#!/usr/bin/env python3 # Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy) # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it and/or # modify it under the terms of the GNU General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # $Id: $ import os import unittest from unittest import mock import logging logger = logging.getLogger('lofar.'+__name__) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) from lofar.common.test_utils import exit_with_skipped_code_if_skip_integration_tests exit_with_skipped_code_if_skip_integration_tests() # create a module-wide TemporaryExchange, and use it in all communications between TMSSTestEnvironment, RA and ObservationControl from lofar.messaging.messagebus import TemporaryExchange tmp_exchange = TemporaryExchange('t_scheduling') tmp_exchange.open() # override DEFAULT_BUSNAME with tmp exchange, some modules import from lofar.messaging others from lofar.messaging.config... import lofar lofar.messaging.DEFAULT_BUSNAME = tmp_exchange.address lofar.messaging.config.DEFAULT_BUSNAME = tmp_exchange.address # before we import any django modules the DJANGO_SETTINGS_MODULE, TMSS_LDAPCREDENTIALS and TMSS_DBCREDENTIALS need to be known/set. # import and start an isolated RATestEnvironment and TMSSTestEnvironment (with fresh database and attached django and ldap server on free ports) # this automagically sets the required DJANGO_SETTINGS_MODULE, TMSS_LDAPCREDENTIALS and TMSS_DBCREDENTIALS envvars. from lofar.sas.tmss.test.test_environment import TMSSTestEnvironment tmss_test_env = TMSSTestEnvironment(populate_schemas=True, populate_test_data=False, start_ra_test_environment=True, start_postgres_listener=False, start_subtask_scheduler=False, start_dynamic_scheduler=False, enable_viewflow=False, exchange=tmp_exchange.address) try: tmss_test_env.start() except Exception as e: logger.exception(e) tmss_test_env.stop() tmp_exchange.close() exit(1) # tell unittest to stop (and automagically cleanup) the test database once all testing is done. def tearDownModule(): tmss_test_env.stop() tmp_exchange.close() from lofar.sas.tmss.test.tmss_test_data_django_models import * # import and setup rest test data creator from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator test_data_creator = TMSSRESTTestDataCreator(tmss_test_env.django_server.url, (tmss_test_env.ldap_server.dbcreds.user, tmss_test_env.ldap_server.dbcreds.password)) from datetime import datetime, timedelta from lofar.sas.resourceassignment.resourceassigner.rarpc import RARPC from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.tmssapp.subtasks import * from lofar.sas.tmss.tmss.tmssapp.tasks import * from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions from lofar.messaging.rpc import RPCService, ServiceMessageHandler import threading import dateutil.parser def create_subtask_object_for_testing(subtask_type_value, subtask_state_value): """ Helper function to create a subtask object for testing with given subtask value and subtask state value as string (no object) For these testcases 'preprocessing pipeline' and 'observation control' is relevant """ task_blueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(specifications_template=models.TaskTemplate.objects.get(name='target observation' if subtask_type_value=='observation' else 'preprocessing pipeline'))) subtask_template_obj = models.SubtaskTemplate.objects.get(name='observation control' if subtask_type_value=='observation' else 'preprocessing pipeline') subtask_data = Subtask_test_data(subtask_template=subtask_template_obj, task_blueprint=task_blueprint) subtask = models.Subtask.objects.create(**subtask_data) if subtask.state.value != subtask_state_value: set_subtask_state_following_allowed_transitions(subtask, subtask_state_value) return subtask def create_reserved_stations_for_testing(station_list): """ Helper function to create stations in reservation, in other words assigned in Resource Assigner :param station_list: List of station names to assign """ with RARPC.create() as rarpc: ra_spec = {'task_type': 'reservation', 'task_subtype': 'maintenance', 'status': 'prescheduled', 'starttime': datetime.utcnow() - timedelta(hours=1), 'endtime': datetime.utcnow() + timedelta(hours=2), 'cluster': None, 'specification': {}} inner_spec = {'Observation.VirtualInstrument.stationList': station_list, 'Observation.startTime': ra_spec['starttime'], 'Observation.endTime': ra_spec['starttime']} ra_spec['specification'] = inner_spec assigned = rarpc.do_assignment(ra_spec) return assigned def duplicates(l: list) -> list: # O(n^2), but that's good enough. uniques = [] dupes = [] for e in l: if e not in uniques: uniques.append(e) elif e not in dupes: dupes.append(e) return dupes class SchedulingTest(unittest.TestCase): def setUp(self): # clean all specs/tasks/claims in RADB (cascading delete) for spec in tmss_test_env.ra_test_environment.radb.getSpecifications(): tmss_test_env.ra_test_environment.radb.deleteSpecification(spec['id']) DataproductTransform.objects.all().delete() Dataproduct.objects.all().delete() SubtaskInput.objects.all().delete() SubtaskOutput.objects.all().delete() Subtask.objects.all().delete() test_data_creator.wipe_cache() @staticmethod def _create_target_observation_subtask(specification_doc: dict=None) -> dict: '''create a target observation subtask in defined state and return the subtask as json dict. if the given specification_doc is None, then the defaults are used.''' if specification_doc is None: specification_doc = {'stations': {'digital_pointings': [{'name': 'target0', 'subbands': [0]}], 'station_list': ['CS001', 'CS002', 'CS003']}} with tmss_test_env.create_tmss_client() as client: task_blueprint_data = test_data_creator.TaskBlueprint(template_url=client.get_task_template(name="target observation")['url']) task_blueprint_data['specifications_doc']['SAPs'][0]['name'] = specification_doc['stations']['digital_pointings'][0]['name'] task_blueprint = test_data_creator.post_data_and_get_response_as_json_object(task_blueprint_data, '/task_blueprint/') subtask_template = client.get_subtask_template("observation control") specification_doc = add_defaults_to_json_object_for_schema(specification_doc, subtask_template['schema']) cluster_url = client.get_path_as_json_object('/cluster/1')['url'] subtask_data = test_data_creator.Subtask(specifications_template_url=subtask_template['url'], specifications_doc=specification_doc, cluster_url=cluster_url, scheduled_on_sky_start_time=datetime.utcnow()+timedelta(minutes=5), task_blueprint_url=task_blueprint['url']) subtask = test_data_creator.post_data_and_get_response_as_json_object(subtask_data, '/subtask/') test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=subtask['url']), '/subtask_output/') client.set_subtask_status(subtask['id'], 'defined') return subtask def _test_schedule_observation_subtask_with_enough_resources_available(self, observation_specification_doc): with tmss_test_env.create_tmss_client() as client: subtask = self._create_target_observation_subtask(observation_specification_doc) subtask_id = subtask['id'] subtask = client.schedule_subtask(subtask_id) self.assertEqual('scheduled', subtask['state_value']) self.assertEqual('scheduled', tmss_test_env.ra_test_environment.radb.getTask(tmss_id=subtask_id)['status']) # test whether all dataproduct specifications are unique outputs = Subtask.objects.get(pk=subtask_id).outputs.all() dataproduct_specifications_docs = [dp.specifications_doc for output in outputs for dp in output.dataproducts.all()] duplicate_dataproduct_specification_docs = duplicates(dataproduct_specifications_docs) self.assertEqual([], duplicate_dataproduct_specification_docs) def test_schedule_observation_subtask_with_enough_resources_available(self): spec = { "stations": { "digital_pointings": [ { "name": "target0", "subbands": [0] } ] }, "COBALT": { "correlator": { "enabled": True } } } self._test_schedule_observation_subtask_with_enough_resources_available(spec) @unittest.skip("TODO: add missing coherent stokes settings") def test_schedule_beamformer_observation_subtask_with_enough_resources_available(self): spec = { "stations": { "digital_pointings": [ { "name": "target0", "subbands": [0] } ] }, "COBALT": { "version": 1, "correlator": { "enabled": False }, "beamformer": { "tab_pipelines": [ { "SAPs": [ { "name": "target0", "tabs": [ { "coherent": False }, { "coherent": True } ] } ] } ] } } } self._test_schedule_observation_subtask_with_enough_resources_available(spec) def test_schedule_cancelled_observation_subtask_failes(self): with tmss_test_env.create_tmss_client() as client: subtask_template = client.get_subtask_template("observation control") spec = get_default_json_object_for_schema(subtask_template['schema']) spec['stations']['digital_pointings'][0]['subbands'] = [0] subtask = self._create_target_observation_subtask(spec) subtask_id = subtask['id'] client.set_subtask_status(subtask_id, 'defined') # cancel it... subtask = client.cancel_subtask(subtask_id) self.assertEqual('cancelled', subtask['state_value']) # scheduling should fail with self.assertRaises(Exception): client.schedule_subtask(subtask_id) # and status should still be cancelled subtask = client.get_subtask(subtask_id) self.assertEqual('cancelled', subtask['state_value']) # mark it as obsolete... (the user thereby states that the cancelled subtask will is not to be used again) self.assertIsNone(subtask['obsolete_since']) before = datetime.utcnow() subtask = client.mark_subtask_as_obsolete(subtask_id) after = datetime.utcnow() obsolete_since = dateutil.parser.parse(subtask['obsolete_since'], ignoretz=True) self.assertIsNotNone(obsolete_since) self.assertLess(before, obsolete_since) self.assertGreater(after, obsolete_since) # scheduling should fail with self.assertRaises(Exception): client.schedule_subtask(subtask_id) # marking an obsolete subtask as obsolete again should be prevented with self.assertRaises(Exception) as context: subtask = client.mark_subtask_as_obsolete(subtask_id) self.assertIn("has been marked obsolete on %s" % obsolete_since, str(context.exception)) # and obsolete_since timestamp should still be the same as before subtask = client.get_subtask(subtask_id) obsolete_since_new = dateutil.parser.parse(subtask['obsolete_since'], ignoretz=True) self.assertEqual(obsolete_since, obsolete_since_new) def test_cancel_scheduled_observation_subtask(self): with tmss_test_env.create_tmss_client() as client: subtask_template = client.get_subtask_template("observation control") spec = get_default_json_object_for_schema(subtask_template['schema']) spec['stations']['digital_pointings'][0]['subbands'] = [0] subtask = self._create_target_observation_subtask(spec) subtask_id = subtask['id'] client.set_subtask_status(subtask_id, 'defined') # scheduling should succeed subtask = client.schedule_subtask(subtask_id) self.assertEqual('scheduled', subtask['state_value']) # cancel it... subtask = client.cancel_subtask(subtask_id) self.assertEqual('cancelled', subtask['state_value']) def test_cancel_started_observation_subtask(self): with tmss_test_env.create_tmss_client() as client: subtask_template = client.get_subtask_template("observation control") spec = get_default_json_object_for_schema(subtask_template['schema']) spec['stations']['digital_pointings'][0]['name'] = 'target0' spec['stations']['digital_pointings'][0]['subbands'] = [0] subtask = self._create_target_observation_subtask(spec) subtask_id = subtask['id'] client.set_subtask_status(subtask_id, 'defined') # scheduling should succeed subtask = client.schedule_subtask(subtask_id) self.assertEqual('scheduled', subtask['state_value']) # mimic that the obs was started and is now running client.set_subtask_status(subtask_id, 'starting') client.set_subtask_status(subtask_id, 'started') observation_killed = threading.Event() class MockObsControlMessageHandler(ServiceMessageHandler): def __init__(self): super(MockObsControlMessageHandler, self).__init__() self.register_service_method("AbortObservation", self.abort_observation) def abort_observation(self, sas_id): observation_killed.set() return {'aborted': True} with RPCService(service_name=lofar.mac.config.DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME, handler_type=MockObsControlMessageHandler, exchange=tmp_exchange.address): # cancel observation subtask... should kill the running observation # check that ObservationControlRPCClient.abort_observation was called subtask = client.cancel_subtask(subtask_id) self.assertEqual('cancelled', subtask['state_value']) observation_killed.wait(10) self.assertTrue(observation_killed.is_set()) def test_schedule_observation_subtask_with_one_blocking_reservation_failed(self): """ Set (Resource Assigner) station CS001 to reserved Schedule subtask with station CS001 Check if schedule of the subtask fail """ self.assertTrue(create_reserved_stations_for_testing(['CS001'])) with tmss_test_env.create_tmss_client() as client: subtask_template = client.get_subtask_template("observation control") spec = get_default_json_object_for_schema(subtask_template['schema']) spec['COBALT']['correlator']['enabled'] = True spec['stations']['digital_pointings'][0]['name'] = 'target0' spec['stations']['digital_pointings'][0]['subbands'] = [0] subtask = self._create_target_observation_subtask(spec) subtask_id = subtask['id'] with self.assertRaises(Exception): client.schedule_subtask(subtask_id) subtask = client.get_subtask(subtask_id) self.assertEqual('unschedulable', subtask['state_value']) self.assertEqual('conflict', tmss_test_env.ra_test_environment.radb.getTask(tmss_id=subtask_id)['status']) def test_schedule_observation_subtask_with_blocking_reservations_failed(self): """ Set (Resource Assigner) station CS001, CS002, CS401, CS501 to reserved Schedule subtask with stations CS001, CS002, CS401 Check if schedule of the subtask fail """ self.assertTrue(create_reserved_stations_for_testing(['CS001','CS002','CS501','CS401' ])) with tmss_test_env.create_tmss_client() as client: subtask_template = client.get_subtask_template("observation control") spec = get_default_json_object_for_schema(subtask_template['schema']) spec['COBALT']['correlator']['enabled'] = True spec['stations']['digital_pointings'][0]['name'] = 'target0' spec['stations']['digital_pointings'][0]['subbands'] = [0] spec['stations']['station_list'] = ['CS001', 'CS002', 'CS401'] subtask = self._create_target_observation_subtask(spec) subtask_id = subtask['id'] with self.assertRaises(Exception): client.schedule_subtask(subtask_id) subtask = client.get_subtask(subtask_id) self.assertEqual('unschedulable', subtask['state_value']) ra_task = tmss_test_env.ra_test_environment.radb.getTask(tmss_id=subtask_id) self.assertIsNotNone(ra_task) self.assertEqual('conflict', ra_task['status']) def test_schedule_observation_subtask_with_blocking_reservation_ok(self): """ Set (Resource Assigner) station CS001, CS003 to reserved Schedule subtask with station CS001, CS002, CS003 Check if schedule of the subtasks do not fail (it can schedule with station CS002) """ self.assertTrue(create_reserved_stations_for_testing(['CS001','CS003'])) with tmss_test_env.create_tmss_client() as client: subtask_template = client.get_subtask_template("observation control") spec = get_default_json_object_for_schema(subtask_template['schema']) spec['COBALT']['correlator']['enabled'] = True spec['stations']['digital_pointings'][0]['name'] = 'target0' spec['stations']['digital_pointings'][0]['subbands'] = [0] spec['stations']['station_list'] = ['CS001', 'CS002', 'CS003'] subtask = self._create_target_observation_subtask(spec) subtask_id = subtask['id'] subtask = client.schedule_subtask(subtask_id) self.assertEqual('scheduled', subtask['state_value']) self.assertEqual('scheduled', tmss_test_env.ra_test_environment.radb.getTask(tmss_id=subtask_id)['status']) def _setup_observation_and_pipeline(self, client, obs_spec, dataproduct_properties, pipeline_task_template_name, pipeline_subtask_template_name, pipeline_subtask_spec): cluster_url = client.get_path_as_json_object('/cluster/1')['url'] # setup: first create an observation, so the pipeline can have input. obs_task_blueprint_data = test_data_creator.TaskBlueprint(template_url=client.get_task_template(name="target observation")['url']) obs_task_blueprint = test_data_creator.post_data_and_get_response_as_json_object(obs_task_blueprint_data, '/task_blueprint/') obs_subtask_template = client.get_subtask_template("observation control") obs_subtask_data = test_data_creator.Subtask(specifications_template_url=obs_subtask_template['url'], specifications_doc=obs_spec, cluster_url=cluster_url, task_blueprint_url=obs_task_blueprint['url']) obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/') obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/') test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(**dataproduct_properties, subtask_output_url=obs_subtask_output_url), '/dataproduct/') client.set_subtask_status(obs_subtask['id'], 'defined') # now create the pipeline... pipe_task_blueprint_data = test_data_creator.TaskBlueprint(template_url=client.get_task_template(name=pipeline_task_template_name)['url']) pipe_task_blueprint = test_data_creator.post_data_and_get_response_as_json_object(pipe_task_blueprint_data, '/task_blueprint/') pipe_subtask_template = client.get_subtask_template(pipeline_subtask_template_name) pipe_spec = add_defaults_to_json_object_for_schema(pipeline_subtask_spec, pipe_subtask_template['schema']) pipe_subtask_data = test_data_creator.Subtask(specifications_template_url=pipe_subtask_template['url'], specifications_doc=pipe_spec, task_blueprint_url=pipe_task_blueprint['url'], cluster_url=cluster_url) pipe_subtask = test_data_creator.post_data_and_get_response_as_json_object(pipe_subtask_data, '/subtask/') # ...and connect it to the observation test_data_creator.post_data_and_get_url(test_data_creator.SubtaskInput(subtask_url=pipe_subtask['url'], subtask_output_url=obs_subtask_output_url), '/subtask_input/') test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=pipe_subtask['url']), '/subtask_output/') client.set_subtask_status(pipe_subtask['id'], 'defined') return obs_subtask, pipe_subtask def test_schedule_preprocessing_pipeline_subtask_with_enough_resources_available(self): with tmss_test_env.create_tmss_client() as client: obs_subtask_template = client.get_subtask_template("observation control") obs_spec = get_default_json_object_for_schema(obs_subtask_template['schema']) obs_spec['stations']['digital_pointings'][0]['name'] = 'target0' obs_spec['stations']['digital_pointings'][0]['subbands'] = [0] obs_spec['COBALT']['correlator']['enabled'] = True obs_subtask, pipe_subtask = self._setup_observation_and_pipeline(client, obs_spec, {"filename": "L123456_SB000.MS", "specifications_doc": {"sap": "target0", "subband": 0 } }, "preprocessing pipeline", "preprocessing pipeline", {}) # make sure that pipeline's predecessor (the obs_subtask) is finished for state in ('defined', 'scheduling', 'scheduled', 'starting', 'started', 'finishing', 'finished'): client.set_subtask_status(obs_subtask['id'], state) subtask = client.schedule_subtask(pipe_subtask['id']) self.assertEqual('scheduled', subtask['state_value']) self.assertEqual('scheduled', tmss_test_env.ra_test_environment.radb.getTask(tmss_id=pipe_subtask['id'])['status']) def test_schedule_pipeline_for_cancelled_observation(self): '''scheduling a pipeline as a successor of a cancelled observation should fail, except when the cancelled observation is also obsolete''' with tmss_test_env.create_tmss_client() as client: obs_subtask_template = client.get_subtask_template("observation control") obs_spec = get_default_json_object_for_schema(obs_subtask_template['schema']) obs_spec['stations']['digital_pointings'][0]['name'] = 'target0' obs_spec['stations']['digital_pointings'][0]['subbands'] = [0] obs_subtask, pipe_subtask = self._setup_observation_and_pipeline(client, obs_spec, {"filename": "L123456_SB000.MS", "specifications_doc": {"sap": "target0", "subband": 0 } }, "preprocessing pipeline", "preprocessing pipeline", {}) # cancel the observation obs_subtask = client.cancel_subtask(obs_subtask['id']) # check, should be cancelled, but not obsolete self.assertEqual('cancelled', obs_subtask['state_value']) self.assertIsNone(obs_subtask['obsolete_since']) # scheduling pipeline should fail with self.assertRaises(Exception) as context: pipe_subtask = client.schedule_subtask(pipe_subtask['id']) self.assertTrue('Cannot schedule subtask' in str(context.exception)) self.assertTrue('not FINISHED but state=cancelled' in str(context.exception)) # now mark the cancelled observation as obsolete obs_subtask = client.mark_subtask_as_obsolete(obs_subtask['id']) # check, should (still) be cancelled, and now obsolete self.assertEqual('cancelled', obs_subtask['state_value']) self.assertIsNotNone(obs_subtask['obsolete_since']) # scheduling pipeline should now be a success pipe_subtask = client.schedule_subtask(pipe_subtask['id']) self.assertEqual('scheduled', pipe_subtask['state_value']) self.assertEqual('scheduled', tmss_test_env.ra_test_environment.radb.getTask(tmss_id=pipe_subtask['id'])['status']) @unittest.skip("TODO: add missing coherent stokes settings") def test_schedule_pulsar_pipeline_subtask_with_enough_resources_available(self): with tmss_test_env.create_tmss_client() as client: obs_subtask_template = client.get_subtask_template("observation control") obs_spec = { "stations": { "digital_pointings": [ { "name": "target0", "subbands": [0] } ] }, "COBALT": { "version": 1, "correlator": { "enabled": False }, "beamformer": { "tab_pipelines": [ { "SAPs": [ { "name": "target0", "tabs": [ { "coherent": False }, { "coherent": True } ] } ] } ] } } } obs_spec = add_defaults_to_json_object_for_schema(obs_spec,obs_subtask_template['schema']) pulp_dataproduct_specifications_template = client.get_dataproduct_specifications_template("pulp summary") pipe_subtask = self._setup_observation_and_pipeline(client, obs_spec, {"filename": "L123456_SAP000_B000_S0_P000.h5", "specifications_template_url": pulp_dataproduct_specifications_template['url'], "specifications_doc": { "sap": "target0", "coherent": True, "stokes_set": "I", "identifiers": { "sap_index": 0, "tab_index": 0, "pipeline_index": 0, "part_index": 0, "stokes_index": 0 } } }, "pulsar pipeline", "pulsar pipeline", {}) # make sure that pipeline's predecessors are finished for predecessor in client.get_subtask_predecessors(pipe_subtask['id']): for state in ('defined', 'scheduling', 'scheduled', 'starting', 'started', 'finishing', 'finished'): client.set_subtask_status(predecessor['id'], state) subtask = client.schedule_subtask(pipe_subtask['id']) self.assertEqual('scheduled', subtask['state_value']) self.assertEqual('scheduled', tmss_test_env.ra_test_environment.radb.getTask(tmss_id=pipe_subtask['id'])['status']) def test_schedule_ingest_subtask(self): with tmss_test_env.create_tmss_client() as client: cluster_url = client.get_path_as_json_object('/cluster/1')['url'] # setup: first create an observation, so the ingest can have input. subtask_template = client.get_subtask_template("observation control") obs_spec = get_default_json_object_for_schema(subtask_template['schema']) obs_spec['stations']['digital_pointings'][0]['name'] = 'target0' obs_spec['stations']['digital_pointings'][0]['subbands'] = [0] obs_subtask = self._create_target_observation_subtask(obs_spec) obs_subtask_id = obs_subtask['id'] obs_subtask_output_url = client.get_path_as_json_object('/subtask_output?subtask=%s'%obs_subtask_id)[0]['url'] test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS"%obs_subtask['id'], specifications_doc={"sap": "target0", "subband": 0}, subtask_output_url=obs_subtask_output_url), '/dataproduct/') # now create the ingest... ingest_subtask_template = client.get_subtask_template("ingest control") ingest_spec = get_default_json_object_for_schema(ingest_subtask_template['schema']) ingest_subtask_data = test_data_creator.Subtask(specifications_template_url=ingest_subtask_template['url'], specifications_doc=ingest_spec, task_blueprint_url=obs_subtask['task_blueprint'], primary=False, cluster_url=cluster_url) ingest_subtask = test_data_creator.post_data_and_get_response_as_json_object(ingest_subtask_data, '/subtask/') # ...and connect it to the observation test_data_creator.post_data_and_get_url(test_data_creator.SubtaskInput(subtask_url=ingest_subtask['url'], subtask_output_url=obs_subtask_output_url), '/subtask_input/') test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=ingest_subtask['url']), '/subtask_output/') # our subtask here has only one known related task for predecessor in client.get_subtask_predecessors(ingest_subtask['id']): for state in ('defined', 'scheduling', 'scheduled', 'starting', 'started', 'finishing', 'finished'): client.set_subtask_status(predecessor['id'], state) client.set_subtask_status(ingest_subtask['id'], 'defined') task_blueprint = client.get_url_as_json_object(ingest_subtask['task_blueprint']) # our subtask here has only one known related task schedulingunit_blueprint = client.get_url_as_json_object(task_blueprint['scheduling_unit_blueprint']) # first, make sure we need but do not have ingest persmission... client.session.patch(schedulingunit_blueprint['url'], json={'ingest_permission_required': True, 'ingest_permission_granted_since': None}) with self.assertRaises(Exception) as context: subtask = client.schedule_subtask(ingest_subtask['id']) self.assertTrue('permission' in str(context.exception)) subtask = client.get_subtask(ingest_subtask['id']) self.assertEqual('defined', subtask['state_value']) # now grant permission... client.session.patch(schedulingunit_blueprint['url'], json={'ingest_permission_required': True, 'ingest_permission_granted_since': datetime.utcnow().isoformat()}) subtask = client.schedule_subtask(ingest_subtask['id']) self.assertEqual('scheduled', subtask['state_value']) self.assertEqual(models.Subtask.objects.get(id=ingest_subtask['id']).inputs.first().dataproducts.count(), 1) def test_schedule_schedulingunit_enough_resources_available(self): '''similar test as test_schedule_pipeline_subtask_with_enough_resources_available, but now created from a scheduling_unit''' with tmss_test_env.create_tmss_client() as client: scheduling_unit_template = client.get_schedulingunit_template("scheduling unit") scheduling_unit_doc = get_default_json_object_for_schema(scheduling_unit_template['schema']) # define an observation without QA obs_task = get_default_json_object_for_schema(client.get_task_template(name="target observation")['schema']) obs_task['QA']['plots']['enabled'] = False obs_task['QA']['file_conversion']['enabled'] = False obs_task['SAPs'] = [{ 'subbands': [0,1] }] scheduling_unit_doc['tasks']["Observation"] = {"specifications_doc": obs_task, "specifications_template": {"name": "target observation"}} # define a pipeline scheduling_unit_doc['tasks']["Pipeline"] = { "specifications_doc": get_default_json_object_for_schema(client.get_task_template(name="preprocessing pipeline")['schema']), "specifications_template": {"name": "preprocessing pipeline"}} # connect obs to pipeline scheduling_unit_doc['task_relations'].append({"producer": "Observation", "consumer": "Pipeline", "input": { "role": "any", "datatype": "visibilities", "dataformat": "MeasurementSet"}, "output": { "role": "correlator", "datatype": "visibilities", "dataformat": "MeasurementSet"}, "selection_doc": {}, "selection_template": "all" }) # submit scheduling_unit_draft_data = test_data_creator.SchedulingUnitDraft(template_url=scheduling_unit_template['url'], specifications_doc=scheduling_unit_doc) scheduling_unit_draft = test_data_creator.post_data_and_get_response_as_json_object(scheduling_unit_draft_data, '/scheduling_unit_draft/') # create the whole blueprints tree... scheduling_unit_blueprint = client.create_scheduling_unit_blueprint_and_tasks_and_subtasks_tree(scheduling_unit_draft['id']) # fetch the created task_blueprints task_blueprints = [client.get_url_as_json_object(task_blueprint['url']) for task_blueprint in scheduling_unit_blueprint['task_blueprints']] self.assertEqual(2, len(task_blueprints)) # and make sure they are ordered correctly if "Pipeline" in task_blueprints[0]['name']: task_blueprints.reverse() for task_blueprint in task_blueprints: self.assertEqual(1, len(task_blueprint['subtasks'])) subtask = client.get_url_as_json_object(task_blueprint['subtasks'][0]) client.session.patch(subtask['url'], {'scheduled_on_sky_start_time': datetime.utcnow() + timedelta(minutes=5)}) client.set_subtask_status(subtask['id'], 'defined') subtask = client.schedule_subtask(subtask['id']) self.assertEqual('scheduled', subtask['state_value']) self.assertEqual('scheduled', tmss_test_env.ra_test_environment.radb.getTask(tmss_id=subtask['id'])['status']) for state in ('starting', 'started', 'finishing', 'finished'): client.set_subtask_status(subtask['id'], state) class SubtaskInputOutputTest(unittest.TestCase): """ Subtask Input and Output test These testcases are located in the t_scheduling module, because during scheduling the output dataproducts are assigned """ def setUp(self) -> None: # make sure we're allowed to schedule setting = Setting.objects.get(name='dynamic_scheduling_enabled') setting.value = True setting.save() def test_specifications_doc_meets_selection_doc(self): # empty selection matches all self.assertTrue(specifications_doc_meets_selection_doc({'something else': 'target0'}, {})) # specification is a list? specification must be a subset of the selection self.assertTrue(specifications_doc_meets_selection_doc({'sap': ['target0']}, {'sap': ['target0']})) self.assertFalse(specifications_doc_meets_selection_doc({'sap': ['target0','target1','target2']}, {'sap': ['target0','target1']})) # specification is a value? it must appear in the selection self.assertTrue(specifications_doc_meets_selection_doc({'sap': 'target0'}, {'sap': ['target0']})) self.assertTrue(specifications_doc_meets_selection_doc({'sap': 'target0'}, {'sap': ['target0','target1']})) self.assertTrue(specifications_doc_meets_selection_doc({'sap': 'target0'}, {'sap': 'target0'})) # specification must contain the selection key self.assertFalse(specifications_doc_meets_selection_doc({'something else': 'target0'}, {'sap': 'target0'})) @mock.patch("lofar.sas.tmss.tmss.tmssapp.subtasks.assign_or_unassign_resources") def test_schedule_pipeline_subtask_filters_predecessor_output_dataproducts_for_input(self, assign_resources_mock): # setup: # create observation subtask and outputs and dataproducts obs_st = create_subtask_object_for_testing('observation', 'finished') obs_out1 = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=obs_st)) obs_out2 = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=obs_st)) # create connected pipeline subtask and inputs, specify input filtering pipe_st = create_subtask_object_for_testing('pipeline', 'defined') pipe_out = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=pipe_st)) # required by scheduling function pipe_in1 = models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=pipe_st, producer=obs_out1, selection_doc={'sap': ['target0']})) pipe_in2 = models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=pipe_st, producer=obs_out2, selection_doc={'sap': ['target1']})) # create obs output dataproducts with specs we can filter on dp1_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': 'target0', 'subband': 0})) dp1_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': 'target1', 'subband': 0})) dp1_3 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': 'target0', 'subband': 1})) dp2_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': 'target0', 'subband': 0})) dp2_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': 'target1', 'subband': 0})) # trigger: # schedule pipeline, which should attach the correct subset of dataproducts to the pipeline inputs schedule_pipeline_subtask(pipe_st) # assert: # check correct input filtering self.assertEqual(set(pipe_in1.dataproducts.all()), {dp1_1, dp1_3}) self.assertEqual(set(pipe_in2.dataproducts.all()), {dp2_2}) class SAPTest(unittest.TestCase): """ SAP test These testcases are located in the t_scheduling module, because the SAP entries are created/assigned during scheduling """ def setUp(self) -> None: # make sure we're allowed to schedule setting = Setting.objects.get(name='dynamic_scheduling_enabled') setting.value = True setting.save() def test_schedule_observation_subtask_creates_sap_with_correct_pointing(self): with tmss_test_env.create_tmss_client() as client: task_blueprint_data = test_data_creator.TaskBlueprint(template_url=client.get_task_template(name="target observation")['url']) task_blueprint_data['specifications_doc']['SAPs'][0]['name'] = 'target0' task_blueprint = test_data_creator.post_data_and_get_response_as_json_object(task_blueprint_data, '/task_blueprint/') subtask_template = client.get_subtask_template("observation control") spec = get_default_json_object_for_schema(subtask_template['schema']) spec['stations']['digital_pointings'][0]['name'] = task_blueprint_data['specifications_doc']['SAPs'][0]['name'] spec['stations']['digital_pointings'][0]['subbands'] = [0] cluster_url = client.get_path_as_json_object('/cluster/1')['url'] pointing = {"angle1": 7.6, "angle2": 5.4, "direction_type": "J2000", "target": "target1"} spec['stations']['digital_pointings'][0]['pointing'] = pointing subtask_data = test_data_creator.Subtask(specifications_template_url=subtask_template['url'], specifications_doc=spec, cluster_url = cluster_url, task_blueprint_url=task_blueprint['url'], scheduled_on_sky_start_time=datetime.utcnow() + timedelta(minutes=5), scheduled_on_sky_stop_time=datetime.utcnow() + timedelta(minutes=15)) subtask = test_data_creator.post_data_and_get_response_as_json_object(subtask_data, '/subtask/') subtask_id = subtask['id'] test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=subtask['url']), '/subtask_output/') subtask_model = models.Subtask.objects.get(id=subtask_id) self.assertEqual(0, subtask_model.output_dataproducts.values('sap').count()) client.set_subtask_status(subtask_id, 'defined') subtask = client.schedule_subtask(subtask_id) self.assertEqual(1, subtask_model.output_dataproducts.count()) self.assertEqual(1, subtask_model.output_dataproducts.values('sap').count()) self.assertEqual(subtask_model.output_dataproducts.first().sap.specifications_doc['pointing']['angle1'], pointing['angle1']) self.assertEqual(subtask_model.output_dataproducts.first().sap.specifications_doc['pointing']['angle2'], pointing['angle2']) @mock.patch("lofar.sas.tmss.tmss.tmssapp.subtasks.assign_or_unassign_resources") def test_schedule_pipeline_subtask_copies_sap_from_input_to_output(self, assign_resources_mock): # setup: # create observation subtask and outputs and dataproducts obs_st = create_subtask_object_for_testing('observation', 'finished') obs_out = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=obs_st)) # create connected pipeline subtask and inputs, specify input filtering pipe_st = create_subtask_object_for_testing('pipeline', 'defined') pipe_out = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=pipe_st)) # required by scheduling function pipe_in = models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=pipe_st, producer=obs_out)) # create obs output dataproducts dp1_in = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out, specifications_doc={"identifiers": { "sap_index": 0, "subband_index": 0 }})) dp2_in = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out, specifications_doc={"identifiers": { "sap_index": 0, "subband_index": 1 }})) # schedule pipeline, which should copy the SAP schedule_pipeline_subtask(pipe_st) # determine the newly created pipeline dataproducts dp1_out = DataproductTransform.objects.get(input=dp1_in).output dp2_out = DataproductTransform.objects.get(input=dp2_in).output # assert: self.assertEqual(dp1_in.sap, dp1_out.sap) self.assertEqual(dp2_in.sap, dp2_out.sap) class TestWithUC1Specifications(unittest.TestCase): """ The Setup will create Scheduling Unit Draft with UC1 strategy template It will use the function 'create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft' which is then implicit tested. Create Task Blueprints and Subtasks: Observation Task 'Calibration 1' SubTask Observation Control SubTask QA File SubTask QA Plots Pipeline Task 'Pipeline 1' SubTask Pipeline Control Observation Task 'Target Observation' SubTask Observation Control SubTask QA File SubTask QA Plots Pipeline Task 'Pipeline target1' SubTask Pipeline Control Pipeline Task 'Pipeline target2' SubTask Pipeline Control Observation Task 'Calibration 2' SubTask Observation Control SubTask QA File SubTask QA Plots Pipeline Task 'Pipeline 2' SubTask Pipeline Control Note that this test requires Resource Assigner testenvironment being alive """ def setUp(self) -> None: # clean all specs/tasks/claims in RADB (cascading delete) for spec in tmss_test_env.ra_test_environment.radb.getSpecifications(): tmss_test_env.ra_test_environment.radb.deleteSpecification(spec['id']) strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") scheduling_unit_draft = models.SchedulingUnitDraft.objects.create( name="Test Scheduling Unit UC1", specifications_template=strategy_template.scheduling_unit_template, observation_strategy_template=strategy_template, scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) update_task_graph_from_specifications_doc(scheduling_unit_draft, strategy_template.template) create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) scheduling_unit_draft.refresh_from_db() self.task_drafts = scheduling_unit_draft.task_drafts.all() self.scheduling_unit_blueprints = scheduling_unit_draft.scheduling_unit_blueprints.all() self.scheduling_unit_blueprint = self.scheduling_unit_blueprints[0] self.task_blueprints = self.scheduling_unit_blueprint.task_blueprints.all() # SubtaskId of the first observation subtask observation_tbp = list(tb for tb in list(self.task_blueprints) if tb.specifications_template.type.value == TaskType.Choices.OBSERVATION.value) observation_tbp.sort(key=lambda tb: tb.relative_start_time) self.subtask_id_of_first_observation = list(st for st in observation_tbp[0].subtasks.all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value)[0].id # Unschedule subtask, setting it back to 'defined', removing all dataproducts. for tb in self.task_blueprints: for subtask in tb.subtasks.all(): # start_time to now (and no stoptime) subtask.scheduled_on_sky_stop_time = None subtask.scheduled_on_sky_start_time = datetime.utcnow() subtask.save() def _schedule_subtask_with_failure(self, station_reserved): with tmss_test_env.create_tmss_client() as client: with self.assertRaises(Exception) as context: client.schedule_subtask(self.subtask_id_of_first_observation) self.assertTrue("There are more stations in conflict than the specification is given" in str(context.exception).lower()) for station in station_reserved: self.assertTrue(station in str(context.exception).lower()) def test_create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(self): """ Create Task Blueprints and Subtasks (class setup) Check if tasks (8) are created: Calibration 1 : 1 Observation and 1 Pipeline task Target Observation: 1 Observation and 2 Pipeline tasks Calibration 2 : 1 Observation and 1 Pipeline task Ingest : 1 ingest task Check if subtasks (13) are created: Every Observation Task: 3 subtasks (1 control, 2 QA) Every Pipeline Task: 1 subtasks (1 control) Every Ingest Task: 1 subtasks (1 control) makes 3x3 + 4x1 = 13 """ self.assertEqual(8, len(self.task_drafts)) self.assertEqual(1, len(self.scheduling_unit_blueprints)) self.assertEqual(8, len(self.task_blueprints)) total_subtasks = 0 for task_blueprint in self.task_blueprints: total_subtasks += task_blueprint.subtasks.count() self.assertEqual(14, total_subtasks) def test_relative_times(self): """ Create Task Blueprints and Subtasks (class setup) Set start and stop times of taskBlueprint Set the subtask start/stop time equal to its taskBlueprint Set all subtask states to 'finished' Check the observed_end_time of the SchedulingUnitBlueprint Check the relative_start/stop_time of the SchedulingUnitBlueprint start = 0 stop = calculates like 8hours (Target) + 2x10min (calibrators) + 2*1min (offset between observations) = 8h22min """ DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" test_timeschedule = { # name of taskBlueprint start_time stop_time "Calibrator Observation 1": ["2020-11-01 08:00:00", "2020-11-01 08:10:00"], "Pipeline 1": ["2020-11-01 08:20:00", "2020-11-01 08:22:00"], "Target Observation": ["2020-11-01 08:30:00", "2020-11-01 18:00:00"], "Pipeline target1": ["2020-11-01 18:30:00", "2020-11-01 18:35:00"], "Pipeline target2": ["2020-11-01 18:40:00", "2020-11-01 18:45:00"], "Calibrator Observation 2": ["2020-11-01 19:00:00", "2020-11-01 19:20:00"], "Pipeline 2": ["2020-11-01 19:30:00", "2020-11-01 19:40:00"] } # Set time_schedule, for name, times in test_timeschedule.items(): task_blueprint = list(filter(lambda x: x.name == name, self.task_blueprints))[0] for subtask in task_blueprint.subtasks.all(): subtask.scheduled_on_sky_stop_time = datetime.strptime(times[1], DATETIME_FORMAT) subtask.scheduled_on_sky_start_time = datetime.strptime(times[0], DATETIME_FORMAT) subtask.save() set_subtask_state_following_allowed_transitions(subtask, "finished") # Check times self.assertEqual("2020-11-01 08:00:00", self.scheduling_unit_blueprint.observed_start_time.strftime("%Y-%m-%d %H:%M:%S")) self.assertEqual("2020-11-01 19:20:00", self.scheduling_unit_blueprint.observed_end_time.strftime("%Y-%m-%d %H:%M:%S")) self.assertEqual(timedelta(0), self.scheduling_unit_blueprint.relative_start_time) self.assertEqual(timedelta(hours=8, minutes=22), self.scheduling_unit_blueprint.relative_stop_time) for task_blueprint in self.task_blueprints: if task_blueprint.name == "Calibrator Observation 1": self.assertEqual(timedelta(0), task_blueprint.relative_start_time) self.assertEqual(timedelta(minutes=10), task_blueprint.relative_stop_time) elif task_blueprint.name == "Target Observation": self.assertEqual(timedelta(minutes=11), task_blueprint.relative_start_time) self.assertEqual(timedelta(hours=8, minutes=11), task_blueprint.relative_stop_time) elif task_blueprint.name == "Calibrator Observation 2": self.assertEqual(timedelta(hours=8, minutes=12), task_blueprint.relative_start_time) self.assertEqual(timedelta(hours=8, minutes=22), task_blueprint.relative_stop_time) else: self.assertEqual(timedelta(0), task_blueprint.relative_start_time) self.assertEqual(timedelta(0), task_blueprint.relative_stop_time) def test_dutch_stations_conflicts_exception(self): """ Test conflict of 'Dutch' station which are have a default of max_nr_missing=4, Assign stations equal to max_nr_missing+1 before schedule it and check if it can NOT be scheduled Check the context of the Exception """ station_reserved = ['CS002', 'CS003', 'CS004', 'CS401', 'CS501'] self.assertTrue(create_reserved_stations_for_testing(station_reserved)) self._schedule_subtask_with_failure(station_reserved) def test_dutch_stations_conflicts_ok(self): """ Test conflict of 'Dutch' station which are have a default of max_nr_missing=4, Assign stations equal to max_nr_missing before schedule it and check if it can be scheduled """ station_reserved = ['CS002', 'CS003', 'CS004', 'CS401'] self.assertTrue(create_reserved_stations_for_testing(station_reserved)) with tmss_test_env.create_tmss_client() as client: client.schedule_subtask(self.subtask_id_of_first_observation) def test_international_stations_conflicts_failed(self): """ Test conflict of 'International' stations which are have a default of max_nr_missing=2, Assign stations equal to max_nr_missing+1 before schedule it and check if it can NOT be scheduled Check the context of the Exception """ station_reserved = ['SE607', 'PL610', 'PL612'] self.assertTrue(create_reserved_stations_for_testing(station_reserved)) self._schedule_subtask_with_failure(station_reserved) def test_international_stations_conflicts_ok(self): """ Test conflict of 'International' stations which are have a default of max_nr_missing=2, Assign stations equal to max_nr_missing before schedule it and check if it can be scheduled """ station_reserved = ['SE607', 'PL612'] self.assertTrue(create_reserved_stations_for_testing(station_reserved)) with tmss_test_env.create_tmss_client() as client: client.schedule_subtask(self.subtask_id_of_first_observation) def test_international_required_stations_conflicts_failed(self): """ Test conflict of 'International Required' stations which are have a default of max_nr_missing=1, Assign stations equal to max_nr_missing+1 before schedule it and check if it can NOT be scheduled Check the context of the Exception """ station_reserved = ['DE601', 'DE605'] self.assertTrue(create_reserved_stations_for_testing(station_reserved)) self._schedule_subtask_with_failure(station_reserved) def test_international_required_stations_conflicts_ok(self): """ Test conflict of 'International Required' stations which are have a default of max_nr_missing=1, Assign stations equal to max_nr_missing before schedule it and check if it can be scheduled """ station_reserved = ['DE605'] self.assertTrue(create_reserved_stations_for_testing(station_reserved)) with tmss_test_env.create_tmss_client() as client: client.schedule_subtask(self.subtask_id_of_first_observation) if __name__ == "__main__": os.environ['TZ'] = 'UTC' unittest.main()