Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ldap_test_service.py 8.98 KiB
import os
import logging
logger = logging.getLogger(__name__)

logging_already_configured = len(logging.root.handlers)>0

from ldap_test import LdapServer
from ldap_test.server import DEFAULT_GATEWAY_PORT, DEFAULT_PYTHON_PROXY_PORT
from py4j.java_gateway import Py4JNetworkError
from datetime import datetime, timedelta

if not logging_already_configured:
    # the 3rd party ldap_test module erroneously does a logging.basicConfig upon module import...
    # logging.basicConfig should only happen in 'main'-like functions.
    # so, undo the basicConfig by removing the root handlers.
    # See: https://docs.python.org/3/library/logging.html#logging.basicConfig
    for h in logging.root.handlers:
        logging.root.removeHandler(h)

from optparse import OptionParser

from lofar.common.util import waitForInterrupt, find_free_port
from lofar.common.testing.dbcredentials import TemporaryCredentials
from lofar.common.locking import NamedAtomicLock

class TestLDAPServer():
    ''' A helper class which instantiates a running LDAP server (not interfering with any other test/production LDAP servers)
    Best used in a 'with'-context so the server is stoped automagically.
    '''
    _named_lock = NamedAtomicLock('TestLDAPServer')

    def __init__(self, user: str = 'test', password: str = 'test') -> None:
        self._tmp_creds = TemporaryCredentials(user=user, password=password)
        self._server = None

    def __enter__(self):
        '''create/instantiate the LDAP server'''
        try:
            self.start()
        except Exception as e:
            logger.error(e)
            self.stop()
            raise
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        '''stop the running LDAP server'''
        self.stop()

    @property
    def dbcreds_id(self):
        # return 'tmss_ldap_test'
        return self._tmp_creds.dbcreds_id

    @property
    def dbcreds(self):
        return self._tmp_creds.dbcreds

    def start(self):
        '''instantiate the isolated postgres server'''
        logger.info('creating test-LDAP instance...')

        with self._named_lock:
            self._tmp_creds.dbcreds.type = 'LDAP'
            self._tmp_creds.dbcreds.host = '127.0.0.1'
            self._tmp_creds.dbcreds.port = find_free_port()
            self._tmp_creds.create()

            logger.info("Using dbcreds '%s' to start and configure LDAP server: %s",
                        self.dbcreds_id, self.dbcreds.stringWithHiddenPassword())

            start_time = datetime.utcnow()
            while datetime.utcnow()-start_time < timedelta(minutes=1):
                try:
                    self._server = LdapServer(java_gateway_port=find_free_port(DEFAULT_GATEWAY_PORT),
                                              python_proxy_port=find_free_port(DEFAULT_PYTHON_PROXY_PORT),
                                              config={'port': self.dbcreds.port,
                                               'base': {'objectclass': ['domain'],
                                                        'dn': 'o=lofar,c=eu',
                                                        'attributes': {'o': 'lofar'}},
                                               'entries': [
                                                   {'objectclass': 'organizationUnit',
                                                    'dn': 'ou=Users,o=lofar,c=eu',
                                                    'attributes': {'ou': 'Users'}},
                                                   {'objectclass': 'lofarPerson',
                                                    'dn': 'cn=paulus,ou=users,o=lofar,c=eu',
                                                    'attributes': {'cn': 'paulus',
                                                                   'userPassword': 'pauluspass',
                                                                   'mail': 'paulus@boskabouter.nl',
                                                                   'givenName': 'Paulus',
                                                                   'sn': 'Boskabouter',
                                                                   'lofarPersonSystemrole': 'cn=support,ou=Roles,o=lofar,c=eu'}},
                                                   {'objectclass': 'lofarPerson',
                                                    'dn': 'cn=paula,ou=users,o=lofar,c=eu',
                                                    'attributes': {'cn': 'paula',
                                                                   'userPassword': 'paulapass',
                                                                   'mail': 'paulus@boskabouter.nl',
                                                                   'givenName': 'Paulus',
                                                                   'sn': 'Boskabouter',
                                                                   'lofarPersonSystemrole': 'cn=user,ou=Roles,o=lofar,c=eu'}},
                                                   {'objectclass': 'lofarPerson',
                                                    'dn': 'cn=%s,ou=users,o=lofar,c=eu' % self.dbcreds.user,
                                                    'attributes': {'cn': self.dbcreds.user,
                                                                   'userPassword': self.dbcreds.password,
                                                                   'mail': '%s@lofar.test' % self.dbcreds.user,
                                                                   'givenName': self.dbcreds.user,
                                                                   'sn': 'lofar_test'}},
                                                   {'objectclass': 'organizationUnit',
                                                    'dn': 'ou=Roles,o=lofar,c=eu',
                                                    'attributes': {'ou': 'Roles'}},
                                                   {'objectclass': 'lofarSystemrole',
                                                    'dn': 'cn=user,ou=roles,o=lofar,c=eu',
                                                    'attributes': {'cn': 'user'}},
                                                   {'objectclass': 'lofarSystemrole',
                                                    'dn': 'cn=support,ou=roles,o=lofar,c=eu',
                                                    'attributes': {'cn': 'support'}},
                                               ]
                                               })

                    self._server.start()
                    os.environ["TMSS_LDAPCREDENTIALS"] = self.dbcreds_id
                    logger.info('LDAP server running and listening on port %s...', self.dbcreds.port)
                    logger.info('LDAP test user/pass: %s %s...', self.dbcreds.user, self.dbcreds.password)
                    return
                except Py4JNetworkError as e:
                    logger.warning("TestLDAPServer could not be started, retrying with next free port. Error: %s", e)
            raise TimeoutError("%s could not be started within 60 seconds. bailing out..." % self.__class__.__name__)

    def stop(self):
        '''stop the running postgres server'''
        try:
            if self._server:
                logger.debug('stopping LDAP server (%s) ...', self.dbcreds.stringWithHiddenPassword())
                self._server.stop()
                self._server = None
                logger.info('LDAP server stopped (%s)', self.dbcreds.stringWithHiddenPassword())
        except Exception as e:
            logger.error('error while removing LDAP Server at %s: %s', self.dbcreds.stringWithHiddenPassword(), e)

        self._tmp_creds.destroy()

    def dump_ldap(self):
        import ldap3
        logger.info('Dumping LDAP contents...')
        srv = ldap3.Server(self.dbcreds.host, port=self.dbcreds.port)
        with ldap3.Connection(srv, auto_bind=True) as connection:
            connection.search(search_base='o=lofar,c=eu', search_filter='(objectclass=*)')
            logger.info(connection.response)

def main():
    """
    Start an isolated LDAP server it alive until interrupted by Ctrl-C.
    It will save its config in a database credentials found in ~/.lofar/dbcredentials
    for the name found in environment variable TMSS_DBCREDENTIALS, or 'tmss' if the env var is not set.
    See also settings.py for the lsrm django setup
    """
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)

    parser = OptionParser('%prog [options]',
                          description='run a test ldap server')
    (options, args) = parser.parse_args()


    # start and run the test ldap server.
    with TestLDAPServer() as server:
        print()
        print("Test-LDAP-Server up and running.")
        print("LDAP Credentials ID: %s (for example to run tmms against this ldapserver, call 'tmss -L %s')" % (server.dbcreds_id, server.dbcreds_id))

        print()
        print("Press Ctrl-C to exit (and remove the test LDAP server automatically...)")
        waitForInterrupt()


if __name__ == "__main__":
    main()