# encoding: utf8 from importlib import import_module import logging from jwkest.jwt import JWT from requests.exceptions import HTTPError from django.contrib import auth from django.contrib import messages from django.contrib.auth import get_user_model from django.contrib.auth.models import Permission from django.contrib.contenttypes.models import ContentType from django.shortcuts import redirect, resolve_url from django.urls import reverse from django.utils.http import is_safe_url from django.conf import settings as appSettings from . import auth as _auth from . import exceptions as sso_exc logger = logging.getLogger(__name__) try: LOGIN_REDIRECT_URL = appSettings.LOGIN_REDIRECT_URL except AttributeError: LOGIN_REDIRECT_URL = '/' try: LOGOUT_REDIRECT_URL = appSettings.LOGOUT_REDIRECT_URL except AttributeError: LOGOUT_REDIRECT_URL = '/sso/logout/' if LOGOUT_REDIRECT_URL is None: LOGOUT_REDIRECT_URL = '/sso/logout/' try: AUTH_SCOPE = appSettings.AUTH_SCOPE except AttributeError: AUTH_SCOPE = ('openid',) try: AUTH_AVAIL_IDP = appSettings.AUTH_AVAIL_IDP except AttributeError: AUTH_AVAIL_IDP = [] try: GET_USER_FUNCTION = appSettings.AUTH_GET_USER_FUNCTION except AttributeError: GET_USER_FUNCTION = 'keycloak_oidc:get_user_by_username' try: LOGOUT_REDIRECT_VIEW = appSettings.AUTH_LOGOUT_REDIRECT_VIEW except AttributeError: LOGOUT_REDIRECT_VIEW = 'nalodeni:about' try: LOGIN_ROLE_REQUIRED = appSettings.AUTH_LOGIN_ROLE_REQUIRED except AttributeError: LOGIN_ROLE_REQUIRED = None try: LOGIN_ROLE_REQUIRED_ANY = appSettings.AUTH_LOGIN_ROLE_REQUIRED_ANY except AttributeError: LOGIN_ROLE_REQUIRED_ANY = False jwt_singleton = JWT() def _import_object(path, def_name): try: mod, cls = path.split(':', 1) except ValueError: mod = path cls = def_name return getattr(import_module(mod), cls) get_user = _import_object(GET_USER_FUNCTION, 'get_user') def login(request, idp_hint=None): return_path = request.GET.get(auth.REDIRECT_FIELD_NAME, "/") if idp_hint and idp_hint not in AUTH_AVAIL_IDP: idp_hint = None try: srv = _auth.get_server() except Exception as e: messages.error(request, "Bohužel nastaly potíže s centrálním přihlášením uživatele.") logger.error('Chyba SSO login: %s' % e) return redirect(request.build_absolute_uri("/")) return redirect(_auth.get_server().authorize( redirect_uri = request.build_absolute_uri(reverse("keycloak_oidc:login-done")), state = return_path, scope = AUTH_SCOPE, ) + "&kc_locale=" + appSettings.AUTH_SSO_LOCALE + ( ('&kc_idp_hint=' + idp_hint) if idp_hint is not None else "" ) ) def renew_session(request, returnHttp=False): """ Refresh the session using a refresh_token. """ reftoken = request.session.get('refresh_token', None) if reftoken is None: return -3 #print("Refresh token IAT:", jwt_singleton.unpack(reftoken).payload()['iat']) #print(jwt_singleton.unpack(reftoken).payload()) try: srv = _auth.get_server() except Exception as e: messages.error(request,"Bohužel nastaly potíže s centrálním SSO serverem.") logger.error('Chyba SSO renew: %s' % e) if returnHttp: return redirect(request.build_absolute_uri("/")) else: return -1 new_token = srv.refresh_session(reftoken) if new_token is None: if returnHttp: return redirect(request.build_absolute_uri("/")) else: return -2 rt = new_token._data['refresh_token'] #print("Refresh token NEW IAT:", jwt_singleton.unpack(rt).payload()['iat']) rt_payload = jwt_singleton.unpack(rt).payload() # save the new refresh token request.session['refresh_token'] = rt request.session['refresh_expires_at'] = rt_payload['exp'] if returnHttp: return redirect(request.build_absolute_uri("/")) else: return 0 def callback(request): return_path = request.GET.get("state",'/') sso_code = request.GET.get('code','') if sso_code == "" and return_path == "": messages.error(request, "Chyba při přihlášení na centrálním serveru SSO.") return redirect(request.build_absolute_uri("/")) else: try: res = _auth.get_server().request_token( redirect_uri = request.build_absolute_uri(reverse("keycloak_oidc:login-done")), code = sso_code, ) except HTTPError as e: messages.error(request, "Přihlášení uživatele se nezdařilo. Chyba komunikace s centrálním serverem přihlášení.") logger.error('Chyba SSO callback: %s' % e) return redirect(request.build_absolute_uri("/")) #print(res) #print(dir(res)) #print(res.id_token) #print(res.access_token) id_token_data = jwt_singleton.unpack(res.id_token).payload() reftoken = jwt_singleton.unpack(res._data['refresh_token']).payload() # Get the user from local DB # try: user = get_user(id_token_data) user.backend = 'django.contrib.auth.backends.ModelBackend' except sso_exc.MissingSsoInfo as e: messages.error(request, "Účet na Pirátské identitě není řádně vyplněn. %s" % e ) messages.info(request,"Zkontrolujte údaje na auth.pirati.cz." % (appSettings.AUTH_SERVER + "account/"), extra_tags="safe") logger.error('SSO account incomplete 1: %s' % e) return redirect(request.build_absolute_uri("/")) except sso_exc.EmailVersusIdentityMismatch as e: messages.error(request, "Účet v nalodění a na Pirátské identitě nesouhlasí.") messages.info(request,"Obraťte se na technický odbor Pirátů.") logger.error('SSO account email vs. identity mismatch: %s' % e) return redirect(request.build_absolute_uri("/")) except sso_exc.UsernameAlreadyTaken as e: messages.error(request, "%s" % e) messages.info(request,'Pokud se jedná o vaše uživatelské jméno,' + 'obraťte se na technický odbor Pirátů.') logger.error('Username already taken in Nalodeni: %s' % e) return redirect(request.build_absolute_uri("/")) except sso_exc.EmailAlreadyTaken as e: messages.error(request, "%s" % e) messages.info(request,'Pokud se jedná o váš email,' + 'obraťte se na technický odbor Pirátů.') logger.error('Email already taken in Nalodeni: %s' % e) return redirect(request.build_absolute_uri("/")) # Process the user roles, if any ( taken from bossoidc.backend ) # jwt = jwt_singleton.unpack(res.access_token).payload() #print("access_token:", jwt_singleton.unpack(res.access_token).payload()) #print("id_token:", jwt_singleton.unpack(res.id_token).payload()) #### if 'realm_access' in jwt: # Session logins and Bearer tokens from password Grant Types #### roles = jwt['realm_access']['roles'] #### else: #### roles = [] resource_roles = [] if 'resource_access' in jwt: if jwt['azp'] in jwt['resource_access']: if 'roles' in jwt['resource_access'][jwt['azp']]: resource_roles = jwt['resource_access'][jwt['azp']]['roles'] if (LOGIN_ROLE_REQUIRED_ANY and len(resource_roles) == 0) or (LOGIN_ROLE_REQUIRED is not None and LOGIN_ROLE_REQUIRED not in resource_roles): messages.error(request, "Váš pirátský účet nemá přidělené potřebné přístupové role. Přístup odmítnut.") logger.error('SSO account %s: no roles assigned' % (user.ssoUid,)) return redirect(request.build_absolute_uri("/")) # Update user information # if 'given_name' in jwt and 'family_name' in jwt: user.first_name = jwt['given_name'] user.last_name = jwt['family_name'] else: messages.error(request, "Účet na Pirátské identitě není řádně vyplněn.") messages.info(request,"Zkontrolujte údaje na auth.pirati.cz." % (appSettings.AUTH_SERVER + "account/"), extra_tags="safe") logger.error('SSO account incomplete 2: %s' % e) return redirect(request.build_absolute_uri("/")) user.loginSession = jwt['session_state'] user.save() # Authenticate the user # auth.login(request, user) request.session['openid_token'] = res.id_token request.session['openid'] = id_token_data request.session['refresh_token'] = res._data['refresh_token'] request.session['refresh_expires_at'] = reftoken['exp'] request.session['loginSession'] = user.loginSession ## # Set client-system roles based on SSO roles # # After setting permissions, the User object should be reloaded, # but we do a redirect, so it is fine. ## #print(roles) #print(resource_roles) user.user_permissions.clear() request.session['site_perms'] = [] # prepend "sso_" not to mix with other perms for rr in resource_roles: perm = 'sso_'+rr request.session['site_perms'].append(perm) # tell the User to calculate needed permissions request.session['spc'] = user.do_site_perms_calc(request.session['site_perms']) # tell us, what roles do we have if appSettings.DEBUG: print("Roles assigned:", request.session['site_perms']) print("Roles calculated:", request.session['spc']) url_is_safe = is_safe_url( url = return_path, host = request.get_host(), allowed_hosts = set(request.get_host()), require_https = request.is_secure(), ) if not url_is_safe: return redirect(resolve_url(LOGIN_REDIRECT_URL)) return redirect(return_path) def logout(request, multiSessionFound=False, ssoLogout=False): id_token = request.session.get('openid_token', None) if request.user.is_authenticated: if not multiSessionFound: # reset the DB session, because this is # a user requested logout request.user.loginSession = None request.user.save() request.session['loginSession'] = None auth.logout(request) if ssoLogout: try: srv = _auth.get_server() except Exception as e: messages.error(request,"Bohužel nastaly potíže s centrálním odhlášením uživatele.") logger.error('Chyba SSO logout: %s' % e) return redirect(request.build_absolute_uri("/")) if _auth.get_server().end_session_endpoint and id_token is not None: messages.info(request,"Byli jste odhlášení z aplikace i Centrální identity.") return redirect(_auth.get_server().end_session( post_logout_redirect_uri = request.build_absolute_uri(LOGOUT_REDIRECT_URL), state = '', id_token_hint = id_token, )) else: if id_token: messages.info(request,"Byli jste odhlášení z aplikace. Vaše centrální identita na auth.pirati.cz zůstává přihlášena." % (appSettings.AUTH_SERVER + "account/"), extra_tags="safe") else: messages.info(request,"Byli jste odhlášení z aplikace.") return redirect(LOGOUT_REDIRECT_VIEW) def account(request): return redirect( request.build_absolute_uri(appSettings.AUTH_SERVER+"account/") + "?kc_locale=" + appSettings.AUTH_SSO_LOCALE ) def refresh_session(request): """ Does a keycloak session refresh via middleware. """ resp = { 'exp' : request.session.get('refresh_expires_at', None) } return HttpResponse(json.dumps(resp), content_type="application/json")