import requests import logging import cachetools.func import os import json import re from lofar.sas.tmss.tmss.exceptions import TMSSException from lofar.sas.tmss.tmss.tmssapp import models logger = logging.Logger(__name__) KEYCLOAK_TOKEN_URL = os.environ.get('KEYCLOAK_TOKEN_URL', 'https://sdc-dev.astron.nl/auth/realms/master/protocol/openid-connect/token') KEYCLOAK_ADMIN_USER = os.environ.get('KEYCLOAK_ADMIN_USER', 'secret') KEYCLOAK_ADMIN_PASSWORD = os.environ.get('KEYCLOAK_ADMIN_PASSWORD', 'secret') KEYCLOAK_API_BASE_URL = os.environ.get('KEYCLOAK_API_BASE_URL', 'https://sdc-dev.astron.nl/auth/admin/realms/master') class KeycloakAdminAPISession(requests.Session): @cachetools.func.ttl_cache(ttl=30) def get_token(self): data = {'username': KEYCLOAK_ADMIN_USER, 'password': KEYCLOAK_ADMIN_PASSWORD, 'grant_type': 'password', 'client_id': 'admin-cli'} response = self.post(url=KEYCLOAK_TOKEN_URL, data=data) # , headers={'Accept': 'text/plain'}) if response.status_code == 200: response_dict = json.loads(response.content.decode('utf-8')) token = response_dict['access_token'] logger.info('Obtained Keycloak API token') return token else: raise TMSSException('Keycloak admin API token could not be obtained: %s' % response.text) def get(self, *args, **kwargs): token = self.get_token() headers = kwargs.pop('headers', {}) headers["Authorization"] = "Bearer %s" % token response = super().get(*args, headers=headers, **kwargs) if response.status_code == 200: return json.loads(response.content.decode('utf-8')) else: raise TMSSException('Keycloak admin API query failed: %s' % response.text) def get_users_by_role_in_project(role, project): """ returns the list of users that have the specified role in the specified project """ project_persons = get_project_persons() if project in project_persons: return project_persons[project][role] else: [] @cachetools.func.ttl_cache(ttl=600) def get_project_persons(): """ returns a mapping of projects names to a dict that contains the users that have a particular role in that project. """ project_persons_map = {} with KeycloakAdminAPISession() as ksession: groups = ksession.get(url='%s/groups/' % KEYCLOAK_API_BASE_URL) for group in groups: if group['name'] == 'Project': projects = group['subGroups'] for project in projects: project_detail = ksession.get(url='%s/groups/%s/' % (KEYCLOAK_API_BASE_URL, project['id'])) attributes = project_detail.get('attributes', {}) legacy_role_keys = {'pi': 'lofarProjectPI', 'friend_of_project': 'lofarProjectFriend', 'contact': 'lofarProjectContactauthor'} for project_role in models.ProjectRole.Choices: # get role attribute from project: role = project_role.value users = attributes.get(role, []) # fall back to legacy-style attribute: if not users and role in legacy_role_keys: users = attributes.get(legacy_role_keys[role], []) # convert user list (LDAP DNs) to something we can use in TMSS (email) user_map = get_user_mapping() mapped_users = [user_map[user] for user in users if user in user_map] # email list of referenced users unmappable_users = [user for user in users if user not in user_map] # list of references for which no account was found for unmappable_user in unmappable_users: # Note: Usually Keycloak should return DN references to user accounts. For PI's, someone had the # great idea to allow to specify a freeform string instead, to refer to people who may or may not # have an account. Even if the person has a user account, there is no way to replicate the exact # string 'representation' Keycloak returns, since the string may contain typos, or info that is not # stored in the user accounts (like titles). # The following unsafe hack tries to determine whether there is a user account that matches the # name given in the string (ignore titles since they are not part of the user account): # unmappable_user_fixed = re.sub('Dr\.', '', unmappable_user) # unmappable_user_fixed = re.sub('Prof\.', '', unmappable_user_fixed) # unmappable_user_fixed = re.sub('ir\.', '', unmappable_user_fixed) # unmappable_user_fixed = re.sub('Ir\.', '', unmappable_user_fixed) # unmappable_user_fixed = re.sub('apl\.', '', unmappable_user_fixed) # unmappable_user_fixed = re.sub(' +', ' ', unmappable_user_fixed) # # if unmappable_user_fixed in user_map: # mapped_users.append(user_map[unmappable_user_fixed]) # else: logger.warning("Could not match Keycloak user reference '%s' to a known user." % unmappable_user) if not unmappable_user.startswith('cn='): logger.warning("LOFAR allowed to reference a person by a freeform string instead of a user account. '%s' seems to be such a legacy reference. This needs to be fixed in the identity management." % unmappable_user) project_persons_map.setdefault(project['name'], {})[role] = mapped_users return project_persons_map @cachetools.func.ttl_cache(ttl=600) def get_user_mapping(): """ returns a mapping of both the string ('Project, Tobitha') or LDAP ('cn=to_project,ou=Users,o=lofartest,c=eu') representations of users that are returned by Keycloak to a reference that we can use to identify a user in TMSS, i.e. email. # todo: consider looking up / creating / returning user objects directly (but that generates a ton of lookups and unnecessary users) # todo: we need to review that all used references are actually unique, especially Keycloak's reference by string representation does not look safe! """ user_map = {} with KeycloakAdminAPISession() as ksession: users = ksession.get(url='%s/users/' % KEYCLOAK_API_BASE_URL) for user in users: if 'attributes' in user: for ldap_dn in user['attributes'].get('LDAP_ENTRY_DN', []): user_map[ldap_dn] = user['email'] for ldap_dn in user['attributes'].get('KEYCLOAK_DN', []): user_map[ldap_dn] = user['email'] if 'firstName' in user and 'lastName' in user: user_map['%s, %s' % (user['lastName'], user['firstName'])] = user['email'] return user_map