Select Git revision
-
Robbie Luijben authoredRobbie Luijben authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_utils.py 42.22 KiB
#!/usr/bin/env python3
# Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
# $Id: $
import os
import time
import datetime
from multiprocessing import Process, Event
import django
import logging
logger = logging.getLogger(__name__)
import threading
from lofar.common.testing.postgres import PostgresTestMixin, PostgresTestDatabaseInstance
from lofar.common.dbcredentials import Credentials, DBCredentials
from lofar.common.util import find_free_port, waitForInterrupt
from lofar.sas.tmss.test.ldap_test_service import TestLDAPServer
from lofar.sas.tmss.tmss.exceptions import TMSSException
from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.messaging.messagebus import BusListenerJanitor
from lofar.common.testing.dbcredentials import TemporaryCredentials
from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession
from lofar.sas.resourceassignment.resourceassigner.test.ra_test_environment import RATestEnvironment
def assertDataWithUrls(self, data, expected):
"""
object instances get returned as urls, check that the value is part of that url
"""
# TODO: Make this smarter, this only checks for matching pk!
from django.db import models
for k, v in expected.items():
if isinstance(v, models.Model):
v = str(v.pk)
v = v.replace(' ', '%20')
err_msg = "The value '%s' (key is %s) is not in expected %s" % (str(v), str(data[k]), k)
self.assertTrue(str(v) in data[k], err_msg)
elif isinstance(v, datetime.datetime):
# URL (data[k]) is string but the test_data object (v) is datetime format, convert latter to string format to compare
self.assertEqual(v.isoformat(), data[k])
else:
self.assertEqual(v, data[k])
def assertUrlList(self, url_list, expected_objects):
"""
object instances get returned as urls, check that the expected projects are in that list
"""
# TODO: Make this smarter, this only checks for matching pk!
from django.db import models
self.assertEqual(len(url_list), len(expected_objects))
for v in expected_objects:
if isinstance(v, models.Model):
v = str(v.pk)
v = v.replace(' ', '%20')
self.assertTrue(any(str(v) in myurl for myurl in url_list))
else:
raise ValueError('Expected item is not a Django model instance: %s' % v)
class TMSSTestDatabaseInstance(PostgresTestDatabaseInstance):
'''
Creates an isolated postgres database instance and initializes the database with a django tmss migration.
Destroys the isolated postgres database instance upon exit automagically.
'''
def __init__(self) -> None:
super().__init__(user='test_tmss_user')
def apply_database_schema(self):
logger.info('applying TMSS sql schema to %s', self.dbcreds)
# a TMSSTestDatabaseInstance needs to run in a clean env,
# with these variables set to the current test values.
import os
os.environ["TMSS_DBCREDENTIALS"] = self.dbcreds_id
os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings"
# run migrate in a seperate process so the needed django setup does not pollute our current apps environment
def _migrate_helper():
# use django management modules to apply database schema via initial migration
import django
django.setup()
django.core.management.call_command('migrate')
migrate_process = Process(target=_migrate_helper, daemon=True)
migrate_process.start()
migrate_process.join()
if migrate_process.exitcode != 0:
raise TMSSException("Could not initialize TMSS database with django migrations")
def minimal_json_schema(title:str="my title", description:str="my description", id:str="http://example.com/foo/bar.json", properties:dict={}, required=[]):
return {"$schema": "http://json-schema.org/draft-06/schema#",
"$id": id,
"title": title,
"description": description,
"type": "object",
"properties": properties,
"required": required,
"default": {}
}
class TMSSPostgresTestMixin(PostgresTestMixin):
'''
A common test mixin class from which you can derive to get a freshly setup postgres testing instance with the latest TMSS sql schema.
'''
@classmethod
def create_test_db_instance(cls) -> TMSSTestDatabaseInstance:
return TMSSTestDatabaseInstance()
class TMSSDjangoServerInstance():
''' Creates a running django TMSS server at the requested port with the requested database credentials.
'''
def __init__(self, db_dbcreds_id: str="TMSS", ldap_dbcreds_id: str="TMSS_LDAP", host: str='127.0.0.1', port: int=8000, public_host: str=None,
exchange: str=os.environ.get("TMSS_EXCHANGE", DEFAULT_BUSNAME), broker: str=os.environ.get("TMSS_EXCHANGE", DEFAULT_BUSNAME)):
self._db_dbcreds_id = db_dbcreds_id
self._ldap_dbcreds_id = ldap_dbcreds_id
self.host = host
self.port = port
self.public_host = public_host or host
self._server_process = None
@property
def host_address(self):
''':returns the address and port of the django server'''
return "%s:%d" % (self.host, self.port)
@property
def address(self):
''':returns the public address and port of the django server'''
return "%s:%d" % (self.public_host, self.port)
@property
def url(self):
''':returns the http url to the django server'''
return "http://%s/api/" % self.address
@property
def oidc_url(self):
''':returns the http url to the django server'''
return "http://%s/oidc/" % self.address
@property
def database_dbcreds_id(self) -> str:
''':returns the uuid of the temporary database credentials'''
return self._db_dbcreds_id
@property
def database_dbcreds(self) -> Credentials:
''':returns the temporary database Credentials'''
return DBCredentials().get(self._db_dbcreds_id)
@property
def ldap_dbcreds_id(self) -> str:
''':returns the uuid of the temporary LDAP server credentials'''
return self._ldap_dbcreds_id
@property
def ldap_dbcreds(self) -> Credentials:
''':returns the temporary LDAP Credentials'''
return DBCredentials().get(self._ldap_dbcreds_id)
def setup_django(self):
# (tmss)django is initialized via many environment variables.
# set these here, run django setup, and start the server
os.environ["TMSS_LDAPCREDENTIALS"] = self.ldap_dbcreds_id
os.environ["TMSS_DBCREDENTIALS"] = self.database_dbcreds_id
os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings"
django.setup()
def start(self):
'''
Start the Django server with a test-LDAP server in the background.
Best used in a 'with'-context
'''
def _helper_runserver_loop():
logger.info("Starting Django server at port=%d with database: %s and LDAP: %s",
self.port, self.database_dbcreds, self.ldap_dbcreds)
self.setup_django()
django.core.management.call_command('runserver',
use_reloader=False,
addrport=self.host_address)
self._server_process = Process(target=_helper_runserver_loop, daemon=True)
self._server_process.start()
# wait for server to be up and running....
# or exit via TimeoutError
self.check_running_server(timeout=60)
def stop(self):
'''
Stop the running Django and LDAP servers.
'''
if self._server_process is not None:
logger.info("Stopping Django server...")
try:
self._server_process.kill() # new in python 3.7
except AttributeError:
self._server_process.terminate() # < python 3.7
self._server_process = None
logger.info("Django server stopped.")
def check_running_server(self, timeout: float = 10) -> bool:
'''Check the running django server for a valid response'''
import requests
from _datetime import datetime, timedelta
start = datetime.utcnow()
while True:
try:
logger.info("Checking if TMSS Django server is up and running at %s with database: %s and LDAP: %s ....",
self.url, self.database_dbcreds, self.ldap_dbcreds)
response = requests.get(self.url, auth=(self.ldap_dbcreds.user, self.ldap_dbcreds.password), timeout=max(1, timeout/10))
if response.status_code in [200, 401, 403]:
logger.info("TMSS Django server is up and running at %s with database: %s and LDAP: %s",
self.url, self.database_dbcreds, self.ldap_dbcreds)
if response.status_code in [401, 403]:
logger.warning("TMSS Django server at %s could not autenticate with LDAP creds: %s", self.url, self.ldap_dbcreds)
# TODO: logout, otherwise django remembers our login session.
return True
except Exception as e:
time.sleep(0.5)
if datetime.utcnow() - start > timedelta(seconds=timeout):
raise TimeoutError("Could not get a valid response from the django server at %s within %s seconds" % (self.url,timeout))
def __enter__(self):
try:
self.start()
except Exception as e:
logger.error(e)
self.stop()
raise
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
class TMSSTestEnvironment:
'''Create and run a test django TMSS server against a newly created test database and a test ldap server (and cleanup automagically)'''
def __init__(self, host: str='127.0.0.1', preferred_django_port: int=8000, public_host: str=None,
exchange: str=os.environ.get("TMSS_EXCHANGE", DEFAULT_BUSNAME), broker: str=os.environ.get("TMSS_BROKER", DEFAULT_BROKER),
populate_schemas:bool=False, populate_test_data:bool=False,
start_ra_test_environment: bool=False, start_postgres_listener: bool=False,
start_subtask_scheduler: bool=False, start_dynamic_scheduler: bool=False,
start_workflow_service: bool=False, enable_viewflow: bool=False):
self._exchange = exchange
self._broker = broker
self._populate_schemas = populate_schemas or populate_test_data
self._populate_test_data = populate_test_data
self.ldap_server = TestLDAPServer(user='test', password='test')
self.database = TMSSTestDatabaseInstance()
self.django_server = TMSSDjangoServerInstance(db_dbcreds_id=self.database.dbcreds_id,
ldap_dbcreds_id=self.ldap_server.dbcreds_id,
host=host,
port=find_free_port(preferred_django_port),
public_host=public_host)
self.client_credentials = TemporaryCredentials(user=self.ldap_server.dbcreds.user,
password=self.ldap_server.dbcreds.password)
# the ra_test_environment is needed by some depending services, so start it when any depending service is started, even if start_postgres_listener==False
self._start_ra_test_environment = start_ra_test_environment or start_subtask_scheduler or start_dynamic_scheduler
self.ra_test_environment = None
# the postgres_listener is needed by some depending services, so start it when any depending service is started, even if start_postgres_listener==False
self._start_postgres_listener = start_postgres_listener or start_subtask_scheduler or start_dynamic_scheduler
self.postgres_listener = None
self._start_subtask_scheduler = start_subtask_scheduler
self.subtask_scheduler = None
self._start_dynamic_scheduler = start_dynamic_scheduler
self.dynamic_scheduler = None
self.enable_viewflow = enable_viewflow or start_workflow_service
self._start_workflow_service = start_workflow_service
self.workflow_service = None
os.environ['TMSS_ENABLE_VIEWFLOW'] = str(bool(self.enable_viewflow))
# Check for correct Django version, should be at least 3.0
if django.VERSION[0] < 3:
print("\nWARNING: YOU ARE USING DJANGO VERSION '%s', WHICH WILL NOT SUPPORT ALL FEATURES IN TMSS!\n" %
django.get_version())
def start(self):
self.ldap_server.start()
self.database.create()
self.django_server.start()
# store client credentials in the TemporaryCredentials file...
self.client_credentials.dbcreds.host = self.django_server.public_host
self.client_credentials.dbcreds.port = self.django_server.port
self.client_credentials.dbcreds.type = "http"
self.client_credentials.create()
# ... and set TMSS_CLIENT_DBCREDENTIALS environment variable, sp anybody or anything (any test) can use it automagically
os.environ['TMSS_CLIENT_DBCREDENTIALS'] = self.client_credentials.dbcreds_id
# apart from the running django server with a REST API,
# it is also convenient to provide a working django setup for the 'normal' django API (via models.objects)
# so: do setup_django
self.django_server.setup_django()
# now that the ldap and django server are running, and the django set has been done,
# we can announce our test user as superuser, so the test user can do anythin via the API.
# (there are also other tests, using other (on the fly created) users with restricted permissions, which is fine but not part of this generic setup.
from django.contrib.auth.models import User
user, _ = User.objects.get_or_create(username=self.ldap_server.dbcreds.user)
user.is_superuser = True
user.save()
if self._start_ra_test_environment:
self.ra_test_environment = RATestEnvironment(exchange=self._exchange, broker=self._broker)
self.ra_test_environment.start()
if self._start_postgres_listener:
# start the TMSSPGListener, so the changes in the database are posted as EventMessages on the bus
from lofar.sas.tmss.services.tmss_postgres_listener import TMSSPGListener
self.postgres_listener = TMSSPGListener(exchange=self._exchange, broker=self._broker, dbcreds=self.database.dbcreds)
self.postgres_listener.start()
if self._start_subtask_scheduler:
from lofar.sas.tmss.services.scheduling.subtask_scheduling import create_subtask_scheduling_service
self.subtask_scheduler = create_subtask_scheduling_service(exchange=self._exchange, broker=self._broker)
self.subtask_scheduler.start_listening()
if self._start_dynamic_scheduler:
from lofar.sas.tmss.services.scheduling.dynamic_scheduling import create_dynamic_scheduling_service, models
# by default, dynamic scheduling is disabled in TMSS.
# In this test environment, we do want to have it enabled. Why else would we wanna start this service?
setting = models.Setting.objects.get(name=models.Flag.Choices.DYNAMIC_SCHEDULING_ENABLED.value)
setting.value = True
setting.save()
self.dynamic_scheduler = create_dynamic_scheduling_service(exchange=self._exchange, broker=self._broker)
self.dynamic_scheduler.start_listening()
if self._start_workflow_service:
from lofar.sas.tmss.services.workflow_service import create_workflow_service
self.workflow_service = create_workflow_service(exchange=self._exchange, broker=self._broker)
self.workflow_service.start_listening()
if self._populate_schemas or self._populate_test_data:
self.populate_schemas()
if self._populate_test_data:
self.populate_test_data()
def stop(self):
if self.workflow_service is not None:
BusListenerJanitor.stop_listening_and_delete_queue(self.workflow_service)
self.workflow_service = None
if self.postgres_listener is not None:
self.postgres_listener.stop()
self.postgres_listener = None
if self.subtask_scheduler is not None:
BusListenerJanitor.stop_listening_and_delete_queue(self.subtask_scheduler)
self.subtask_scheduler = None
if self.dynamic_scheduler is not None:
BusListenerJanitor.stop_listening_and_delete_queue(self.dynamic_scheduler)
self.dynamic_scheduler = None
if self.ra_test_environment is not None:
self.ra_test_environment.stop()
self.ra_test_environment = None
self.django_server.stop()
self.ldap_server.stop()
self.database.destroy()
self.client_credentials.destroy()
def __enter__(self):
try:
self.start()
except Exception as e:
logger.error(e)
self.stop()
raise
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
def populate_schemas(self):
# populate the items that rely on a running REST API server (which cannot be populated via the django model.objects API)
from lofar.sas.tmss.client.populate import populate_schemas
populate_schemas()
# the connectors rely on the schemas to be populated first (above)
from lofar.sas.tmss.tmss.tmssapp.populate import populate_connectors
populate_connectors()
def populate_test_data(self):
from lofar.sas.tmss.tmss.tmssapp.populate import populate_test_data
populate_test_data()
def create_tmss_client(self):
return TMSSsession.create_from_dbcreds_for_ldap(self.client_credentials.dbcreds_id)
def main_test_database():
"""instantiate, run and destroy a test postgress django database"""
os.environ['TZ'] = 'UTC'
logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s', level = logging.INFO)
with TMSSTestDatabaseInstance() as db:
# print some nice info for the user to use the test servers...
# use print instead of log for clean lines.
for h in logging.root.handlers:
h.flush()
print()
print()
print("**********************************")
print("Test-TMSS database up and running.")
print("**********************************")
print("DB Credentials ID: %s (for example to run tmms against this test db, call 'tmss -C %s')" % (db.dbcreds_id, db.dbcreds_id))
print()
print("Press Ctrl-C to exit (and remove the test database automatically)")
waitForInterrupt()
def main_test_environment():
"""instantiate, run and destroy a full tmss test environment (postgress database, ldap server, django server)"""
from optparse import OptionParser, OptionGroup
os.environ['TZ'] = 'UTC'
parser = OptionParser('%prog [options]',
description='setup/run/teardown a full TMSS test environment including a fresh and isolated database, LDAP server and DJANGO REST server.')
group = OptionGroup(parser, 'Network')
parser.add_option_group(group)
group.add_option("-H", "--host", dest="host", type="string", default='0.0.0.0',
help="serve the TMSS Django REST API server via this host. [default=%default]")
group.add_option("-p", "--port", dest="port", type="int", default=find_free_port(8000),
help="try to use this port for the DJANGO REST API. If not available, then a random free port is used and logged. [default=%default]")
group.add_option("-P", "--public_host", dest="public_host", type="string", default='127.0.0.1',
help="expose the TMSS Django REST API via this host. [default=%default]")
group = OptionGroup(parser, 'Example/Test data, schemas and services',
description='Options to enable/create example/test data, schemas and services. ' \
'Without these options you get a lean and mean TMSS test environment, but then you need to run the background services and create test data yourself. ' \
'For standalone commissioning/testing/playing around you need all these options, use --all for that as a convenience.')
parser.add_option_group(group)
group.add_option('-d', '--data', dest='data', action='store_true', help='populate the test-database with test/example data. This implies -s/--schemas because these schemas are needed to create test data.')
group.add_option('-s', '--schemas', dest='schemas', action='store_true', help='populate the test-database with the TMSS JSON schemas')
group.add_option('-m', '--eventmessages', dest='eventmessages', action='store_true', help='Send event messages over the messagebus for changes in the TMSS database (for (sub)tasks/scheduling_units etc).')
group.add_option('-r', '--ra_test_environment', dest='ra_test_environment', action='store_true', help='start the Resource Assigner test environment which enables scheduling.')
group.add_option('-S', '--scheduling', dest='scheduling', action='store_true', help='start the TMSS background scheduling services for dynamic scheduling of schedulingunits and subtask scheduling of chains of dependend subtasks.')
group.add_option('-v', '--viewflow', dest='viewflow', action='store_true', help='Enable the viewflow app for workflows on top of TMSS')
group.add_option('--all', dest='all', action='store_true', help='Enable/Start all the services, upload schemas and testdata')
group = OptionGroup(parser, 'Messaging options')
parser.add_option_group(group)
group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default')
group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="Bus or queue where the TMSS messages are published. [default: %default]")
(options, args) = parser.parse_args()
logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s', level = logging.INFO)
with TMSSTestEnvironment(host=options.host, preferred_django_port=options.port, public_host=options.public_host,
exchange=options.exchange, broker=options.broker,
populate_schemas=options.schemas or options.data or options.all,
populate_test_data=options.data or options.all,
start_ra_test_environment=options.ra_test_environment or options.all,
start_postgres_listener=options.eventmessages or options.scheduling or options.all,
start_subtask_scheduler=options.scheduling or options.all,
start_dynamic_scheduler=options.scheduling or options.all,
start_workflow_service=options.viewflow or options.all,
enable_viewflow=options.viewflow or options.all) as tmss_test_env:
# print some nice info for the user to use the test servers...
# use print instead of log for clean lines.
for h in logging.root.handlers:
h.flush()
print()
print()
print("*****************************************************")
print("Test-TMSS database, LDAP and Django up and running...")
print("*****************************************************")
print("DB Credentials ID: %s" % (tmss_test_env.database.dbcreds_id, ))
print("LDAP Credentials ID: %s" % (tmss_test_env.django_server.ldap_dbcreds_id, ))
print("TMSS Client Credentials ID: %s" % (tmss_test_env.client_credentials.dbcreds_id, ))
print("Django URL: %s" % (tmss_test_env.django_server.url))
print()
print("Example cmdlines to run tmss or tmss_manage_django:")
print("TMSS_DBCREDENTIALS=%s TMSS_LDAPCREDENTIALS=%s tmss" % (tmss_test_env.database.dbcreds_id, tmss_test_env.django_server.ldap_dbcreds_id))
print("TMSS_DBCREDENTIALS=%s TMSS_LDAPCREDENTIALS=%s tmss_manage_django" % (tmss_test_env.database.dbcreds_id, tmss_test_env.django_server.ldap_dbcreds_id))
print()
print("Example cmdline to run tmss client call:")
print("TMSS_CLIENT_DBCREDENTIALS=%s tmss_set_subtask_state <id> <state>" % (tmss_test_env.client_credentials.dbcreds_id, ))
print()
print("Press Ctrl-C to exit (and remove the test database and django server automatically)")
waitForInterrupt()
def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int, stop_event: threading.Event,
handle_observations: bool = True, handle_pipelines: bool = True,
handle_QA: bool = True, handle_ingest: bool = True,
delay: float=1, duration: float=5,
create_output_dataproducts: bool=False,
exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
'''
create a "simulator" which sets the correct events in the correct order upon receiving status change events,
and which uploads simulated feedback upon finishing. Can be used to simulate a 'run' of a scheduling_unit without
doing the actual observation/pipeline/QA/ingest.
'''
from lofar.sas.tmss.client.tmssbuslistener import TMSSEventMessageHandler, TMSSBusListener
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp.subtasks import schedule_subtask_and_update_successor_start_times, update_start_time_and_shift_successors_until_after_stop_time
from lofar.common.json_utils import get_default_json_object_for_schema
from lofar.sas.tmss.tmss.exceptions import SubtaskSchedulingException
from datetime import datetime, timedelta
from time import sleep
from uuid import uuid4
class SimulationEventHandler(TMSSEventMessageHandler):
def __init__(self, scheduling_unit_blueprint_id: int, stop_event: threading.Event,
handle_observations: bool = True, handle_pipelines: bool = True,
handle_QA: bool = True, handle_ingest: bool = True,
delay: float = 1, duration: float = 10,
create_output_dataproducts: bool=False) -> None:
super().__init__(log_event_messages=False)
self.scheduling_unit_blueprint_id = scheduling_unit_blueprint_id
self.stop_event = stop_event
self.handle_observations = handle_observations
self.handle_pipelines = handle_pipelines
self.handle_QA = handle_QA
self.handle_ingest = handle_ingest
self.delay = delay
self.duration = duration
self.create_output_dataproducts = create_output_dataproducts
def need_to_handle(self, subtask: models.Subtask) -> bool:
if subtask.task_blueprint.scheduling_unit_blueprint.id != self.scheduling_unit_blueprint_id:
return False
if subtask.specifications_template.type.value == models.SubtaskType.Choices.OBSERVATION.value and not self.handle_observations:
return False
if subtask.specifications_template.type.value == models.SubtaskType.Choices.PIPELINE.value and not self.handle_pipelines:
return False
if subtask.specifications_template.type.value in [models.SubtaskType.Choices.QA_FILES.value,
models.SubtaskType.Choices.QA_PLOTS] and not self.handle_QA:
return False
if subtask.specifications_template.type.value == models.SubtaskType.Choices.INGEST.value and not self.handle_ingest:
return False
return True
def start_handling(self):
from lofar.common import isProductionEnvironment
if isProductionEnvironment():
raise RuntimeError("Do not use this tool to simulate running a scheduling_unit in a production environment!")
super().start_handling()
try:
# exit if already finished
scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=self.scheduling_unit_blueprint_id)
if scheduling_unit.status in ["finished", "error"]:
logger.info("scheduling_unit id=%s name='%s' has status=%s -> not simulating", scheduling_unit.id, scheduling_unit.name, scheduling_unit.status)
self.stop_event.set()
return
except models.SchedulingUnitBlueprint.DoesNotExist:
pass
# trick: trigger any already scheduled subtasks, cascading in events simulating the run
subtasks = models.Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint_id=self.scheduling_unit_blueprint_id)
for subtask in subtasks.filter(state__value=models.SubtaskState.Choices.SCHEDULED.value):
self.onSubTaskStatusChanged(subtask.id, "scheduled")
# schedule the defined subtasks, cascading in events simulating the run
self.schedule_independend_defined_subtasks_if_needed()
def schedule_independend_defined_subtasks_if_needed(self):
try:
scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=self.scheduling_unit_blueprint_id)
for task_blueprint in scheduling_unit.task_blueprints.all():
for subtask in task_blueprint.subtasks.filter(inputs=None,
state__value=models.SubtaskState.Choices.DEFINED.value).all():
if self.need_to_handle(subtask):
subtask.start_time = datetime.utcnow() + task_blueprint.relative_start_time
while subtask.state.value != models.SubtaskState.Choices.SCHEDULED.value:
try:
schedule_subtask_and_update_successor_start_times(subtask)
except SubtaskSchedulingException as e:
# try again, a bit later
subtask.state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.DEFINED.value)
update_start_time_and_shift_successors_until_after_stop_time(subtask, subtask.start_time + timedelta(hours=3))
if subtask.start_time - datetime.utcnow() > timedelta(days=1):
raise
except models.SchedulingUnitBlueprint.DoesNotExist:
pass
def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str):
if id == self.scheduling_unit_blueprint_id:
scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=id)
logger.info("scheduling_unit_blueprint id=%s name='%s' now has status='%s'", id, scheduling_unit.name,
status)
if status == "schedulable":
self.schedule_independend_defined_subtasks_if_needed()
if status in ["finished", "error"]:
self.stop_event.set()
def onTaskBlueprintStatusChanged(self, id: int, status: str):
if id == self.scheduling_unit_blueprint_id:
task = models.TaskBlueprint.objects.get(id=id)
if task.scheduling_unit_blueprint.id == self.scheduling_unit_blueprint_id:
logger.info("task_blueprint_id id=%s name='%s' now has status='%s'", id, task.name, status)
def onSubTaskStatusChanged(self, id: int, status: str):
subtask = models.Subtask.objects.get(id=id)
if not self.need_to_handle(subtask):
return
logger.info("subtask id=%s type='%s' now has status='%s'", id, subtask.specifications_template.type.value,
status)
next_state = None
if status == models.SubtaskState.Choices.SCHEDULED.value:
next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.QUEUEING.value)
elif status == models.SubtaskState.Choices.QUEUEING.value:
next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.QUEUED.value)
elif status == models.SubtaskState.Choices.QUEUED.value:
next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.STARTING.value)
elif status == models.SubtaskState.Choices.STARTING.value:
next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.STARTED.value)
elif status == models.SubtaskState.Choices.STARTED.value:
sleep(self.duration - self.delay) # mimic a running duration
next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.FINISHING.value)
elif status == models.SubtaskState.Choices.FINISHING.value:
next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.FINISHED.value)
if subtask.specifications_template.type.value in [models.SubtaskType.Choices.OBSERVATION.value,
models.SubtaskType.Choices.PIPELINE.value]:
if self.create_output_dataproducts:
for output_dp in subtask.output_dataproducts.all():
os.makedirs(output_dp.directory, exist_ok=True)
logger.info('writing 1KB test dataproduct for subtask id=%s %s', subtask.id, output_dp.filepath)
with open(output_dp.filepath, 'w') as file:
file.write(1024 * 'a')
# create some nice default (and thus correct although not scientifically meaningful) feedback
template = models.DataproductFeedbackTemplate.objects.get(name="feedback")
feedback_doc = get_default_json_object_for_schema(template.schema)
feedback_doc['frequency']['subbands'] = [0]
feedback_doc['frequency']['central_frequencies'] = [1]
for output_dp in subtask.output_dataproducts:
output_dp.feedback_template = template
output_dp.feedback_doc = feedback_doc
output_dp.save()
elif subtask.specifications_template.type.value == models.SubtaskType.Choices.INGEST.value:
project_name = subtask.task_blueprint.draft.scheduling_unit_draft.scheduling_set.project.name
for output_dp in subtask.output_dataproducts:
try:
# copy feedback from ingest-subtask-input-dp
input_dp = subtask.get_transformed_input_dataproduct(output_dp.id)
feedback_template = input_dp.feedback_template
feedback_doc = input_dp.feedback_doc
except models.Subtask.DoesNotExist:
feedback_template = models.DataproductFeedbackTemplate.objects.get(name="empty")
feedback_doc = get_default_json_object_for_schema(feedback_template.schema)
output_dp.size = 1024
output_dp.directory = "srm://some.lta.site/project/%s/%s/" % (project_name, subtask.id)
output_dp.feedback_template = feedback_template
output_dp.feedback_doc = feedback_doc
output_dp.save()
models.DataproductArchiveInfo.objects.create(dataproduct=output_dp, storage_ticket=uuid4())
for algo in models.Algorithm.objects.all():
models.DataproductHash.objects.create(dataproduct=output_dp, algorithm=algo, hash=uuid4())
if next_state:
sleep(self.delay) # mimic a little 'processing' delay
logger.info("Simulating subtask id=%d type='%s' by proceeding from state='%s' to state='%s'...",
subtask.id, subtask.specifications_template.type.value, subtask.state.value, next_state)
subtask.state = next_state
subtask.save()
return TMSSBusListener(SimulationEventHandler,
handler_kwargs={'scheduling_unit_blueprint_id': scheduling_unit_blueprint_id,
'stop_event': stop_event,
'handle_observations': handle_observations, 'handle_pipelines': handle_pipelines,
'handle_QA': handle_QA, 'handle_ingest': handle_ingest,
'create_output_dataproducts': create_output_dataproducts,
'delay': delay, 'duration': duration},
exchange=exchange, broker=broker)
def main_scheduling_unit_blueprint_simulator():
'''run a "simulator" which sets the correct events in the correct order upon receiving status change events,
and which uploads simulated feedback upon finishing. Can be used to simulate a 'run' of a scheduling_unit without
doing the actual observation/pipeline/QA/ingest.
'''
# make sure we run in UTC timezone
os.environ['TZ'] = 'UTC'
from optparse import OptionParser, OptionGroup
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
# Check the invocation arguments
parser = OptionParser('%prog [options] <scheduling_unit_blueprint_id>',
description='Mimic runnning a scheduling unit but scheduling/queueing/running/finishing all its (sub)tasks in the correct order and creating default feedback.')
group = OptionGroup(parser, 'Subtask Types', description="Simulate the event for the folling types, or all if no specific type is specified.")
parser.add_option_group(group)
group.add_option('-o', '--observation', dest='observation', action='store_true', help='simulate events for observation subtasks')
group.add_option('-p', '--pipeline', dest='pipeline', action='store_true', help='simulate events for pipeline subtasks')
group.add_option('-Q', '--QA', dest='QA', action='store_true', help='simulate events for QA subtasks')
group.add_option('-i', '--ingest', dest='ingest', action='store_true', help='simulate events for ingest subtasks')
group = OptionGroup(parser, 'Simulation parameters')
parser.add_option_group(group)
group.add_option('-e', '--event_delay', dest='event_delay', type='float', default=1.0, help='wait <event_delay> seconds between simulating events to mimic real-world behaviour, default: %default')
group.add_option('-d', '--duration', dest='duration', type='float', default=60.0, help='wait <duration> seconds while "observing"/"processing" between started and finishing state to mimic real-world behaviour, default: %default')
group = OptionGroup(parser, 'Messaging options')
parser.add_option_group(group)
group.add_option('--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the messaging broker, default: %default')
group.add_option('--exchange', dest='exchange', type='string', default=DEFAULT_BUSNAME, help='Name of the exchange on the messaging broker, default: %default')
group = OptionGroup(parser, 'Django options')
parser.add_option_group(group)
group.add_option('-C', '--credentials', dest='dbcredentials', type='string', default=os.environ.get('TMSS_DBCREDENTIALS', 'TMSS'), help='django dbcredentials name, default: %default')
(options, args) = parser.parse_args()
if len(args) != 1:
parser.print_usage()
exit(1)
scheduling_unit_blueprint_id = int(args[0])
if not (options.observation or options.pipeline or options.QA or options.ingest):
options.observation = True
options.pipeline = True
options.QA = True
options.ingest = True
# setup django
os.environ["TMSS_DBCREDENTIALS"] = options.dbcredentials
os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings"
import django
django.setup()
dbcreds = DBCredentials().get(options.dbcredentials)
logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword())
stop_event = threading.Event()
with BusListenerJanitor(create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id, stop_event=stop_event,
delay=options.event_delay, duration=options.duration,
handle_observations=bool(options.observation), handle_pipelines=bool(options.pipeline),
handle_QA=bool(options.QA), handle_ingest=bool(options.ingest),
exchange=options.exchange, broker=options.broker)):
stop_event.wait()
if __name__ == '__main__':
main_test_environment()