diff --git a/SAS/TMSS/backend/test/t_scheduling.py b/SAS/TMSS/backend/test/t_scheduling.py index 8fbad583b7a9edcc14053d35f79af2dfd1b9f1a2..2c7a7c33c75c3029b695390bbf1d15452dd1089f 100755 --- a/SAS/TMSS/backend/test/t_scheduling.py +++ b/SAS/TMSS/backend/test/t_scheduling.py @@ -24,12 +24,22 @@ import unittest from unittest import mock import logging -logger = logging.getLogger('lofar'+__name__) +logger = logging.getLogger('lofar.'+__name__) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) from lofar.common.test_utils import exit_with_skipped_code_if_skip_integration_tests exit_with_skipped_code_if_skip_integration_tests() +# create a module-wide TemporaryExchange, and use it in all communications between TMSSTestEnvironment, RA and ObservationControl +from lofar.messaging.messagebus import TemporaryExchange +tmp_exchange = TemporaryExchange('t_scheduling') +tmp_exchange.open() + +# override DEFAULT_BUSNAME with tmp exchange, some modules import from lofar.messaging others from lofar.messaging.config... +import lofar +lofar.messaging.DEFAULT_BUSNAME = tmp_exchange.address +lofar.messaging.config.DEFAULT_BUSNAME = tmp_exchange.address + # before we import any django modules the DJANGO_SETTINGS_MODULE, TMSS_LDAPCREDENTIALS and TMSS_DBCREDENTIALS need to be known/set. # import and start an isolated RATestEnvironment and TMSSTestEnvironment (with fresh database and attached django and ldap server on free ports) # this automagically sets the required DJANGO_SETTINGS_MODULE, TMSS_LDAPCREDENTIALS and TMSS_DBCREDENTIALS envvars. @@ -37,7 +47,8 @@ from lofar.sas.tmss.test.test_environment import TMSSTestEnvironment tmss_test_env = TMSSTestEnvironment(populate_schemas=True, populate_test_data=False, start_ra_test_environment=True, start_postgres_listener=False, start_subtask_scheduler=False, start_dynamic_scheduler=False, - enable_viewflow=False) + enable_viewflow=False, + exchange=tmp_exchange.address) try: tmss_test_env.start() @@ -45,11 +56,13 @@ except Exception as e: logger.exception(e) tmss_test_env.stop() + tmp_exchange.close() exit(1) # tell unittest to stop (and automagically cleanup) the test database once all testing is done. def tearDownModule(): tmss_test_env.stop() + tmp_exchange.close() from lofar.sas.tmss.test.tmss_test_data_django_models import * @@ -62,7 +75,8 @@ from lofar.sas.resourceassignment.resourceassigner.rarpc import RARPC from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.tmssapp.subtasks import * from lofar.sas.tmss.tmss.tmssapp.tasks import * - +from lofar.messaging.rpc import RPCService, ServiceMessageHandler +import threading def create_subtask_object_for_testing(subtask_type_value, subtask_state_value): """ @@ -212,6 +226,61 @@ class SchedulingTest(unittest.TestCase): subtask = client.get_subtask(subtask_id) self.assertEqual('cancelled', subtask['state_value']) + def test_cancel_scheduled_observation_subtask(self): + with tmss_test_env.create_tmss_client() as client: + subtask_template = client.get_subtask_template("observation control") + spec = get_default_json_object_for_schema(subtask_template['schema']) + spec['stations']['digital_pointings'][0]['subbands'] = [0] + subtask = self._create_target_observation_subtask(spec) + subtask_id = subtask['id'] + client.set_subtask_status(subtask_id, 'defined') + # scheduling should succeed + subtask = client.schedule_subtask(subtask_id) + self.assertEqual('scheduled', subtask['state_value']) + + # cancel it... + subtask = client.cancel_subtask(subtask_id) + self.assertEqual('cancelled', subtask['state_value']) + + def test_cancel_started_observation_subtask(self): + with tmss_test_env.create_tmss_client() as client: + subtask_template = client.get_subtask_template("observation control") + spec = get_default_json_object_for_schema(subtask_template['schema']) + spec['stations']['digital_pointings'][0]['subbands'] = [0] + subtask = self._create_target_observation_subtask(spec) + subtask_id = subtask['id'] + client.set_subtask_status(subtask_id, 'defined') + # scheduling should succeed + subtask = client.schedule_subtask(subtask_id) + self.assertEqual('scheduled', subtask['state_value']) + + # mimic that the obs was started and is now running + client.set_subtask_status(subtask_id, 'starting') + client.set_subtask_status(subtask_id, 'started') + + observation_killed = threading.Event() + class MockObsControlMessageHandler(ServiceMessageHandler): + def __init__(self): + super(MockObsControlMessageHandler, self).__init__() + self.register_service_method("AbortObservation", self.abort_observation) + + def abort_observation(self, sas_id): + observation_killed.set() + return {'aborted': True} + + with RPCService(service_name=lofar.mac.config.DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME, + handler_type=MockObsControlMessageHandler, + exchange=tmp_exchange.address): + + # cancel observation subtask... should kill the running observation + # check that ObservationControlRPCClient.abort_observation was called + subtask = client.cancel_subtask(subtask_id) + self.assertEqual('cancelled', subtask['state_value']) + + observation_killed.wait(10) + self.assertTrue(observation_killed.is_set()) + + def test_schedule_observation_subtask_with_one_blocking_reservation_failed(self): """ Set (Resource Assigner) station CS001 to reserved