Skip to content
Snippets Groups Projects
keycloak.py 9.15 KiB
Newer Older
import requests
import logging
import cachetools.func
import os
import json
import re
from lofar.sas.tmss.tmss.exceptions import TMSSException
from lofar.sas.tmss.tmss.tmssapp import models
KEYCLOAK_TOKEN_URL = os.environ.get('KEYCLOAK_TOKEN_URL', 'https://keycloak.astron.nl/auth/realms/SDC/protocol/openid-connect/token')
KEYCLOAK_ADMIN_USER = os.environ.get('KEYCLOAK_ADMIN_USER', 'secret')
KEYCLOAK_ADMIN_PASSWORD = os.environ.get('KEYCLOAK_ADMIN_PASSWORD', 'secret')
KEYCLOAK_API_BASE_URL = os.environ.get('KEYCLOAK_API_BASE_URL', 'https://keycloak.astron.nl/auth/admin/realms/SDC')


class KeycloakAdminAPISession(requests.Session):

    @cachetools.func.ttl_cache(ttl=30)
    def get_token(self):
        data = {'username': KEYCLOAK_ADMIN_USER, 'password': KEYCLOAK_ADMIN_PASSWORD, 'grant_type': 'password',
                'client_id': 'admin-cli'}
        response = self.post(url=KEYCLOAK_TOKEN_URL, data=data)  # , headers={'Accept': 'text/plain'})
        if response.status_code == 200:
            response_dict = json.loads(response.content.decode('utf-8'))
            token = response_dict['access_token']
            logger.info('Obtained Keycloak API token')
            return token
        else:
            raise TMSSException('Keycloak admin API token could not be obtained: %s' % response.text)

    def get(self, *args, **kwargs):
        token = self.get_token()
        headers = kwargs.pop('headers', {})
        headers["Authorization"] = "Bearer %s" % token
        response = super().get(*args, headers=headers, **kwargs)
        if response.status_code == 200:
            return json.loads(response.content.decode('utf-8'))
        else:
            raise TMSSException('Keycloak admin API query failed: %s' % response.text)


def get_users_by_role_in_project(role, project):
    """
    returns the list of users that have the specified role in the specified project
    """
    # we fetch all and cache them instead of hitting Keycloak every time this gets called.
    # But we don't care about all the legacy projects, so only request what's known to TMSS.
    # This also works but is a little less efficient (for a relatively small number of projects):
    # project_persons = get_project_persons(include_projects=(project,))
    projects_known_to_tmss = models.Project.objects.values_list('name', flat=True)
    project_persons = get_project_persons(include_projects=tuple(projects_known_to_tmss))
    if project in project_persons:
        return project_persons[project][role]
    else:
        return []
def get_project_persons(include_projects: tuple = None):
    """
    returns a mapping of projects names to a dict that contains the users that
    have a particular role in that project.
    Optionally filter for a list of projects to include in the response.
    """
    project_persons_map = {}
    with KeycloakAdminAPISession() as ksession:
        groups = ksession.get(url='%s/groups/' % KEYCLOAK_API_BASE_URL)
        for group in groups:
            if group['name'] == 'Project':
                projects = group['subGroups']
        for project in projects:
            if include_projects is None or project['name'] in include_projects:
                project_detail = ksession.get(url='%s/groups/%s/' % (KEYCLOAK_API_BASE_URL, project['id']))
                attributes = project_detail.get('attributes', {})
                legacy_role_keys = {'pi': 'lofarProjectPI',
                                    'friend_of_project': 'lofarProjectFriend',
                                    'contact': 'lofarProjectContactauthor'}
                for project_role in models.ProjectRole.Choices:
                    # get role attribute from project:
                    role = project_role.value
                    users = attributes.get(role, [])
                    # fall back to legacy-style attribute:
                    if not users and role in legacy_role_keys:
                        users = attributes.get(legacy_role_keys[role], [])
                    # convert user list (LDAP DNs) to something we can use in TMSS (email).
                    # guess the likely username from LDAP DN (to query a sensible amount of data, unfortunately we cannot filter for LDAP attributes directly)
                    # (Note: this is not exact since the Keycloak filter returns partial matches, but that does not harm us.
                    #        Also, there is a theoretical risk that the Keycloak username differs from the LDAP cn, in
                    #        which case we do not request the required detailed info of that user.
                    #        But since get_user_mapping does match on LDAP DN, the matches we get should always be correct.)
                    usernames = tuple([user.split('cn=')[1].split(',')[0] for user in users if 'cn=' in user])
                    # then request the email addresses of these users and map their LDAP DN to their email address
                    user_map = get_user_mapping(include_usernames=usernames)
                    mapped_users = [user_map[user] for user in users if user in user_map]  # email list of referenced users
                    unmappable_users = [user for user in users if user not in user_map]  # list of references for which no account was found
                    for unmappable_user in unmappable_users:
                        logger.warning("Could not match Keycloak user reference '%s' to a known user." % unmappable_user)
                        if not unmappable_user.startswith('cn='):
                            logger.warning("LOFAR allowed to reference a person by a freeform string instead of a user account. '%s' seems to be such a legacy reference. This needs to be fixed in the identity management." % unmappable_user)
                    project_persons_map.setdefault(project['name'], {})[role] = mapped_users
