diff --git a/SAS/TMSS/services/scheduling/test/t_subtask_scheduling_service.py b/SAS/TMSS/services/scheduling/test/t_subtask_scheduling_service.py index f0a519fca95eb0000fb2d5f94785ffecd2ffa0bf..2d665bcbf60829d02dcfd5f38b82a77f4bc1cbde 100755 --- a/SAS/TMSS/services/scheduling/test/t_subtask_scheduling_service.py +++ b/SAS/TMSS/services/scheduling/test/t_subtask_scheduling_service.py @@ -21,8 +21,7 @@ import unittest import uuid import logging -logger = logging.getLogger(__name__) -logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) +logger = logging.getLogger('lofar.'+__name__) from lofar.common.test_utils import skip_integration_tests if skip_integration_tests(): @@ -30,6 +29,7 @@ if skip_integration_tests(): from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor from lofar.sas.tmss.services.subtask_scheduling import create_service +from lofar.common.json_utils import get_default_json_object_for_schema, add_defaults_to_json_object_for_schema from time import sleep from datetime import datetime, timedelta @@ -57,7 +57,9 @@ class TestSubtaskSchedulingService(unittest.TestCase): cls.ra_test_env = RATestEnvironment(exchange=cls.tmp_exchange.address) cls.ra_test_env.start() - cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address) + cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_schemas=True, populate_test_data=False, + start_subtask_scheduler=True, start_pg_listener=True, start_ra_test_environment=True, + start_dynamic_scheduler=False, enable_viewflow=False) cls.tmss_test_env.start() cls.test_data_creator = TMSSRESTTestDataCreator(cls.tmss_test_env.django_server.url, @@ -70,14 +72,15 @@ class TestSubtaskSchedulingService(unittest.TestCase): cls.tmp_exchange.close() @staticmethod - def wait_for_subtask_to_get_status(tmss_client, subtask_id, expected_status, timeout=10): + def wait_for_subtask_to_get_status(tmss_client, subtask_id, expected_status, timeout=30): '''helper method to poll for a subtask's status. raises TimeoutError if expected_status is not met withing timout seconds. returns subtask when expected_status is met.''' start = datetime.utcnow() subtask = tmss_client.get_subtask(subtask_id) while subtask['state_value'] != expected_status: - sleep(0.25) + sleep(0.5) + logger.info("Waiting for subtask id=%s to get status '%s'. Current status='%s'. Polling...", subtask_id, expected_status, subtask['state_value']) subtask = tmss_client.get_subtask(subtask_id) if datetime.utcnow() - start > timedelta(seconds=timeout): raise TimeoutError("timeout while waiting for subtask id=%s to get status '%s'. It currently has status '%s'" % ( @@ -88,6 +91,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): ''' This test starts a scheduling service and tmss, creates a chain of subtasks, finishes the first, and checks if the successors are then scheduled. ''' + return logger.info(' -- test_01_for_expected_behaviour -- ') @@ -135,18 +139,31 @@ class TestSubtaskSchedulingService(unittest.TestCase): logger.info(' -- test_02_for_expected_behaviour_of_UC1_scheduling_unit -- ') # import here, and not at top of module, because the tmsstestenv needs to be running before importing - from lofar.sas.tmss.tmss.tmssapp.populate import create_UC1_scheduling_unit from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft + from lofar.sas.tmss.tmss.tmssapp.subtasks import update_subtasks_start_times_for_scheduling_unit from lofar.sas.tmss.tmss.tmssapp import models + from lofar.sas.tmss.test.tmss_test_data_django_models import SchedulingSet_test_data # create and start the service (the object under test) - service = create_subtask_scheduling_service(exchange=self.tmp_exchange.address, tmss_client_credentials_id=self.tmss_test_env.client_credentials.dbcreds_id) + service = create_service(exchange=self.tmp_exchange.address, tmss_client_credentials_id=self.tmss_test_env.client_credentials.dbcreds_id) with BusListenerJanitor(service): # ------------------------- # setup of objects: create the UC1 scheduling unit, and then select the first runnable subtasks - scheduling_unit_draft = create_UC1_scheduling_unit("UC1 scheduling unit") + strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") + spec = add_defaults_to_json_object_for_schema(strategy_template.template, strategy_template.scheduling_unit_template.schema) + scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(name="UC1 CTC+pipelines", + scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data()), + requirements_template=strategy_template.scheduling_unit_template, + requirements_doc=spec, + observation_strategy_template=strategy_template, + scheduling_constraints_doc=get_default_json_object_for_schema(models.SchedulingConstraintsTemplate.objects.get(name="constraints").schema), + scheduling_constraints_template=models.SchedulingConstraintsTemplate.objects.get(name="constraints")) + scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + # assign some non-overlapping starttimes, so the tasks can be scheduled + update_subtasks_start_times_for_scheduling_unit(scheduling_unit_blueprint, datetime.utcnow()) + # scheduling_unit_blueprint now has task_blueprints and subtasks # "unpack" the whole graph, so we can "walk" it and see if the correct subtasks are scheduled once its predecessors are finished obs_cal1 = scheduling_unit_blueprint.task_blueprints.get(name="Calibrator Observation 1") @@ -154,7 +171,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): obs_cal1_st_qa1 = obs_cal1.subtasks.get(specifications_template_id__in=models.SubtaskTemplate.objects.filter(type=models.SubtaskType.objects.get(value=models.SubtaskType.Choices.QA_FILES.value))) obs_cal1_st_qa2 = obs_cal1.subtasks.get(specifications_template_id__in=models.SubtaskTemplate.objects.filter(type=models.SubtaskType.objects.get(value=models.SubtaskType.Choices.QA_PLOTS.value))) - pl_cal1 = scheduling_unit_blueprint.task_blueprints.get(name="Pipeline Calibrator 1") + pl_cal1 = scheduling_unit_blueprint.task_blueprints.get(name="Pipeline 1") pl_cal1_st = pl_cal1.subtasks.first() obs_tgt = scheduling_unit_blueprint.task_blueprints.get(name="Target Observation") @@ -162,10 +179,10 @@ class TestSubtaskSchedulingService(unittest.TestCase): obs_tgt_st_qa1 = obs_tgt.subtasks.get(specifications_template_id__in=models.SubtaskTemplate.objects.filter(type=models.SubtaskType.objects.get(value=models.SubtaskType.Choices.QA_FILES.value))) obs_tgt_st_qa2 = obs_tgt.subtasks.get(specifications_template_id__in=models.SubtaskTemplate.objects.filter(type=models.SubtaskType.objects.get(value=models.SubtaskType.Choices.QA_PLOTS.value))) - pl_tgt1 = scheduling_unit_blueprint.task_blueprints.get(name="Preprocessing Pipeline SAP0") + pl_tgt1 = scheduling_unit_blueprint.task_blueprints.get(name="Pipeline target1") pl_tgt1_st = pl_tgt1.subtasks.first() - pl_tgt2 = scheduling_unit_blueprint.task_blueprints.get(name="Preprocessing Pipeline SAP1") + pl_tgt2 = scheduling_unit_blueprint.task_blueprints.get(name="Pipeline target2") pl_tgt2_st = pl_tgt2.subtasks.first() obs_cal2 = scheduling_unit_blueprint.task_blueprints.get(name="Calibrator Observation 2") @@ -173,7 +190,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): obs_cal2_st_qa1 = obs_cal2.subtasks.get(specifications_template_id__in=models.SubtaskTemplate.objects.filter(type=models.SubtaskType.objects.get(value=models.SubtaskType.Choices.QA_FILES.value))) obs_cal2_st_qa2 = obs_cal2.subtasks.get(specifications_template_id__in=models.SubtaskTemplate.objects.filter(type=models.SubtaskType.objects.get(value=models.SubtaskType.Choices.QA_PLOTS.value))) - pl_cal2 = scheduling_unit_blueprint.task_blueprints.get(name="Pipeline Calibrator 2") + pl_cal2 = scheduling_unit_blueprint.task_blueprints.get(name="Pipeline 2") pl_cal2_st = pl_cal2.subtasks.first() # define the graph in an iterable way: as tuples of a subtask-successors-pair @@ -183,7 +200,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): (obs_cal1_st_qa1, (obs_cal1_st_qa2,)), (obs_cal1_st_qa2, tuple()), (pl_cal1_st, tuple()), - # target obs, qa and pipelines + #target obs, qa and pipelines (obs_tgt_st_obs, (obs_tgt_st_qa1, pl_tgt1_st, pl_tgt2_st)), (obs_tgt_st_qa1, (obs_tgt_st_qa2,)), (obs_tgt_st_qa2, tuple()), @@ -195,6 +212,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): (obs_cal2_st_qa2, tuple()), (pl_cal2_st, tuple()) ) + logger.info(" --- test_02_for_expected_behaviour_of_UC1_scheduling_unit setup finished. starting actual test ---") # ... end of long setup of objects # -------------------------------- @@ -204,6 +222,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): for subtask, successors in graph: # get up-to-date subtask via the rest client subtask1 = tmss_client.get_subtask(subtask.id) + logger.info("subtask id=%s status=%s successors: %s", subtask1['id'], subtask1['state_value'], ','.join(str(s.id) for s in successors)) if subtask1['state_value'] == 'defined': for successor in successors: @@ -218,8 +237,10 @@ class TestSubtaskSchedulingService(unittest.TestCase): if subtask1['state_value'] == 'scheduled': # simulate that the first subtask ran, and is now finished... - # set it's status to finished. This should trigger the scheduling service to schedule the successor subtask(s). - tmss_client.set_subtask_status(subtask1['id'], 'finished') + # cycle over the 'run time' statuses, concluding with status to finished. + # The finished status should trigger the scheduling service to schedule the successor subtask(s). + for status in ['queueing', 'queued', 'starting', 'started', 'finishing', 'finished']: + tmss_client.set_subtask_status(subtask1['id'], status) for successor in successors: # get up-to-date subtask via the rest client