Select Git revision
test_utils.py
TMSS-719: claim project roles from OIDC/Keycloak and adapt user model to hold them
Jörn Künsemöller authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_utils.py 55.64 KiB
#!/usr/bin/env python3
# Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
# $Id: $
import os
import time
import datetime
from multiprocessing import Process, Event
import django
import logging
logger = logging.getLogger(__name__)
import threading
from lofar.common.testing.postgres import PostgresTestMixin, PostgresTestDatabaseInstance
from lofar.common.dbcredentials import Credentials, DBCredentials
from lofar.common.util import find_free_port, waitForInterrupt
from lofar.sas.tmss.test.ldap_test_service import TestLDAPServer
from lofar.sas.tmss.tmss.exceptions import TMSSException
from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.messaging.messagebus import BusListenerJanitor
from lofar.common.testing.dbcredentials import TemporaryCredentials
from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession
from lofar.sas.resourceassignment.resourceassigner.test.ra_test_environment import RATestEnvironment
def assertDataWithUrls(self, data, expected):
"""
object instances get returned as urls, check that the value is part of that url
"""
# TODO: Make this smarter, this only checks for matching pk!
from django.db import models
for k, v in expected.items():
if isinstance(v, models.Model):
v = str(v.pk)
v = v.replace(' ', '%20')
err_msg = "The value '%s' (key is %s) is not in expected %s" % (str(v), str(data[k]), k)
self.assertTrue(str(v) in data[k], err_msg)
elif isinstance(v, datetime.datetime):
# URL (data[k]) is string but the test_data object (v) is datetime format, convert latter to string format to compare
self.assertEqual(v.isoformat(), data[k])
else:
self.assertEqual(v, data[k])
def assertUrlList(self, url_list, expected_objects):
"""
object instances get returned as urls, check that the expected projects are in that list
"""
# TODO: Make this smarter, this only checks for matching pk!
from django.db import models
self.assertEqual(len(url_list), len(expected_objects))
for v in expected_objects:
if isinstance(v, models.Model):
v = str(v.pk)
v = v.replace(' ', '%20')
self.assertTrue(any(str(v) in myurl for myurl in url_list))
else:
raise ValueError('Expected item is not a Django model instance: %s' % v)
class TMSSTestDatabaseInstance(PostgresTestDatabaseInstance):
'''
Creates an isolated postgres database instance and initializes the database with a django tmss migration.
Destroys the isolated postgres database instance upon exit automagically.
'''
def __init__(self, dbcreds_id: str=None) -> None:
super().__init__(user='test_tmss_user', dbcreds_id=dbcreds_id)
def apply_database_schema(self):
logger.info('applying TMSS sql schema to %s', self.dbcreds)
# a TMSSTestDatabaseInstance needs to run in a clean env,
# with these variables set to the current test values.
import os
os.environ["TMSS_DBCREDENTIALS"] = self.dbcreds_id
os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings"
# run migrate in a seperate process so the needed django setup does not pollute our current apps environment
def _migrate_helper():
# use django management modules to apply database schema via initial migration
import django
django.setup()
django.core.management.call_command('migrate')
migrate_process = Process(target=_migrate_helper, daemon=True)
migrate_process.start()
migrate_process.join()
if migrate_process.exitcode != 0:
raise TMSSException("Could not initialize TMSS database with django migrations")
def minimal_json_schema(title:str="my title", description:str="my description", id:str="http://example.com/foo/bar.json", properties:dict={}, required=[]):
return {"$schema": "http://json-schema.org/draft-06/schema#",
"$id": id,
"title": title,
"description": description,
"type": "object",
"properties": properties,
"required": required,
"default": {}
}
class TMSSPostgresTestMixin(PostgresTestMixin):
'''
A common test mixin class from which you can derive to get a freshly setup postgres testing instance with the latest TMSS sql schema.
'''
@classmethod
def create_test_db_instance(cls) -> TMSSTestDatabaseInstance:
return TMSSTestDatabaseInstance()
class TMSSDjangoServerInstance():
''' Creates a running django TMSS server at the requested port with the requested database credentials.
'''
def __init__(self, db_dbcreds_id: str="TMSS", ldap_dbcreds_id: str="TMSS_LDAP", host: str='127.0.0.1', port: int=8000, public_host: str=None, skip_startup_checks: bool=True):
self._db_dbcreds_id = db_dbcreds_id
self._ldap_dbcreds_id = ldap_dbcreds_id
self.host = host
self.port = port
self.public_host = public_host or host
self._skip_startup_checks = skip_startup_checks
self._server_process = None
@property
def host_address(self):
''':returns the address and port of the django server'''
return "%s:%d" % (self.host, self.port)
@property
def address(self):
''':returns the public address and port of the django server'''
return "%s:%d" % (self.public_host, self.port)
@property
def url(self):
''':returns the http url to the django server'''
return "http://%s/api/" % self.address
@property
def oidc_url(self):
''':returns the http url to the django server'''
return "http://%s/oidc/" % self.address
@property
def database_dbcreds_id(self) -> str:
''':returns the uuid of the temporary database credentials'''
return self._db_dbcreds_id
@property
def database_dbcreds(self) -> Credentials:
''':returns the temporary database Credentials'''
return DBCredentials().get(self._db_dbcreds_id)
@property
def ldap_dbcreds_id(self) -> str:
''':returns the uuid of the temporary LDAP server credentials'''
return self._ldap_dbcreds_id
@property
def ldap_dbcreds(self) -> Credentials:
''':returns the temporary LDAP Credentials'''
return DBCredentials().get(self._ldap_dbcreds_id)
def setup_django(self):
# (tmss)django is initialized via many environment variables.
# set these here, run django setup, and start the server
os.environ["TMSS_LDAPCREDENTIALS"] = self.ldap_dbcreds_id
from lofar.sas.tmss.tmss import setup_tmss_django
setup_tmss_django(self.database_dbcreds_id)
def start(self):
'''
Start the Django server with a test-LDAP server in the background.
Best used in a 'with'-context
'''
def _helper_runserver_loop():
logger.info("Starting Django server at port=%d with database: %s and LDAP: %s",
self.port, self.database_dbcreds, self.ldap_dbcreds)
self.setup_django()
try:
if self._skip_startup_checks:
# quick start a simple WSGIServer and don't do any checks.
# This saves startup time, but assumes the settings, database and migrations are valid.
from django.core.servers.basehttp import WSGIServer, get_internal_wsgi_application, run
run(self.host, self.port, get_internal_wsgi_application(), ipv6=False, threading=True, server_cls=WSGIServer)
else:
# start the django server via the "normal" django runserver command, including many startup checks
django.core.management.call_command('runserver', use_reloader=False, addrport=self.host_address)
except KeyboardInterrupt:
logger.info("Exiting django TMSS server loop...")
self._server_process = Process(target=_helper_runserver_loop, daemon=True)
self._server_process.start()
# wait for server to be up and running....
# or exit via TimeoutError
self.check_running_server(timeout=60)
def stop(self):
'''
Stop the running Django and LDAP servers.
'''
if self._server_process is not None:
logger.info("Stopping Django server...")
try:
self._server_process.kill() # new in python 3.7
except AttributeError:
self._server_process.terminate() # < python 3.7
self._server_process = None
logger.info("Django server stopped.")
def check_running_server(self, timeout: float = 10) -> bool:
'''Check the running django server for a valid response'''
import requests
from _datetime import datetime, timedelta
start = datetime.utcnow()
while True:
try:
logger.info("Checking if TMSS Django server is up and running at %s with database: %s and LDAP: %s ....",
self.url, self.database_dbcreds, self.ldap_dbcreds)
response = requests.get(self.url, auth=(self.ldap_dbcreds.user, self.ldap_dbcreds.password), timeout=max(1, timeout/10))
if response.status_code in [200, 401, 403]:
logger.info("TMSS Django server is up and running at %s with database: %s and LDAP: %s",
self.url, self.database_dbcreds, self.ldap_dbcreds)
if response.status_code in [401, 403]:
logger.warning("TMSS Django server at %s could not autenticate with LDAP creds: %s", self.url, self.ldap_dbcreds)
# TODO: logout, otherwise django remembers our login session.
return True
except Exception as e:
time.sleep(0.5)
if datetime.utcnow() - start > timedelta(seconds=timeout):
raise TimeoutError("Could not get a valid response from the django server at %s within %s seconds" % (self.url,timeout))
def __enter__(self):
try:
self.start()
except Exception as e:
logger.error(e)
self.stop()
raise
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
class TMSSTestEnvironment:
'''Create and run a test django TMSS server against a newly created test database and a test ldap server (and cleanup automagically)'''
def __init__(self, host: str='127.0.0.1', preferred_django_port: int=8000, public_host: str=None, skip_startup_checks: bool=True,
exchange: str=os.environ.get("TMSS_EXCHANGE", DEFAULT_BUSNAME), broker: str=os.environ.get("TMSS_BROKER", DEFAULT_BROKER),
populate_schemas:bool=False, populate_test_data:bool=False, populate_permissions=False,
start_ra_test_environment: bool=False, start_postgres_listener: bool=False,
start_subtask_scheduler: bool=False, start_dynamic_scheduler: bool=False,
start_pipeline_control: bool=False, start_websocket: bool=False,
start_feedback_service: bool=False,
start_workflow_service: bool=False, enable_viewflow: bool=False,
start_precalculations_service: bool=False,
ldap_dbcreds_id: str=None, db_dbcreds_id: str=None, client_dbcreds_id: str=None):
self._exchange = exchange
self._broker = broker
self._populate_schemas = populate_schemas or populate_test_data
self._populate_test_data = populate_test_data
self.ldap_server = TestLDAPServer(user='test', password='test', dbcreds_id=ldap_dbcreds_id)
self.database = TMSSTestDatabaseInstance(dbcreds_id=db_dbcreds_id)
self._populate_permissions = populate_permissions
self.django_server = TMSSDjangoServerInstance(db_dbcreds_id=self.database.dbcreds_id,
ldap_dbcreds_id=self.ldap_server.dbcreds_id,
host=host,
port=find_free_port(preferred_django_port),
public_host=public_host,
skip_startup_checks=skip_startup_checks)
self.client_credentials = TemporaryCredentials(user=self.ldap_server.dbcreds.user,
password=self.ldap_server.dbcreds.password, dbcreds_id=client_dbcreds_id)
# the ra_test_environment is needed by some depending services, so start it when any depending service is started, even if start_postgres_listener==False
self._start_ra_test_environment = start_ra_test_environment or start_subtask_scheduler or start_dynamic_scheduler
self.ra_test_environment = None
# the postgres_listener is needed by some depending services, so start it when any depending service is started, even if start_postgres_listener==False
self._start_postgres_listener = start_postgres_listener or start_subtask_scheduler or start_dynamic_scheduler
self.postgres_listener = None
self._start_subtask_scheduler = start_subtask_scheduler
self.subtask_scheduler = None
self._start_dynamic_scheduler = start_dynamic_scheduler
self.dynamic_scheduler = None
self._start_pipeline_control = start_pipeline_control
self.pipeline_control = None
self._start_websocket = start_websocket
self.websocket_service = None
self._start_feedback_service = start_feedback_service
self.feedback_service = None
self.enable_viewflow = enable_viewflow or start_workflow_service
self._start_workflow_service = start_workflow_service
self.workflow_service = None
os.environ['TMSS_ENABLE_VIEWFLOW'] = str(bool(self.enable_viewflow))
self._start_precalculations_service = start_precalculations_service
self.precalculations_service = None
# Check for correct Django version, should be at least 3.0
if django.VERSION[0] < 3:
print("\nWARNING: YOU ARE USING DJANGO VERSION '%s', WHICH WILL NOT SUPPORT ALL FEATURES IN TMSS!\n" %
django.get_version())
def start(self):
starttime = datetime.datetime.utcnow()
#start ldapserver and database in parallel in the background (because to are independent of each other, and this saves startup wait time)
ldap_server_thread = threading.Thread(target=self.ldap_server.start)
ldap_server_thread.start()
database_thread = threading.Thread(target=self.database.create)
database_thread.start()
# wait until both are started/created
ldap_server_thread.join()
database_thread.join()
# now start the django_server
self.django_server.start()
# store client credentials in the TemporaryCredentials file...
self.client_credentials.dbcreds.host = self.django_server.public_host
self.client_credentials.dbcreds.port = self.django_server.port
self.client_credentials.dbcreds.type = "http"
self.client_credentials.create_if_not_existing()
# ... and set TMSS_CLIENT_DBCREDENTIALS environment variable, sp anybody or anything (any test) can use it automagically
os.environ['TMSS_CLIENT_DBCREDENTIALS'] = self.client_credentials.dbcreds_id
# apart from the running django server with a REST API,
# it is also convenient to provide a working django setup for the 'normal' django API (via models.objects)
# so: do setup_django
self.django_server.setup_django()
# now that the ldap and django server are running, and the django set has been done,
# we can announce our test user as superuser, so the test user can do anythin via the API.
# (there are also other tests, using other (on the fly created) users with restricted permissions, which is fine but not part of this generic setup.
from django.contrib.auth import get_user_model
User = get_user_model()
user, _ = User.objects.get_or_create(username=self.ldap_server.dbcreds.user)
user.is_superuser = True
user.save()
logger.info("started TMSSTestEnvironment ldap/database/django in %.1fs", (datetime.datetime.utcnow()-starttime).total_seconds())
# start all (needed) services in background threads, keep track of them.
service_threads = []
if self._start_ra_test_environment:
self.ra_test_environment = RATestEnvironment(exchange=self._exchange, broker=self._broker)
service_threads.append(threading.Thread(target=self.ra_test_environment.start))
service_threads[-1].start()
if self._start_postgres_listener:
# start the TMSSPGListener, so the changes in the database are posted as EventMessages on the bus
from lofar.sas.tmss.services.tmss_postgres_listener import TMSSPGListener
self.postgres_listener = TMSSPGListener(exchange=self._exchange, broker=self._broker, dbcreds=self.database.dbcreds)
service_threads.append(threading.Thread(target=self.postgres_listener.start))
service_threads[-1].start()
if self._start_websocket:
# start the websocket service, so the changes in the database are posted (via the messagebus) to an http web socket
# this implies that _start_pg_listener should be true as well
self._start_pg_listener = True
from lofar.sas.tmss.services.websocket_service import create_service as create_websocket_service, DEFAULT_WEBSOCKET_PORT
self.websocket_service = create_websocket_service(exchange=self._exchange, broker=self._broker, websocket_port=find_free_port(DEFAULT_WEBSOCKET_PORT))
service_threads.append(threading.Thread(target=self.websocket_service.start_listening))
service_threads[-1].start()
if self._start_subtask_scheduler:
from lofar.sas.tmss.services.scheduling.subtask_scheduling import create_subtask_scheduling_service
self.subtask_scheduler = create_subtask_scheduling_service(exchange=self._exchange, broker=self._broker, tmss_client_credentials_id=self.client_credentials.dbcreds_id)
service_threads.append(threading.Thread(target=self.subtask_scheduler.start_listening()))
service_threads[-1].start()
if self._start_dynamic_scheduler:
from lofar.sas.tmss.services.scheduling.dynamic_scheduling import create_dynamic_scheduling_service, models
# beware: by default, dynamic scheduling is disabled in TMSS.
# so, even if we start the service, even then the dynamic scheduling is disable in the settings.
self.dynamic_scheduler = create_dynamic_scheduling_service(exchange=self._exchange, broker=self._broker)
service_threads.append(threading.Thread(target=self.dynamic_scheduler.start_listening))
service_threads[-1].start()
if self._start_workflow_service:
from lofar.sas.tmss.services.workflow_service import create_workflow_service
self.workflow_service = create_workflow_service(exchange=self._exchange, broker=self._broker)
service_threads.append(threading.Thread(target=self.workflow_service.start_listening))
service_threads[-1].start()
if self._start_feedback_service:
try:
from lofar.sas.tmss.services.feedback_handling import create_service as create_feedback_service
self.feedback_service = create_feedback_service(exchange=self._exchange, broker=self._broker)
service_threads.append(threading.Thread(target=self.feedback_service.start_listening))
service_threads[-1].start()
except Exception as e:
logger.exception(e)
# wait for all services to be fully started in their background threads
for thread in service_threads:
thread.join()
logger.info("started TMSSTestEnvironment ldap/database/django + services in %.1fs", (datetime.datetime.utcnow()-starttime).total_seconds())
if self._populate_schemas or self._populate_test_data:
self.populate_schemas()
if self._populate_test_data:
self.populate_test_data()
if self._populate_permissions:
self.populate_permissions()
logger.info("started TMSSTestEnvironment ldap/database/django + services + schemas + data in %.1fs", (datetime.datetime.utcnow()-starttime).total_seconds())
# next service does not have a buslistener, it is just a simple time scheduler and currently rely and
# populated stations schema to retrieve all stations
if self._start_precalculations_service:
from lofar.sas.tmss.services.precalculations_service import create_service_job_for_sunrise_and_sunset_calculations
# For testpurposes we can use a smaller range and higher interval frequency
self.precalculations_service = \
create_service_job_for_sunrise_and_sunset_calculations(wait_time_seconds=60, nbr_days_calculate_ahead=3, nbr_days_before_today=1)
self.precalculations_service.start()
def stop(self):
if self.workflow_service is not None:
BusListenerJanitor.stop_listening_and_delete_queue(self.workflow_service)
self.workflow_service = None
if self.postgres_listener is not None:
self.postgres_listener.stop()
self.postgres_listener = None
if self.feedback_service is not None:
self.feedback_service.stop_listening()
self.feedback_service = None
if self.websocket_service is not None:
self.websocket_service.stop_listening()
self.websocket_service = None
if self.subtask_scheduler is not None:
BusListenerJanitor.stop_listening_and_delete_queue(self.subtask_scheduler)
self.subtask_scheduler = None
if self.dynamic_scheduler is not None:
BusListenerJanitor.stop_listening_and_delete_queue(self.dynamic_scheduler)
self.dynamic_scheduler = None
if self.ra_test_environment is not None:
self.ra_test_environment.stop()
self.ra_test_environment = None
if self.precalculations_service is not None:
self.precalculations_service.stop()
self.precalculations_service = None
self.django_server.stop()
self.ldap_server.stop()
self.database.destroy()
self.client_credentials.destroy_if_not_existing_upon_creation()
def __enter__(self):
try:
self.start()
except Exception as e:
logger.error(e)
self.stop()
raise
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
def populate_schemas(self):
# populate the items that rely on a running REST API server (which cannot be populated via the django model.objects API)
from lofar.sas.tmss.client.populate import populate_schemas
populate_schemas()
# the connectors rely on the schemas to be populated first (above)
from lofar.sas.tmss.tmss.tmssapp.populate import populate_connectors
populate_connectors()
def populate_test_data(self):
from lofar.sas.tmss.tmss.tmssapp.populate import populate_test_data
populate_test_data()
def populate_permissions(self):
from lofar.sas.tmss.tmss.tmssapp.populate import populate_permissions
populate_permissions()
def create_tmss_client(self):
return TMSSsession.create_from_dbcreds_for_ldap(self.client_credentials.dbcreds_id)
def create_test_data_creator(self):
from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator
return TMSSRESTTestDataCreator(self.django_server.url, (self.django_server.ldap_dbcreds.user, self.django_server.ldap_dbcreds.password))
def main_test_database():
"""instantiate, run and destroy a test postgress django database"""
os.environ['TZ'] = 'UTC'
logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s', level = logging.INFO)
from optparse import OptionParser, OptionGroup
parser = OptionParser('%prog [options]',
description='setup/run/teardown a full fresh, unique and isolated TMSS test database.')
group = OptionGroup(parser, 'Credentials options', description="By default a unique ID is created for the Postgres DB credentials to ensure that this TMSSTestDatabaseInstance is isolated and unique." \
"There are however also some use cases where we want to refer to a constant ID. These options enable that." \
"Please mind that these given credentials are still stored in a temporary credentials file which are deleted upon exit.")
parser.add_option_group(group)
group.add_option('-D', '--DB_ID', dest='DB_ID', type='string', default=None, help='Use this ID for the Postgres database instead of a generated unique id if None given. default: %default')
(options, args) = parser.parse_args()
with TMSSTestDatabaseInstance(dbcreds_id=options.DB_ID) as db:
# print some nice info for the user to use the test servers...
# use print instead of log for clean lines.
for h in logging.root.handlers:
h.flush()
print()
print()
print("**********************************")
print("Test-TMSS database up and running.")
print("**********************************")
print("DB Credentials ID: %s (for example to run tmms against this test db, call 'tmss -C %s')" % (db.dbcreds_id, db.dbcreds_id))
print()
print("Press Ctrl-C to exit (and remove the test database automatically)")
waitForInterrupt()
def main_test_environment():
"""instantiate, run and destroy a full tmss test environment (postgress database, ldap server, django server)"""
from optparse import OptionParser, OptionGroup
os.environ['TZ'] = 'UTC'
parser = OptionParser('%prog [options]',
description='setup/run/teardown a full TMSS test environment including a fresh and isolated database, LDAP server and DJANGO REST server.')
parser.add_option('--skip_startup_checks', dest='skip_startup_checks', action='store_true', help='skip startup checks, assuming your settings/database/migrations are valid.')
group = OptionGroup(parser, 'Network')
parser.add_option_group(group)
group.add_option("-H", "--host", dest="host", type="string", default='0.0.0.0',
help="serve the TMSS Django REST API server via this host. [default=%default]")
group.add_option("-p", "--port", dest="port", type="int", default=find_free_port(8000),
help="try to use this port for the DJANGO REST API. If not available, then a random free port is used and logged. [default=%default]")
group.add_option("-P", "--public_host", dest="public_host", type="string", default='127.0.0.1',
help="expose the TMSS Django REST API via this host. [default=%default]")
group = OptionGroup(parser, 'Example/Test data, schemas and services',
description='Options to enable/create example/test data, schemas and services. ' \
'Without these options you get a lean and mean TMSS test environment, but then you need to run the background services and create test data yourself. ' \
'For standalone commissioning/testing/playing around you need all these options, use --all for that as a convenience.')
parser.add_option_group(group)
group.add_option('-d', '--data', dest='data', action='store_true', help='populate the test-database with test/example data. This implies -s/--schemas because these schemas are needed to create test data.')
group.add_option('-s', '--schemas', dest='schemas', action='store_true', help='populate the test-database with the TMSS JSON schemas')
group.add_option('-M', '--permissions', dest='permissions', action='store_true', help='populate the test-database with the TMSS permissions')
group.add_option('-m', '--eventmessages', dest='eventmessages', action='store_true', help='Send event messages over the messagebus for changes in the TMSS database (for (sub)tasks/scheduling_units etc).')
group.add_option('-r', '--ra_test_environment', dest='ra_test_environment', action='store_true', help='start the Resource Assigner test environment which enables scheduling.')
group.add_option('-S', '--scheduling', dest='scheduling', action='store_true', help='start the TMSS background scheduling services for dynamic scheduling of schedulingunits and subtask scheduling of chains of dependend subtasks.')
group.add_option('-v', '--viewflow_app', dest='viewflow_app', action='store_true', help='Enable the viewflow app for workflows on top of TMSS')
group.add_option('-V', '--viewflow_service', dest='viewflow_service', action='store_true', help='Enable the viewflow service. Implies --viewflow_app and --eventmessages')
group.add_option('-w', '--websockets', dest='websockets', action='store_true', help='Enable json updates pushed via websockets')
group.add_option('-f', '--feedbackservice', dest='feedbackservice', action='store_true', help='Enable feedbackservice to handle feedback from observations/pipelines which comes in via the (old qpid) otdb messagebus.')
group.add_option('-C', '--precalculations_service', dest='precalculations_service', action='store_true', help='Enable the PreCalculations service')
group.add_option('--all', dest='all', action='store_true', help='Enable/Start all the services, upload schemas and testdata')
group.add_option('--simulate', dest='simulate', action='store_true', help='Simulate a run of the first example scheduling_unit (implies --data and --eventmessages and --ra_test_environment)')
group = OptionGroup(parser, 'Messaging options')
parser.add_option_group(group)
group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default')
group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="Bus or queue where the TMSS messages are published. [default: %default]")
group = OptionGroup(parser, 'Credentials options', description="By default a unique ID is created for the LDAP and Postgres DB credentials to ensure that this TMSSTestEnvironment is isolated and unique." \
"There are however also some use cases where we want to refer to a constant ID. These options enable that." \
"Please mind that these given credentials are still stored in a temporary credentials file which are deleted upon exit.")
parser.add_option_group(group)
group.add_option('-L', '--LDAP_ID', dest='LDAP_ID', type='string', default=None, help='Use this ID for the LDAP service instead of a generated unique id if None given. default: %default')
group.add_option('-D', '--DB_ID', dest='DB_ID', type='string', default=None, help='Use this ID for the Postgres database instead of a generated unique id if None given. default: %default')
group.add_option('-R', '--REST_CLIENT_ID', dest='REST_CLIENT_ID', type='string', default=None, help='Use this ID for the http REST client API instead of a generated unique id if None given. default: %default')
(options, args) = parser.parse_args()
if options.simulate:
options.data = True
options.eventmessages = True
options.ra_test_environment = True
logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s', level = logging.INFO)
with TMSSTestEnvironment(host=options.host, preferred_django_port=options.port, public_host=options.public_host,
skip_startup_checks=options.skip_startup_checks,
exchange=options.exchange, broker=options.broker,
populate_schemas=options.schemas or options.data or options.all,
populate_test_data=options.data or options.all,
populate_permissions=options.permissions or options.all,
start_ra_test_environment=options.ra_test_environment or options.all,
start_postgres_listener=options.eventmessages or options.scheduling or options.viewflow_service or options.all,
start_subtask_scheduler=options.scheduling or options.all,
start_dynamic_scheduler=options.scheduling or options.all,
start_websocket=options.websockets or options.all,
start_feedback_service=options.feedbackservice or options.all,
enable_viewflow=options.viewflow_app or options.viewflow_service or options.all,
start_workflow_service=options.viewflow_service or options.all,
start_precalculations_service=options.precalculations_service or options.all,
ldap_dbcreds_id=options.LDAP_ID, db_dbcreds_id=options.DB_ID, client_dbcreds_id=options.REST_CLIENT_ID) as tmss_test_env:
# print some nice info for the user to use the test servers...
# use print instead of log for clean lines.
for h in logging.root.handlers:
h.flush()
print()
print()
print("*****************************************************")
print("Test-TMSS database, LDAP and Django up and running...")
print("*****************************************************")
print("DB Credentials ID: %s" % (tmss_test_env.database.dbcreds_id, ))
print("LDAP Credentials ID: %s" % (tmss_test_env.django_server.ldap_dbcreds_id, ))
print("TMSS Client Credentials ID: %s" % (tmss_test_env.client_credentials.dbcreds_id, ))
print("Django URL: %s" % (tmss_test_env.django_server.url))
print()
print("Example cmdlines to run tmss or tmss_manage_django:")
print("TMSS_DBCREDENTIALS=%s TMSS_LDAPCREDENTIALS=%s tmss" % (tmss_test_env.database.dbcreds_id, tmss_test_env.django_server.ldap_dbcreds_id))
print("TMSS_DBCREDENTIALS=%s TMSS_LDAPCREDENTIALS=%s tmss_manage_django" % (tmss_test_env.database.dbcreds_id, tmss_test_env.django_server.ldap_dbcreds_id))
print()
print("Example cmdline to run tmss client call:")
print("TMSS_CLIENT_DBCREDENTIALS=%s tmss_set_subtask_state <id> <state>" % (tmss_test_env.client_credentials.dbcreds_id, ))
print()
print("Press Ctrl-C to exit (and remove the test database and django server automatically)")
if options.simulate:
stop_event = threading.Event()
with create_scheduling_unit_blueprint_simulator(1, stop_event=stop_event,
exchange=options.exchange, broker=options.broker):
try:
stop_event.wait()
except KeyboardInterrupt:
return
waitForInterrupt()
def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int, stop_event: threading.Event,
handle_observations: bool = True, handle_pipelines: bool = True,
handle_QA: bool = True, handle_ingest: bool = True,
auto_grant_ingest_permission: bool = True,
delay: float=1, duration: float=5,
create_output_dataproducts: bool=False,
exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
'''
create a "simulator" which sets the correct events in the correct order upon receiving status change events,
and which uploads simulated feedback upon finishing. Can be used to simulate a 'run' of a scheduling_unit without
doing the actual observation/pipeline/QA/ingest.
'''
from lofar.sas.tmss.client.tmssbuslistener import TMSSEventMessageHandler, TMSSBusListener
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp.subtasks import schedule_subtask_and_update_successor_start_times, update_start_time_and_shift_successors_until_after_stop_time
from lofar.common.json_utils import get_default_json_object_for_schema
from lofar.sas.tmss.tmss.exceptions import SubtaskSchedulingException
from datetime import datetime, timedelta
from time import sleep
from uuid import uuid4
class SimulationEventHandler(TMSSEventMessageHandler):
def __init__(self, scheduling_unit_blueprint_id: int, stop_event: threading.Event,
handle_observations: bool = True, handle_pipelines: bool = True,
handle_QA: bool = True, handle_ingest: bool = True,
delay: float = 1, duration: float = 10,
create_output_dataproducts: bool=False) -> None:
super().__init__(log_event_messages=False)
self.scheduling_unit_blueprint_id = scheduling_unit_blueprint_id
self.stop_event = stop_event
self.handle_observations = handle_observations
self.handle_pipelines = handle_pipelines
self.handle_QA = handle_QA
self.handle_ingest = handle_ingest
self.auto_grant_ingest_permission = auto_grant_ingest_permission
self.delay = delay
self.duration = duration
self.create_output_dataproducts = create_output_dataproducts
def need_to_handle(self, subtask: models.Subtask) -> bool:
if subtask.task_blueprint.scheduling_unit_blueprint.id != self.scheduling_unit_blueprint_id:
return False
if subtask.specifications_template.type.value == models.SubtaskType.Choices.OBSERVATION.value and not self.handle_observations:
return False
if subtask.specifications_template.type.value == models.SubtaskType.Choices.PIPELINE.value and not self.handle_pipelines:
return False
if subtask.specifications_template.type.value in [models.SubtaskType.Choices.QA_FILES.value,
models.SubtaskType.Choices.QA_PLOTS] and not self.handle_QA:
return False
if subtask.specifications_template.type.value == models.SubtaskType.Choices.INGEST.value and not self.handle_ingest:
return False
return True
def start_handling(self):
from lofar.common import isProductionEnvironment
if isProductionEnvironment():
raise RuntimeError("Do not use this tool to simulate running a scheduling_unit in a production environment!")
logger.info("starting to simulate a run for scheduling_unit id=%s ...", self.scheduling_unit_blueprint_id)
super().start_handling()
try:
# exit if already finished
scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=self.scheduling_unit_blueprint_id)
if scheduling_unit.status in ["finished", "error"]:
logger.info("scheduling_unit id=%s name='%s' has status=%s -> not simulating", scheduling_unit.id, scheduling_unit.name, scheduling_unit.status)
self.stop_event.set()
return
except models.SchedulingUnitBlueprint.DoesNotExist:
pass
# trick: trigger any already scheduled subtasks, cascading in events simulating the run
subtasks = models.Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint_id=self.scheduling_unit_blueprint_id)
for subtask in subtasks.filter(state__value=models.SubtaskState.Choices.SCHEDULED.value):
self.onSubTaskStatusChanged(subtask.id, "scheduled")
# schedule the defined subtasks, cascading in events simulating the run
self.schedule_independend_defined_subtasks_if_needed()
def schedule_independend_defined_subtasks_if_needed(self):
try:
scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=self.scheduling_unit_blueprint_id)
for task_blueprint in scheduling_unit.task_blueprints.all():
for subtask in task_blueprint.subtasks.filter(inputs=None,
state__value=models.SubtaskState.Choices.DEFINED.value).all():
if self.need_to_handle(subtask):
subtask.start_time = datetime.utcnow() + task_blueprint.relative_start_time
while subtask.state.value != models.SubtaskState.Choices.SCHEDULED.value:
try:
schedule_subtask_and_update_successor_start_times(subtask)
except SubtaskSchedulingException as e:
# try again, a bit later
subtask.state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.DEFINED.value)
update_start_time_and_shift_successors_until_after_stop_time(subtask, subtask.start_time + timedelta(hours=3))
if subtask.start_time - datetime.utcnow() > timedelta(days=1):
raise
except models.SchedulingUnitBlueprint.DoesNotExist:
pass
def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str):
if id == self.scheduling_unit_blueprint_id:
scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=id)
logger.info("scheduling_unit_blueprint id=%s name='%s' now has status='%s'", id, scheduling_unit.name,
status)
if status == "schedulable":
self.schedule_independend_defined_subtasks_if_needed()
if status in ["finished", "error"]:
self.stop_event.set()
def onTaskBlueprintStatusChanged(self, id: int, status: str):
if id == self.scheduling_unit_blueprint_id:
task = models.TaskBlueprint.objects.get(id=id)
if task.scheduling_unit_blueprint.id == self.scheduling_unit_blueprint_id:
logger.info("task_blueprint_id id=%s name='%s' now has status='%s'", id, task.name, status)
def onSubTaskStatusChanged(self, id: int, status: str):
subtask = models.Subtask.objects.get(id=id)
if not self.need_to_handle(subtask):
return
logger.info("subtask id=%s type='%s' now has status='%s'", id, subtask.specifications_template.type.value,
status)
next_state = None
if status == models.SubtaskState.Choices.SCHEDULED.value:
next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.QUEUEING.value)
elif status == models.SubtaskState.Choices.QUEUEING.value:
next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.QUEUED.value)
elif status == models.SubtaskState.Choices.QUEUED.value:
next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.STARTING.value)
elif status == models.SubtaskState.Choices.STARTING.value:
next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.STARTED.value)
elif status == models.SubtaskState.Choices.STARTED.value:
sleep(self.duration - self.delay) # mimic a running duration
next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.FINISHING.value)
elif status == models.SubtaskState.Choices.FINISHING.value:
next_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.FINISHED.value)
if subtask.specifications_template.type.value in [models.SubtaskType.Choices.OBSERVATION.value,
models.SubtaskType.Choices.PIPELINE.value]:
if self.create_output_dataproducts:
for output_dp in subtask.output_dataproducts.all():
os.makedirs(output_dp.directory, exist_ok=True)
logger.info('writing 1KB test dataproduct for subtask id=%s %s', subtask.id, output_dp.filepath)
with open(output_dp.filepath, 'w') as file:
file.write(1024 * 'a')
# create some nice default (and thus correct although not scientifically meaningful) feedback
template = models.DataproductFeedbackTemplate.objects.get(name="feedback")
feedback_doc = get_default_json_object_for_schema(template.schema)
feedback_doc['frequency']['subbands'] = [0]
feedback_doc['frequency']['central_frequencies'] = [1]
for output_dp in subtask.output_dataproducts:
output_dp.feedback_template = template
output_dp.feedback_doc = feedback_doc
output_dp.save()
elif subtask.specifications_template.type.value == models.SubtaskType.Choices.INGEST.value:
project_name = subtask.task_blueprint.draft.scheduling_unit_draft.scheduling_set.project.name
for output_dp in subtask.output_dataproducts:
try:
# copy feedback from ingest-subtask-input-dp
input_dp = subtask.get_transformed_input_dataproduct(output_dp.id)
feedback_template = input_dp.feedback_template
feedback_doc = input_dp.feedback_doc
except models.Subtask.DoesNotExist:
feedback_template = models.DataproductFeedbackTemplate.objects.get(name="empty")
feedback_doc = get_default_json_object_for_schema(feedback_template.schema)
output_dp.size = 1024
output_dp.directory = "srm://some.lta.site/project/%s/%s/" % (project_name, subtask.id)
output_dp.feedback_template = feedback_template
output_dp.feedback_doc = feedback_doc
output_dp.save()
models.DataproductArchiveInfo.objects.create(dataproduct=output_dp, storage_ticket=uuid4())
for algo in models.Algorithm.objects.all():
models.DataproductHash.objects.create(dataproduct=output_dp, algorithm=algo, hash=uuid4())
elif status == models.SubtaskState.Choices.DEFINED.value:
state_transition = models.SubtaskStateLog.objects.filter(subtask__id=subtask.id,
old_state__value=models.SubtaskState.Choices.SCHEDULING.value,
new_state__value=models.SubtaskState.Choices.DEFINED.value).order_by('-updated_at').first()
if state_transition and datetime.utcnow() - state_transition.updated_at < timedelta(hours=1):
logger.info("subtask id=%d type='%s' returned to state 'defined' while scheduling... (which means that scheduling did not succeed)",
subtask.id, subtask.specifications_template.type.value)
if subtask.specifications_template.type.value == 'ingest':
logger.info("subtask id=%d is an ingest task which requires permission in order to be scheduled", subtask.id)
if self.auto_grant_ingest_permission and subtask.task_blueprint.scheduling_unit_blueprint.ingest_permission_required:
# just granting the permission triggers the scheduling_service to check and schedulable ingest subtasks,
# resulting in a scheduled ingest subtask.
logger.info("granting ingest subtask id=%d ingest_permission", subtask.id)
subtask.task_blueprint.scheduling_unit_blueprint.ingest_permission_granted_since = datetime.utcnow()
subtask.task_blueprint.scheduling_unit_blueprint.save()
if next_state:
sleep(self.delay) # mimic a little 'processing' delay
logger.info("Simulating subtask id=%d type='%s' by proceeding from state='%s' to state='%s'...",
subtask.id, subtask.specifications_template.type.value, subtask.state.value, next_state)
if next_state == models.SubtaskState.objects.get(value=models.SubtaskState.Choices.STARTED.value):
subtask.start_time = datetime.utcnow()
if next_state == models.SubtaskState.objects.get(value=models.SubtaskState.Choices.FINISHING.value):
subtask.stop_time = datetime.utcnow()
subtask.state = next_state
subtask.save()
# the SimulationEventHandler is meant to run for a single scheduling_unit_blueprint,
# so no need to keep the created designated queue existing. So, use a BusListenerJanitor to cleanup the queue after use.
return BusListenerJanitor(TMSSBusListener(SimulationEventHandler, handler_kwargs={'scheduling_unit_blueprint_id': scheduling_unit_blueprint_id,
'stop_event': stop_event,
'handle_observations': handle_observations, 'handle_pipelines': handle_pipelines,
'handle_QA': handle_QA, 'handle_ingest': handle_ingest,
'create_output_dataproducts': create_output_dataproducts,
'delay': delay, 'duration': duration},
exchange=exchange, broker=broker))
def main_scheduling_unit_blueprint_simulator():
'''run a "simulator" which sets the correct events in the correct order upon receiving status change events,
and which uploads simulated feedback upon finishing. Can be used to simulate a 'run' of a scheduling_unit without
doing the actual observation/pipeline/QA/ingest.
'''
# make sure we run in UTC timezone
os.environ['TZ'] = 'UTC'
from optparse import OptionParser, OptionGroup
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
# Check the invocation arguments
parser = OptionParser('%prog [options] <scheduling_unit_blueprint_id>',
description='Mimic runnning a scheduling unit through all the scheduling->queueing->started->finished states for all its (sub)tasks in the correct order and creating default feedback.')
group = OptionGroup(parser, 'Subtask Types', description="Simulate the event for the folling types, or all if no specific type is specified.")
parser.add_option_group(group)
group.add_option('-o', '--observation', dest='observation', action='store_true', help='simulate events for observation subtasks')
group.add_option('-p', '--pipeline', dest='pipeline', action='store_true', help='simulate events for pipeline subtasks')
group.add_option('-Q', '--QA', dest='QA', action='store_true', help='simulate events for QA subtasks')
group.add_option('-i', '--ingest', dest='ingest', action='store_true', help='simulate events for ingest subtasks')
group = OptionGroup(parser, 'Simulation parameters')
parser.add_option_group(group)
group.add_option('-e', '--event_delay', dest='event_delay', type='float', default=1.0, help='wait <event_delay> seconds between simulating events to mimic real-world behaviour, default: %default')
group.add_option('-d', '--duration', dest='duration', type='float', default=60.0, help='wait <duration> seconds while "observing"/"processing" between started and finishing state to mimic real-world behaviour, default: %default')
group.add_option('-g', '--grant_ingest_permission', dest='grant_ingest_permission', action='store_true', help='automatically grant ingest permission for ingest subtasks if needed')
group = OptionGroup(parser, 'Messaging options')
parser.add_option_group(group)
group.add_option('--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the messaging broker, default: %default')
group.add_option('--exchange', dest='exchange', type='string', default=DEFAULT_BUSNAME, help='Name of the exchange on the messaging broker, default: %default')
group = OptionGroup(parser, 'Django options')
parser.add_option_group(group)
group.add_option('-C', '--credentials', dest='dbcredentials', type='string', default=os.environ.get('TMSS_DBCREDENTIALS', 'TMSS'), help='django dbcredentials name, default: %default')
(options, args) = parser.parse_args()
if len(args) != 1:
parser.print_usage()
exit(1)
scheduling_unit_blueprint_id = int(args[0])
if not (options.observation or options.pipeline or options.QA or options.ingest):
options.observation = True
options.pipeline = True
options.QA = True
options.ingest = True
from lofar.sas.tmss.tmss import setup_and_check_tmss_django_database_connection_and_exit_on_error
setup_and_check_tmss_django_database_connection_and_exit_on_error(options.dbcredentials)
stop_event = threading.Event()
with create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id, stop_event=stop_event,
delay=options.event_delay, duration=options.duration,
handle_observations=bool(options.observation), handle_pipelines=bool(options.pipeline),
handle_QA=bool(options.QA), handle_ingest=bool(options.ingest),
auto_grant_ingest_permission=bool(options.grant_ingest_permission),
exchange=options.exchange, broker=options.broker):
print("Press Ctrl-C to exit")
try:
stop_event.wait()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main_test_environment()