nalodeni.pirati.cz/keycloak_oidc/views.py

338 lines
12 KiB
Python

# encoding: utf8
from importlib import import_module
import logging
from jwkest.jwt import JWT
from requests.exceptions import HTTPError
from django.contrib import auth
from django.contrib import messages
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Permission
from django.contrib.contenttypes.models import ContentType
from django.shortcuts import redirect, resolve_url
from django.urls import reverse
from django.utils.http import is_safe_url
from django.conf import settings as appSettings
from . import auth as _auth
from . import exceptions as sso_exc
logger = logging.getLogger(__name__)
try:
LOGIN_REDIRECT_URL = appSettings.LOGIN_REDIRECT_URL
except AttributeError:
LOGIN_REDIRECT_URL = '/'
try:
LOGOUT_REDIRECT_URL = appSettings.LOGOUT_REDIRECT_URL
except AttributeError:
LOGOUT_REDIRECT_URL = '/sso/logout/'
if LOGOUT_REDIRECT_URL is None:
LOGOUT_REDIRECT_URL = '/sso/logout/'
try:
AUTH_SCOPE = appSettings.AUTH_SCOPE
except AttributeError:
AUTH_SCOPE = ('openid',)
try:
AUTH_AVAIL_IDP = appSettings.AUTH_AVAIL_IDP
except AttributeError:
AUTH_AVAIL_IDP = []
try:
GET_USER_FUNCTION = appSettings.AUTH_GET_USER_FUNCTION
except AttributeError:
GET_USER_FUNCTION = 'keycloak_oidc:get_user_by_username'
try:
LOGOUT_REDIRECT_VIEW = appSettings.AUTH_LOGOUT_REDIRECT_VIEW
except AttributeError:
LOGOUT_REDIRECT_VIEW = 'nalodeni:about'
try:
LOGIN_ROLE_REQUIRED = appSettings.AUTH_LOGIN_ROLE_REQUIRED
except AttributeError:
LOGIN_ROLE_REQUIRED = None
try:
LOGIN_ROLE_REQUIRED_ANY = appSettings.AUTH_LOGIN_ROLE_REQUIRED_ANY
except AttributeError:
LOGIN_ROLE_REQUIRED_ANY = False
jwt_singleton = JWT()
def _import_object(path, def_name):
try:
mod, cls = path.split(':', 1)
except ValueError:
mod = path
cls = def_name
return getattr(import_module(mod), cls)
get_user = _import_object(GET_USER_FUNCTION, 'get_user')
def login(request, idp_hint=None):
return_path = request.GET.get(auth.REDIRECT_FIELD_NAME, "/")
if idp_hint and idp_hint not in AUTH_AVAIL_IDP:
idp_hint = None
try:
srv = _auth.get_server()
except Exception as e:
messages.error(request, "Bohužel nastaly potíže s centrálním přihlášením uživatele.")
logger.error('Chyba SSO login: %s' % e)
return redirect(request.build_absolute_uri("/"))
return redirect(_auth.get_server().authorize(
redirect_uri = request.build_absolute_uri(reverse("keycloak_oidc:login-done")),
state = return_path,
scope = AUTH_SCOPE,
) + "&kc_locale=" + appSettings.AUTH_SSO_LOCALE + (
('&kc_idp_hint=' + idp_hint) if idp_hint is not None else ""
)
)
def renew_session(request, returnHttp=False):
"""
Refresh the session using a refresh_token.
"""
reftoken = request.session.get('refresh_token', None)
if reftoken is None:
return -3
#print("Refresh token IAT:", jwt_singleton.unpack(reftoken).payload()['iat'])
#print(jwt_singleton.unpack(reftoken).payload())
try:
srv = _auth.get_server()
except Exception as e:
messages.error(request,"Bohužel nastaly potíže s centrálním SSO serverem.")
logger.error('Chyba SSO renew: %s' % e)
if returnHttp:
return redirect(request.build_absolute_uri("/"))
else:
return -1
new_token = srv.refresh_session(reftoken)
if new_token is None:
if returnHttp:
return redirect(request.build_absolute_uri("/"))
else:
return -2
rt = new_token._data['refresh_token']
#print("Refresh token NEW IAT:", jwt_singleton.unpack(rt).payload()['iat'])
rt_payload = jwt_singleton.unpack(rt).payload()
# save the new refresh token
request.session['refresh_token'] = rt
request.session['refresh_expires_at'] = rt_payload['exp']
if returnHttp:
return redirect(request.build_absolute_uri("/"))
else:
return 0
def callback(request):
return_path = request.GET.get("state",'/')
sso_code = request.GET.get('code','')
if sso_code == "" and return_path == "":
messages.error(request, "Chyba při přihlášení na centrálním serveru SSO.")
return redirect(request.build_absolute_uri("/"))
else:
try:
res = _auth.get_server().request_token(
redirect_uri = request.build_absolute_uri(reverse("keycloak_oidc:login-done")),
code = sso_code,
)
except HTTPError as e:
messages.error(request, "Přihlášení uživatele se nezdařilo. Chyba komunikace s centrálním serverem přihlášení.")
logger.error('Chyba SSO callback: %s' % e)
return redirect(request.build_absolute_uri("/"))
#print(res)
#print(dir(res))
#print(res.id_token)
#print(res.access_token)
id_token_data = jwt_singleton.unpack(res.id_token).payload()
reftoken = jwt_singleton.unpack(res._data['refresh_token']).payload()
# Get the user from local DB
#
try:
user = get_user(id_token_data)
user.backend = 'django.contrib.auth.backends.ModelBackend'
except sso_exc.MissingSsoInfo as e:
messages.error(request, "Účet na Pirátské identitě není řádně vyplněn. %s" % e )
messages.info(request,"Zkontrolujte údaje na <a href='%s' target='_blank'>auth.pirati.cz</a>." % (appSettings.AUTH_SERVER + "account/"), extra_tags="safe")
logger.error('SSO account incomplete 1: %s' % e)
return redirect(request.build_absolute_uri("/"))
except sso_exc.EmailVersusIdentityMismatch as e:
messages.error(request, "Účet v nalodění a na Pirátské identitě nesouhlasí.")
messages.info(request,"Obraťte se na technický odbor Pirátů.")
logger.error('SSO account email vs. identity mismatch: %s' % e)
return redirect(request.build_absolute_uri("/"))
except sso_exc.UsernameAlreadyTaken as e:
messages.error(request, "%s" % e)
messages.info(request,'Pokud se jedná o vaše uživatelské jméno,'
+ 'obraťte se na technický odbor Pirátů.')
logger.error('Username already taken in Nalodeni: %s' % e)
return redirect(request.build_absolute_uri("/"))
except sso_exc.EmailAlreadyTaken as e:
messages.error(request, "%s" % e)
messages.info(request,'Pokud se jedná o váš email,'
+ 'obraťte se na technický odbor Pirátů.')
logger.error('Email already taken in Nalodeni: %s' % e)
return redirect(request.build_absolute_uri("/"))
# Process the user roles, if any ( taken from bossoidc.backend )
#
jwt = jwt_singleton.unpack(res.access_token).payload()
#print("access_token:", jwt_singleton.unpack(res.access_token).payload())
#print("id_token:", jwt_singleton.unpack(res.id_token).payload())
#### if 'realm_access' in jwt: # Session logins and Bearer tokens from password Grant Types
#### roles = jwt['realm_access']['roles']
#### else:
#### roles = []
resource_roles = []
if 'resource_access' in jwt:
if jwt['azp'] in jwt['resource_access']:
if 'roles' in jwt['resource_access'][jwt['azp']]:
resource_roles = jwt['resource_access'][jwt['azp']]['roles']
if (LOGIN_ROLE_REQUIRED_ANY and len(resource_roles) == 0) or (LOGIN_ROLE_REQUIRED is not None and LOGIN_ROLE_REQUIRED not in resource_roles):
messages.error(request, "Váš pirátský účet nemá přidělené potřebné přístupové role. Přístup odmítnut.")
logger.error('SSO account %s: no roles assigned' % (user.ssoUid,))
return redirect(request.build_absolute_uri("/"))
# Update user information
#
if 'given_name' in jwt and 'family_name' in jwt:
user.first_name = jwt['given_name']
user.last_name = jwt['family_name']
else:
messages.error(request, "Účet na Pirátské identitě není řádně vyplněn.")
messages.info(request,"Zkontrolujte údaje na <a href='%s' target='_blank'>auth.pirati.cz</a>." % (appSettings.AUTH_SERVER + "account/"), extra_tags="safe")
logger.error('SSO account incomplete 2: %s' % e)
return redirect(request.build_absolute_uri("/"))
user.loginSession = jwt['session_state']
user.save()
# Authenticate the user
#
auth.login(request, user)
request.session['openid_token'] = res.id_token
request.session['openid'] = id_token_data
request.session['refresh_token'] = res._data['refresh_token']
request.session['refresh_expires_at'] = reftoken['exp']
request.session['loginSession'] = user.loginSession
##
# Set client-system roles based on SSO roles
#
# After setting permissions, the User object should be reloaded,
# but we do a redirect, so it is fine.
##
#print(roles)
#print(resource_roles)
user.user_permissions.clear()
request.session['site_perms'] = []
# prepend "sso_" not to mix with other perms
for rr in resource_roles:
perm = 'sso_'+rr
request.session['site_perms'].append(perm)
# tell the User to calculate needed permissions
request.session['spc'] = user.do_site_perms_calc(request.session['site_perms'])
# tell us, what roles do we have
if appSettings.DEBUG:
print("Roles assigned:", request.session['site_perms'])
print("Roles calculated:", request.session['spc'])
url_is_safe = is_safe_url(
url = return_path,
host = request.get_host(),
allowed_hosts = set(request.get_host()),
require_https = request.is_secure(),
)
if not url_is_safe:
return redirect(resolve_url(LOGIN_REDIRECT_URL))
return redirect(return_path)
def logout(request, multiSessionFound=False, ssoLogout=False):
id_token = request.session.get('openid_token', None)
if request.user.is_authenticated:
if not multiSessionFound:
# reset the DB session, because this is
# a user requested logout
request.user.loginSession = None
request.user.save()
request.session['loginSession'] = None
auth.logout(request)
if ssoLogout:
try:
srv = _auth.get_server()
except Exception as e:
messages.error(request,"Bohužel nastaly potíže s centrálním odhlášením uživatele.")
logger.error('Chyba SSO logout: %s' % e)
return redirect(request.build_absolute_uri("/"))
if _auth.get_server().end_session_endpoint and id_token is not None:
messages.info(request,"Byli jste odhlášení z aplikace i Centrální identity.")
return redirect(_auth.get_server().end_session(
post_logout_redirect_uri = request.build_absolute_uri(LOGOUT_REDIRECT_URL),
state = '',
id_token_hint = id_token,
))
else:
if id_token:
messages.info(request,"Byli jste odhlášení z aplikace. Vaše centrální identita na <a href='%s' target='_blank'>auth.pirati.cz</a> zůstává přihlášena." % (appSettings.AUTH_SERVER + "account/"), extra_tags="safe")
else:
messages.info(request,"Byli jste odhlášení z aplikace.")
return redirect(LOGOUT_REDIRECT_VIEW)
def account(request):
return redirect(
request.build_absolute_uri(appSettings.AUTH_SERVER+"account/")
+ "?kc_locale=" + appSettings.AUTH_SSO_LOCALE
)
def refresh_session(request):
"""
Does a keycloak session refresh via middleware.
"""
resp = { 'exp' : request.session.get('refresh_expires_at', None) }
return HttpResponse(json.dumps(resp), content_type="application/json")