Newer
Older
Jörn Künsemöller
committed
import requests
import logging
import cachetools.func
import os
import json
import re
from lofar.sas.tmss.tmss.exceptions import TMSSException
Jörn Künsemöller
committed
from lofar.sas.tmss.tmss.tmssapp import models
Jörn Künsemöller
committed
logger = logging.Logger(__name__)
KEYCLOAK_TOKEN_URL = os.environ.get('KEYCLOAK_TOKEN_URL', 'https://sdc-dev.astron.nl/auth/realms/master/protocol/openid-connect/token')
KEYCLOAK_ADMIN_USER = os.environ.get('KEYCLOAK_ADMIN_USER', 'secret')
KEYCLOAK_ADMIN_PASSWORD = os.environ.get('KEYCLOAK_ADMIN_PASSWORD', 'secret')
KEYCLOAK_API_BASE_URL = os.environ.get('KEYCLOAK_API_BASE_URL', 'https://sdc-dev.astron.nl/auth/admin/realms/master')
class KeycloakAdminAPISession(requests.Session):
@cachetools.func.ttl_cache(ttl=30)
def get_token(self):
data = {'username': KEYCLOAK_ADMIN_USER, 'password': KEYCLOAK_ADMIN_PASSWORD, 'grant_type': 'password',
'client_id': 'admin-cli'}
response = self.post(url=KEYCLOAK_TOKEN_URL, data=data) # , headers={'Accept': 'text/plain'})
if response.status_code == 200:
response_dict = json.loads(response.content.decode('utf-8'))
token = response_dict['access_token']
logger.info('Obtained Keycloak API token')
return token
else:
raise TMSSException('Keycloak admin API token could not be obtained: %s' % response.text)
Jörn Künsemöller
committed
def get(self, *args, **kwargs):
token = self.get_token()
headers = kwargs.pop('headers', {})
headers["Authorization"] = "Bearer %s" % token
response = super().get(*args, headers=headers, **kwargs)
if response.status_code == 200:
return json.loads(response.content.decode('utf-8'))
else:
raise TMSSException('Keycloak admin API query failed: %s' % response.text)
Jörn Künsemöller
committed
def get_users_by_role_in_project(role, project):
"""
returns the list of users that have the specified role in the specified project
"""
project_persons = get_project_persons()
Jörn Künsemöller
committed
if project in project_persons:
return project_persons[project][role]
else:
[]
Jörn Künsemöller
committed
@cachetools.func.ttl_cache(ttl=600)
def get_project_persons():
"""
returns a mapping of projects names to a dict that contains the users that
have a particular role in that project.
"""
project_persons_map = {}
with KeycloakAdminAPISession() as ksession:
groups = ksession.get(url='%s/groups/' % KEYCLOAK_API_BASE_URL)
for group in groups:
if group['name'] == 'Project':
projects = group['subGroups']
for project in projects:
project_detail = ksession.get(url='%s/groups/%s/' % (KEYCLOAK_API_BASE_URL, project['id']))
attributes = project_detail.get('attributes', {})
Jörn Künsemöller
committed
legacy_role_keys = {'pi': 'lofarProjectPI',
'friend_of_project': 'lofarProjectFriend',
'contact': 'lofarProjectContactauthor'}
for project_role in models.ProjectRole.Choices:
# get role attribute from project:
role = project_role.value
users = attributes.get(role, [])
# fall back to legacy-style attribute:
if not users and role in legacy_role_keys:
users = attributes.get(legacy_role_keys[role], [])
# convert user list (LDAP DNs) to something we can use in TMSS (email)
Jörn Künsemöller
committed
user_map = get_user_mapping()
Jörn Künsemöller
committed
mapped_users = [user_map[user] for user in users if user in user_map] # email list of referenced users
unmappable_users = [user for user in users if user not in user_map] # list of references for which no account was found
Jörn Künsemöller
committed
for unmappable_user in unmappable_users:
Jörn Künsemöller
committed
# Note: Usually Keycloak should return DN references to user accounts. For PI's, someone had the
# great idea to allow to specify a freeform string instead, to refer to people who may or may not
# have an account. Even if the person has a user account, there is no way to replicate the exact
# string 'representation' Keycloak returns, since the string may contain typos, or info that is not
# stored in the user accounts (like titles).
# The following unsafe hack tries to determine whether there is a user account that matches the
# name given in the string (ignore titles since they are not part of the user account):
# unmappable_user_fixed = re.sub('Dr\.', '', unmappable_user)
# unmappable_user_fixed = re.sub('Prof\.', '', unmappable_user_fixed)
# unmappable_user_fixed = re.sub('ir\.', '', unmappable_user_fixed)
# unmappable_user_fixed = re.sub('Ir\.', '', unmappable_user_fixed)
# unmappable_user_fixed = re.sub('apl\.', '', unmappable_user_fixed)
# unmappable_user_fixed = re.sub(' +', ' ', unmappable_user_fixed)
#
# if unmappable_user_fixed in user_map:
# mapped_users.append(user_map[unmappable_user_fixed])
# else:
logger.warning("Could not match Keycloak user reference '%s' to a known user." % unmappable_user)
if not unmappable_user.startswith('cn='):
logger.warning("LOFAR allowed to reference a person by a freeform string instead of a user account. '%s' seems to be such a legacy reference. This needs to be fixed in the identity management." % unmappable_user)
Jörn Künsemöller
committed
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
project_persons_map.setdefault(project['name'], {})[role] = mapped_users
return project_persons_map
@cachetools.func.ttl_cache(ttl=600)
def get_user_mapping():
"""
returns a mapping of both the string ('Project, Tobitha') or LDAP ('cn=to_project,ou=Users,o=lofartest,c=eu')
representations of users that are returned by Keycloak to a reference that we can use to identify a user
in TMSS, i.e. email.
# todo: consider looking up / creating / returning user objects directly (but that generates a ton of lookups and unnecessary users)
# todo: we need to review that all used references are actually unique, especially Keycloak's reference by string representation does not look safe!
"""
user_map = {}
with KeycloakAdminAPISession() as ksession:
users = ksession.get(url='%s/users/' % KEYCLOAK_API_BASE_URL)
for user in users:
if 'attributes' in user:
for ldap_dn in user['attributes'].get('LDAP_ENTRY_DN', []):
user_map[ldap_dn] = user['email']
for ldap_dn in user['attributes'].get('KEYCLOAK_DN', []):
user_map[ldap_dn] = user['email']
if 'firstName' in user and 'lastName' in user:
user_map['%s, %s' % (user['lastName'], user['firstName'])] = user['email']
return user_map