Skip to content
Snippets Groups Projects
Select Git revision
  • 75183b0c969c50d2036ce814b5c493a9d800c8ff
  • main default protected
  • SDC-1435-add-WEBB-collection
  • SDCP-132-connect-lta-to-ldvspec
  • SDC-1435-add-ALMA-collection
  • oracle_and_lta
  • SDC-890-activities
  • SDC-854__implement_python_package_template
  • SDC-926-add-focus-connectors
  • add-unittests-for-connectors
  • improve-test-coverage
  • toying_with_connectors
  • ancillary_dps
13 results

beam.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_utils.py 42.22 KiB
    #!/usr/bin/env python3
    
    # Copyright (C) 2018    ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it and/or
    # modify it under the terms of the GNU General Public License as published
    # by the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    
    # $Id:  $
    
    import os
    import time
    import datetime
    from multiprocessing import Process, Event
    import django
    
    import logging
    logger = logging.getLogger(__name__)
    
    import threading
    from lofar.common.testing.postgres import PostgresTestMixin, PostgresTestDatabaseInstance
    from lofar.common.dbcredentials import Credentials, DBCredentials
    from lofar.common.util import find_free_port, waitForInterrupt
    from lofar.sas.tmss.test.ldap_test_service import TestLDAPServer
    from lofar.sas.tmss.tmss.exceptions import TMSSException
    from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME
    from lofar.messaging.messagebus import BusListenerJanitor
    from lofar.common.testing.dbcredentials import TemporaryCredentials
    from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession
    from lofar.sas.resourceassignment.resourceassigner.test.ra_test_environment import RATestEnvironment
    
    
    def assertDataWithUrls(self, data, expected):
        """
        object instances get returned as urls, check that the value is part of that url
        """
        # TODO: Make this smarter, this only checks for matching pk!
    
        from django.db import models
    
        for k, v in expected.items():
            if isinstance(v, models.Model):
                v = str(v.pk)
                v = v.replace(' ', '%20')
                err_msg = "The value '%s' (key is %s) is not in expected %s" % (str(v), str(data[k]), k)
                self.assertTrue(str(v) in data[k], err_msg)
    
            elif isinstance(v, datetime.datetime):
                # URL (data[k]) is string but the test_data object (v) is datetime format, convert latter to string format to compare
                self.assertEqual(v.isoformat(), data[k])
            else:
                self.assertEqual(v, data[k])
    
    
    def assertUrlList(self, url_list, expected_objects):
        """
        object instances get returned as urls, check that the expected projects are in that list
        """
    
        # TODO: Make this smarter, this only checks for matching pk!
    
        from django.db import models
        self.assertEqual(len(url_list), len(expected_objects))
        for v in expected_objects:
            if isinstance(v, models.Model):
                v = str(v.pk)
                v = v.replace(' ', '%20')
                self.assertTrue(any(str(v) in myurl for myurl in url_list))
            else:
                raise ValueError('Expected item is not a Django model instance: %s' % v)
    
    
    class TMSSTestDatabaseInstance(PostgresTestDatabaseInstance):
        '''
        Creates an isolated postgres database instance and initializes the database with a django tmss migration.
        Destroys the isolated postgres database instance upon exit automagically.
        '''
        def __init__(self) -> None:
            super().__init__(user='test_tmss_user')
    
        def apply_database_schema(self):
            logger.info('applying TMSS sql schema to %s', self.dbcreds)
    
            # a TMSSTestDatabaseInstance needs to run in a clean env,
            # with these variables set to the current test values.
            import os
            os.environ["TMSS_DBCREDENTIALS"] = self.dbcreds_id
            os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings"
    
            # run migrate in a seperate process so the needed django setup does not pollute our current apps environment
            def _migrate_helper():
                # use django management modules to apply database schema via initial migration
                import django
                django.setup()
                django.core.management.call_command('migrate')
    
            migrate_process = Process(target=_migrate_helper, daemon=True)
            migrate_process.start()
            migrate_process.join()
    
            if migrate_process.exitcode != 0:
                raise TMSSException("Could not initialize TMSS database with django migrations")
    
    
    def minimal_json_schema(title:str="my title", description:str="my description", id:str="http://example.com/foo/bar.json", properties:dict={}, required=[]):
        return {"$schema": "http://json-schema.org/draft-06/schema#",
                "$id": id,
                "title": title,
                "description": description,
                "type": "object",
                "properties": properties,
                "required": required,
                "default": {}
                }
    
    class TMSSPostgresTestMixin(PostgresTestMixin):
        '''
        A common test mixin class from which you can derive to get a freshly setup postgres testing instance with the latest TMSS sql schema.
        '''
        @classmethod
        def create_test_db_instance(cls) -> TMSSTestDatabaseInstance:
            return TMSSTestDatabaseInstance()
    
    
    class TMSSDjangoServerInstance():
        ''' Creates a running django TMSS server at the requested port with the requested database credentials.
        '''
        def __init__(self, db_dbcreds_id: str="TMSS", ldap_dbcreds_id: str="TMSS_LDAP", host: str='127.0.0.1', port: int=8000, public_host: str=None,
                     exchange: str=os.environ.get("TMSS_EXCHANGE", DEFAULT_BUSNAME), broker: str=os.environ.get("TMSS_EXCHANGE", DEFAULT_BUSNAME)):
            self._db_dbcreds_id = db_dbcreds_id
            self._ldap_dbcreds_id = ldap_dbcreds_id
            self.host = host
            self.port = port
            self.public_host = public_host or host
            self._server_process = None
    
        @property
        def host_address(self):
            ''':returns the address and port of the django server'''
            return "%s:%d" % (self.host, self.port)
    
        @property
        def address(self):
            ''':returns the public address and port of the django server'''
            return "%s:%d" % (self.public_host, self.port)
    
        @property
        def url(self):
            ''':returns the http url to the django server'''
            return "http://%s/api/" % self.address
    
        @property
        def oidc_url(self):
            ''':returns the http url to the django server'''
            return "http://%s/oidc/" % self.address
    
        @property
        def database_dbcreds_id(self) -> str:
            ''':returns the uuid of the temporary database credentials'''
            return self._db_dbcreds_id
    
        @property
        def database_dbcreds(self) -> Credentials:
            ''':returns the temporary database Credentials'''
            return DBCredentials().get(self._db_dbcreds_id)
    
        @property
        def ldap_dbcreds_id(self) -> str:
            ''':returns the uuid of the temporary LDAP server credentials'''
            return self._ldap_dbcreds_id
    
        @property
        def ldap_dbcreds(self) -> Credentials:
            ''':returns the temporary LDAP Credentials'''
            return DBCredentials().get(self._ldap_dbcreds_id)
    
        def setup_django(self):
            # (tmss)django is initialized via many environment variables.
            # set these here, run django setup, and start the server
            os.environ["TMSS_LDAPCREDENTIALS"] = self.ldap_dbcreds_id
            os.environ["TMSS_DBCREDENTIALS"] = self.database_dbcreds_id
            os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings"
            django.setup()
    
        def start(self):
            '''
            Start the Django server with a test-LDAP server in the background.
            Best used in a 'with'-context
            '''
            def _helper_runserver_loop():
                logger.info("Starting Django server at port=%d with database: %s and LDAP: %s",
                            self.port, self.database_dbcreds, self.ldap_dbcreds)
    
                self.setup_django()
                django.core.management.call_command('runserver',
                                                    use_reloader=False,
                                                    addrport=self.host_address)
    
            self._server_process = Process(target=_helper_runserver_loop, daemon=True)
            self._server_process.start()
    
            # wait for server to be up and running....
            # or exit via TimeoutError
            self.check_running_server(timeout=60)
    
        def stop(self):
            '''
            Stop the running Django and LDAP servers.
            '''
            if self._server_process is not None:
                logger.info("Stopping Django server...")
                try:
                    self._server_process.kill() # new in python 3.7
                except AttributeError:
                    self._server_process.terminate() # < python 3.7
    
                self._server_process = None
                logger.info("Django server stopped.")
    
        def check_running_server(self, timeout: float = 10) -> bool:
            '''Check the running django server for a valid response'''
            import requests
            from _datetime import datetime, timedelta
            start = datetime.utcnow()
            while True:
                try:
                    logger.info("Checking if TMSS Django server is up and running at %s with database: %s and LDAP: %s ....",
                                self.url, self.database_dbcreds, self.ldap_dbcreds)
                    response = requests.get(self.url, auth=(self.ldap_dbcreds.user, self.ldap_dbcreds.password), timeout=max(1, timeout/10))
    
                    if response.status_code in [200, 401, 403]:
                        logger.info("TMSS Django server is up and running at %s with database: %s and LDAP: %s",
                                    self.url, self.database_dbcreds, self.ldap_dbcreds)
    
                        if response.status_code in [401, 403]:
                            logger.warning("TMSS Django server at %s could not autenticate with LDAP creds: %s", self.url, self.ldap_dbcreds)
    
                        # TODO: logout, otherwise django remembers our login session.
                        return True
                except Exception as e:
                    time.sleep(0.5)
    
                if datetime.utcnow() - start > timedelta(seconds=timeout):
                    raise TimeoutError("Could not get a valid response from the django server at %s within %s seconds" % (self.url,timeout))
    
        def __enter__(self):
            try:
                self.start()
            except Exception as e:
                logger.error(e)
                self.stop()
                raise
            return self
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.stop()
    
    
    class TMSSTestEnvironment:
        '''Create and run a test django TMSS server against a newly created test database and a test ldap server (and cleanup automagically)'''
        def __init__(self, host: str='127.0.0.1', preferred_django_port: int=8000, public_host: str=None,
                     exchange: str=os.environ.get("TMSS_EXCHANGE", DEFAULT_BUSNAME), broker: str=os.environ.get("TMSS_BROKER", DEFAULT_BROKER),
                     populate_schemas:bool=False, populate_test_data:bool=False,
                     start_ra_test_environment: bool=False, start_postgres_listener: bool=False,
                     start_subtask_scheduler: bool=False, start_dynamic_scheduler: bool=False,
                     start_workflow_service: bool=False, enable_viewflow: bool=False):
            self._exchange = exchange
            self._broker = broker
            self._populate_schemas = populate_schemas or populate_test_data
            self._populate_test_data = populate_test_data
            self.ldap_server = TestLDAPServer(user='test', password='test')
            self.database = TMSSTestDatabaseInstance()
            self.django_server = TMSSDjangoServerInstance(db_dbcreds_id=self.database.dbcreds_id,
                                                          ldap_dbcreds_id=self.ldap_server.dbcreds_id,
                                                          host=host,
                                                          port=find_free_port(preferred_django_port),
                                                          public_host=public_host)
            self.client_credentials = TemporaryCredentials(user=self.ldap_server.dbcreds.user,
                                                           password=self.ldap_server.dbcreds.password)
    
    
            # the ra_test_environment is needed by some depending services, so start it when any depending service is started, even if start_postgres_listener==False
            self._start_ra_test_environment = start_ra_test_environment or start_subtask_scheduler or start_dynamic_scheduler
            self.ra_test_environment = None
    
            # the postgres_listener is needed by some depending services, so start it when any depending service is started, even if start_postgres_listener==False
            self._start_postgres_listener = start_postgres_listener or start_subtask_scheduler or start_dynamic_scheduler
            self.postgres_listener = None
    
            self._start_subtask_scheduler = start_subtask_scheduler
            self.subtask_scheduler = None
    
            self._start_dynamic_scheduler = start_dynamic_scheduler
            self.dynamic_scheduler = None
    
            self.enable_viewflow = enable_viewflow or start_workflow_service
            self._start_workflow_service = start_workflow_service
            self.workflow_service = None
            os.environ['TMSS_ENABLE_VIEWFLOW'] = str(bool(self.enable_viewflow))
    
            # Check for correct Django version, should be at least 3.0
            if django.VERSION[0] < 3:
                print("\nWARNING: YOU ARE USING DJANGO VERSION '%s', WHICH WILL NOT SUPPORT ALL FEATURES IN TMSS!\n" %
                      django.get_version())
    
        def start(self):
            self.ldap_server.start()
            self.database.create()
            self.django_server.start()
    
            # store client credentials in the TemporaryCredentials file...
            self.client_credentials.dbcreds.host = self.django_server.public_host
            self.client_credentials.dbcreds.port = self.django_server.port
            self.client_credentials.dbcreds.type = "http"
            self.client_credentials.create()
            # ... and set TMSS_CLIENT_DBCREDENTIALS environment variable, sp anybody or anything (any test) can use it automagically
            os.environ['TMSS_CLIENT_DBCREDENTIALS'] = self.client_credentials.dbcreds_id
    
            # apart from the running django server with a REST API,
            # it is also convenient to provide a working django setup for the 'normal' django API (via models.objects)
            # so: do setup_django
            self.django_server.setup_django()
    
            # now that the ldap and django server are running, and the django set has been done,
            # we can announce our test user as superuser, so the test user can do anythin via the API.
            # (there are also other tests, using other (on the fly created) users with restricted permissions, which is fine but not part of this generic setup.
            from django.contrib.auth.models import User
            user, _ = User.objects.get_or_create(username=self.ldap_server.dbcreds.user)
            user.is_superuser = True
            user.save()
    
            if self._start_ra_test_environment:
                self.ra_test_environment = RATestEnvironment(exchange=self._exchange, broker=self._broker)
                self.ra_test_environment.start()
    
            if self._start_postgres_listener:
                # start the TMSSPGListener, so the changes in the database are posted as EventMessages on the bus
                from lofar.sas.tmss.services.tmss_postgres_listener import TMSSPGListener
                self.postgres_listener = TMSSPGListener(exchange=self._exchange, broker=self._broker, dbcreds=self.database.dbcreds)
                self.postgres_listener.start()
    
            if self._start_subtask_scheduler:
                from lofar.sas.tmss.services.scheduling.subtask_scheduling import create_subtask_scheduling_service
                self.subtask_scheduler = create_subtask_scheduling_service(exchange=self._exchange, broker=self._broker)
                self.subtask_scheduler.start_listening()
    
            if self._start_dynamic_scheduler:
                from lofar.sas.tmss.services.scheduling.dynamic_scheduling import create_dynamic_scheduling_service, models
                # by default, dynamic scheduling is disabled in TMSS.
                # In this test environment, we do want to have it enabled. Why else would we wanna start this service?
                setting = models.Setting.objects.get(name=models.Flag.Choices.DYNAMIC_SCHEDULING_ENABLED.value)
                setting.value = True
                setting.save()
                self.dynamic_scheduler = create_dynamic_scheduling_service(exchange=self._exchange, broker=self._broker)
                self.dynamic_scheduler.start_listening()
    
            if self._start_workflow_service:
                from lofar.sas.tmss.services.workflow_service import create_workflow_service
                self.workflow_service = create_workflow_service(exchange=self._exchange, broker=self._broker)
                self.workflow_service.start_listening()
    
            if self._populate_schemas or self._populate_test_data:
                self.populate_schemas()
    
            if self._populate_test_data:
                self.populate_test_data()
    
    
        def stop(self):
            if self.workflow_service is not None:
                BusListenerJanitor.stop_listening_and_delete_queue(self.workflow_service)
                self.workflow_service = None
    
            if self.postgres_listener is not None:
                self.postgres_listener.stop()
                self.postgres_listener = None
    
            if self.subtask_scheduler is not None:
                BusListenerJanitor.stop_listening_and_delete_queue(self.subtask_scheduler)
                self.subtask_scheduler = None
    
            if self.dynamic_scheduler is not None:
                BusListenerJanitor.stop_listening_and_delete_queue(self.dynamic_scheduler)
                self.dynamic_scheduler = None
    
            if self.ra_test_environment is not None:
                self.ra_test_environment.stop()
                self.ra_test_environment = None
    
            self.django_server.stop()
            self.ldap_server.stop()
            self.database.destroy()
            self.client_credentials.destroy()
    
        def __enter__(self):
            try:
                self.start()
            except Exception as e:
                logger.error(e)
                self.stop()
                raise
            return self
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.stop()
    
        def populate_schemas(self):
            # populate the items that rely on a running REST API server (which cannot be populated via the django model.objects API)
            from lofar.sas.tmss.client.populate import populate_schemas
            populate_schemas()
    
            # the connectors rely on the schemas to be populated first (above)
            from lofar.sas.tmss.tmss.tmssapp.populate import populate_connectors
            populate_connectors()
    
        def populate_test_data(self):
            from lofar.sas.tmss.tmss.tmssapp.populate import populate_test_data
            populate_test_data()
    
        def create_tmss_client(self):
            return TMSSsession.create_from_dbcreds_for_ldap(self.client_credentials.dbcreds_id)
    
    def main_test_database():
        """instantiate, run and destroy a test postgress django database"""
        os.environ['TZ'] = 'UTC'
        logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s', level = logging.INFO)
    
        with TMSSTestDatabaseInstance() as db:
            # print some nice info for the user to use the test servers...
            # use print instead of log for clean lines.
            for h in logging.root.handlers:
                h.flush()
            print()
            print()
            print("**********************************")
            print("Test-TMSS database up and running.")
            print("**********************************")
            print("DB Credentials ID: %s (for example to run tmms against this test db, call 'tmss -C %s')" % (db.dbcreds_id, db.dbcreds_id))
            print()
            print("Press Ctrl-C to exit (and remove the test database automatically)")
            waitForInterrupt()
    
    def main_test_environment():
        """instantiate, run and destroy a full tmss test environment (postgress database, ldap server, django server)"""
        from optparse import OptionParser, OptionGroup
        os.environ['TZ'] = 'UTC'
    
        parser = OptionParser('%prog [options]',
                              description='setup/run/teardown a full TMSS test environment including a fresh and isolated database, LDAP server and DJANGO REST server.')
    
        group = OptionGroup(parser, 'Network')
        parser.add_option_group(group)
        group.add_option("-H", "--host", dest="host", type="string", default='0.0.0.0',
                          help="serve the TMSS Django REST API server via this host. [default=%default]")
        group.add_option("-p", "--port", dest="port", type="int", default=find_free_port(8000),
                          help="try to use this port for the DJANGO REST API. If not available, then a random free port is used and logged. [default=%default]")
        group.add_option("-P", "--public_host", dest="public_host", type="string", default='127.0.0.1',
                          help="expose the TMSS Django REST API via this host. [default=%default]")
    
        group = OptionGroup(parser, 'Example/Test data, schemas and services',
                            description='Options to enable/create example/test data, schemas and services. ' \
                                        'Without these options you get a lean and mean TMSS test environment, but then you need to run the background services and create test data yourself. ' \
                                        'For standalone commissioning/testing/playing around you need all these options, use --all for that as a convenience.')
        parser.add_option_group(group)
        group.add_option('-d', '--data', dest='data', action='store_true', help='populate the test-database with test/example data. This implies -s/--schemas because these schemas are needed to create test data.')
        group.add_option('-s', '--schemas', dest='schemas', action='store_true', help='populate the test-database with the TMSS JSON schemas')
        group.add_option('-m', '--eventmessages', dest='eventmessages', action='store_true', help='Send event messages over the messagebus for changes in the TMSS database (for (sub)tasks/scheduling_units etc).')
        group.add_option('-r', '--ra_test_environment', dest='ra_test_environment', action='store_true', help='start the Resource Assigner test environment which enables scheduling.')
        group.add_option('-S', '--scheduling', dest='scheduling', action='store_true', help='start the TMSS background scheduling services for dynamic scheduling of schedulingunits and subtask scheduling of chains of dependend subtasks.')
        group.add_option('-v', '--viewflow', dest='viewflow', action='store_true', help='Enable the viewflow app for workflows on top of TMSS')
        group.add_option('--all', dest='all', action='store_true', help='Enable/Start all the services, upload schemas and testdata')
    
        group = OptionGroup(parser, 'Messaging options')
        parser.add_option_group(group)
        group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default')
        group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="Bus or queue where the TMSS messages are published. [default: %default]")
    
        (options, args) = parser.parse_args()
    
        logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s', level = logging.INFO)
    
        with TMSSTestEnvironment(host=options.host, preferred_django_port=options.port, public_host=options.public_host,
                                 exchange=options.exchange, broker=options.broker,
                                 populate_schemas=options.schemas or options.data or options.all,
                                 populate_test_data=options.data or options.all,
                                 start_ra_test_environment=options.ra_test_environment or options.all,
                                 start_postgres_listener=options.eventmessages or options.scheduling or options.all,
                                 start_subtask_scheduler=options.scheduling or options.all,
                                 start_dynamic_scheduler=options.scheduling or options.all,
                                 start_workflow_service=options.viewflow or options.all,
                                 enable_viewflow=options.viewflow or options.all) as tmss_test_env:
    
                # print some nice info for the user to use the test servers...
                # use print instead of log for clean lines.
                for h in logging.root.handlers:
                    h.flush()
                print()
                print()
                print("*****************************************************")
                print("Test-TMSS database, LDAP and Django up and running...")
                print("*****************************************************")
                print("DB Credentials ID: %s" % (tmss_test_env.database.dbcreds_id, ))
                print("LDAP Credentials ID: %s" % (tmss_test_env.django_server.ldap_dbcreds_id, ))
                print("TMSS Client Credentials ID: %s" % (tmss_test_env.client_credentials.dbcreds_id, ))
                print("Django URL: %s" % (tmss_test_env.django_server.url))
                print()
                print("Example cmdlines to run tmss or tmss_manage_django:")
                print("TMSS_DBCREDENTIALS=%s TMSS_LDAPCREDENTIALS=%s tmss" % (tmss_test_env.database.dbcreds_id, tmss_test_env.django_server.ldap_dbcreds_id))
                print("TMSS_DBCREDENTIALS=%s TMSS_LDAPCREDENTIALS=%s tmss_manage_django" % (tmss_test_env.database.dbcreds_id, tmss_test_env.django_server.ldap_dbcreds_id))
                print()
                print("Example cmdline to run tmss client call:")
                print("TMSS_CLIENT_DBCREDENTIALS=%s tmss_set_subtask_state <id> <state>" % (tmss_test_env.client_credentials.dbcreds_id, ))
                print()
                print("Press Ctrl-C to exit (and remove the test database and django server automatically)")
    
                waitForInterrupt()
    
    
    def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int, stop_event: threading.Event,
                                                   handle_observations: bool = True, handle_pipelines: bool = True,
                                                   handle_QA: bool = True, handle_ingest: bool = True,
                                                   delay: float=1, duration: float=5,
                                                   create_output_dataproducts: bool=False,
                                                   exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
        '''
        create a "simulator" which sets the correct events in the correct order upon receiving status change events,
        and which uploads simulated feedback upon finishing. Can be used to simulate a 'run' of a scheduling_unit without
        doing the actual observation/pipeline/QA/ingest.
        '''
        from lofar.sas.tmss.client.tmssbuslistener import TMSSEventMessageHandler, TMSSBusListener
        from lofar.sas.tmss.tmss.tmssapp import models
        from lofar.sas.tmss.tmss.tmssapp.subtasks import schedule_subtask_and_update_successor_start_times, update_start_time_and_shift_successors_until_after_stop_time
        from lofar.common.json_utils import get_default_json_object_for_schema
        from lofar.sas.tmss.tmss.exceptions import SubtaskSchedulingException
        from datetime import datetime, timedelta
        from time import sleep
        from uuid import uuid4
    
        class SimulationEventHandler(TMSSEventMessageHandler):
            def __init__(self, scheduling_unit_blueprint_id: int, stop_event: threading.Event,
                         handle_observations: bool = True, handle_pipelines: bool = True,
                         handle_QA: bool = True, handle_ingest: bool = True,
                         delay: float = 1, duration: float = 10,
                         create_output_dataproducts: bool=False) -> None:
                super().__init__(log_event_messages=False)
                self.scheduling_unit_blueprint_id = scheduling_unit_blueprint_id
                self.stop_event = stop_event
                self.handle_observations = handle_observations
                self.handle_pipelines = handle_pipelines
                self.handle_QA = handle_QA
                self.handle_ingest = handle_ingest
                self.delay = delay
                self.duration = duration
                self.create_output_dataproducts = create_output_dataproducts
    
            def need_to_handle(self, subtask: models.Subtask) -> bool:
                if subtask.task_blueprint.scheduling_unit_blueprint.id != self.scheduling_unit_blueprint_id:
                    return False
    
                if subtask.specifications_template.type.value == models.SubtaskType.Choices.OBSERVATION.value and not self.handle_observations:
                    return False
    
                if subtask.specifications_template.type.value == models.SubtaskType.Choices.PIPELINE.value and not self.handle_pipelines:
                    return False
    
                if subtask.specifications_template.type.value in [models.SubtaskType.Choices.QA_FILES.value,
                                                                  models.SubtaskType.Choices.QA_PLOTS] and not self.handle_QA:
                    return False
    
                if subtask.specifications_template.type.value == models.SubtaskType.Choices.INGEST.value and not self.handle_ingest:
                    return False
    
                return True
    
            def start_handling(self):
                from lofar.common import isProductionEnvironment
                if isProductionEnvironment():
                    raise RuntimeError("Do not use this tool to simulate running a scheduling_unit in a production environment!")
    
                super().start_handling()
    
                try:
                    # exit if already finished
                    scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=self.scheduling_unit_blueprint_id)
                    if scheduling_unit.status in ["finished", "error"]:
                        logger.info("scheduling_unit id=%s name='%s' has status=%s -> not simulating", scheduling_unit.id, scheduling_unit.name, scheduling_unit.status)
                        self.stop_event.set()
                        return
                except models.SchedulingUnitBlueprint.DoesNotExist:
                    pass
    
                # trick: trigger any already scheduled subtasks, cascading in events simulating the run
                subtasks = models.Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint_id=self.scheduling_unit_blueprint_id)
                for subtask in subtasks.filter(state__value=models.SubtaskState.Choices.SCHEDULED.value):
                    self.onSubTaskStatusChanged(subtask.id, "scheduled")
    
                # schedule the defined subtasks, cascading in events simulating the run
                self.schedule_independend_defined_subtasks_if_needed()
    
    
            def schedule_independend_defined_subtasks_if_needed(self):
                try:
                    scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=self.scheduling_unit_blueprint_id)
    
                    for task_blueprint in scheduling_unit.task_blueprints.all():
                        for subtask in task_blueprint.subtasks.filter(inputs=None,
                                                                      state__value=models.SubtaskState.Choices.DEFINED.value).all():
    
                            if self.need_to_handle(subtask):
                                subtask.start_time = datetime.utcnow() + task_blueprint.relative_start_time
    
                                while subtask.state.value != models.SubtaskState.Choices.SCHEDULED.value:
                                    try:
                                        schedule_subtask_and_update_successor_start_times(subtask)
                                    except SubtaskSchedulingException as e:
                                        # try again, a bit later
                                        subtask.state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.DEFINED.value)
                                        update_start_time_and_shift_successors_until_after_stop_time(subtask, subtask.start_time + timedelta(hours=3))
                                        if subtask.start_time - datetime.utcnow() > timedelta(days=1):
                                            raise
                except models.SchedulingUnitBlueprint.DoesNotExist:
                    pass
    
            def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str):
                if id == self.scheduling_unit_blueprint_id:
                    scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=id)
                    logger.info("scheduling_unit_blueprint id=%s name='%s' now has status='%s'", id, scheduling_unit.name,
                                status)
                    if status == "schedulable":
                        self.schedule_independend_defined_subtasks_if_needed()
    
                    if status in ["finished", "error"]:
                        self.stop_event.set()
    
            def onTaskBlueprintStatusChanged(self, id: int, status: str):
                if id == self.scheduling_unit_blueprint_id:
                    task = models.TaskBlueprint.objects.get(id=id)
                    if task.scheduling_unit_blueprint.id == self.scheduling_unit_blueprint_id:
                        logger.info("task_blueprint_id id=%s name='%s' now has status='%s'", id, task.name, status)
    
            def onSubTaskStatusChanged(self, id: int, status: str):
                subtask = models.Subtask.objects.get(id=id)
                if not self.need_to_handle(subtask):
                    return
    
                logger.info("subtask id=%s type='%s' now has status='%s'", id, subtask.specifications_template.type.value,
                            status)
    
                next_state = None
                if status == models.SubtaskState.Choices.SCHEDULED.value:
                    next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.QUEUEING.value)
                elif status == models.SubtaskState.Choices.QUEUEING.value:
                    next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.QUEUED.value)
                elif status == models.SubtaskState.Choices.QUEUED.value:
                    next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.STARTING.value)
                elif status == models.SubtaskState.Choices.STARTING.value:
                    next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.STARTED.value)
                elif status == models.SubtaskState.Choices.STARTED.value:
                    sleep(self.duration - self.delay)  # mimic a running duration
                    next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.FINISHING.value)
                elif status == models.SubtaskState.Choices.FINISHING.value:
                    next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.FINISHED.value)
    
                    if subtask.specifications_template.type.value in [models.SubtaskType.Choices.OBSERVATION.value,
                                                                      models.SubtaskType.Choices.PIPELINE.value]:
                        if self.create_output_dataproducts:
                            for output_dp in subtask.output_dataproducts.all():
                                os.makedirs(output_dp.directory, exist_ok=True)
                                logger.info('writing 1KB test dataproduct for subtask id=%s %s', subtask.id, output_dp.filepath)
                                with open(output_dp.filepath, 'w') as file:
                                    file.write(1024 * 'a')
    
                        # create some nice default (and thus correct although not scientifically meaningful) feedback
                        template = models.DataproductFeedbackTemplate.objects.get(name="feedback")
                        feedback_doc = get_default_json_object_for_schema(template.schema)
                        feedback_doc['frequency']['subbands'] = [0]
                        feedback_doc['frequency']['central_frequencies'] = [1]
    
                        for output_dp in subtask.output_dataproducts:
                            output_dp.feedback_template = template
                            output_dp.feedback_doc = feedback_doc
                            output_dp.save()
                    elif subtask.specifications_template.type.value == models.SubtaskType.Choices.INGEST.value:
                        project_name = subtask.task_blueprint.draft.scheduling_unit_draft.scheduling_set.project.name
    
                        for output_dp in subtask.output_dataproducts:
                            try:
                                # copy feedback from ingest-subtask-input-dp
                                input_dp = subtask.get_transformed_input_dataproduct(output_dp.id)
                                feedback_template = input_dp.feedback_template
                                feedback_doc = input_dp.feedback_doc
                            except models.Subtask.DoesNotExist:
                                feedback_template = models.DataproductFeedbackTemplate.objects.get(name="empty")
                                feedback_doc = get_default_json_object_for_schema(feedback_template.schema)
    
                            output_dp.size = 1024
                            output_dp.directory = "srm://some.lta.site/project/%s/%s/" % (project_name, subtask.id)
                            output_dp.feedback_template = feedback_template
                            output_dp.feedback_doc = feedback_doc
                            output_dp.save()
    
                            models.DataproductArchiveInfo.objects.create(dataproduct=output_dp, storage_ticket=uuid4())
    
                            for algo in models.Algorithm.objects.all():
                                models.DataproductHash.objects.create(dataproduct=output_dp, algorithm=algo, hash=uuid4())
    
                if next_state:
                    sleep(self.delay)  # mimic a little 'processing' delay
                    logger.info("Simulating subtask id=%d type='%s' by proceeding from state='%s' to state='%s'...",
                                subtask.id, subtask.specifications_template.type.value, subtask.state.value, next_state)
    
                    subtask.state = next_state
                    subtask.save()
    
        return TMSSBusListener(SimulationEventHandler,
                               handler_kwargs={'scheduling_unit_blueprint_id': scheduling_unit_blueprint_id,
                                               'stop_event': stop_event,
                                               'handle_observations': handle_observations, 'handle_pipelines': handle_pipelines,
                                               'handle_QA': handle_QA, 'handle_ingest': handle_ingest,
                                               'create_output_dataproducts': create_output_dataproducts,
                                               'delay': delay, 'duration': duration},
                               exchange=exchange, broker=broker)
    
    
    def main_scheduling_unit_blueprint_simulator():
        '''run a "simulator" which sets the correct events in the correct order upon receiving status change events,
        and which uploads simulated feedback upon finishing. Can be used to simulate a 'run' of a scheduling_unit without
        doing the actual observation/pipeline/QA/ingest.
        '''
        # make sure we run in UTC timezone
        os.environ['TZ'] = 'UTC'
        from optparse import OptionParser, OptionGroup
    
        logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
    
        # Check the invocation arguments
        parser = OptionParser('%prog [options] <scheduling_unit_blueprint_id>',
                              description='Mimic runnning a scheduling unit but scheduling/queueing/running/finishing all its (sub)tasks in the correct order and creating default feedback.')
    
        group = OptionGroup(parser, 'Subtask Types', description="Simulate the event for the folling types, or all if no specific type is specified.")
        parser.add_option_group(group)
        group.add_option('-o', '--observation', dest='observation', action='store_true', help='simulate events for observation subtasks')
        group.add_option('-p', '--pipeline', dest='pipeline', action='store_true', help='simulate events for pipeline subtasks')
        group.add_option('-Q', '--QA', dest='QA', action='store_true', help='simulate events for QA subtasks')
        group.add_option('-i', '--ingest', dest='ingest', action='store_true', help='simulate events for ingest subtasks')
    
        group = OptionGroup(parser, 'Simulation parameters')
        parser.add_option_group(group)
        group.add_option('-e', '--event_delay', dest='event_delay', type='float', default=1.0, help='wait <event_delay> seconds between simulating events to mimic real-world behaviour, default: %default')
        group.add_option('-d', '--duration', dest='duration', type='float', default=60.0, help='wait <duration> seconds while "observing"/"processing" between started and finishing state to mimic real-world behaviour, default: %default')
    
        group = OptionGroup(parser, 'Messaging options')
        parser.add_option_group(group)
        group.add_option('--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the messaging broker, default: %default')
        group.add_option('--exchange', dest='exchange', type='string', default=DEFAULT_BUSNAME, help='Name of the exchange on the messaging broker, default: %default')
    
        group = OptionGroup(parser, 'Django options')
        parser.add_option_group(group)
        group.add_option('-C', '--credentials', dest='dbcredentials', type='string', default=os.environ.get('TMSS_DBCREDENTIALS', 'TMSS'), help='django dbcredentials name, default: %default')
    
        (options, args) = parser.parse_args()
        if len(args) != 1:
            parser.print_usage()
            exit(1)
    
        scheduling_unit_blueprint_id = int(args[0])
    
        if not (options.observation or options.pipeline or options.QA or options.ingest):
            options.observation = True
            options.pipeline = True
            options.QA = True
            options.ingest = True
    
        # setup django
        os.environ["TMSS_DBCREDENTIALS"] = options.dbcredentials
        os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings"
        import django
        django.setup()
    
        dbcreds = DBCredentials().get(options.dbcredentials)
        logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword())
    
        stop_event = threading.Event()
        with BusListenerJanitor(create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id, stop_event=stop_event,
                                                                           delay=options.event_delay, duration=options.duration,
                                                                           handle_observations=bool(options.observation), handle_pipelines=bool(options.pipeline),
                                                                           handle_QA=bool(options.QA), handle_ingest=bool(options.ingest),
                                                                           exchange=options.exchange, broker=options.broker)):
            stop_event.wait()
    
    
    if __name__ == '__main__':
        main_test_environment()