import ldap3 from ldap_test import LdapServer import time import os from optparse import OptionParser import logging logger = logging.getLogger(__name__) from lofar.common import dbcredentials import signal def setup_test_ldap_server(ldap_creds_name): ldap_credentials = dbcredentials.DBCredentials().get(ldap_creds_name) logger.info("Using dbcreds '%s' to start ldap server: %s", ldap_creds_name, ldap_credentials.stringWithHiddenPassword()) logger.info('Configuring test LDAP server...') server = LdapServer({ 'port': ldap_credentials.port, 'base': {'objectclass': ['domain'], 'dn': 'o=lofar,c=eu', 'attributes': {'o': 'lofar'}}, 'entries': [ {'objectclass': 'organizationUnit', 'dn': 'ou=Users,o=lofar,c=eu', 'attributes': {'ou': 'Users'}}, {'objectclass': 'lofarPerson', 'dn': 'cn=paulus,ou=users,o=lofar,c=eu', 'attributes': {'cn': 'paulus', 'userPassword': 'pauluspass', 'mail': 'paulus@boskabouter.nl', 'givenName': 'Paulus', 'sn': 'Boskabouter', 'lofarPersonSystemrole': 'cn=support,ou=Roles,o=lofar,c=eu'}}, {'objectclass': 'lofarPerson', 'dn': 'cn=paula,ou=users,o=lofar,c=eu', 'attributes': {'cn': 'paula', 'userPassword': 'paulapass', 'mail': 'paulus@boskabouter.nl', 'givenName': 'Paulus', 'sn': 'Boskabouter', 'lofarPersonSystemrole': 'cn=user,ou=Roles,o=lofar,c=eu'}}, {'objectclass': 'lofarPerson', 'dn': 'cn=%s,ou=users,o=lofar,c=eu' % ldap_credentials.user, 'attributes': {'cn': ldap_credentials.user, 'userPassword': ldap_credentials.password, 'mail': 'gen@eric.nl', 'givenName': 'Gen', 'sn': 'Eric'}}, {'objectclass': 'organizationUnit', 'dn': 'ou=Roles,o=lofar,c=eu', 'attributes': {'ou': 'Roles'}}, {'objectclass': 'lofarSystemrole', 'dn': 'cn=user,ou=roles,o=lofar,c=eu', 'attributes': {'cn': 'user'}}, {'objectclass': 'lofarSystemrole', 'dn': 'cn=support,ou=roles,o=lofar,c=eu', 'attributes': {'cn': 'support'}}, ] }) logger.info('LDAP server listens on port %s...' % server.config['port']) return server def dump_ldap(host, port): logger.debug('Dumping LDAP contents...') srv = ldap3.Server(host, port=port) conn = ldap3.Connection(srv, auto_bind=True) conn.search(search_base='o=lofar,c=eu', search_filter='(objectclass=*)') logger.debug(conn.response) # raise normal Exception when a signal is caught so we can stop the ldap server including the spawned Java process class SigTermException(Exception): pass def signal_handler(_s,_f): raise SigTermException("signal %s received..." % (_s,)) for s in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]: signal.signal(s, signal_handler) def start_test_ldap_server_and_wait(server): try: logger.info('Starting test LDAP service...') server.start() dump_ldap(host='localhost', port=server.config['port']) logger.info('Waiting for interrupt to stop...') while(True): time.sleep(1) except SigTermException as e: logger.info("interrupted... %s", e) except KeyboardInterrupt: logger.info("interrupted by Ctrl-C...") server.stop() logger.info("Stopped test LDAP server.") def main(): """ Start Django test database and keep it alive until interrupted. It will use the database credentials found in ~/.lofar/dbcredentials for the name found in environment variable TMSS_DBCREDENTIALS, or 'tmss' if the env var is not set. See also settings.py for the lsrm django setup """ logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) parser = OptionParser('%prog [options]', description='run a test ldap server') parser.add_option("-C", "--ldapcredentials", dest="ldapcredentials", type="string", default=os.environ.get('TMSS_LDAPCREDENTIALS', "tmss_ldap_test"), help="Name of ldap credential set to use [default=%default]") (options, args) = parser.parse_args() os.environ['TMSS_LDAPCREDENTIALS'] = options.ldapcredentials # create and run the test ldap server. server = setup_test_ldap_server(options.ldapcredentials) start_test_ldap_server_and_wait(server) if __name__ == "__main__": main()