Skip to content
Snippets Groups Projects
keycloak.py 6.91 KiB
Newer Older
import requests
import logging
import cachetools.func
import os
import json
import re
from lofar.sas.tmss.tmss.exceptions import TMSSException
from lofar.sas.tmss.tmss.tmssapp import models
logger = logging.Logger(__name__)

KEYCLOAK_TOKEN_URL = os.environ.get('KEYCLOAK_TOKEN_URL', 'https://sdc-dev.astron.nl/auth/realms/master/protocol/openid-connect/token')
KEYCLOAK_ADMIN_USER = os.environ.get('KEYCLOAK_ADMIN_USER', 'secret')
KEYCLOAK_ADMIN_PASSWORD = os.environ.get('KEYCLOAK_ADMIN_PASSWORD', 'secret')
KEYCLOAK_API_BASE_URL = os.environ.get('KEYCLOAK_API_BASE_URL', 'https://sdc-dev.astron.nl/auth/admin/realms/master')


class KeycloakAdminAPISession(requests.Session):

    @cachetools.func.ttl_cache(ttl=30)
    def get_token(self):
        data = {'username': KEYCLOAK_ADMIN_USER, 'password': KEYCLOAK_ADMIN_PASSWORD, 'grant_type': 'password',
                'client_id': 'admin-cli'}
        response = self.post(url=KEYCLOAK_TOKEN_URL, data=data)  # , headers={'Accept': 'text/plain'})
        if response.status_code == 200:
            response_dict = json.loads(response.content.decode('utf-8'))
            token = response_dict['access_token']
            logger.info('Obtained Keycloak API token')
            return token
        else:
            raise TMSSException('Keycloak admin API token could not be obtained: %s' % response.text)

    def get(self, *args, **kwargs):
        token = self.get_token()
        headers = kwargs.pop('headers', {})
        headers["Authorization"] = "Bearer %s" % token
        response = super().get(*args, headers=headers, **kwargs)
        if response.status_code == 200:
            return json.loads(response.content.decode('utf-8'))
        else:
            raise TMSSException('Keycloak admin API query failed: %s' % response.text)


def get_users_by_role_in_project(role, project):
    """
    returns the list of users that have the specified role in the specified project
    """
    project_persons = get_project_persons()
    if project in project_persons:
        return project_persons[project][role]
    else:
        []


@cachetools.func.ttl_cache(ttl=600)
def get_project_persons():
    """
    returns a mapping of projects names to a dict that contains the users that
    have a particular role in that project.
    """
    project_persons_map = {}
    with KeycloakAdminAPISession() as ksession:
        groups = ksession.get(url='%s/groups/' % KEYCLOAK_API_BASE_URL)
        for group in groups:
            if group['name'] == 'Project':
                projects = group['subGroups']
        for project in projects:
            project_detail = ksession.get(url='%s/groups/%s/' % (KEYCLOAK_API_BASE_URL, project['id']))
            attributes = project_detail.get('attributes', {})

            legacy_role_keys = {'pi': 'lofarProjectPI',
                                'friend_of_project': 'lofarProjectFriend',
                                'contact': 'lofarProjectContactauthor'}

            for project_role in models.ProjectRole.Choices:
                # get role attribute from project:
                role = project_role.value
                users = attributes.get(role, [])
                # fall back to legacy-style attribute:
                if not users and role in legacy_role_keys:
                    users = attributes.get(legacy_role_keys[role], [])

                # convert user list (LDAP DNs) to something we can use in TMSS (email)
                mapped_users = [user_map[user] for user in users if user in user_map]  # email list of referenced users
                unmappable_users = [user for user in users if user not in user_map]  # list of references for which no account was found
                    # Note: Usually Keycloak should return DN references to user accounts. For PI's, someone had the
                    # great idea to allow to specify a freeform string instead, to refer to people who may or may not
                    # have an account. Even if the person has a user account, there is no way to replicate the exact
                    # string 'representation' Keycloak returns, since the string may contain typos, or info that is not
                    # stored in the user accounts (like titles).
                    # The following unsafe hack tries to determine whether there is a user account that matches the
                    # name given in the string (ignore titles since they are not part of the user account):
                    # unmappable_user_fixed = re.sub('Dr\.', '', unmappable_user)
                    # unmappable_user_fixed = re.sub('Prof\.', '', unmappable_user_fixed)
                    # unmappable_user_fixed = re.sub('ir\.', '', unmappable_user_fixed)
                    # unmappable_user_fixed = re.sub('Ir\.', '', unmappable_user_fixed)
                    # unmappable_user_fixed = re.sub('apl\.', '', unmappable_user_fixed)
                    # unmappable_user_fixed = re.sub(' +', ' ', unmappable_user_fixed)
                    #
                    # if unmappable_user_fixed in user_map:
                    #     mapped_users.append(user_map[unmappable_user_fixed])
                    # else:
                        logger.warning("Could not match Keycloak user reference '%s' to a known user." % unmappable_user)
                        if not unmappable_user.startswith('cn='):
                            logger.warning("LOFAR allowed to reference a person by a freeform string instead of a user account. '%s' seems to be such a legacy reference. This needs to be fixed in the identity management." % unmappable_user)
                project_persons_map.setdefault(project['name'], {})[role] = mapped_users

    return project_persons_map


@cachetools.func.ttl_cache(ttl=600)
def get_user_mapping():
    """
    returns a mapping of both the string ('Project, Tobitha') or LDAP ('cn=to_project,ou=Users,o=lofartest,c=eu')
    representations of users that are returned by Keycloak to a reference that we can use to identify a user
    in TMSS, i.e. email.
    # todo: consider looking up / creating / returning user objects directly (but that generates a ton of lookups and unnecessary users)
    # todo: we need to review that all used references are actually unique, especially Keycloak's reference by string representation does not look safe!
    """
    user_map = {}
    with KeycloakAdminAPISession() as ksession:
        users = ksession.get(url='%s/users/' % KEYCLOAK_API_BASE_URL)
        for user in users:
            if 'attributes' in user:
                for ldap_dn in user['attributes'].get('LDAP_ENTRY_DN', []):
                    user_map[ldap_dn] = user['email']
                for ldap_dn in user['attributes'].get('KEYCLOAK_DN', []):
                    user_map[ldap_dn] = user['email']
            if 'firstName' in user and 'lastName' in user:
                user_map['%s, %s' % (user['lastName'], user['firstName'])] = user['email']

    return user_map