Skip to content
Snippets Groups Projects
Commit 424cbfec authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

L2SS-1189: implemented a quick 'skeleton' for the L2Station & TMSS integration...

L2SS-1189: implemented a quick 'skeleton' for the L2Station & TMSS integration test. Useable for now, to be fully implemented in L2SS-1193
parent 7decd22d
No related branches found
No related tags found
1 merge request!1067L2SS-1189
...@@ -8,3 +8,9 @@ lofar_find_package(Python 3.4 REQUIRED) ...@@ -8,3 +8,9 @@ lofar_find_package(Python 3.4 REQUIRED)
lofar_add_test(tPipelineControl) lofar_add_test(tPipelineControl)
lofar_add_test(tPipelineControlService) lofar_add_test(tPipelineControlService)
IF(BUILD_TMSSBackend)
lofar_add_test(t_l2station_tmss_integration_test)
ELSE()
message(WARNING "Skipping t_l2station_tmss_integration_test because it depends on the TMSSBackend package which is not included in the build")
ENDIF(BUILD_TMSSBackend)
#!/usr/bin/env python3
import copy
import time
import unittest
import logging
logger = logging.getLogger('lofar.'+__name__)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
from lofar.sas.tmss.test.test_environment import TMSSTestEnvironment
from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor
from lofar.common.test_utils import integration_test
try:
from lofar.mac.L2TMSSObservationControl import create_service, L2TMSSObservationControlBusListener
except ModuleNotFoundError:
print("Skipping test. could not import lobster L2TMSSObservationControl service. Did you build it?")
exit(3)
from datetime import datetime, timedelta
from uuid import uuid4
import threading
import os
from unittest import mock
@integration_test
class TestL2StationTMSSIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.TEST_UUID = uuid4()
cls.tmp_exchange = TemporaryExchange()
cls.tmp_exchange.open()
cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_schemas=True, start_postgres_listener=True, populate_test_data=False, enable_viewflow=False, start_dynamic_scheduler=False, start_subtask_scheduler=True, start_workflow_service=False)
cls.tmss_test_env.start()
def setUp(self) -> None:
# we should not depend on "previous" data
self.tmss_test_env.delete_scheduling_unit_drafts_cascade()
@classmethod
def tearDownClass(cls) -> None:
cls.tmss_test_env.stop()
cls.tmp_exchange.close()
def test_simple_lofar2_observation(self):
from lofar.common.json_utils import add_defaults_to_json_object_for_schema
from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions
from lofar.sas.tmss.test.tmss_test_data_django_models import Project_test_data, SchedulingSet_test_data
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp.tasks import create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft, update_task_graph_from_specifications_doc, mark_task_blueprint_as_obsolete
from lofar.sas.tmss.tmss.tmssapp.subtasks import wait_for_subtask_status, schedule_subtask
# Setup
project = models.Project.objects.create(**Project_test_data(auto_ingest=True))
scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data(project=project))
# use a simple observation strategy with only one target imaging observation
strategy_template = models.SchedulingUnitObservingStrategyTemplate.get_latest(name="Simple Observation")
scheduling_unit_spec = copy.deepcopy(strategy_template.template)
# overrule the normal stations for this template with just the one CS001 LOFAR2 station
scheduling_unit_spec['tasks']['Observation']['specifications_doc']['station_configuration']['station_groups'] = [{'stations': ['CS001'], 'max_nr_missing': 0}]
# create a scheduling unit draft & blueprint
su_draft = models.SchedulingUnitDraft.objects.create(name=str(uuid4()),
scheduling_set=scheduling_set,
specifications_template=strategy_template.scheduling_unit_template,
observation_strategy_template=strategy_template)
su_draft = update_task_graph_from_specifications_doc(su_draft, scheduling_unit_spec)
su_blueprint = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(su_draft)
# get the observation subtask
observation_subtask = su_blueprint.subtasks.get(specifications_template__type__value=models.SubtaskType.Choices.OBSERVATION.value)
# schedule it 1 minute from now
observation_subtask = schedule_subtask(observation_subtask, datetime.utcnow()+timedelta(minutes=1))
self.assertEqual(models.SubtaskState.Choices.SCHEDULED.value, observation_subtask.state.value)
# start the lobster buslistener
# this should pick up the scheduled observation and send instructions to the (mocked) station.
# as a result, the observation subtask should become STARTED
with L2TMSSObservationControlBusListener(self.tmp_exchange.address, self.tmp_exchange.broker, self.tmss_test_env.client_credentials.dbcreds_id) as lobster_buslistener:
# wait for station client to do it's work...
try:
wait_for_subtask_status(observation_subtask, models.SubtaskState.Choices.STARTED.value)
except TimeoutError:
# ToDo: remove this try/except TimeoutError when the station client did it's work and started the observation
pass
# ToDo: add asserts to check if (mocked) station was indeed initialized with this observation's specs.
self.assertTrue(True)
if __name__ == '__main__':
unittest.main()
#!/bin/bash
python3 t_l2station_tmss_integration_test.py
#!/bin/sh
./runctest.sh t_l2station_tmss_integration_test
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment