338 lines
12 KiB
Python
338 lines
12 KiB
Python
# encoding: utf8
|
|
from importlib import import_module
|
|
|
|
import logging
|
|
|
|
from jwkest.jwt import JWT
|
|
from requests.exceptions import HTTPError
|
|
|
|
from django.contrib import auth
|
|
from django.contrib import messages
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.models import Permission
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.shortcuts import redirect, resolve_url
|
|
from django.urls import reverse
|
|
from django.utils.http import is_safe_url
|
|
|
|
from django.conf import settings as appSettings
|
|
|
|
from . import auth as _auth
|
|
from . import exceptions as sso_exc
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
LOGIN_REDIRECT_URL = appSettings.LOGIN_REDIRECT_URL
|
|
except AttributeError:
|
|
LOGIN_REDIRECT_URL = '/'
|
|
|
|
try:
|
|
LOGOUT_REDIRECT_URL = appSettings.LOGOUT_REDIRECT_URL
|
|
except AttributeError:
|
|
LOGOUT_REDIRECT_URL = '/sso/logout/'
|
|
|
|
if LOGOUT_REDIRECT_URL is None:
|
|
LOGOUT_REDIRECT_URL = '/sso/logout/'
|
|
|
|
try:
|
|
AUTH_SCOPE = appSettings.AUTH_SCOPE
|
|
except AttributeError:
|
|
AUTH_SCOPE = ('openid',)
|
|
|
|
try:
|
|
AUTH_AVAIL_IDP = appSettings.AUTH_AVAIL_IDP
|
|
except AttributeError:
|
|
AUTH_AVAIL_IDP = []
|
|
|
|
try:
|
|
GET_USER_FUNCTION = appSettings.AUTH_GET_USER_FUNCTION
|
|
except AttributeError:
|
|
GET_USER_FUNCTION = 'keycloak_oidc:get_user_by_username'
|
|
|
|
try:
|
|
LOGOUT_REDIRECT_VIEW = appSettings.AUTH_LOGOUT_REDIRECT_VIEW
|
|
except AttributeError:
|
|
LOGOUT_REDIRECT_VIEW = 'nalodeni:about'
|
|
|
|
try:
|
|
LOGIN_ROLE_REQUIRED = appSettings.AUTH_LOGIN_ROLE_REQUIRED
|
|
except AttributeError:
|
|
LOGIN_ROLE_REQUIRED = None
|
|
|
|
try:
|
|
LOGIN_ROLE_REQUIRED_ANY = appSettings.AUTH_LOGIN_ROLE_REQUIRED_ANY
|
|
except AttributeError:
|
|
LOGIN_ROLE_REQUIRED_ANY = False
|
|
|
|
|
|
jwt_singleton = JWT()
|
|
|
|
|
|
def _import_object(path, def_name):
|
|
try:
|
|
mod, cls = path.split(':', 1)
|
|
except ValueError:
|
|
mod = path
|
|
cls = def_name
|
|
|
|
return getattr(import_module(mod), cls)
|
|
|
|
get_user = _import_object(GET_USER_FUNCTION, 'get_user')
|
|
|
|
|
|
def login(request, idp_hint=None):
|
|
return_path = request.GET.get(auth.REDIRECT_FIELD_NAME, "/")
|
|
|
|
if idp_hint and idp_hint not in AUTH_AVAIL_IDP:
|
|
idp_hint = None
|
|
|
|
try:
|
|
srv = _auth.get_server()
|
|
except Exception as e:
|
|
messages.error(request, "Bohužel nastaly potíže s centrálním přihlášením uživatele.")
|
|
logger.error('Chyba SSO login: %s' % e)
|
|
return redirect(request.build_absolute_uri("/"))
|
|
|
|
return redirect(_auth.get_server().authorize(
|
|
redirect_uri = request.build_absolute_uri(reverse("keycloak_oidc:login-done")),
|
|
state = return_path,
|
|
scope = AUTH_SCOPE,
|
|
) + "&kc_locale=" + appSettings.AUTH_SSO_LOCALE + (
|
|
('&kc_idp_hint=' + idp_hint) if idp_hint is not None else ""
|
|
)
|
|
)
|
|
|
|
def renew_session(request, returnHttp=False):
|
|
"""
|
|
Refresh the session using a refresh_token.
|
|
"""
|
|
|
|
reftoken = request.session.get('refresh_token', None)
|
|
|
|
if reftoken is None:
|
|
return -3
|
|
|
|
#print("Refresh token IAT:", jwt_singleton.unpack(reftoken).payload()['iat'])
|
|
#print(jwt_singleton.unpack(reftoken).payload())
|
|
|
|
try:
|
|
srv = _auth.get_server()
|
|
except Exception as e:
|
|
messages.error(request,"Bohužel nastaly potíže s centrálním SSO serverem.")
|
|
logger.error('Chyba SSO renew: %s' % e)
|
|
if returnHttp:
|
|
return redirect(request.build_absolute_uri("/"))
|
|
else:
|
|
return -1
|
|
|
|
new_token = srv.refresh_session(reftoken)
|
|
if new_token is None:
|
|
if returnHttp:
|
|
return redirect(request.build_absolute_uri("/"))
|
|
else:
|
|
return -2
|
|
|
|
rt = new_token._data['refresh_token']
|
|
#print("Refresh token NEW IAT:", jwt_singleton.unpack(rt).payload()['iat'])
|
|
rt_payload = jwt_singleton.unpack(rt).payload()
|
|
|
|
# save the new refresh token
|
|
request.session['refresh_token'] = rt
|
|
request.session['refresh_expires_at'] = rt_payload['exp']
|
|
|
|
if returnHttp:
|
|
return redirect(request.build_absolute_uri("/"))
|
|
else:
|
|
return 0
|
|
|
|
|
|
def callback(request):
|
|
return_path = request.GET.get("state",'/')
|
|
sso_code = request.GET.get('code','')
|
|
|
|
if sso_code == "" and return_path == "":
|
|
messages.error(request, "Chyba při přihlášení na centrálním serveru SSO.")
|
|
return redirect(request.build_absolute_uri("/"))
|
|
else:
|
|
try:
|
|
res = _auth.get_server().request_token(
|
|
redirect_uri = request.build_absolute_uri(reverse("keycloak_oidc:login-done")),
|
|
code = sso_code,
|
|
)
|
|
except HTTPError as e:
|
|
messages.error(request, "Přihlášení uživatele se nezdařilo. Chyba komunikace s centrálním serverem přihlášení.")
|
|
logger.error('Chyba SSO callback: %s' % e)
|
|
return redirect(request.build_absolute_uri("/"))
|
|
|
|
#print(res)
|
|
#print(dir(res))
|
|
#print(res.id_token)
|
|
#print(res.access_token)
|
|
|
|
id_token_data = jwt_singleton.unpack(res.id_token).payload()
|
|
|
|
reftoken = jwt_singleton.unpack(res._data['refresh_token']).payload()
|
|
|
|
# Get the user from local DB
|
|
#
|
|
try:
|
|
user = get_user(id_token_data)
|
|
user.backend = 'django.contrib.auth.backends.ModelBackend'
|
|
except sso_exc.MissingSsoInfo as e:
|
|
messages.error(request, "Účet na Pirátské identitě není řádně vyplněn. %s" % e )
|
|
messages.info(request,"Zkontrolujte údaje na <a href='%s' target='_blank'>auth.pirati.cz</a>." % (appSettings.AUTH_SERVER + "account/"), extra_tags="safe")
|
|
logger.error('SSO account incomplete 1: %s' % e)
|
|
return redirect(request.build_absolute_uri("/"))
|
|
except sso_exc.EmailVersusIdentityMismatch as e:
|
|
messages.error(request, "Účet v nalodění a na Pirátské identitě nesouhlasí.")
|
|
messages.info(request,"Obraťte se na technický odbor Pirátů.")
|
|
logger.error('SSO account email vs. identity mismatch: %s' % e)
|
|
return redirect(request.build_absolute_uri("/"))
|
|
except sso_exc.UsernameAlreadyTaken as e:
|
|
messages.error(request, "%s" % e)
|
|
messages.info(request,'Pokud se jedná o vaše uživatelské jméno,'
|
|
+ 'obraťte se na technický odbor Pirátů.')
|
|
logger.error('Username already taken in Nalodeni: %s' % e)
|
|
return redirect(request.build_absolute_uri("/"))
|
|
except sso_exc.EmailAlreadyTaken as e:
|
|
messages.error(request, "%s" % e)
|
|
messages.info(request,'Pokud se jedná o váš email,'
|
|
+ 'obraťte se na technický odbor Pirátů.')
|
|
logger.error('Email already taken in Nalodeni: %s' % e)
|
|
return redirect(request.build_absolute_uri("/"))
|
|
|
|
|
|
# Process the user roles, if any ( taken from bossoidc.backend )
|
|
#
|
|
jwt = jwt_singleton.unpack(res.access_token).payload()
|
|
#print("access_token:", jwt_singleton.unpack(res.access_token).payload())
|
|
#print("id_token:", jwt_singleton.unpack(res.id_token).payload())
|
|
#### if 'realm_access' in jwt: # Session logins and Bearer tokens from password Grant Types
|
|
#### roles = jwt['realm_access']['roles']
|
|
#### else:
|
|
#### roles = []
|
|
resource_roles = []
|
|
if 'resource_access' in jwt:
|
|
if jwt['azp'] in jwt['resource_access']:
|
|
if 'roles' in jwt['resource_access'][jwt['azp']]:
|
|
resource_roles = jwt['resource_access'][jwt['azp']]['roles']
|
|
if (LOGIN_ROLE_REQUIRED_ANY and len(resource_roles) == 0) or (LOGIN_ROLE_REQUIRED is not None and LOGIN_ROLE_REQUIRED not in resource_roles):
|
|
messages.error(request, "Váš pirátský účet nemá přidělené potřebné přístupové role. Přístup odmítnut.")
|
|
logger.error('SSO account %s: no roles assigned' % (user.ssoUid,))
|
|
return redirect(request.build_absolute_uri("/"))
|
|
|
|
# Update user information
|
|
#
|
|
if 'given_name' in jwt and 'family_name' in jwt:
|
|
user.first_name = jwt['given_name']
|
|
user.last_name = jwt['family_name']
|
|
else:
|
|
messages.error(request, "Účet na Pirátské identitě není řádně vyplněn.")
|
|
messages.info(request,"Zkontrolujte údaje na <a href='%s' target='_blank'>auth.pirati.cz</a>." % (appSettings.AUTH_SERVER + "account/"), extra_tags="safe")
|
|
logger.error('SSO account incomplete 2: %s' % e)
|
|
return redirect(request.build_absolute_uri("/"))
|
|
|
|
user.loginSession = jwt['session_state']
|
|
user.save()
|
|
|
|
# Authenticate the user
|
|
#
|
|
auth.login(request, user)
|
|
|
|
request.session['openid_token'] = res.id_token
|
|
request.session['openid'] = id_token_data
|
|
request.session['refresh_token'] = res._data['refresh_token']
|
|
request.session['refresh_expires_at'] = reftoken['exp']
|
|
request.session['loginSession'] = user.loginSession
|
|
|
|
|
|
##
|
|
# Set client-system roles based on SSO roles
|
|
#
|
|
# After setting permissions, the User object should be reloaded,
|
|
# but we do a redirect, so it is fine.
|
|
##
|
|
#print(roles)
|
|
#print(resource_roles)
|
|
user.user_permissions.clear()
|
|
|
|
request.session['site_perms'] = []
|
|
|
|
# prepend "sso_" not to mix with other perms
|
|
for rr in resource_roles:
|
|
perm = 'sso_'+rr
|
|
request.session['site_perms'].append(perm)
|
|
|
|
# tell the User to calculate needed permissions
|
|
request.session['spc'] = user.do_site_perms_calc(request.session['site_perms'])
|
|
|
|
# tell us, what roles do we have
|
|
if appSettings.DEBUG:
|
|
print("Roles assigned:", request.session['site_perms'])
|
|
print("Roles calculated:", request.session['spc'])
|
|
|
|
url_is_safe = is_safe_url(
|
|
url = return_path,
|
|
host = request.get_host(),
|
|
allowed_hosts = set(request.get_host()),
|
|
require_https = request.is_secure(),
|
|
)
|
|
|
|
if not url_is_safe:
|
|
return redirect(resolve_url(LOGIN_REDIRECT_URL))
|
|
return redirect(return_path)
|
|
|
|
|
|
def logout(request, multiSessionFound=False, ssoLogout=False):
|
|
id_token = request.session.get('openid_token', None)
|
|
|
|
if request.user.is_authenticated:
|
|
if not multiSessionFound:
|
|
# reset the DB session, because this is
|
|
# a user requested logout
|
|
request.user.loginSession = None
|
|
request.user.save()
|
|
|
|
request.session['loginSession'] = None
|
|
|
|
auth.logout(request)
|
|
|
|
if ssoLogout:
|
|
try:
|
|
srv = _auth.get_server()
|
|
except Exception as e:
|
|
messages.error(request,"Bohužel nastaly potíže s centrálním odhlášením uživatele.")
|
|
logger.error('Chyba SSO logout: %s' % e)
|
|
return redirect(request.build_absolute_uri("/"))
|
|
|
|
if _auth.get_server().end_session_endpoint and id_token is not None:
|
|
messages.info(request,"Byli jste odhlášení z aplikace i Centrální identity.")
|
|
return redirect(_auth.get_server().end_session(
|
|
post_logout_redirect_uri = request.build_absolute_uri(LOGOUT_REDIRECT_URL),
|
|
state = '',
|
|
id_token_hint = id_token,
|
|
))
|
|
else:
|
|
if id_token:
|
|
messages.info(request,"Byli jste odhlášení z aplikace. Vaše centrální identita na <a href='%s' target='_blank'>auth.pirati.cz</a> zůstává přihlášena." % (appSettings.AUTH_SERVER + "account/"), extra_tags="safe")
|
|
else:
|
|
messages.info(request,"Byli jste odhlášení z aplikace.")
|
|
|
|
return redirect(LOGOUT_REDIRECT_VIEW)
|
|
|
|
def account(request):
|
|
return redirect(
|
|
request.build_absolute_uri(appSettings.AUTH_SERVER+"account/")
|
|
+ "?kc_locale=" + appSettings.AUTH_SSO_LOCALE
|
|
)
|
|
|
|
def refresh_session(request):
|
|
"""
|
|
Does a keycloak session refresh via middleware.
|
|
"""
|
|
resp = { 'exp' : request.session.get('refresh_expires_at', None) }
|
|
|
|
return HttpResponse(json.dumps(resp), content_type="application/json")
|
|
|