import requests import logging import cachetools.func import os import json import re from lofar.sas.tmss.tmss.exceptions import TMSSException from lofar.sas.tmss.tmss.tmssapp import models logger = logging.Logger(__name__) KEYCLOAK_TOKEN_URL = os.environ.get('KEYCLOAK_TOKEN_URL', 'https://keycloak.astron.nl/auth/realms/SDC/protocol/openid-connect/token') KEYCLOAK_ADMIN_USER = os.environ.get('KEYCLOAK_ADMIN_USER', 'secret') KEYCLOAK_ADMIN_PASSWORD = os.environ.get('KEYCLOAK_ADMIN_PASSWORD', 'secret') KEYCLOAK_API_BASE_URL = os.environ.get('KEYCLOAK_API_BASE_URL', 'https://keycloak.astron.nl/auth/admin/realms/SDC') class KeycloakAdminAPISession(requests.Session): @cachetools.func.ttl_cache(ttl=30) def get_token(self): data = {'username': KEYCLOAK_ADMIN_USER, 'password': KEYCLOAK_ADMIN_PASSWORD, 'grant_type': 'password', 'client_id': 'admin-cli'} response = self.post(url=KEYCLOAK_TOKEN_URL, data=data) # , headers={'Accept': 'text/plain'}) if response.status_code == 200: response_dict = json.loads(response.content.decode('utf-8')) token = response_dict['access_token'] logger.info('Obtained Keycloak API token') return token else: raise TMSSException('Keycloak admin API token could not be obtained: %s' % response.text) def get(self, *args, **kwargs): token = self.get_token() headers = kwargs.pop('headers', {}) headers["Authorization"] = "Bearer %s" % token response = super().get(*args, headers=headers, **kwargs) if response.status_code == 200: return json.loads(response.content.decode('utf-8')) else: raise TMSSException('Keycloak admin API query failed: %s' % response.text) def get_users_by_role_in_project(role, project): """ returns the list of users that have the specified role in the specified project """ # we fetch all and cache them instead of hitting Keycloak every time this gets called. # But we don't care about all the legacy projects, so only request what's known to TMSS. # This also works but is a little less efficient (for a relatively small number of projects): # project_persons = get_project_persons(include_projects=(project,)) projects_known_to_tmss = models.Project.objects.values_list('name', flat=True) project_persons = get_project_persons(include_projects=tuple(projects_known_to_tmss)) if project in project_persons: return project_persons[project][role] else: return [] @cachetools.func.ttl_cache(ttl=600) def get_project_persons(include_projects: tuple = None): """ returns a mapping of projects names to a dict that contains the users that have a particular role in that project. Optionally filter for a list of projects to include in the response. """ project_persons_map = {} with KeycloakAdminAPISession() as ksession: groups = ksession.get(url='%s/groups/' % KEYCLOAK_API_BASE_URL) for group in groups: if group['name'] == 'Project': projects = group['subGroups'] for project in projects: if include_projects is None or project['name'] in include_projects: project_detail = ksession.get(url='%s/groups/%s/' % (KEYCLOAK_API_BASE_URL, project['id'])) attributes = project_detail.get('attributes', {}) legacy_role_keys = {'pi': 'lofarProjectPI', 'friend_of_project': 'lofarProjectFriend', 'contact': 'lofarProjectContactauthor'} for project_role in models.ProjectRole.Choices: # get role attribute from project: role = project_role.value users = attributes.get(role, []) # fall back to legacy-style attribute: if not users and role in legacy_role_keys: users = attributes.get(legacy_role_keys[role], []) # convert user list (LDAP DNs) to something we can use in TMSS (email). # guess the likely username from LDAP DN (to query a sensible amount of data, unfortunately we cannot filter for LDAP attributes directly) # (Note: this is not exact since the Keycloak filter returns partial matches, but that does not harm us. # Also, there is a theoretical risk that the Keycloak username differs from the LDAP cn, in # which case we do not request the required detailed info of that user. # But since get_user_mapping does match on LDAP DN, the matches we get should always be correct.) usernames = tuple([user.split('cn=')[1].split(',')[0] for user in users if 'cn=' in user]) # then request the email addresses of these users and map their LDAP DN to their email address user_map = get_user_mapping(include_usernames=usernames) mapped_users = [user_map[user] for user in users if user in user_map] # email list of referenced users unmappable_users = [user for user in users if user not in user_map] # list of references for which no account was found for unmappable_user in unmappable_users: logger.warning("Could not match Keycloak user reference '%s' to a known user." % unmappable_user) if not unmappable_user.startswith('cn='): logger.warning("LOFAR allowed to reference a person by a freeform string instead of a user account. '%s' seems to be such a legacy reference. This needs to be fixed in the identity management." % unmappable_user) project_persons_map.setdefault(project['name'], {})[role] = mapped_users return project_persons_map @cachetools.func.ttl_cache(ttl=600) def get_user_mapping(include_usernames: tuple = None, include_email: tuple = None): """ returns a mapping of both the string ('Project, Tobitha') or LDAP ('cn=to_project,ou=Users,o=lofartest,c=eu') representations of users that are returned by Keycloak to a reference that we can use to identify a user in TMSS, i.e. email. Optionally filter for email address and/or username since querying all users is very expensive. # todo: consider looking up / creating / returning user objects directly (but that generates a ton of lookups and unnecessary users) # todo: we need to review that all used references are actually unique, especially Keycloak's reference by string representation does not look safe! """ user_map = {} with KeycloakAdminAPISession() as ksession: if include_usernames is None and include_email is None: users = ksession.get(url='%s/users/?max=99999' % KEYCLOAK_API_BASE_URL) else: users = [] for username in include_usernames or []: users += (ksession.get(url='%s/users/?username=%s' % (KEYCLOAK_API_BASE_URL, username))) for email in include_email or []: users += (ksession.get(url='%s/users/?email=%s' % (KEYCLOAK_API_BASE_URL, email))) for user in users: if 'attributes' in user and 'email' in user: for ldap_dn in user['attributes'].get('LDAP_ENTRY_DN', []): user_map[ldap_dn] = user['email'] for ldap_dn in user['attributes'].get('KEYCLOAK_DN', []): user_map[ldap_dn] = user['email'] if 'firstName' in user and 'lastName' in user and 'email' in user: user_map['%s, %s' % (user['lastName'], user['firstName'])] = user['email'] return user_map @cachetools.func.ttl_cache(ttl=600) def get_email_to_name_mapping(include_email: tuple = None): # first create a reverse map of unique-email to multiple-usernames-for-the-same-person reverse_users_map = {} for name, email in get_user_mapping(include_email=include_email).items(): if email not in reverse_users_map: reverse_users_map[email] = [] reverse_users_map[email].append(name) email_to_name_map = {} for email, names in reverse_users_map.items(): if len(names) == 1: # just one username for this unique email address, so use it email_to_name_map[email] = names[0] elif len(names) > 1: # multiple usernames for this unique email address, so determine the 'best' one # filter out the 'ugly' local ldap name names2 = [n for n in names if 'cn=' not in n] if names2: # pick the first not 'ugly' local ldap name email_to_name_map[email] = names2[0] else: # just pick the first, we have to choose something email_to_name_map[email] = names[0] return email_to_name_map def get_names_by_role_in_project(role, project): """ returns the list of user display names that have the specified role in the specified project """ emails = get_users_by_role_in_project(role, project) map = get_email_to_name_mapping(include_email=tuple(emails)) return [map.get(email, email) for email in emails]