@cachetools.func.ttl_cache(ttl=600)
def get_user_mapping(include_usernames: tuple = None, include_email: tuple = None):
    """
    returns a mapping of both the string ('Project, Tobitha') or LDAP ('cn=to_project,ou=Users,o=lofartest,c=eu')
    representations of users that are returned by Keycloak to a reference that we can use to identify a user
    in TMSS, i.e. email.
    Optionally filter for email address and/or username since querying all users is very expensive.
    # todo: consider looking up / creating / returning user objects directly (but that generates a ton of lookups and unnecessary users)
    # todo: we need to review that all used references are actually unique, especially Keycloak's reference by string representation does not look safe!
    """
    user_map = {}
    with KeycloakAdminAPISession() as ksession:
        if include_usernames is None and include_email is None:
            users = ksession.get(url='%s/users/?max=99999' % KEYCLOAK_API_BASE_URL)
        else:
            users = []
            for username in include_usernames or []:
                users += (ksession.get(url='%s/users/?username=%s' % (KEYCLOAK_API_BASE_URL, username)))
            for email in include_email or []:
                users += (ksession.get(url='%s/users/?email=%s' % (KEYCLOAK_API_BASE_URL, email)))

            if 'attributes' in user and 'email' in user:
                for ldap_dn in user['attributes'].get('LDAP_ENTRY_DN', []):
                    user_map[ldap_dn] = user['email']
                for ldap_dn in user['attributes'].get('KEYCLOAK_DN', []):
                    user_map[ldap_dn] = user['email']
            if 'firstName' in user and 'lastName' in user and 'email' in user:
                user_map['%s, %s' % (user['lastName'], user['firstName'])] = user['email']

    return user_map
def get_email_to_name_mapping(include_email: tuple = None):
    # first create a reverse map of unique-email to multiple-usernames-for-the-same-person
    reverse_users_map = {}
    for name, email in get_user_mapping(include_email=include_email).items():
        if email not in reverse_users_map:
            reverse_users_map[email] = []
        reverse_users_map[email].append(name)
    email_to_name_map = {}
    for email, names in reverse_users_map.items():
        if len(names) == 1:
            # just one username for this unique email address, so use it
            email_to_name_map[email] = names[0]
        elif len(names) > 1:
            # multiple usernames for this unique email address, so determine the 'best' one
            # filter out the 'ugly' local ldap name
            names2 = [n for n in names if 'cn=' not in n]
            if names2:
                # pick the first not 'ugly' local ldap name
                email_to_name_map[email] = names2[0]
            else:
                # just pick the first, we have to choose something
                email_to_name_map[email] = names[0]
    return email_to_name_map


def get_names_by_role_in_project(role, project):
    """
    returns the list of user display names that have the specified role in the specified project
    """
    emails = get_users_by_role_in_project(role, project)
    map = get_email_to_name_mapping(include_email=tuple(emails))
    return [map.get(email, email) for email in emails]