Skip to content
Snippets Groups Projects
Commit a0536cfd authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-139: enhanced the ldap test server routines into a reusable class that...

TMSS-139: enhanced the ldap test server routines into a reusable class that starts/stops the ldapserver in a with-context, ready to be used in unittests or from the commandline
parent 4df34122
No related branches found
No related tags found
1 merge request!96Resolve TMSS-139
import ldap3
from ldap_test import LdapServer
import time
import os import os
from optparse import OptionParser
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from lofar.common import dbcredentials
import signal
from ldap_test import LdapServer
# the 3rd party ldap_test module erroneously does a logging.basicConfig upon module import...
# logging.basicConfig should only happen in 'main'-like functions.
# so, undo the basicConfig by removing the root handlers.
# See: https://docs.python.org/3/library/logging.html#logging.basicConfig
for h in logging.root.handlers:
logging.root.removeHandler(h)
def setup_test_ldap_server(ldap_creds_name): from optparse import OptionParser
ldap_credentials = dbcredentials.DBCredentials().get(ldap_creds_name)
logger.info("Using dbcreds '%s' to start ldap server: %s",
ldap_creds_name, ldap_credentials.stringWithHiddenPassword())
logger.info('Configuring test LDAP server...') from lofar.common.util import waitForInterrupt, find_free_port
server = LdapServer({ from lofar.common.testing.dbcredentials import TemporaryCredentials
'port': ldap_credentials.port,
class TestLDAPServer():
''' A helper class which instantiates a running LDAP server (not interfering with any other test/production LDAP servers)
Best used in a 'with'-context so the server is stoped automagically.
'''
def __init__(self, user: str = 'test', password: str = 'test') -> None:
self._tmp_creds = TemporaryCredentials(user=user, password=password)
self._server = None
def __enter__(self):
'''create/instantiate the LDAP server'''
try:
self.start()
except Exception as e:
logger.error(e)
self.stop()
raise
return self
def __exit__(self, exc_type, exc_val, exc_tb):
'''stop the running LDAP server'''
self.stop()
@property
def dbcreds_id(self):
# return 'tmss_ldap_test'
return self._tmp_creds.dbcreds_id
@property
def dbcreds(self):
# return dbcredentials.DBCredentials().get('tmss_ldap_test')
return self._tmp_creds.dbcreds
def start(self):
'''instantiate the isolated postgres server'''
logger.info('creating test-LDAP instance...')
self._tmp_creds.dbcreds.type = 'LDAP'
self._tmp_creds.dbcreds.host = '127.0.0.1'
self._tmp_creds.dbcreds.port = find_free_port()
self._tmp_creds.create()
logger.info("Using dbcreds '%s' to start and configure LDAP server: %s",
self.dbcreds_id, self.dbcreds.stringWithHiddenPassword())
self._server = LdapServer({'port': self.dbcreds.port,
'base': {'objectclass': ['domain'], 'base': {'objectclass': ['domain'],
'dn': 'o=lofar,c=eu', 'dn': 'o=lofar,c=eu',
'attributes': {'o': 'lofar'}}, 'attributes': {'o': 'lofar'}},
...@@ -26,91 +70,66 @@ def setup_test_ldap_server(ldap_creds_name): ...@@ -26,91 +70,66 @@ def setup_test_ldap_server(ldap_creds_name):
'dn': 'ou=Users,o=lofar,c=eu', 'dn': 'ou=Users,o=lofar,c=eu',
'attributes': {'ou': 'Users'}}, 'attributes': {'ou': 'Users'}},
{'objectclass': 'lofarPerson', {'objectclass': 'lofarPerson',
'dn': 'cn=paulus,ou=users,o=lofar,c=eu', 'dn': 'cn=%s,ou=users,o=lofar,c=eu' % self.dbcreds.user,
'attributes': {'cn': 'paulus', 'attributes': {'cn': self.dbcreds.user,
'userPassword': 'pauluspass', 'userPassword': self.dbcreds.password,
'mail': 'paulus@boskabouter.nl',
'givenName': 'Paulus',
'sn': 'Boskabouter'}},
{'objectclass': 'lofarPerson',
'dn': 'cn=paula,ou=users,o=lofar,c=eu',
'attributes': {'cn': 'paula',
'userPassword': 'paulapass',
'mail': 'paulus@boskabouter.nl',
'givenName': 'Paulus',
'sn': 'Boskabouter'}},
{'objectclass': 'lofarPerson',
'dn': 'cn=%s,ou=users,o=lofar,c=eu' % ldap_credentials.user,
'attributes': {'cn': ldap_credentials.user,
'userPassword': ldap_credentials.password,
'mail': 'gen@eric.nl', 'mail': 'gen@eric.nl',
'givenName': 'Gen', 'givenName': 'Gen',
'sn': 'Eric'}} 'sn': 'Eric'}}]})
]
})
logger.info('LDAP server listens on port %s...' % server.config['port'])
return server
self._server.start()
os.environ["TMSS_LDAPCREDENTIALS"] = self.dbcreds_id
logger.info('LDAP server running and listening on port %s...', self.dbcreds.port)
logger.info('LDAP test user/pass: %s %s...', self.dbcreds.user, self.dbcreds.password)
def dump_ldap(host, port): def stop(self):
logger.debug('Dumping LDAP contents...') '''stop the running postgres server'''
srv = ldap3.Server(host, port=port)
conn = ldap3.Connection(srv, auto_bind=True)
conn.search(search_base='o=lofar,c=eu', search_filter='(objectclass=*)')
logger.debug(conn.response)
# raise normal Exception when a signal is caught so we can stop the ldap server including the spawned Java process
class SigTermException(Exception):
pass
def signal_handler(_s,_f):
raise SigTermException("signal %s received..." % (_s,))
for s in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]:
signal.signal(s, signal_handler)
def start_test_ldap_server_and_wait(server):
try: try:
logger.info('Starting test LDAP service...') if self._server:
server.start() logger.debug('stopping LDAP server (%s) ...', self.dbcreds.stringWithHiddenPassword())
dump_ldap(host='localhost', port=server.config['port']) self._server.stop()
logger.info('Waiting for interrupt to stop...') self._server = None
while(True): logger.info('LDAP server stopped (%s)', self.dbcreds.stringWithHiddenPassword())
time.sleep(1) except Exception as e:
except SigTermException as e: logger.error('error while removing LDAP Server at %s: %s', self.dbcreds.stringWithHiddenPassword(), e)
logger.info("interrupted... %s", e)
except KeyboardInterrupt: self._tmp_creds.destroy()
logger.info("interrupted by Ctrl-C...")
server.stop() def dump_ldap(self):
logger.info("Stopped test LDAP server.") import ldap3
logger.info('Dumping LDAP contents...')
srv = ldap3.Server(self.dbcreds.host, port=self.dbcreds.port)
with ldap3.Connection(srv, auto_bind=True) as connection:
connection.search(search_base='o=lofar,c=eu', search_filter='(objectclass=*)')
logger.info(connection.response)
def main(): def main():
""" """
Start Django test database and keep it alive until interrupted. Start an isolated LDAP server it alive until interrupted by Ctrl-C.
It will use the database credentials found in ~/.lofar/dbcredentials It will save its config in a database credentials found in ~/.lofar/dbcredentials
for the name found in environment variable TMSS_DBCREDENTIALS, or 'tmss' if the env var is not set. for the name found in environment variable TMSS_DBCREDENTIALS, or 'tmss' if the env var is not set.
See also settings.py for the lsrm django setup See also settings.py for the lsrm django setup
""" """
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
parser = OptionParser('%prog [options]', parser = OptionParser('%prog [options]',
description='run a test ldap server') description='run a test ldap server')
parser.add_option("-C", "--ldapcredentials", dest="ldapcredentials", type="string",
default=os.environ.get('TMSS_LDAPCREDENTIALS', "tmss_ldap_test"),
help="Name of ldap credential set to use [default=%default]")
(options, args) = parser.parse_args() (options, args) = parser.parse_args()
os.environ['TMSS_LDAPCREDENTIALS'] = options.ldapcredentials
# create and run the test ldap server. # start and run the test ldap server.
server = setup_test_ldap_server(options.ldapcredentials) with TestLDAPServer() as server:
start_test_ldap_server_and_wait(server) print()
print("Test-LDAP-Server up and running.")
print("LDAP Credentials ID: %s (for example to run tmms against this ldapserver, call 'tmss -L %s')" % (server.dbcreds_id, server.dbcreds_id))
print()
print("Press Ctrl-C to exit (and remove the test LDAP server automatically...)")
waitForInterrupt()
if __name__ == "__main__": if __name__ == "__main__":
main() main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment