diff --git a/SAS/TMSS/test/ldap_test_service.py b/SAS/TMSS/test/ldap_test_service.py index faa907f0c095e843e646b15b27cc82048f70adc7..b760be53c8aee496a5f57ca9fae9bf37b496d33c 100644 --- a/SAS/TMSS/test/ldap_test_service.py +++ b/SAS/TMSS/test/ldap_test_service.py @@ -1,116 +1,135 @@ -import ldap3 - -from ldap_test import LdapServer -import time import os -from optparse import OptionParser import logging logger = logging.getLogger(__name__) -from lofar.common import dbcredentials -import signal - - -def setup_test_ldap_server(ldap_creds_name): - ldap_credentials = dbcredentials.DBCredentials().get(ldap_creds_name) - logger.info("Using dbcreds '%s' to start ldap server: %s", - ldap_creds_name, ldap_credentials.stringWithHiddenPassword()) - - logger.info('Configuring test LDAP server...') - server = LdapServer({ - 'port': ldap_credentials.port, - 'base': {'objectclass': ['domain'], - 'dn': 'o=lofar,c=eu', - 'attributes': {'o': 'lofar'}}, - 'entries': [ - {'objectclass': 'organizationUnit', - 'dn': 'ou=Users,o=lofar,c=eu', - 'attributes': {'ou': 'Users'}}, - {'objectclass': 'lofarPerson', - 'dn': 'cn=paulus,ou=users,o=lofar,c=eu', - 'attributes': {'cn': 'paulus', - 'userPassword': 'pauluspass', - 'mail': 'paulus@boskabouter.nl', - 'givenName': 'Paulus', - 'sn': 'Boskabouter'}}, - {'objectclass': 'lofarPerson', - 'dn': 'cn=paula,ou=users,o=lofar,c=eu', - 'attributes': {'cn': 'paula', - 'userPassword': 'paulapass', - 'mail': 'paulus@boskabouter.nl', - 'givenName': 'Paulus', - 'sn': 'Boskabouter'}}, - {'objectclass': 'lofarPerson', - 'dn': 'cn=%s,ou=users,o=lofar,c=eu' % ldap_credentials.user, - 'attributes': {'cn': ldap_credentials.user, - 'userPassword': ldap_credentials.password, - 'mail': 'gen@eric.nl', - 'givenName': 'Gen', - 'sn': 'Eric'}} - ] - }) - logger.info('LDAP server listens on port %s...' % server.config['port']) - return server - - -def dump_ldap(host, port): - logger.debug('Dumping LDAP contents...') - srv = ldap3.Server(host, port=port) - conn = ldap3.Connection(srv, auto_bind=True) - conn.search(search_base='o=lofar,c=eu', search_filter='(objectclass=*)') - logger.debug(conn.response) - - -# raise normal Exception when a signal is caught so we can stop the ldap server including the spawned Java process -class SigTermException(Exception): - pass - - -def signal_handler(_s,_f): - raise SigTermException("signal %s received..." % (_s,)) - -for s in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]: - signal.signal(s, signal_handler) - - -def start_test_ldap_server_and_wait(server): - try: - logger.info('Starting test LDAP service...') - server.start() - dump_ldap(host='localhost', port=server.config['port']) - logger.info('Waiting for interrupt to stop...') - while(True): - time.sleep(1) - except SigTermException as e: - logger.info("interrupted... %s", e) - except KeyboardInterrupt: - logger.info("interrupted by Ctrl-C...") - - server.stop() - logger.info("Stopped test LDAP server.") + +from ldap_test import LdapServer + +# the 3rd party ldap_test module erroneously does a logging.basicConfig upon module import... +# logging.basicConfig should only happen in 'main'-like functions. +# so, undo the basicConfig by removing the root handlers. +# See: https://docs.python.org/3/library/logging.html#logging.basicConfig +for h in logging.root.handlers: + logging.root.removeHandler(h) + +from optparse import OptionParser + +from lofar.common.util import waitForInterrupt, find_free_port +from lofar.common.testing.dbcredentials import TemporaryCredentials + +class TestLDAPServer(): + ''' A helper class which instantiates a running LDAP server (not interfering with any other test/production LDAP servers) + Best used in a 'with'-context so the server is stoped automagically. + ''' + + def __init__(self, user: str = 'test', password: str = 'test') -> None: + self._tmp_creds = TemporaryCredentials(user=user, password=password) + self._server = None + + def __enter__(self): + '''create/instantiate the LDAP server''' + try: + self.start() + except Exception as e: + logger.error(e) + self.stop() + raise + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + '''stop the running LDAP server''' + self.stop() + + @property + def dbcreds_id(self): + # return 'tmss_ldap_test' + return self._tmp_creds.dbcreds_id + + @property + def dbcreds(self): + # return dbcredentials.DBCredentials().get('tmss_ldap_test') + return self._tmp_creds.dbcreds + + def start(self): + '''instantiate the isolated postgres server''' + logger.info('creating test-LDAP instance...') + + self._tmp_creds.dbcreds.type = 'LDAP' + self._tmp_creds.dbcreds.host = '127.0.0.1' + self._tmp_creds.dbcreds.port = find_free_port() + self._tmp_creds.create() + + logger.info("Using dbcreds '%s' to start and configure LDAP server: %s", + self.dbcreds_id, self.dbcreds.stringWithHiddenPassword()) + + self._server = LdapServer({'port': self.dbcreds.port, + 'base': {'objectclass': ['domain'], + 'dn': 'o=lofar,c=eu', + 'attributes': {'o': 'lofar'}}, + 'entries': [ + {'objectclass': 'organizationUnit', + 'dn': 'ou=Users,o=lofar,c=eu', + 'attributes': {'ou': 'Users'}}, + {'objectclass': 'lofarPerson', + 'dn': 'cn=%s,ou=users,o=lofar,c=eu' % self.dbcreds.user, + 'attributes': {'cn': self.dbcreds.user, + 'userPassword': self.dbcreds.password, + 'mail': 'gen@eric.nl', + 'givenName': 'Gen', + 'sn': 'Eric'}}]}) + + self._server.start() + os.environ["TMSS_LDAPCREDENTIALS"] = self.dbcreds_id + logger.info('LDAP server running and listening on port %s...', self.dbcreds.port) + logger.info('LDAP test user/pass: %s %s...', self.dbcreds.user, self.dbcreds.password) + + def stop(self): + '''stop the running postgres server''' + try: + if self._server: + logger.debug('stopping LDAP server (%s) ...', self.dbcreds.stringWithHiddenPassword()) + self._server.stop() + self._server = None + logger.info('LDAP server stopped (%s)', self.dbcreds.stringWithHiddenPassword()) + except Exception as e: + logger.error('error while removing LDAP Server at %s: %s', self.dbcreds.stringWithHiddenPassword(), e) + + self._tmp_creds.destroy() + + def dump_ldap(self): + import ldap3 + logger.info('Dumping LDAP contents...') + srv = ldap3.Server(self.dbcreds.host, port=self.dbcreds.port) + with ldap3.Connection(srv, auto_bind=True) as connection: + connection.search(search_base='o=lofar,c=eu', search_filter='(objectclass=*)') + logger.info(connection.response) def main(): """ - Start Django test database and keep it alive until interrupted. - It will use the database credentials found in ~/.lofar/dbcredentials + Start an isolated LDAP server it alive until interrupted by Ctrl-C. + It will save its config in a database credentials found in ~/.lofar/dbcredentials for the name found in environment variable TMSS_DBCREDENTIALS, or 'tmss' if the env var is not set. See also settings.py for the lsrm django setup """ - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) parser = OptionParser('%prog [options]', description='run a test ldap server') - parser.add_option("-C", "--ldapcredentials", dest="ldapcredentials", type="string", - default=os.environ.get('TMSS_LDAPCREDENTIALS', "tmss_ldap_test"), - help="Name of ldap credential set to use [default=%default]") (options, args) = parser.parse_args() - os.environ['TMSS_LDAPCREDENTIALS'] = options.ldapcredentials - # create and run the test ldap server. - server = setup_test_ldap_server(options.ldapcredentials) - start_test_ldap_server_and_wait(server) + # start and run the test ldap server. + with TestLDAPServer() as server: + print() + print("Test-LDAP-Server up and running.") + print("LDAP Credentials ID: %s (for example to run tmms against this ldapserver, call 'tmss -L %s')" % (server.dbcreds_id, server.dbcreds_id)) + + print() + print("Press Ctrl-C to exit (and remove the test LDAP server automatically...)") + waitForInterrupt() if __name__ == "__main__": main() + +