Newer
Older
Jörn Künsemöller
committed
import requests
import logging
import cachetools.func
import os
import json
import re
from lofar.sas.tmss.tmss.exceptions import TMSSException
Jörn Künsemöller
committed
from lofar.sas.tmss.tmss.tmssapp import models
Jörn Künsemöller
committed
logger = logging.Logger(__name__)
KEYCLOAK_TOKEN_URL = os.environ.get('KEYCLOAK_TOKEN_URL', 'https://keycloak.astron.nl/auth/realms/SDC/protocol/openid-connect/token')
Jörn Künsemöller
committed
KEYCLOAK_ADMIN_USER = os.environ.get('KEYCLOAK_ADMIN_USER', 'secret')
KEYCLOAK_ADMIN_PASSWORD = os.environ.get('KEYCLOAK_ADMIN_PASSWORD', 'secret')
KEYCLOAK_API_BASE_URL = os.environ.get('KEYCLOAK_API_BASE_URL', 'https://keycloak.astron.nl/auth/admin/realms/SDC')
Jörn Künsemöller
committed
class KeycloakAdminAPISession(requests.Session):
@cachetools.func.ttl_cache(ttl=30)
def get_token(self):
data = {'username': KEYCLOAK_ADMIN_USER, 'password': KEYCLOAK_ADMIN_PASSWORD, 'grant_type': 'password',
'client_id': 'admin-cli'}
response = self.post(url=KEYCLOAK_TOKEN_URL, data=data) # , headers={'Accept': 'text/plain'})
if response.status_code == 200:
response_dict = json.loads(response.content.decode('utf-8'))
token = response_dict['access_token']
logger.info('Obtained Keycloak API token')
return token
else:
raise TMSSException('Keycloak admin API token could not be obtained: %s' % response.text)
Jörn Künsemöller
committed
def get(self, *args, **kwargs):
token = self.get_token()
headers = kwargs.pop('headers', {})
headers["Authorization"] = "Bearer %s" % token
response = super().get(*args, headers=headers, **kwargs)
if response.status_code == 200:
return json.loads(response.content.decode('utf-8'))
else:
raise TMSSException('Keycloak admin API query failed: %s' % response.text)
Jörn Künsemöller
committed
def get_users_by_role_in_project(role, project):
"""
returns the list of users that have the specified role in the specified project
"""
Jörn Künsemöller
committed
# we fetch all and cache them instead of hitting Keycloak every time this gets called.
# But we don't care about all the legacy projects, so only request what's known to TMSS.
# This also works but is a little less efficient (for a relatively small number of projects):
# project_persons = get_project_persons(include_projects=(project,))
projects_known_to_tmss = models.Project.objects.values_list('name', flat=True)
project_persons = get_project_persons(include_projects=tuple(projects_known_to_tmss))
Jörn Künsemöller
committed
if project in project_persons:
return project_persons[project][role]
else:
return []
Jörn Künsemöller
committed
@cachetools.func.ttl_cache(ttl=600)
Jörn Künsemöller
committed
def get_project_persons(include_projects: tuple = None):
Jörn Künsemöller
committed
"""
returns a mapping of projects names to a dict that contains the users that
have a particular role in that project.
Jörn Künsemöller
committed
Optionally filter for a list of projects to include in the response.
Jörn Künsemöller
committed
"""
project_persons_map = {}
with KeycloakAdminAPISession() as ksession:
groups = ksession.get(url='%s/groups/' % KEYCLOAK_API_BASE_URL)
for group in groups:
if group['name'] == 'Project':
projects = group['subGroups']
for project in projects:
Jörn Künsemöller
committed
if include_projects is None or project['name'] in include_projects:
project_detail = ksession.get(url='%s/groups/%s/' % (KEYCLOAK_API_BASE_URL, project['id']))
attributes = project_detail.get('attributes', {})
Jörn Künsemöller
committed
Jörn Künsemöller
committed
legacy_role_keys = {'pi': 'lofarProjectPI',
'friend_of_project': 'lofarProjectFriend',
'contact': 'lofarProjectContactauthor'}
Jörn Künsemöller
committed
Jörn Künsemöller
committed
for project_role in models.ProjectRole.Choices:
# get role attribute from project:
role = project_role.value
users = attributes.get(role, [])
# fall back to legacy-style attribute:
if not users and role in legacy_role_keys:
users = attributes.get(legacy_role_keys[role], [])
Jörn Künsemöller
committed
Jörn Künsemöller
committed
# convert user list (LDAP DNs) to something we can use in TMSS (email).
# guess the likely username from LDAP DN (to query a sensible amount of data, unfortunately we cannot filter for LDAP attributes directly)
# (Note: this is not exact since the Keycloak filter returns partial matches, but that does not harm us.
# Also, there is a theoretical risk that the Keycloak username differs from the LDAP cn, in
# which case we do not request the required detailed info of that user.
# But since get_user_mapping does match on LDAP DN, the matches we get should always be correct.)
usernames = tuple([user.split('cn=')[1].split(',')[0] for user in users if 'cn=' in user])
# then request the email addresses of these users and map their LDAP DN to their email address
user_map = get_user_mapping(include_usernames=usernames)
Jörn Künsemöller
committed
mapped_users = [user_map[user] for user in users if user in user_map] # email list of referenced users
unmappable_users = [user for user in users if user not in user_map] # list of references for which no account was found
for unmappable_user in unmappable_users:
Jörn Künsemöller
committed
logger.warning("Could not match Keycloak user reference '%s' to a known user." % unmappable_user)
if not unmappable_user.startswith('cn='):
logger.warning("LOFAR allowed to reference a person by a freeform string instead of a user account. '%s' seems to be such a legacy reference. This needs to be fixed in the identity management." % unmappable_user)
Jörn Künsemöller
committed
project_persons_map.setdefault(project['name'], {})[role] = mapped_users
Jörn Künsemöller
committed
return project_persons_map
Jörn Künsemöller
committed
@cachetools.func.ttl_cache(ttl=600)
def get_user_mapping(include_usernames: tuple = None, include_email: tuple = None):
Jörn Künsemöller
committed
"""
returns a mapping of both the string ('Project, Tobitha') or LDAP ('cn=to_project,ou=Users,o=lofartest,c=eu')
representations of users that are returned by Keycloak to a reference that we can use to identify a user
in TMSS, i.e. email.
Jörn Künsemöller
committed
Optionally filter for email address and/or username since querying all users is very expensive.
Jörn Künsemöller
committed
# todo: consider looking up / creating / returning user objects directly (but that generates a ton of lookups and unnecessary users)
# todo: we need to review that all used references are actually unique, especially Keycloak's reference by string representation does not look safe!
"""
user_map = {}
with KeycloakAdminAPISession() as ksession:
Jörn Künsemöller
committed
if include_usernames is None and include_email is None:
users = ksession.get(url='%s/users/?max=99999' % KEYCLOAK_API_BASE_URL)
else:
users = []
for username in include_usernames or []:
users += (ksession.get(url='%s/users/?username=%s' % (KEYCLOAK_API_BASE_URL, username)))
for email in include_email or []:
users += (ksession.get(url='%s/users/?email=%s' % (KEYCLOAK_API_BASE_URL, email)))
Jörn Künsemöller
committed
for user in users:
if 'attributes' in user and 'email' in user:
Jörn Künsemöller
committed
for ldap_dn in user['attributes'].get('LDAP_ENTRY_DN', []):
user_map[ldap_dn] = user['email']
for ldap_dn in user['attributes'].get('KEYCLOAK_DN', []):
user_map[ldap_dn] = user['email']
if 'firstName' in user and 'lastName' in user and 'email' in user:
Jörn Künsemöller
committed
user_map['%s, %s' % (user['lastName'], user['firstName'])] = user['email']
@cachetools.func.ttl_cache(ttl=600)
def get_email_to_name_mapping(include_email: tuple = None):
# first create a reverse map of unique-email to multiple-usernames-for-the-same-person
reverse_users_map = {}
for name, email in get_user_mapping(include_email=include_email).items():
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
if email not in reverse_users_map:
reverse_users_map[email] = []
reverse_users_map[email].append(name)
email_to_name_map = {}
for email, names in reverse_users_map.items():
if len(names) == 1:
# just one username for this unique email address, so use it
email_to_name_map[email] = names[0]
elif len(names) > 1:
# multiple usernames for this unique email address, so determine the 'best' one
# filter out the 'ugly' local ldap name
names2 = [n for n in names if 'cn=' not in n]
if names2:
# pick the first not 'ugly' local ldap name
email_to_name_map[email] = names2[0]
else:
# just pick the first, we have to choose something
email_to_name_map[email] = names[0]
return email_to_name_map
def get_names_by_role_in_project(role, project):
"""
returns the list of user display names that have the specified role in the specified project
"""
emails = get_users_by_role_in_project(role, project)
map = get_email_to_name_mapping(include_email=tuple(emails))
return [map.get(email, email) for email in emails]