Skip to content
Snippets Groups Projects
Select Git revision
  • b6f83742a75890c859f6088b028fc40c6f0c55df
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

test_utils.py

Blame
  • Jörn Künsemöller's avatar
    TMSS-719: claim project roles from OIDC/Keycloak and adapt user model to hold them
    Jörn Künsemöller authored
    b6f83742
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_utils.py 55.64 KiB
    #!/usr/bin/env python3
    
    # Copyright (C) 2018    ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it and/or
    # modify it under the terms of the GNU General Public License as published
    # by the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    
    # $Id:  $
    
    import os
    import time
    import datetime
    from multiprocessing import Process, Event
    import django
    
    import logging
    logger = logging.getLogger(__name__)
    
    import threading
    from lofar.common.testing.postgres import PostgresTestMixin, PostgresTestDatabaseInstance
    from lofar.common.dbcredentials import Credentials, DBCredentials
    from lofar.common.util import find_free_port, waitForInterrupt
    from lofar.sas.tmss.test.ldap_test_service import TestLDAPServer
    from lofar.sas.tmss.tmss.exceptions import TMSSException
    from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME
    from lofar.messaging.messagebus import BusListenerJanitor
    from lofar.common.testing.dbcredentials import TemporaryCredentials
    from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession
    from lofar.sas.resourceassignment.resourceassigner.test.ra_test_environment import RATestEnvironment
    
    
    def assertDataWithUrls(self, data, expected):
        """
        object instances get returned as urls, check that the value is part of that url
        """
        # TODO: Make this smarter, this only checks for matching pk!
    
        from django.db import models
    
        for k, v in expected.items():
            if isinstance(v, models.Model):
                v = str(v.pk)
                v = v.replace(' ', '%20')
                err_msg = "The value '%s' (key is %s) is not in expected %s" % (str(v), str(data[k]), k)
                self.assertTrue(str(v) in data[k], err_msg)
    
            elif isinstance(v, datetime.datetime):
                # URL (data[k]) is string but the test_data object (v) is datetime format, convert latter to string format to compare
                self.assertEqual(v.isoformat(), data[k])
            else:
                self.assertEqual(v, data[k])
    
    
    def assertUrlList(self, url_list, expected_objects):
        """
        object instances get returned as urls, check that the expected projects are in that list
        """
    
        # TODO: Make this smarter, this only checks for matching pk!
    
        from django.db import models
        self.assertEqual(len(url_list), len(expected_objects))
        for v in expected_objects:
            if isinstance(v, models.Model):
                v = str(v.pk)
                v = v.replace(' ', '%20')
                self.assertTrue(any(str(v) in myurl for myurl in url_list))
            else:
                raise ValueError('Expected item is not a Django model instance: %s' % v)
    
    
    class TMSSTestDatabaseInstance(PostgresTestDatabaseInstance):
        '''
        Creates an isolated postgres database instance and initializes the database with a django tmss migration.
        Destroys the isolated postgres database instance upon exit automagically.
        '''
        def __init__(self, dbcreds_id: str=None) -> None:
            super().__init__(user='test_tmss_user', dbcreds_id=dbcreds_id)
    
        def apply_database_schema(self):
            logger.info('applying TMSS sql schema to %s', self.dbcreds)
    
            # a TMSSTestDatabaseInstance needs to run in a clean env,
            # with these variables set to the current test values.
            import os
            os.environ["TMSS_DBCREDENTIALS"] = self.dbcreds_id
            os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings"
    
            # run migrate in a seperate process so the needed django setup does not pollute our current apps environment
            def _migrate_helper():
                # use django management modules to apply database schema via initial migration
                import django
                django.setup()
                django.core.management.call_command('migrate')
    
            migrate_process = Process(target=_migrate_helper, daemon=True)
            migrate_process.start()
            migrate_process.join()
    
            if migrate_process.exitcode != 0:
                raise TMSSException("Could not initialize TMSS database with django migrations")
    
    
    def minimal_json_schema(title:str="my title", description:str="my description", id:str="http://example.com/foo/bar.json", properties:dict={}, required=[]):
        return {"$schema": "http://json-schema.org/draft-06/schema#",
                "$id": id,
                "title": title,
                "description": description,
                "type": "object",
                "properties": properties,
                "required": required,
                "default": {}
                }
    
    class TMSSPostgresTestMixin(PostgresTestMixin):
        '''
        A common test mixin class from which you can derive to get a freshly setup postgres testing instance with the latest TMSS sql schema.
        '''
        @classmethod
        def create_test_db_instance(cls) -> TMSSTestDatabaseInstance:
            return TMSSTestDatabaseInstance()
    
    
    class TMSSDjangoServerInstance():
        ''' Creates a running django TMSS server at the requested port with the requested database credentials.
        '''
        def __init__(self, db_dbcreds_id: str="TMSS", ldap_dbcreds_id: str="TMSS_LDAP", host: str='127.0.0.1', port: int=8000, public_host: str=None, skip_startup_checks: bool=True):
            self._db_dbcreds_id = db_dbcreds_id
            self._ldap_dbcreds_id = ldap_dbcreds_id
            self.host = host
            self.port = port
            self.public_host = public_host or host
            self._skip_startup_checks = skip_startup_checks
            self._server_process = None
    
        @property
        def host_address(self):
            ''':returns the address and port of the django server'''
            return "%s:%d" % (self.host, self.port)
    
        @property
        def address(self):
            ''':returns the public address and port of the django server'''
            return "%s:%d" % (self.public_host, self.port)
    
        @property
        def url(self):
            ''':returns the http url to the django server'''
            return "http://%s/api/" % self.address
    
        @property
        def oidc_url(self):
            ''':returns the http url to the django server'''
            return "http://%s/oidc/" % self.address
    
        @property
        def database_dbcreds_id(self) -> str:
            ''':returns the uuid of the temporary database credentials'''
            return self._db_dbcreds_id
    
        @property
        def database_dbcreds(self) -> Credentials:
            ''':returns the temporary database Credentials'''
            return DBCredentials().get(self._db_dbcreds_id)
    
        @property
        def ldap_dbcreds_id(self) -> str:
            ''':returns the uuid of the temporary LDAP server credentials'''
            return self._ldap_dbcreds_id
    
        @property
        def ldap_dbcreds(self) -> Credentials:
            ''':returns the temporary LDAP Credentials'''
            return DBCredentials().get(self._ldap_dbcreds_id)
    
        def setup_django(self):
            # (tmss)django is initialized via many environment variables.
            # set these here, run django setup, and start the server
            os.environ["TMSS_LDAPCREDENTIALS"] = self.ldap_dbcreds_id
    
            from lofar.sas.tmss.tmss import setup_tmss_django
            setup_tmss_django(self.database_dbcreds_id)
    
        def start(self):
            '''
            Start the Django server with a test-LDAP server in the background.
            Best used in a 'with'-context
            '''
            def _helper_runserver_loop():
                logger.info("Starting Django server at port=%d with database: %s and LDAP: %s",
                            self.port, self.database_dbcreds, self.ldap_dbcreds)
    
                self.setup_django()
    
                try:
                    if self._skip_startup_checks:
                        # quick start a simple WSGIServer and don't do any checks.
                        # This saves startup time, but assumes the settings, database and migrations are valid.
                        from django.core.servers.basehttp import WSGIServer, get_internal_wsgi_application, run
                        run(self.host, self.port, get_internal_wsgi_application(), ipv6=False, threading=True, server_cls=WSGIServer)
                    else:
                        # start the django server via the "normal" django runserver command, including many startup checks
                        django.core.management.call_command('runserver', use_reloader=False,  addrport=self.host_address)
                except KeyboardInterrupt:
                    logger.info("Exiting django TMSS server loop...")
    
            self._server_process = Process(target=_helper_runserver_loop, daemon=True)
            self._server_process.start()
    
            # wait for server to be up and running....
            # or exit via TimeoutError
            self.check_running_server(timeout=60)
    
        def stop(self):
            '''
            Stop the running Django and LDAP servers.
            '''
            if self._server_process is not None:
                logger.info("Stopping Django server...")
                try:
                    self._server_process.kill() # new in python 3.7
                except AttributeError:
                    self._server_process.terminate() # < python 3.7
    
                self._server_process = None
                logger.info("Django server stopped.")
    
        def check_running_server(self, timeout: float = 10) -> bool:
            '''Check the running django server for a valid response'''
            import requests
            from _datetime import datetime, timedelta
            start = datetime.utcnow()
            while True:
                try:
                    logger.info("Checking if TMSS Django server is up and running at %s with database: %s and LDAP: %s ....",
                                self.url, self.database_dbcreds, self.ldap_dbcreds)
                    response = requests.get(self.url, auth=(self.ldap_dbcreds.user, self.ldap_dbcreds.password), timeout=max(1, timeout/10))
    
                    if response.status_code in [200, 401, 403]:
                        logger.info("TMSS Django server is up and running at %s with database: %s and LDAP: %s",
                                    self.url, self.database_dbcreds, self.ldap_dbcreds)
    
                        if response.status_code in [401, 403]:
                            logger.warning("TMSS Django server at %s could not autenticate with LDAP creds: %s", self.url, self.ldap_dbcreds)
    
                        # TODO: logout, otherwise django remembers our login session.
                        return True
                except Exception as e:
                    time.sleep(0.5)
    
                if datetime.utcnow() - start > timedelta(seconds=timeout):
                    raise TimeoutError("Could not get a valid response from the django server at %s within %s seconds" % (self.url,timeout))
    
        def __enter__(self):
            try:
                self.start()
            except Exception as e:
                logger.error(e)
                self.stop()
                raise
            return self
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.stop()
    
    
    class TMSSTestEnvironment:
        '''Create and run a test django TMSS server against a newly created test database and a test ldap server (and cleanup automagically)'''
        def __init__(self, host: str='127.0.0.1', preferred_django_port: int=8000, public_host: str=None, skip_startup_checks: bool=True,
                     exchange: str=os.environ.get("TMSS_EXCHANGE", DEFAULT_BUSNAME), broker: str=os.environ.get("TMSS_BROKER", DEFAULT_BROKER),
                     populate_schemas:bool=False, populate_test_data:bool=False, populate_permissions=False,
                     start_ra_test_environment: bool=False, start_postgres_listener: bool=False,
                     start_subtask_scheduler: bool=False, start_dynamic_scheduler: bool=False,
                     start_pipeline_control: bool=False, start_websocket: bool=False,
                     start_feedback_service: bool=False,
                     start_workflow_service: bool=False, enable_viewflow: bool=False,
                     start_precalculations_service: bool=False,
                     ldap_dbcreds_id: str=None, db_dbcreds_id: str=None, client_dbcreds_id: str=None):
            self._exchange = exchange
            self._broker = broker
            self._populate_schemas = populate_schemas or populate_test_data
            self._populate_test_data = populate_test_data
            self.ldap_server = TestLDAPServer(user='test', password='test', dbcreds_id=ldap_dbcreds_id)
            self.database = TMSSTestDatabaseInstance(dbcreds_id=db_dbcreds_id)
            self._populate_permissions = populate_permissions
            self.django_server = TMSSDjangoServerInstance(db_dbcreds_id=self.database.dbcreds_id,
                                                          ldap_dbcreds_id=self.ldap_server.dbcreds_id,
                                                          host=host,
                                                          port=find_free_port(preferred_django_port),
                                                          public_host=public_host,
                                                          skip_startup_checks=skip_startup_checks)
            self.client_credentials = TemporaryCredentials(user=self.ldap_server.dbcreds.user,
                                                           password=self.ldap_server.dbcreds.password, dbcreds_id=client_dbcreds_id)
    
    
            # the ra_test_environment is needed by some depending services, so start it when any depending service is started, even if start_postgres_listener==False
            self._start_ra_test_environment = start_ra_test_environment or start_subtask_scheduler or start_dynamic_scheduler
            self.ra_test_environment = None
    
            # the postgres_listener is needed by some depending services, so start it when any depending service is started, even if start_postgres_listener==False
            self._start_postgres_listener = start_postgres_listener or start_subtask_scheduler or start_dynamic_scheduler
            self.postgres_listener = None
    
            self._start_subtask_scheduler = start_subtask_scheduler
            self.subtask_scheduler = None
    
            self._start_dynamic_scheduler = start_dynamic_scheduler
            self.dynamic_scheduler = None
    
            self._start_pipeline_control = start_pipeline_control
            self.pipeline_control = None
    
            self._start_websocket = start_websocket
            self.websocket_service = None
    
            self._start_feedback_service = start_feedback_service
            self.feedback_service = None
    
            self.enable_viewflow = enable_viewflow or start_workflow_service
            self._start_workflow_service = start_workflow_service
            self.workflow_service = None
            os.environ['TMSS_ENABLE_VIEWFLOW'] = str(bool(self.enable_viewflow))
    
            self._start_precalculations_service = start_precalculations_service
            self.precalculations_service = None
    
            # Check for correct Django version, should be at least 3.0
            if django.VERSION[0] < 3:
                print("\nWARNING: YOU ARE USING DJANGO VERSION '%s', WHICH WILL NOT SUPPORT ALL FEATURES IN TMSS!\n" %
                      django.get_version())
    
        def start(self):
            starttime = datetime.datetime.utcnow()
            #start ldapserver and database in parallel in the background (because to are independent of each other, and this saves startup wait time)
            ldap_server_thread = threading.Thread(target=self.ldap_server.start)
            ldap_server_thread.start()
    
            database_thread = threading.Thread(target=self.database.create)
            database_thread.start()
    
            # wait until both are started/created
            ldap_server_thread.join()
            database_thread.join()
    
            # now start the django_server
            self.django_server.start()
    
            # store client credentials in the TemporaryCredentials file...
            self.client_credentials.dbcreds.host = self.django_server.public_host
            self.client_credentials.dbcreds.port = self.django_server.port
            self.client_credentials.dbcreds.type = "http"
            self.client_credentials.create_if_not_existing()
            # ... and set TMSS_CLIENT_DBCREDENTIALS environment variable, sp anybody or anything (any test) can use it automagically
            os.environ['TMSS_CLIENT_DBCREDENTIALS'] = self.client_credentials.dbcreds_id
    
            # apart from the running django server with a REST API,
            # it is also convenient to provide a working django setup for the 'normal' django API (via models.objects)
            # so: do setup_django
            self.django_server.setup_django()
    
            # now that the ldap and django server are running, and the django set has been done,
            # we can announce our test user as superuser, so the test user can do anythin via the API.
            # (there are also other tests, using other (on the fly created) users with restricted permissions, which is fine but not part of this generic setup.
            from django.contrib.auth import get_user_model
            User = get_user_model()
            user, _ = User.objects.get_or_create(username=self.ldap_server.dbcreds.user)
            user.is_superuser = True
            user.save()
    
            logger.info("started TMSSTestEnvironment ldap/database/django in %.1fs", (datetime.datetime.utcnow()-starttime).total_seconds())
    
            # start all (needed) services in background threads, keep track of them.
            service_threads = []
    
            if self._start_ra_test_environment:
                self.ra_test_environment = RATestEnvironment(exchange=self._exchange, broker=self._broker)
                service_threads.append(threading.Thread(target=self.ra_test_environment.start))
                service_threads[-1].start()
    
            if self._start_postgres_listener:
                # start the TMSSPGListener, so the changes in the database are posted as EventMessages on the bus
                from lofar.sas.tmss.services.tmss_postgres_listener import TMSSPGListener
                self.postgres_listener = TMSSPGListener(exchange=self._exchange, broker=self._broker, dbcreds=self.database.dbcreds)
                service_threads.append(threading.Thread(target=self.postgres_listener.start))
                service_threads[-1].start()
    
            if self._start_websocket:
                # start the websocket service, so the changes in the database are posted (via the messagebus) to an http web socket
                # this implies that _start_pg_listener should be true as well
                self._start_pg_listener = True
                from lofar.sas.tmss.services.websocket_service import create_service as create_websocket_service, DEFAULT_WEBSOCKET_PORT
                self.websocket_service = create_websocket_service(exchange=self._exchange, broker=self._broker, websocket_port=find_free_port(DEFAULT_WEBSOCKET_PORT))
                service_threads.append(threading.Thread(target=self.websocket_service.start_listening))
                service_threads[-1].start()
    
    
            if self._start_subtask_scheduler:
                from lofar.sas.tmss.services.scheduling.subtask_scheduling import create_subtask_scheduling_service
                self.subtask_scheduler = create_subtask_scheduling_service(exchange=self._exchange, broker=self._broker, tmss_client_credentials_id=self.client_credentials.dbcreds_id)
                service_threads.append(threading.Thread(target=self.subtask_scheduler.start_listening()))
                service_threads[-1].start()
    
            if self._start_dynamic_scheduler:
                from lofar.sas.tmss.services.scheduling.dynamic_scheduling import create_dynamic_scheduling_service, models
                # beware: by default, dynamic scheduling is disabled in TMSS.
                # so, even if we start the service, even then the dynamic scheduling is disable in the settings.
                self.dynamic_scheduler = create_dynamic_scheduling_service(exchange=self._exchange, broker=self._broker)
                service_threads.append(threading.Thread(target=self.dynamic_scheduler.start_listening))
                service_threads[-1].start()
    
            if self._start_workflow_service:
                from lofar.sas.tmss.services.workflow_service import create_workflow_service
                self.workflow_service = create_workflow_service(exchange=self._exchange, broker=self._broker)
                service_threads.append(threading.Thread(target=self.workflow_service.start_listening))
                service_threads[-1].start()
    
            if self._start_feedback_service:
                try:
                    from lofar.sas.tmss.services.feedback_handling import create_service as create_feedback_service
                    self.feedback_service = create_feedback_service(exchange=self._exchange, broker=self._broker)
                    service_threads.append(threading.Thread(target=self.feedback_service.start_listening))
                    service_threads[-1].start()
                except Exception as e:
                    logger.exception(e)
    
    
    
            # wait for all services to be fully started in their background threads
            for thread in service_threads:
                thread.join()
    
            logger.info("started TMSSTestEnvironment ldap/database/django + services in %.1fs", (datetime.datetime.utcnow()-starttime).total_seconds())
    
            if self._populate_schemas or self._populate_test_data:
                self.populate_schemas()
    
            if self._populate_test_data:
                self.populate_test_data()
    
            if self._populate_permissions:
                self.populate_permissions()
    
            logger.info("started TMSSTestEnvironment ldap/database/django + services + schemas + data in %.1fs", (datetime.datetime.utcnow()-starttime).total_seconds())
    
            # next service does not have a buslistener, it is just a simple time scheduler and currently rely and
            # populated stations schema to retrieve all stations
            if self._start_precalculations_service:
                from lofar.sas.tmss.services.precalculations_service import create_service_job_for_sunrise_and_sunset_calculations
                # For testpurposes we can use a smaller range and higher interval frequency
                self.precalculations_service = \
                    create_service_job_for_sunrise_and_sunset_calculations(wait_time_seconds=60, nbr_days_calculate_ahead=3, nbr_days_before_today=1)
                self.precalculations_service.start()
    
        def stop(self):
            if self.workflow_service is not None:
                BusListenerJanitor.stop_listening_and_delete_queue(self.workflow_service)
                self.workflow_service = None
    
            if self.postgres_listener is not None:
                self.postgres_listener.stop()
                self.postgres_listener = None
    
            if self.feedback_service is not None:
                self.feedback_service.stop_listening()
                self.feedback_service = None
    
            if self.websocket_service is not None:
                self.websocket_service.stop_listening()
                self.websocket_service = None
    
            if self.subtask_scheduler is not None:
                BusListenerJanitor.stop_listening_and_delete_queue(self.subtask_scheduler)
                self.subtask_scheduler = None
    
            if self.dynamic_scheduler is not None:
                BusListenerJanitor.stop_listening_and_delete_queue(self.dynamic_scheduler)
                self.dynamic_scheduler = None
    
            if self.ra_test_environment is not None:
                self.ra_test_environment.stop()
                self.ra_test_environment = None
    
            if self.precalculations_service is not None:
                self.precalculations_service.stop()
                self.precalculations_service = None
    
            self.django_server.stop()
            self.ldap_server.stop()
            self.database.destroy()
            self.client_credentials.destroy_if_not_existing_upon_creation()
    
        def __enter__(self):
            try:
                self.start()
            except Exception as e:
                logger.error(e)
                self.stop()
                raise
            return self
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.stop()
    
        def populate_schemas(self):
            # populate the items that rely on a running REST API server (which cannot be populated via the django model.objects API)
            from lofar.sas.tmss.client.populate import populate_schemas
            populate_schemas()
    
            # the connectors rely on the schemas to be populated first (above)
            from lofar.sas.tmss.tmss.tmssapp.populate import populate_connectors
            populate_connectors()
    
        def populate_test_data(self):
            from lofar.sas.tmss.tmss.tmssapp.populate import populate_test_data
            populate_test_data()
    
        def populate_permissions(self):
            from lofar.sas.tmss.tmss.tmssapp.populate import populate_permissions
            populate_permissions()
    
        def create_tmss_client(self):
            return TMSSsession.create_from_dbcreds_for_ldap(self.client_credentials.dbcreds_id)
    
        def create_test_data_creator(self):
            from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator
            return TMSSRESTTestDataCreator(self.django_server.url, (self.django_server.ldap_dbcreds.user, self.django_server.ldap_dbcreds.password))
    
    
    def main_test_database():
        """instantiate, run and destroy a test postgress django database"""
        os.environ['TZ'] = 'UTC'
        logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s', level = logging.INFO)
    
        from optparse import OptionParser, OptionGroup
        parser = OptionParser('%prog [options]',
                              description='setup/run/teardown a full fresh, unique and isolated TMSS test database.')
    
        group = OptionGroup(parser, 'Credentials options', description="By default a unique ID is created for the Postgres DB credentials to ensure that this TMSSTestDatabaseInstance is isolated and unique." \
                                                                       "There are however also some use cases where we want to refer to a constant ID. These options enable that." \
                                                                       "Please mind that these given credentials are still stored in a temporary credentials file which are deleted upon exit.")
        parser.add_option_group(group)
        group.add_option('-D', '--DB_ID', dest='DB_ID', type='string', default=None, help='Use this ID for the Postgres database instead of a generated unique id if None given. default: %default')
    
        (options, args) = parser.parse_args()
    
        with TMSSTestDatabaseInstance(dbcreds_id=options.DB_ID) as db:
            # print some nice info for the user to use the test servers...
            # use print instead of log for clean lines.
            for h in logging.root.handlers:
                h.flush()
            print()
            print()
            print("**********************************")
            print("Test-TMSS database up and running.")
            print("**********************************")
            print("DB Credentials ID: %s (for example to run tmms against this test db, call 'tmss -C %s')" % (db.dbcreds_id, db.dbcreds_id))
            print()
            print("Press Ctrl-C to exit (and remove the test database automatically)")
            waitForInterrupt()
    
    
    def main_test_environment():
        """instantiate, run and destroy a full tmss test environment (postgress database, ldap server, django server)"""
        from optparse import OptionParser, OptionGroup
        os.environ['TZ'] = 'UTC'
    
        parser = OptionParser('%prog [options]',
                              description='setup/run/teardown a full TMSS test environment including a fresh and isolated database, LDAP server and DJANGO REST server.')
        parser.add_option('--skip_startup_checks', dest='skip_startup_checks', action='store_true', help='skip startup checks, assuming your settings/database/migrations are valid.')
    
        group = OptionGroup(parser, 'Network')
        parser.add_option_group(group)
        group.add_option("-H", "--host", dest="host", type="string", default='0.0.0.0',
                          help="serve the TMSS Django REST API server via this host. [default=%default]")
        group.add_option("-p", "--port", dest="port", type="int", default=find_free_port(8000),
                          help="try to use this port for the DJANGO REST API. If not available, then a random free port is used and logged. [default=%default]")
        group.add_option("-P", "--public_host", dest="public_host", type="string", default='127.0.0.1',
                          help="expose the TMSS Django REST API via this host. [default=%default]")
    
        group = OptionGroup(parser, 'Example/Test data, schemas and services',
                            description='Options to enable/create example/test data, schemas and services. ' \
                                        'Without these options you get a lean and mean TMSS test environment, but then you need to run the background services and create test data yourself. ' \
                                        'For standalone commissioning/testing/playing around you need all these options, use --all for that as a convenience.')
        parser.add_option_group(group)
        group.add_option('-d', '--data', dest='data', action='store_true', help='populate the test-database with test/example data. This implies -s/--schemas because these schemas are needed to create test data.')
        group.add_option('-s', '--schemas', dest='schemas', action='store_true', help='populate the test-database with the TMSS JSON schemas')
        group.add_option('-M', '--permissions', dest='permissions', action='store_true', help='populate the test-database with the TMSS permissions')
        group.add_option('-m', '--eventmessages', dest='eventmessages', action='store_true', help='Send event messages over the messagebus for changes in the TMSS database (for (sub)tasks/scheduling_units etc).')
        group.add_option('-r', '--ra_test_environment', dest='ra_test_environment', action='store_true', help='start the Resource Assigner test environment which enables scheduling.')
        group.add_option('-S', '--scheduling', dest='scheduling', action='store_true', help='start the TMSS background scheduling services for dynamic scheduling of schedulingunits and subtask scheduling of chains of dependend subtasks.')
        group.add_option('-v', '--viewflow_app', dest='viewflow_app', action='store_true', help='Enable the viewflow app for workflows on top of TMSS')
        group.add_option('-V', '--viewflow_service', dest='viewflow_service', action='store_true', help='Enable the viewflow service. Implies --viewflow_app and --eventmessages')
        group.add_option('-w', '--websockets', dest='websockets', action='store_true', help='Enable json updates pushed via websockets')
        group.add_option('-f', '--feedbackservice', dest='feedbackservice', action='store_true', help='Enable feedbackservice to handle feedback from observations/pipelines which comes in via the (old qpid) otdb messagebus.')
        group.add_option('-C', '--precalculations_service', dest='precalculations_service', action='store_true', help='Enable the PreCalculations service')
        group.add_option('--all', dest='all', action='store_true', help='Enable/Start all the services, upload schemas and testdata')
        group.add_option('--simulate', dest='simulate', action='store_true', help='Simulate a run of the first example scheduling_unit (implies --data and --eventmessages and --ra_test_environment)')
    
        group = OptionGroup(parser, 'Messaging options')
        parser.add_option_group(group)
        group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default')
        group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="Bus or queue where the TMSS messages are published. [default: %default]")
    
        group = OptionGroup(parser, 'Credentials options', description="By default a unique ID is created for the LDAP and Postgres DB credentials to ensure that this TMSSTestEnvironment is isolated and unique." \
                                                                       "There are however also some use cases where we want to refer to a constant ID. These options enable that." \
                                                                       "Please mind that these given credentials are still stored in a temporary credentials file which are deleted upon exit.")
        parser.add_option_group(group)
        group.add_option('-L', '--LDAP_ID', dest='LDAP_ID', type='string', default=None, help='Use this ID for the LDAP service instead of a generated unique id if None given. default: %default')
        group.add_option('-D', '--DB_ID', dest='DB_ID', type='string', default=None, help='Use this ID for the Postgres database instead of a generated unique id if None given. default: %default')
        group.add_option('-R', '--REST_CLIENT_ID', dest='REST_CLIENT_ID', type='string', default=None, help='Use this ID for the http REST client API instead of a generated unique id if None given. default: %default')
    
        (options, args) = parser.parse_args()
    
        if options.simulate:
            options.data = True
            options.eventmessages = True
            options.ra_test_environment = True
    
        logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s', level = logging.INFO)
    
        with TMSSTestEnvironment(host=options.host, preferred_django_port=options.port, public_host=options.public_host,
                                 skip_startup_checks=options.skip_startup_checks,
                                 exchange=options.exchange, broker=options.broker,
                                 populate_schemas=options.schemas or options.data or options.all,
                                 populate_test_data=options.data or options.all,
                                 populate_permissions=options.permissions or options.all,
                                 start_ra_test_environment=options.ra_test_environment or options.all,
                                 start_postgres_listener=options.eventmessages or options.scheduling or options.viewflow_service or options.all,
                                 start_subtask_scheduler=options.scheduling or options.all,
                                 start_dynamic_scheduler=options.scheduling or options.all,
                                 start_websocket=options.websockets or options.all,
                                 start_feedback_service=options.feedbackservice or options.all,
                                 enable_viewflow=options.viewflow_app or options.viewflow_service or options.all,
                                 start_workflow_service=options.viewflow_service or options.all,
                                 start_precalculations_service=options.precalculations_service or options.all,
                                 ldap_dbcreds_id=options.LDAP_ID, db_dbcreds_id=options.DB_ID, client_dbcreds_id=options.REST_CLIENT_ID) as tmss_test_env:
    
                # print some nice info for the user to use the test servers...
                # use print instead of log for clean lines.
                for h in logging.root.handlers:
                    h.flush()
                print()
                print()
                print("*****************************************************")
                print("Test-TMSS database, LDAP and Django up and running...")
                print("*****************************************************")
                print("DB Credentials ID: %s" % (tmss_test_env.database.dbcreds_id, ))
                print("LDAP Credentials ID: %s" % (tmss_test_env.django_server.ldap_dbcreds_id, ))
                print("TMSS Client Credentials ID: %s" % (tmss_test_env.client_credentials.dbcreds_id, ))
                print("Django URL: %s" % (tmss_test_env.django_server.url))
                print()
                print("Example cmdlines to run tmss or tmss_manage_django:")
                print("TMSS_DBCREDENTIALS=%s TMSS_LDAPCREDENTIALS=%s tmss" % (tmss_test_env.database.dbcreds_id, tmss_test_env.django_server.ldap_dbcreds_id))
                print("TMSS_DBCREDENTIALS=%s TMSS_LDAPCREDENTIALS=%s tmss_manage_django" % (tmss_test_env.database.dbcreds_id, tmss_test_env.django_server.ldap_dbcreds_id))
                print()
                print("Example cmdline to run tmss client call:")
                print("TMSS_CLIENT_DBCREDENTIALS=%s tmss_set_subtask_state <id> <state>" % (tmss_test_env.client_credentials.dbcreds_id, ))
                print()
                print("Press Ctrl-C to exit (and remove the test database and django server automatically)")
    
                if options.simulate:
                    stop_event = threading.Event()
                    with create_scheduling_unit_blueprint_simulator(1, stop_event=stop_event,
                                                                    exchange=options.exchange, broker=options.broker):
                        try:
                            stop_event.wait()
                        except KeyboardInterrupt:
                            return
    
                waitForInterrupt()
    
    
    def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int, stop_event: threading.Event,
                                                   handle_observations: bool = True, handle_pipelines: bool = True,
                                                   handle_QA: bool = True, handle_ingest: bool = True,
                                                   auto_grant_ingest_permission: bool = True,
                                                   delay: float=1, duration: float=5,
                                                   create_output_dataproducts: bool=False,
                                                   exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
        '''
        create a "simulator" which sets the correct events in the correct order upon receiving status change events,
        and which uploads simulated feedback upon finishing. Can be used to simulate a 'run' of a scheduling_unit without
        doing the actual observation/pipeline/QA/ingest.
        '''
        from lofar.sas.tmss.client.tmssbuslistener import TMSSEventMessageHandler, TMSSBusListener
        from lofar.sas.tmss.tmss.tmssapp import models
        from lofar.sas.tmss.tmss.tmssapp.subtasks import schedule_subtask_and_update_successor_start_times, update_start_time_and_shift_successors_until_after_stop_time
        from lofar.common.json_utils import get_default_json_object_for_schema
        from lofar.sas.tmss.tmss.exceptions import SubtaskSchedulingException
        from datetime import datetime, timedelta
        from time import sleep
        from uuid import uuid4
    
        class SimulationEventHandler(TMSSEventMessageHandler):
            def __init__(self, scheduling_unit_blueprint_id: int, stop_event: threading.Event,
                         handle_observations: bool = True, handle_pipelines: bool = True,
                         handle_QA: bool = True, handle_ingest: bool = True,
                         delay: float = 1, duration: float = 10,
                         create_output_dataproducts: bool=False) -> None:
                super().__init__(log_event_messages=False)
                self.scheduling_unit_blueprint_id = scheduling_unit_blueprint_id
                self.stop_event = stop_event
                self.handle_observations = handle_observations
                self.handle_pipelines = handle_pipelines
                self.handle_QA = handle_QA
                self.handle_ingest = handle_ingest
                self.auto_grant_ingest_permission = auto_grant_ingest_permission
                self.delay = delay
                self.duration = duration
                self.create_output_dataproducts = create_output_dataproducts
    
            def need_to_handle(self, subtask: models.Subtask) -> bool:
                if subtask.task_blueprint.scheduling_unit_blueprint.id != self.scheduling_unit_blueprint_id:
                    return False
    
                if subtask.specifications_template.type.value == models.SubtaskType.Choices.OBSERVATION.value and not self.handle_observations:
                    return False
    
                if subtask.specifications_template.type.value == models.SubtaskType.Choices.PIPELINE.value and not self.handle_pipelines:
                    return False
    
                if subtask.specifications_template.type.value in [models.SubtaskType.Choices.QA_FILES.value,
                                                                  models.SubtaskType.Choices.QA_PLOTS] and not self.handle_QA:
                    return False
    
                if subtask.specifications_template.type.value == models.SubtaskType.Choices.INGEST.value and not self.handle_ingest:
                    return False
    
                return True
    
            def start_handling(self):
                from lofar.common import isProductionEnvironment
                if isProductionEnvironment():
                    raise RuntimeError("Do not use this tool to simulate running a scheduling_unit in a production environment!")
    
                logger.info("starting to simulate a run for scheduling_unit id=%s ...", self.scheduling_unit_blueprint_id)
    
                super().start_handling()
    
                try:
                    # exit if already finished
                    scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=self.scheduling_unit_blueprint_id)
                    if scheduling_unit.status in ["finished", "error"]:
                        logger.info("scheduling_unit id=%s name='%s' has status=%s -> not simulating", scheduling_unit.id, scheduling_unit.name, scheduling_unit.status)
                        self.stop_event.set()
                        return
                except models.SchedulingUnitBlueprint.DoesNotExist:
                    pass
    
                # trick: trigger any already scheduled subtasks, cascading in events simulating the run
                subtasks = models.Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint_id=self.scheduling_unit_blueprint_id)
                for subtask in subtasks.filter(state__value=models.SubtaskState.Choices.SCHEDULED.value):
                    self.onSubTaskStatusChanged(subtask.id, "scheduled")
    
                # schedule the defined subtasks, cascading in events simulating the run
                self.schedule_independend_defined_subtasks_if_needed()
    
    
            def schedule_independend_defined_subtasks_if_needed(self):
                try:
                    scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=self.scheduling_unit_blueprint_id)
    
                    for task_blueprint in scheduling_unit.task_blueprints.all():
                        for subtask in task_blueprint.subtasks.filter(inputs=None,
                                                                      state__value=models.SubtaskState.Choices.DEFINED.value).all():
    
                            if self.need_to_handle(subtask):
                                subtask.start_time = datetime.utcnow() + task_blueprint.relative_start_time
    
                                while subtask.state.value != models.SubtaskState.Choices.SCHEDULED.value:
                                    try:
                                        schedule_subtask_and_update_successor_start_times(subtask)
                                    except SubtaskSchedulingException as e:
                                        # try again, a bit later
                                        subtask.state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.DEFINED.value)
                                        update_start_time_and_shift_successors_until_after_stop_time(subtask, subtask.start_time + timedelta(hours=3))
                                        if subtask.start_time - datetime.utcnow() > timedelta(days=1):
                                            raise
                except models.SchedulingUnitBlueprint.DoesNotExist:
                    pass
    
            def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str):
                if id == self.scheduling_unit_blueprint_id:
                    scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=id)
                    logger.info("scheduling_unit_blueprint id=%s name='%s' now has status='%s'", id, scheduling_unit.name,
                                status)
                    if status == "schedulable":
                        self.schedule_independend_defined_subtasks_if_needed()
    
                    if status in ["finished", "error"]:
                        self.stop_event.set()
    
            def onTaskBlueprintStatusChanged(self, id: int, status: str):
                if id == self.scheduling_unit_blueprint_id:
                    task = models.TaskBlueprint.objects.get(id=id)
                    if task.scheduling_unit_blueprint.id == self.scheduling_unit_blueprint_id:
                        logger.info("task_blueprint_id id=%s name='%s' now has status='%s'", id, task.name, status)
    
            def onSubTaskStatusChanged(self, id: int, status: str):
                subtask = models.Subtask.objects.get(id=id)
                if not self.need_to_handle(subtask):
                    return
    
                logger.info("subtask id=%s type='%s' now has status='%s'", id, subtask.specifications_template.type.value,
                            status)
    
                next_state = None
                if status == models.SubtaskState.Choices.SCHEDULED.value:
                    next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.QUEUEING.value)
                elif status == models.SubtaskState.Choices.QUEUEING.value:
                    next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.QUEUED.value)
                elif status == models.SubtaskState.Choices.QUEUED.value:
                    next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.STARTING.value)
                elif status == models.SubtaskState.Choices.STARTING.value:
                    next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.STARTED.value)
                elif status == models.SubtaskState.Choices.STARTED.value:
                    sleep(self.duration - self.delay)  # mimic a running duration
                    next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.FINISHING.value)
                elif status == models.SubtaskState.Choices.FINISHING.value:
                    next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.FINISHED.value)
    
                    if subtask.specifications_template.type.value in [models.SubtaskType.Choices.OBSERVATION.value,
                                                                      models.SubtaskType.Choices.PIPELINE.value]:
                        if self.create_output_dataproducts:
                            for output_dp in subtask.output_dataproducts.all():
                                os.makedirs(output_dp.directory, exist_ok=True)
                                logger.info('writing 1KB test dataproduct for subtask id=%s %s', subtask.id, output_dp.filepath)
                                with open(output_dp.filepath, 'w') as file:
                                    file.write(1024 * 'a')
    
                        # create some nice default (and thus correct although not scientifically meaningful) feedback
                        template = models.DataproductFeedbackTemplate.objects.get(name="feedback")
                        feedback_doc = get_default_json_object_for_schema(template.schema)
                        feedback_doc['frequency']['subbands'] = [0]
                        feedback_doc['frequency']['central_frequencies'] = [1]
    
                        for output_dp in subtask.output_dataproducts:
                            output_dp.feedback_template = template
                            output_dp.feedback_doc = feedback_doc
                            output_dp.save()
                    elif subtask.specifications_template.type.value == models.SubtaskType.Choices.INGEST.value:
                        project_name = subtask.task_blueprint.draft.scheduling_unit_draft.scheduling_set.project.name
    
                        for output_dp in subtask.output_dataproducts:
                            try:
                                # copy feedback from ingest-subtask-input-dp
                                input_dp = subtask.get_transformed_input_dataproduct(output_dp.id)
                                feedback_template = input_dp.feedback_template
                                feedback_doc = input_dp.feedback_doc
                            except models.Subtask.DoesNotExist:
                                feedback_template = models.DataproductFeedbackTemplate.objects.get(name="empty")
                                feedback_doc = get_default_json_object_for_schema(feedback_template.schema)
    
                            output_dp.size = 1024
                            output_dp.directory = "srm://some.lta.site/project/%s/%s/" % (project_name, subtask.id)
                            output_dp.feedback_template = feedback_template
                            output_dp.feedback_doc = feedback_doc
                            output_dp.save()
    
                            models.DataproductArchiveInfo.objects.create(dataproduct=output_dp, storage_ticket=uuid4())
    
                            for algo in models.Algorithm.objects.all():
                                models.DataproductHash.objects.create(dataproduct=output_dp, algorithm=algo, hash=uuid4())
                elif status == models.SubtaskState.Choices.DEFINED.value:
                    state_transition = models.SubtaskStateLog.objects.filter(subtask__id=subtask.id,
                                                                             old_state__value=models.SubtaskState.Choices.SCHEDULING.value,
                                                                             new_state__value=models.SubtaskState.Choices.DEFINED.value).order_by('-updated_at').first()
                    if state_transition and datetime.utcnow() - state_transition.updated_at < timedelta(hours=1):
                        logger.info("subtask id=%d type='%s' returned to state 'defined' while scheduling... (which means that scheduling did not succeed)",
                                    subtask.id, subtask.specifications_template.type.value)
    
                        if subtask.specifications_template.type.value == 'ingest':
                            logger.info("subtask id=%d is an ingest task which requires permission in order to be scheduled", subtask.id)
                            if self.auto_grant_ingest_permission and subtask.task_blueprint.scheduling_unit_blueprint.ingest_permission_required:
                                # just granting the permission triggers the scheduling_service to check and schedulable ingest subtasks,
                                # resulting in a scheduled ingest subtask.
                                logger.info("granting ingest subtask id=%d ingest_permission", subtask.id)
                                subtask.task_blueprint.scheduling_unit_blueprint.ingest_permission_granted_since = datetime.utcnow()
                                subtask.task_blueprint.scheduling_unit_blueprint.save()
    
                if next_state:
                    sleep(self.delay)  # mimic a little 'processing' delay
                    logger.info("Simulating subtask id=%d type='%s' by proceeding from state='%s' to state='%s'...",
                                subtask.id, subtask.specifications_template.type.value, subtask.state.value, next_state)
    
                    if next_state == models.SubtaskState.objects.get(value=models.SubtaskState.Choices.STARTED.value):
                        subtask.start_time = datetime.utcnow()
                    if next_state == models.SubtaskState.objects.get(value=models.SubtaskState.Choices.FINISHING.value):
                        subtask.stop_time = datetime.utcnow()
    
                    subtask.state = next_state
                    subtask.save()
    
        # the SimulationEventHandler is meant to run for a single scheduling_unit_blueprint,
        # so no need to keep the created designated queue existing. So, use a BusListenerJanitor to cleanup the queue after use.
        return BusListenerJanitor(TMSSBusListener(SimulationEventHandler, handler_kwargs={'scheduling_unit_blueprint_id': scheduling_unit_blueprint_id,
                                                                                          'stop_event': stop_event,
                                                                                          'handle_observations': handle_observations, 'handle_pipelines': handle_pipelines,
                                                                                          'handle_QA': handle_QA, 'handle_ingest': handle_ingest,
                                                                                          'create_output_dataproducts': create_output_dataproducts,
                                                                                          'delay': delay, 'duration': duration},
                                                                           exchange=exchange, broker=broker))
    
    
    def main_scheduling_unit_blueprint_simulator():
        '''run a "simulator" which sets the correct events in the correct order upon receiving status change events,
        and which uploads simulated feedback upon finishing. Can be used to simulate a 'run' of a scheduling_unit without
        doing the actual observation/pipeline/QA/ingest.
        '''
        # make sure we run in UTC timezone
        os.environ['TZ'] = 'UTC'
        from optparse import OptionParser, OptionGroup
    
        logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
    
        # Check the invocation arguments
        parser = OptionParser('%prog [options] <scheduling_unit_blueprint_id>',
                              description='Mimic runnning a scheduling unit through all the scheduling->queueing->started->finished states for all its (sub)tasks in the correct order and creating default feedback.')
    
        group = OptionGroup(parser, 'Subtask Types', description="Simulate the event for the folling types, or all if no specific type is specified.")
        parser.add_option_group(group)
        group.add_option('-o', '--observation', dest='observation', action='store_true', help='simulate events for observation subtasks')
        group.add_option('-p', '--pipeline', dest='pipeline', action='store_true', help='simulate events for pipeline subtasks')
        group.add_option('-Q', '--QA', dest='QA', action='store_true', help='simulate events for QA subtasks')
        group.add_option('-i', '--ingest', dest='ingest', action='store_true', help='simulate events for ingest subtasks')
    
        group = OptionGroup(parser, 'Simulation parameters')
        parser.add_option_group(group)
        group.add_option('-e', '--event_delay', dest='event_delay', type='float', default=1.0, help='wait <event_delay> seconds between simulating events to mimic real-world behaviour, default: %default')
        group.add_option('-d', '--duration', dest='duration', type='float', default=60.0, help='wait <duration> seconds while "observing"/"processing" between started and finishing state to mimic real-world behaviour, default: %default')
        group.add_option('-g', '--grant_ingest_permission', dest='grant_ingest_permission', action='store_true', help='automatically grant ingest permission for ingest subtasks if needed')
    
        group = OptionGroup(parser, 'Messaging options')
        parser.add_option_group(group)
        group.add_option('--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the messaging broker, default: %default')
        group.add_option('--exchange', dest='exchange', type='string', default=DEFAULT_BUSNAME, help='Name of the exchange on the messaging broker, default: %default')
    
        group = OptionGroup(parser, 'Django options')
        parser.add_option_group(group)
        group.add_option('-C', '--credentials', dest='dbcredentials', type='string', default=os.environ.get('TMSS_DBCREDENTIALS', 'TMSS'), help='django dbcredentials name, default: %default')
    
        (options, args) = parser.parse_args()
        if len(args) != 1:
            parser.print_usage()
            exit(1)
    
        scheduling_unit_blueprint_id = int(args[0])
    
        if not (options.observation or options.pipeline or options.QA or options.ingest):
            options.observation = True
            options.pipeline = True
            options.QA = True
            options.ingest = True
    
        from lofar.sas.tmss.tmss import setup_and_check_tmss_django_database_connection_and_exit_on_error
        setup_and_check_tmss_django_database_connection_and_exit_on_error(options.dbcredentials)
    
        stop_event = threading.Event()
        with create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id, stop_event=stop_event,
                                                        delay=options.event_delay, duration=options.duration,
                                                        handle_observations=bool(options.observation), handle_pipelines=bool(options.pipeline),
                                                        handle_QA=bool(options.QA), handle_ingest=bool(options.ingest),
                                                        auto_grant_ingest_permission=bool(options.grant_ingest_permission),
                                                        exchange=options.exchange, broker=options.broker):
            print("Press Ctrl-C to exit")
            try:
                stop_event.wait()
            except KeyboardInterrupt:
                pass
    
    
    
    
    if __name__ == '__main__':
        main_test_environment()