Skip to content
Snippets Groups Projects
Commit c16b21e6 authored by Jörn Künsemöller's avatar Jörn Künsemöller
Browse files

TMSS-453: Add a simple Keycloak admin API client to query a list of users by role in project

parent f3749ef3
No related branches found
No related tags found
3 merge requests!634WIP: COBALT commissioning delta,!481Draft: SW-971 SW-973 SW-975: Various fixes to build LOFAR correctly.,!476Resolve TMSS-453
......@@ -24,7 +24,7 @@ RUN mkdir -p /opt/oracle && \
unzip instantclient-basic-linux.x64-21.1.0.0.0.zip
ENV LD_LIBRARY_PATH /opt/oracle/instantclient_21_1:$LD_LIBRARY_PATH
RUN pip3 install cython kombu lxml requests pygcn xmljson mysql-connector-python python-dateutil Django==3.0.9 djangorestframework==3.11.1 djangorestframework-xml ldap==1.0.2 flask fabric coverage python-qpid-proton PyGreSQL numpy h5py psycopg2 testing.postgresql Flask-Testing scipy Markdown django-filter python-ldap python-ldap-test ldap3 django-jsonforms django-json-widget django-jsoneditor drf-yasg flex swagger-spec-validator django-auth-ldap mozilla-django-oidc jsonschema comet pyxb==1.2.5 graphviz isodate astropy packaging django-debug-toolbar pymysql astroplan SimpleWebSocketServer websocket_client drf-flex-fields django-property-filter cx_Oracle
RUN pip3 install cython kombu lxml requests pygcn xmljson mysql-connector-python python-dateutil Django==3.0.9 djangorestframework==3.11.1 djangorestframework-xml ldap==1.0.2 flask fabric coverage python-qpid-proton PyGreSQL numpy h5py psycopg2 testing.postgresql Flask-Testing scipy Markdown django-filter python-ldap python-ldap-test ldap3 django-jsonforms django-json-widget django-jsoneditor drf-yasg flex swagger-spec-validator django-auth-ldap mozilla-django-oidc jsonschema comet pyxb==1.2.5 graphviz isodate astropy packaging django-debug-toolbar pymysql astroplan SimpleWebSocketServer websocket_client drf-flex-fields django-property-filter cx_Oracle cachetools
#Viewflow package
RUN pip3 install django-material django-viewflow
......
import requests
import logging
import cachetools.func
import os
import json
import re
logger = logging.Logger(__name__)
KEYCLOAK_TOKEN_URL = os.environ.get('KEYCLOAK_TOKEN_URL', 'https://sdc-dev.astron.nl/auth/realms/master/protocol/openid-connect/token')
KEYCLOAK_ADMIN_USER = os.environ.get('KEYCLOAK_ADMIN_USER', 'secret')
KEYCLOAK_ADMIN_PASSWORD = os.environ.get('KEYCLOAK_ADMIN_PASSWORD', 'secret')
KEYCLOAK_API_BASE_URL = os.environ.get('KEYCLOAK_API_BASE_URL', 'https://sdc-dev.astron.nl/auth/admin/realms/master')
class KeycloakAdminAPISession(requests.Session):
@cachetools.func.ttl_cache(ttl=30)
def get_token(self):
data = {'username': KEYCLOAK_ADMIN_USER, 'password': KEYCLOAK_ADMIN_PASSWORD, 'grant_type': 'password',
'client_id': 'admin-cli'}
response = self.post(url=KEYCLOAK_TOKEN_URL, data=data) # , headers={'Accept': 'text/plain'})
if response.status_code == 200:
response_dict = json.loads(response.content.decode('utf-8'))
token = response_dict['access_token']
logger.info('Obtained Keycloak API token')
return token
else:
raise Exception('Keycloak admin API token could not be obtained: %s' % response.text)
def get(self, *args, **kwargs):
token = self.get_token()
headers = kwargs.pop('headers', {})
headers["Authorization"] = "Bearer %s" % token
response = super().get(*args, headers=headers, **kwargs)
if response.status_code == 200:
return json.loads(response.content.decode('utf-8'))
else:
raise Exception('Keycloak admin API query failed: %s' % response.text)
def get_users_by_role_in_project(role, project):
"""
returns the list of users that have the specified role in the specified project
"""
project_persons = get_project_persons()
return project_persons[project][role]
@cachetools.func.ttl_cache(ttl=600)
def get_project_persons():
"""
returns a mapping of projects names to a dict that contains the users that
have a particular role in that project.
"""
project_persons_map = {}
with KeycloakAdminAPISession() as ksession:
groups = ksession.get(url='%s/groups/' % KEYCLOAK_API_BASE_URL)
for group in groups:
if group['name'] == 'Project':
projects = group['subGroups']
for project in projects:
project_detail = ksession.get(url='%s/groups/%s/' % (KEYCLOAK_API_BASE_URL, project['id']))
attributes = project_detail.get('attributes', {})
role_keys = {'pi': 'lofarProjectPI',
'friend_of_project': 'lofarProjectFriend',
'contact': 'lofarProjectContactauthor'}
for role, key in role_keys.items():
users = attributes.get(key, [])
# convert user list to something we can use in TMSS
user_map = get_user_mapping()
# todo: find a way to replicate the exact string representation Keycloak uses (where to get the title from?), instead of the following unsafe hack
unmappable_users = [user for user in users if user not in user_map]
mapped_users = [user_map[user] for user in users if user in user_map]
for unmappable_user in unmappable_users:
unmappable_user_fixed = re.sub('Dr.', '', unmappable_user)
unmappable_user_fixed = re.sub('Prof.', '', unmappable_user_fixed)
unmappable_user_fixed = re.sub('ir.', '', unmappable_user_fixed)
unmappable_user_fixed = re.sub('Ir.', '', unmappable_user_fixed)
unmappable_user_fixed = re.sub('apl.', '', unmappable_user_fixed)
unmappable_user_fixed = re.sub(' +', ' ', unmappable_user_fixed)
if unmappable_user_fixed in user_map:
users.append(user_map[unmappable_user_fixed])
else:
logger.warning("Could not match Keycloak user reference '%s' to a known user" % unmappable_user)
project_persons_map.setdefault(project['name'], {})[role] = mapped_users
return project_persons_map
@cachetools.func.ttl_cache(ttl=600)
def get_user_mapping():
"""
returns a mapping of both the string ('Project, Tobitha') or LDAP ('cn=to_project,ou=Users,o=lofartest,c=eu')
representations of users that are returned by Keycloak to a reference that we can use to identify a user
in TMSS, i.e. email.
# todo: consider looking up / creating / returning user objects directly (but that generates a ton of lookups and unnecessary users)
# todo: we need to review that all used references are actually unique, especially Keycloak's reference by string representation does not look safe!
"""
user_map = {}
with KeycloakAdminAPISession() as ksession:
users = ksession.get(url='%s/users/' % KEYCLOAK_API_BASE_URL)
# pprint.pprint(ksession.get(url='%s/groups/70b064d6-44a1-48be-bc39-98f8f5cd3d7e/' % KEYCLOAK_API_BASE_URL))
for user in users:
if 'attributes' in user:
for ldap_dn in user['attributes'].get('LDAP_ENTRY_DN', []):
user_map[ldap_dn] = user['email']
for ldap_dn in user['attributes'].get('KEYCLOAK_DN', []):
user_map[ldap_dn] = user['email']
if 'firstName' in user and 'lastName' in user:
user_map['%s, %s' % (user['lastName'], user['firstName'])] = user['email']
return user_map
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment