import os import logging logger = logging.getLogger(__name__) logging_already_configured = len(logging.root.handlers)>0 from ldap_test import LdapServer from ldap_test.server import DEFAULT_GATEWAY_PORT, DEFAULT_PYTHON_PROXY_PORT from py4j.java_gateway import Py4JNetworkError from datetime import datetime, timedelta if not logging_already_configured: # the 3rd party ldap_test module erroneously does a logging.basicConfig upon module import... # logging.basicConfig should only happen in 'main'-like functions. # so, undo the basicConfig by removing the root handlers. # See: https://docs.python.org/3/library/logging.html#logging.basicConfig for h in logging.root.handlers: logging.root.removeHandler(h) from optparse import OptionParser from lofar.common.util import waitForInterrupt, find_free_port from lofar.common.testing.dbcredentials import TemporaryCredentials from lofar.common.locking import NamedAtomicLock class TestLDAPServer(): ''' A helper class which instantiates a running LDAP server (not interfering with any other test/production LDAP servers) Best used in a 'with'-context so the server is stoped automagically. ''' _named_lock = NamedAtomicLock('TestLDAPServer') def __init__(self, user: str = 'test', password: str = 'test', dbcreds_id: str=None) -> None: self._tmp_creds = TemporaryCredentials(user=user, password=password, dbcreds_id=dbcreds_id) self._server = None def __enter__(self): '''create/instantiate the LDAP server''' try: self.start() except Exception as e: logger.error(e) self.stop() raise return self def __exit__(self, exc_type, exc_val, exc_tb): '''stop the running LDAP server''' self.stop() @property def dbcreds_id(self): # return 'tmss_ldap_test' return self._tmp_creds.dbcreds_id @property def dbcreds(self): return self._tmp_creds.dbcreds def start(self): '''instantiate the isolated postgres server''' logger.info('creating test-LDAP instance...') with self._named_lock: self._tmp_creds.dbcreds.type = 'LDAP' self._tmp_creds.dbcreds.host = '127.0.0.1' self._tmp_creds.dbcreds.port = find_free_port() self._tmp_creds.create() logger.info("Using dbcreds '%s' to start and configure LDAP server: %s", self.dbcreds_id, self.dbcreds.stringWithHiddenPassword()) start_time = datetime.utcnow() while datetime.utcnow()-start_time < timedelta(minutes=1): try: self._server = LdapServer(java_gateway_port=find_free_port(DEFAULT_GATEWAY_PORT), python_proxy_port=find_free_port(DEFAULT_PYTHON_PROXY_PORT), config={'port': self.dbcreds.port, 'base': {'objectclass': ['domain'], 'dn': 'o=lofar,c=eu', 'attributes': {'o': 'lofar'}}, 'entries': [ {'objectclass': 'organizationUnit', 'dn': 'ou=Users,o=lofar,c=eu', 'attributes': {'ou': 'Users'}}, {'objectclass': 'lofarPerson', 'dn': 'cn=paulus,ou=users,o=lofar,c=eu', 'attributes': {'cn': 'paulus', 'userPassword': 'pauluspass', 'mail': 'paulus@boskabouter.nl', 'givenName': 'Paulus', 'sn': 'Boskabouter', 'lofarPersonSystemrole': 'cn=support,ou=Roles,o=lofar,c=eu'}}, {'objectclass': 'lofarPerson', 'dn': 'cn=paula,ou=users,o=lofar,c=eu', 'attributes': {'cn': 'paula', 'userPassword': 'paulapass', 'mail': 'paulus@boskabouter.nl', 'givenName': 'Paulus', 'sn': 'Boskabouter', 'lofarPersonSystemrole': 'cn=user,ou=Roles,o=lofar,c=eu'}}, {'objectclass': 'lofarPerson', 'dn': 'cn=%s,ou=users,o=lofar,c=eu' % self.dbcreds.user, 'attributes': {'cn': self.dbcreds.user, 'userPassword': self.dbcreds.password, 'mail': '%s@lofar.test' % self.dbcreds.user, 'givenName': self.dbcreds.user, 'sn': 'lofar_test'}}, {'objectclass': 'organizationUnit', 'dn': 'ou=Roles,o=lofar,c=eu', 'attributes': {'ou': 'Roles'}}, {'objectclass': 'lofarSystemrole', 'dn': 'cn=user,ou=roles,o=lofar,c=eu', 'attributes': {'cn': 'user'}}, {'objectclass': 'lofarSystemrole', 'dn': 'cn=support,ou=roles,o=lofar,c=eu', 'attributes': {'cn': 'support'}}, ] }) self._server.start() os.environ["TMSS_LDAPCREDENTIALS"] = self.dbcreds_id logger.info('LDAP server running and listening on port %s...', self.dbcreds.port) logger.info('LDAP test user/pass: %s %s...', self.dbcreds.user, self.dbcreds.password) return except Py4JNetworkError as e: logger.warning("TestLDAPServer could not be started, retrying with next free port. Error: %s", e) raise TimeoutError("%s could not be started within 60 seconds. bailing out..." % self.__class__.__name__) def stop(self): '''stop the running postgres server''' try: if self._server: logger.debug('stopping LDAP server (%s) ...', self.dbcreds.stringWithHiddenPassword()) self._server.stop() self._server = None logger.info('LDAP server stopped (%s)', self.dbcreds.stringWithHiddenPassword()) except Exception as e: logger.exception('error while removing LDAP Server at %s: %s', self.dbcreds.stringWithHiddenPassword(), e) finally: self._tmp_creds.destroy() def dump_ldap(self): import ldap3 logger.info('Dumping LDAP contents...') srv = ldap3.Server(self.dbcreds.host, port=self.dbcreds.port) with ldap3.Connection(srv, auto_bind=True) as connection: connection.search(search_base='o=lofar,c=eu', search_filter='(objectclass=*)') logger.info(connection.response) def main(): """ Start an isolated LDAP server it alive until interrupted by Ctrl-C. It will save its config in a database credentials found in ~/.lofar/dbcredentials for the name found in environment variable TMSS_DBCREDENTIALS, or 'tmss' if the env var is not set. See also settings.py for the lsrm django setup """ logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) parser = OptionParser('%prog [options]', description='run a test ldap server') (options, args) = parser.parse_args() # start and run the test ldap server. with TestLDAPServer() as server: print() print("Test-LDAP-Server up and running.") print("LDAP Credentials ID: %s (for example to run tmms against this ldapserver, call 'tmss -L %s')" % (server.dbcreds_id, server.dbcreds_id)) print() print("Press Ctrl-C to exit (and remove the test LDAP server automatically...)") waitForInterrupt() if __name__ == "__main__": main()