From 6d4b4af46a31c6e97bf4b76f2887fdaaf273307b Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Mon, 12 Apr 2021 21:06:22 +0200 Subject: [PATCH] TMSS-725: fixed tests, and added convenience method to set a subtask to a desired state following the allowed transitions. --- .../test/t_feedback_handling_service.py | 3 +- .../test/t_tmss_postgres_listener_service.py | 14 +- .../websocket/test/t_websocket_service.py | 2 +- .../src/tmss/tmssapp/models/scheduling.py | 11 + SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py | 66 ++++- .../tests/t_workflow_qaworkflow.py | 41 ++-- SAS/TMSS/backend/test/t_adapter.py | 25 +- .../test/t_permissions_system_roles.py | 50 ++-- SAS/TMSS/backend/test/t_scheduling.py | 58 ++--- SAS/TMSS/backend/test/t_subtasks.py | 150 ++++++++++-- SAS/TMSS/backend/test/t_tasks.py | 229 ++++++++++-------- 11 files changed, 422 insertions(+), 227 deletions(-) diff --git a/SAS/TMSS/backend/services/feedback_handling/test/t_feedback_handling_service.py b/SAS/TMSS/backend/services/feedback_handling/test/t_feedback_handling_service.py index 525610174d5..cbb276339dc 100755 --- a/SAS/TMSS/backend/services/feedback_handling/test/t_feedback_handling_service.py +++ b/SAS/TMSS/backend/services/feedback_handling/test/t_feedback_handling_service.py @@ -99,7 +99,8 @@ Observation.DataProducts.Output_Correlated_[{subband}].subband={subband}""" self.assertEqual(empty_dataproduct_feedback_template['url'], dataproduct['feedback_template']) # TMSS only accepts feedback in finishing state - tmss_client.set_subtask_status(subtask_id=subtask_id, status='finishing') + from lofar.sas.tmss.tmss.tmssapp.subtasks import set_subtask_state_following_allowed_transitions, Subtask + set_subtask_state_following_allowed_transitions(Subtask.objects.get(id=subtask_id), 'finishing') # test handler busines logic without messagebuses # assume the old qpid messagebus just works, and delivers proper feedback chuncks in the payload. diff --git a/SAS/TMSS/backend/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py b/SAS/TMSS/backend/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py index 37fbe82b303..d12fa1d8eda 100755 --- a/SAS/TMSS/backend/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py +++ b/SAS/TMSS/backend/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py @@ -144,7 +144,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): # update subtask status, use a nice tmss_client and the rest api. with self.tmss_test_env.create_tmss_client() as client: - client.set_subtask_status(subtask['id'], 'scheduled') + client.set_subtask_status(subtask['id'], 'defined') # ugly, but functional. Wait for all status updates: 1 object, 1 status. both per each object (3 types) => total 6 events. start_wait = datetime.utcnow() @@ -160,20 +160,20 @@ class TestSubtaskSchedulingService(unittest.TestCase): self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX + '.Updated', service.subjects.popleft()) self.assertEqual({'id': subtask['id']}, service.contentDicts.popleft()) - self.assertEqual(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.Scheduled', service.subjects.popleft()) - self.assertEqual({'id': subtask['id'], 'status': 'scheduled'}, service.contentDicts.popleft()) + self.assertEqual(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.Defined', service.subjects.popleft()) + self.assertEqual({'id': subtask['id'], 'status': 'defined'}, service.contentDicts.popleft()) self.assertEqual(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', service.subjects.popleft()) self.assertEqual({'id': task_blueprint['id']}, service.contentDicts.popleft()) - self.assertEqual(TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX+'.Scheduled', service.subjects.popleft()) - self.assertEqual({'id': task_blueprint['id'], 'status': 'scheduled'}, service.contentDicts.popleft()) + self.assertEqual(TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX+'.Schedulable', service.subjects.popleft()) + self.assertEqual({'id': task_blueprint['id'], 'status': 'schedulable'}, service.contentDicts.popleft()) self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', service.subjects.popleft()) self.assertEqual({'id': su_blueprint['id']}, service.contentDicts.popleft()) - self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.Scheduled', service.subjects.popleft()) - self.assertEqual({'id': su_blueprint['id'], 'status': 'scheduled'}, service.contentDicts.popleft()) + self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.Schedulable', service.subjects.popleft()) + self.assertEqual({'id': su_blueprint['id'], 'status': 'schedulable'}, service.contentDicts.popleft()) # delete subtask, use direct http delete request on rest api requests.delete(subtask['url'], auth=self.test_data_creator.auth) diff --git a/SAS/TMSS/backend/services/websocket/test/t_websocket_service.py b/SAS/TMSS/backend/services/websocket/test/t_websocket_service.py index f3f8388cb9b..233a4992ec2 100755 --- a/SAS/TMSS/backend/services/websocket/test/t_websocket_service.py +++ b/SAS/TMSS/backend/services/websocket/test/t_websocket_service.py @@ -158,7 +158,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): # Test updates with self.tmss_test_env.create_tmss_client() as client: # Test subtask update - client.set_subtask_status(subtask['id'], 'scheduled') + client.set_subtask_status(subtask['id'], 'defined') subtask = requests.get(subtask['url'], auth=self.test_data_creator.auth).json() test_object(subtask, self.ObjTypes.SUBTASK, self.ObjActions.UPDATE) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py index 86c316ad604..be3474b8d8a 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py @@ -322,6 +322,17 @@ class SubtaskAllowedStateTransitions(Model): old_state = ForeignKey('SubtaskState', null=True, editable=False, on_delete=PROTECT, related_name='allowed_transition_from', help_text='Subtask state before update (see Subtask State Machine).') new_state = ForeignKey('SubtaskState', null=False, editable=False, on_delete=PROTECT, related_name='allowed_transition_to', help_text='Subtask state after update (see Subtask State Machine).') + @staticmethod + def allowed_new_states(old_state: SubtaskState) -> [SubtaskState]: + '''get a list of all states we are allowed to transition to from the given old_state''' + return [transition.new_state for transition in SubtaskAllowedStateTransitions.objects.filter(old_state=old_state).all()] + + @staticmethod + def illegal_new_states(old_state: SubtaskState) -> [SubtaskState]: + '''get a list of all states we are NOT allowed to transition to from the given old_state''' + allowed_new_states = SubtaskAllowedStateTransitions.allowed_new_states(old_state) + return list(SubtaskState.objects.exclude(value__in=[s.value for s in allowed_new_states]).exclude(pk=old_state.pk).all()) + class SubtaskStateLog(BasicCommon): """ diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 5c1513c8291..ca4b2df6e0e 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -463,10 +463,6 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) SubtaskType.Choices.QA_FILES.value, observation_subtask.pk, observation_subtask.specifications_template.type, SubtaskType.Choices.OBSERVATION.value)) - if observation_subtask.state.value == SubtaskState.Choices.DEFINING.value: - raise ValueError("Cannot create %s subtask for subtask id=%d because it is not DEFINED" % ( - SubtaskType.Choices.QA_FILES.value, observation_subtask.pk)) - obs_task_spec = get_observation_task_specification_with_check_for_calibrator(observation_subtask) obs_task_qafile_spec = obs_task_spec.get("QA", {}).get("file_conversion", {}) @@ -1381,16 +1377,16 @@ def schedule_ingest_subtask(ingest_subtask: Subtask): ingest_subtask.specifications_template.type, SubtaskType.Choices.INGEST.value)) - # step 1: set state to SCHEDULING - ingest_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value) - ingest_subtask.save() - # check permission pre-requisites scheduling_unit_blueprint = ingest_subtask.task_blueprint.scheduling_unit_blueprint if scheduling_unit_blueprint.ingest_permission_required: if scheduling_unit_blueprint.ingest_permission_granted_since is None or scheduling_unit_blueprint.ingest_permission_granted_since > datetime.utcnow(): raise SubtaskSchedulingException("Cannot schedule ingest subtask id=%d because it requires explicit permission and the permission has not been granted (yet)" % (ingest_subtask.pk,)) + # step 1: set state to SCHEDULING + ingest_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value) + ingest_subtask.save() + # step 1a: set start/stop times # not very relevant for ingest subtasks, but it's nice for the user to see when the ingest task was scheduled. # please note that an ingest subtask may idle for some time while it is in the ingest queue. @@ -1617,3 +1613,57 @@ def get_observation_task_specification_with_check_for_calibrator(subtask): else: task_spec = subtask.task_blueprint.specifications_doc return task_spec + + +def set_subtask_state_following_allowed_transitions(subtask: Subtask, state_value:str) -> Subtask: + '''helper function to set subtask state following allowed transitions''' + while subtask.state.value != state_value and (subtask.state.value not in (SubtaskState.Choices.FINISHED.value, + SubtaskState.Choices.ERROR.value, + SubtaskState.Choices.CANCELLED.value)): + # handle "unsuccessful path" to cancelled/canceling end state + if state_value in (SubtaskState.Choices.CANCELLED.value, SubtaskState.Choices.CANCELLING.value) and \ + subtask.state.value not in (SubtaskState.Choices.DEFINING.value, + SubtaskState.Choices.QUEUEING.value, + SubtaskState.Choices.STARTING.value, + SubtaskState.Choices.FINISHING.value, + SubtaskState.Choices.CANCELLING.value): + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.CANCELLING.value) + + # handle "unsuccessful path" to error end state + elif state_value == SubtaskState.Choices.ERROR.value and subtask.state.value in (SubtaskState.Choices.DEFINING.value, + SubtaskState.Choices.QUEUEING.value, + SubtaskState.Choices.STARTING.value, + SubtaskState.Choices.FINISHING.value, + SubtaskState.Choices.CANCELLING.value): + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value) + + # handle reverse path to unscheduling + elif state_value == SubtaskState.Choices.UNSCHEDULING.value and subtask.state.value in (SubtaskState.Choices.SCHEDULED.value): + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.UNSCHEDULING.value) + else: + # handle "normal successful path" + if subtask.state.value == SubtaskState.Choices.DEFINING.value: + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) + elif subtask.state.value == SubtaskState.Choices.DEFINED.value: + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value) + elif subtask.state.value == SubtaskState.Choices.SCHEDULING.value: + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value) + elif subtask.state.value == SubtaskState.Choices.SCHEDULED.value: + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.QUEUEING.value) + elif subtask.state.value == SubtaskState.Choices.QUEUEING.value: + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.QUEUED.value) + elif subtask.state.value == SubtaskState.Choices.QUEUED.value: + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.STARTING.value) + elif subtask.state.value == SubtaskState.Choices.STARTING.value: + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.STARTED.value) + elif subtask.state.value == SubtaskState.Choices.STARTED.value: + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.FINISHING.value) + elif subtask.state.value == SubtaskState.Choices.FINISHING.value: + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.FINISHED.value) + elif subtask.state.value == SubtaskState.Choices.CANCELLING.value: + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.CANCELLED.value) + elif subtask.state.value == SubtaskState.Choices.UNSCHEDULING.value: + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) + + subtask.save() + return subtask diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py b/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py index 4616f51c172..ad8b28bf153 100755 --- a/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py @@ -56,7 +56,6 @@ class SchedulingUnitFlowTest(unittest.TestCase): cls.sync_event_bp_scheduled = Event() cls.sync_event_bp_cannot_proceed = Event() - class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) @@ -151,11 +150,11 @@ class SchedulingUnitFlowTest(unittest.TestCase): #Change subtask status to scheduled + from lofar.sas.tmss.tmss.tmssapp.subtasks import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: for subtask in task_blueprint.subtasks.all(): - subtask.state = models.SubtaskState.objects.get(value='scheduled') - subtask.save() + set_subtask_state_following_allowed_transitions(subtask, 'scheduled') # wait until scheduling unit is scheduled if not sync_event_bp_scheduled.wait(timeout=10): @@ -183,13 +182,13 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') #Change subtask status to finished + from lofar.sas.tmss.tmss.tmssapp.subtasks import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): task_blueprint.output_pinned=True task_blueprint.save() for subtask in task_blueprint.subtasks.all(): - subtask.state = models.SubtaskState.objects.get(value='finished') - subtask.save() + set_subtask_state_following_allowed_transitions(subtask, 'finished') if not sync_event_bp_cannot_proceed.wait(timeout=10): logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") @@ -464,11 +463,11 @@ class SchedulingUnitFlowTest(unittest.TestCase): #Change subtask status to scheduled + from lofar.sas.tmss.tmss.tmssapp.subtasks import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: for subtask in task_blueprint.subtasks.all(): - subtask.state = models.SubtaskState.objects.get(value='scheduled') - subtask.save() + set_subtask_state_following_allowed_transitions(subtask, 'scheduled') # wait until scheduling unit is scheduled if not sync_event_bp_scheduled.wait(timeout=10): @@ -496,13 +495,13 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') #Change subtask status to finished + from lofar.sas.tmss.tmss.tmssapp.subtasks import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): task_blueprint.output_pinned=True task_blueprint.save() for subtask in task_blueprint.subtasks.all(): - subtask.state = models.SubtaskState.objects.get(value='finished') - subtask.save() + set_subtask_state_following_allowed_transitions(subtask, 'finished') if not sync_event_bp_cannot_proceed.wait(timeout=10): logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") @@ -675,11 +674,11 @@ class SchedulingUnitFlowTest(unittest.TestCase): #Change subtask status to scheduled + from lofar.sas.tmss.tmss.tmssapp.subtasks import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: for subtask in task_blueprint.subtasks.all(): - subtask.state = models.SubtaskState.objects.get(value='scheduled') - subtask.save() + set_subtask_state_following_allowed_transitions(subtask, 'scheduled') # wait until scheduling unit is scheduled if not sync_event_bp_scheduled.wait(timeout=10): @@ -707,13 +706,13 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') #Change subtask status to finished + from lofar.sas.tmss.tmss.tmssapp.subtasks import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): task_blueprint.output_pinned=True task_blueprint.save() for subtask in task_blueprint.subtasks.all(): - subtask.state = models.SubtaskState.objects.get(value='finished') - subtask.save() + set_subtask_state_following_allowed_transitions(subtask, 'finished') if not sync_event_bp_cannot_proceed.wait(timeout=10): logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") @@ -913,11 +912,11 @@ class SchedulingUnitFlowTest(unittest.TestCase): #Change subtask status to scheduled + from lofar.sas.tmss.tmss.tmssapp.subtasks import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: for subtask in task_blueprint.subtasks.all(): - subtask.state = models.SubtaskState.objects.get(value='scheduled') - subtask.save() + set_subtask_state_following_allowed_transitions(subtask, 'scheduled') # wait until scheduling unit is scheduled if not sync_event_bp_scheduled.wait(timeout=10): @@ -945,13 +944,13 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') #Change subtask status to finished + from lofar.sas.tmss.tmss.tmssapp.subtasks import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): task_blueprint.output_pinned=True task_blueprint.save() for subtask in task_blueprint.subtasks.all(): - subtask.state = models.SubtaskState.objects.get(value='finished') - subtask.save() + set_subtask_state_following_allowed_transitions(subtask, 'finished') if not sync_event_bp_cannot_proceed.wait(timeout=10): logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") @@ -1210,11 +1209,11 @@ class SchedulingUnitFlowTest(unittest.TestCase): #Change subtask status to scheduled + from lofar.sas.tmss.tmss.tmssapp.subtasks import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: for subtask in task_blueprint.subtasks.all(): - subtask.state = models.SubtaskState.objects.get(value='scheduled') - subtask.save() + set_subtask_state_following_allowed_transitions(subtask, 'scheduled') # wait until scheduling unit is scheduled if not sync_event_bp_scheduled.wait(timeout=10): @@ -1242,13 +1241,13 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') #Change subtask status to finished + from lofar.sas.tmss.tmss.tmssapp.subtasks import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): task_blueprint.output_pinned=False task_blueprint.save() for subtask in task_blueprint.subtasks.all(): - subtask.state = models.SubtaskState.objects.get(value='finished') - subtask.save() + set_subtask_state_following_allowed_transitions(subtask, 'finished') if not sync_event_bp_cannot_proceed.wait(timeout=10): logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") diff --git a/SAS/TMSS/backend/test/t_adapter.py b/SAS/TMSS/backend/test/t_adapter.py index 772a2d43ed7..44a249b8aa7 100755 --- a/SAS/TMSS/backend/test/t_adapter.py +++ b/SAS/TMSS/backend/test/t_adapter.py @@ -50,6 +50,7 @@ from lofar.sas.tmss.tmss.tmssapp.adapters.sip import generate_sip_for_dataproduc from lofar.sas.tmss.tmss.tmssapp.adapters.feedback import append_to_subtask_raw_feedback, process_feedback_into_subtask_dataproducts, process_feedback_for_subtask_and_set_to_finished_if_complete, reprocess_raw_feedback_for_subtask_and_set_to_finished_if_complete from lofar.lta.sip import constants from lofar.parameterset import parameterset +from lofar.sas.tmss.tmss.tmssapp.subtasks import set_subtask_state_following_allowed_transitions from lofar.sas.resourceassignment.resourceassignmentestimator.resource_estimators import ObservationResourceEstimator @@ -376,14 +377,14 @@ _isCobalt=T def test_generate_dataproduct_feedback_from_subtask_feedback_and_set_finished_fails_on_incomplete_feedback(self): - subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value='finishing'), - subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) + subtask_data = Subtask_test_data(subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) subtask_obs:models.Subtask = models.Subtask.objects.create(**subtask_data) + set_subtask_state_following_allowed_transitions(subtask_obs, 'finishing') subtask_obs_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_obs)) - subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value='finishing'), - subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) + subtask_data = Subtask_test_data(subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) subtask_pipe: models.Subtask = models.Subtask.objects.create(**subtask_data) + set_subtask_state_following_allowed_transitions(subtask_pipe, 'finishing') subtask_pipe_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_pipe)) test_dir = "/tmp/test/data/%s" % uuid.uuid4() @@ -410,14 +411,14 @@ _isCobalt=T self.assertEqual(self.feedback_pipe_incomplete.strip(), subtask_pipe.raw_feedback.strip()) def test_generate_dataproduct_feedback_from_subtask_feedback_and_set_finished_after_reprocessing(self): - subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value='finishing'), - subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) + subtask_data = Subtask_test_data(subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) subtask_obs:models.Subtask = models.Subtask.objects.create(**subtask_data) + set_subtask_state_following_allowed_transitions(subtask_obs, 'finishing') subtask_obs_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_obs)) - subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value='finishing'), - subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) + subtask_data = Subtask_test_data(subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) subtask_pipe: models.Subtask = models.Subtask.objects.create(**subtask_data) + set_subtask_state_following_allowed_transitions(subtask_pipe, 'finishing') subtask_pipe_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_pipe)) test_dir = "/tmp/test/data/%s" % uuid.uuid4() @@ -454,14 +455,14 @@ _isCobalt=T self.assertTrue(subtask_pipe.is_feedback_complete) def test_generate_dataproduct_feedback_from_subtask_feedback_and_set_finished(self): - subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value='finishing'), - subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) + subtask_data = Subtask_test_data(subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) subtask_obs:models.Subtask = models.Subtask.objects.create(**subtask_data) + set_subtask_state_following_allowed_transitions(subtask_obs, 'finishing') subtask_obs_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_obs)) - subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value='finishing'), - subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) + subtask_data = Subtask_test_data(subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) subtask_pipe: models.Subtask = models.Subtask.objects.create(**subtask_data) + set_subtask_state_following_allowed_transitions(subtask_pipe, 'finishing') subtask_pipe_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_pipe)) empty_feedback_template = models.DataproductFeedbackTemplate.objects.get(name='empty') diff --git a/SAS/TMSS/backend/test/t_permissions_system_roles.py b/SAS/TMSS/backend/test/t_permissions_system_roles.py index 5d05682bec0..4d6856642d5 100755 --- a/SAS/TMSS/backend/test/t_permissions_system_roles.py +++ b/SAS/TMSS/backend/test/t_permissions_system_roles.py @@ -67,7 +67,7 @@ test_data_creator = TMSSRESTTestDataCreator(BASE_URL, AUTH) from lofar.sas.tmss.tmss.tmssapp.viewsets.permissions import TMSSPermissions from lofar.sas.tmss.tmss.tmssapp.viewsets.scheduling import SubtaskViewSet from django.contrib.auth.models import User, Group - +from lofar.sas.tmss.tmss.tmssapp.subtasks import set_subtask_state_following_allowed_transitions, Subtask class SystemPermissionTestCase(unittest.TestCase): ''' @@ -75,9 +75,7 @@ class SystemPermissionTestCase(unittest.TestCase): ''' @classmethod - def setUpClass(cls) -> None: - super().setUpClass() - + def create_subtask(cls) -> int: # Create preparatory data with tmss_test_env.create_tmss_client() as client: cluster_url = client.get_path_as_json_object('/cluster/1')['url'] @@ -97,12 +95,17 @@ class SystemPermissionTestCase(unittest.TestCase): task_blueprint_url=obs_task_blueprint['url'], raw_feedback='Observation.Correlator.channelWidth=3051.7578125') obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/') - cls.obs_subtask_id = obs_subtask['id'] - obs_subtask_output_url = test_data_creator.post_data_and_get_url( - test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/') - test_data_creator.post_data_and_get_url( - test_data_creator.Dataproduct(filename="L%s_SB000.MS" % obs_subtask['id'], - subtask_output_url=obs_subtask_output_url), '/dataproduct/') + obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/') + test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS" % obs_subtask['id'], + subtask_output_url=obs_subtask_output_url), '/dataproduct/') + + return obs_subtask['id'] + + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + + cls.obs_subtask_id = cls.create_subtask() # Create test_data_creator as regular user cls.test_data_creator = TMSSRESTTestDataCreator(BASE_URL, requests.auth.HTTPBasicAuth('paulus', 'pauluspass')) @@ -364,12 +367,12 @@ class SystemPermissionTestCase(unittest.TestCase): # Assert Paulus has the process_feedback_and_set_to_finished_if_complete_subtask permission self.assertTrue(user.has_perm('tmssapp.process_feedback_and_set_to_finished_if_complete_subtask')) + obs_subtask_id = self.create_subtask() # Set subtask status to finishing, so it can process feedback and set to finished. - with tmss_test_env.create_tmss_client() as client: - client.set_subtask_status(self.obs_subtask_id, 'finishing') + set_subtask_state_following_allowed_transitions(Subtask.objects.get(id=obs_subtask_id), 'finishing') # Try to process_feedback_and_set_to_finished_if_complete subtask and assert Paulus can do it within the TO observer group permissions. - response = POST_and_assert_expected_response(self, BASE_URL + '/subtask/%s/process_feedback_and_set_to_finished_if_complete/' % self.obs_subtask_id, + response = POST_and_assert_expected_response(self, BASE_URL + '/subtask/%s/process_feedback_and_set_to_finished_if_complete/' % obs_subtask_id, {}, 200, {}, auth=self.test_data_creator.auth) @@ -395,8 +398,7 @@ class SystemPermissionTestCase(unittest.TestCase): auth=self.test_data_creator.auth) - def test_Subtask_can_reprocess_raw_feedback_for_subtask_and_set_to_finished_if_complete_with_to_observer_group( - self): + def test_Subtask_can_reprocess_raw_feedback_for_subtask_and_set_to_finished_if_complete_with_to_observer_group(self): user = User.objects.get(username='paulus') user.groups.set([self.to_observer_group]) @@ -410,8 +412,12 @@ class SystemPermissionTestCase(unittest.TestCase): # Assert Paulus has the reprocess_raw_feedback_for_subtask_and_set_to_finished_if_complete_subtask permission self.assertTrue(user.has_perm('tmssapp.reprocess_raw_feedback_for_subtask_and_set_to_finished_if_complete_subtask')) + obs_subtask_id = self.create_subtask() + # Set subtask status to finishing, so we can reprocess feedback + set_subtask_state_following_allowed_transitions(Subtask.objects.get(id=obs_subtask_id), 'finishing') + # Try to reprocess_raw_feedback_for_subtask_and_set_to_finished_if_complete subtask and assert Paulus can do it within the TO observer group permissions. - response = GET_and_assert_equal_expected_code(self, BASE_URL + '/subtask/%s/reprocess_raw_feedback_for_subtask_and_set_to_finished_if_complete/' % self.obs_subtask_id, + response = GET_and_assert_equal_expected_code(self, BASE_URL + '/subtask/%s/reprocess_raw_feedback_for_subtask_and_set_to_finished_if_complete/' % obs_subtask_id, 200, auth=self.test_data_creator.auth) @@ -451,12 +457,12 @@ class SystemPermissionTestCase(unittest.TestCase): # Assert Paulus has the schedule_subtask permission self.assertTrue(user.has_perm('tmssapp.schedule_subtask')) + obs_subtask_id = self.create_subtask() # Set subtask status to defined, so it can be scheduled. - with tmss_test_env.create_tmss_client() as client: - client.set_subtask_status(self.obs_subtask_id, 'defined') + set_subtask_state_following_allowed_transitions(Subtask.objects.get(id=obs_subtask_id), 'defined') # Try to schedule subtask and assert Paulus can do it within the TO observer group permissions. - response = GET_and_assert_equal_expected_code(self, BASE_URL + '/subtask/%s/schedule/' % self.obs_subtask_id, + response = GET_and_assert_equal_expected_code(self, BASE_URL + '/subtask/%s/schedule/' % obs_subtask_id, 200, auth=self.test_data_creator.auth) @@ -658,12 +664,12 @@ class SystemPermissionTestCase(unittest.TestCase): # Assert Paulus has the unschedule_subtask permission self.assertTrue(user.has_perm('tmssapp.unschedule_subtask')) + obs_subtask_id = self.create_subtask() # Set subtask status to scheduled, so it can be unscheduled. - with tmss_test_env.create_tmss_client() as client: - client.set_subtask_status(self.obs_subtask_id, 'scheduled') + set_subtask_state_following_allowed_transitions(Subtask.objects.get(id=obs_subtask_id), 'scheduled') # Try to unschedule subtask and assert Paulus can do it within the TO observer group permissions. - response = GET_and_assert_equal_expected_code(self, BASE_URL + '/subtask/%s/unschedule/' % self.obs_subtask_id, + response = GET_and_assert_equal_expected_code(self, BASE_URL + '/subtask/%s/unschedule/' % obs_subtask_id, 200, auth=self.test_data_creator.auth) diff --git a/SAS/TMSS/backend/test/t_scheduling.py b/SAS/TMSS/backend/test/t_scheduling.py index 6a6ff816fce..bd38f74bb22 100755 --- a/SAS/TMSS/backend/test/t_scheduling.py +++ b/SAS/TMSS/backend/test/t_scheduling.py @@ -72,10 +72,11 @@ def create_subtask_object_for_testing(subtask_type_value, subtask_state_value): """ task_blueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(specifications_template=models.TaskTemplate.objects.get(name='target observation' if subtask_type_value=='observation' else 'preprocessing pipeline'))) subtask_template_obj = models.SubtaskTemplate.objects.get(name="%s control" % subtask_type_value) - subtask_state_obj = models.SubtaskState.objects.get(value=subtask_state_value) - subtask_data = Subtask_test_data(subtask_template=subtask_template_obj, state=subtask_state_obj, task_blueprint=task_blueprint) - return models.Subtask.objects.create(**subtask_data) - + subtask_data = Subtask_test_data(subtask_template=subtask_template_obj, task_blueprint=task_blueprint) + subtask = models.Subtask.objects.create(**subtask_data) + if subtask.state.value != subtask_state_value: + set_subtask_state_following_allowed_transitions(subtask, subtask_state_value) + return subtask def create_reserved_stations_for_testing(station_list): """ @@ -320,7 +321,8 @@ class SchedulingTest(unittest.TestCase): test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=pipe_subtask['url']), '/subtask_output/') for predecessor in client.get_subtask_predecessors(pipe_subtask['id']): - client.set_subtask_status(predecessor['id'], 'finished') + for state in ('defined', 'scheduling', 'scheduled', 'starting', 'started', 'finishing', 'finished'): + client.set_subtask_status(predecessor['id'], state) client.set_subtask_status(pipe_subtask['id'], 'defined') subtask = client.schedule_subtask(pipe_subtask['id']) @@ -362,7 +364,9 @@ class SchedulingTest(unittest.TestCase): test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=ingest_subtask['url']), '/subtask_output/') for predecessor in client.get_subtask_predecessors(ingest_subtask['id']): - client.set_subtask_status(predecessor['id'], 'finished') + for state in ('defined', 'scheduling', 'scheduled', 'starting', 'started', 'finishing', 'finished'): + client.set_subtask_status(predecessor['id'], state) + client.set_subtask_status(ingest_subtask['id'], 'defined') task_blueprint = client.get_url_as_json_object(ingest_subtask['task_blueprint']) @@ -441,7 +445,8 @@ class SchedulingTest(unittest.TestCase): self.assertEqual('scheduled', subtask['state_value']) self.assertEqual('scheduled', tmss_test_env.ra_test_environment.radb.getTask(tmss_id=subtask['id'])['status']) - client.set_subtask_status(subtask['id'], 'finished') + for state in ('starting', 'started', 'finishing', 'finished'): + client.set_subtask_status(subtask['id'], state) class SubtaskInputOutputTest(unittest.TestCase): @@ -608,8 +613,11 @@ class TestWithUC1Specifications(unittest.TestCase): Note that this test requires Resource Assigner testenvironment being alive """ - @classmethod - def setUpClass(cls) -> None: + def setUp(self) -> None: + # clean all specs/tasks/claims in RADB (cascading delete) + for spec in tmss_test_env.ra_test_environment.radb.getSpecifications(): + tmss_test_env.ra_test_environment.radb.deleteSpecification(spec['id']) + strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") scheduling_unit_draft = models.SchedulingUnitDraft.objects.create( @@ -625,40 +633,24 @@ class TestWithUC1Specifications(unittest.TestCase): create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) scheduling_unit_draft.refresh_from_db() - cls.task_drafts = scheduling_unit_draft.task_drafts.all() - cls.scheduling_unit_blueprints = scheduling_unit_draft.scheduling_unit_blueprints.all() - cls.scheduling_unit_blueprint = cls.scheduling_unit_blueprints[0] - cls.task_blueprints = cls.scheduling_unit_blueprint.task_blueprints.all() + self.task_drafts = scheduling_unit_draft.task_drafts.all() + self.scheduling_unit_blueprints = scheduling_unit_draft.scheduling_unit_blueprints.all() + self.scheduling_unit_blueprint = self.scheduling_unit_blueprints[0] + self.task_blueprints = self.scheduling_unit_blueprint.task_blueprints.all() # SubtaskId of the first observation subtask - observation_tbp = list(tb for tb in list(cls.task_blueprints) if tb.specifications_template.type.value == TaskType.Choices.OBSERVATION.value) + observation_tbp = list(tb for tb in list(self.task_blueprints) if tb.specifications_template.type.value == TaskType.Choices.OBSERVATION.value) observation_tbp.sort(key=lambda tb: tb.relative_start_time) - cls.subtask_id_of_first_observation = list(st for st in observation_tbp[0].subtasks.all() - if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value)[0].id + self.subtask_id_of_first_observation = list(st for st in observation_tbp[0].subtasks.all() + if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value)[0].id - def setUp(self): - # clean all specs/tasks/claims in RADB (cascading delete) - for spec in tmss_test_env.ra_test_environment.radb.getSpecifications(): - tmss_test_env.ra_test_environment.radb.deleteSpecification(spec['id']) # Unschedule subtask, setting it back to 'defined', removing all dataproducts. for tb in self.task_blueprints: for subtask in tb.subtasks.all(): - if subtask.state.value == SubtaskState.Choices.SCHEDULED.value: - unschedule_subtask(subtask) - if subtask.state.value == SubtaskState.Choices.ERROR.value: - subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) - - for output in subtask.outputs.all(): - # delete all transforms (the producers of the output dataproducts), and the the dataproducts themselves - output.dataproducts.all().select_related('producers').delete() - output.dataproducts.all().delete() - # start_time to now (and no stoptime) subtask.stop_time = None subtask.start_time = datetime.utcnow() subtask.save() - - def _schedule_subtask_with_failure(self, station_reserved): with tmss_test_env.create_tmss_client() as client: with self.assertRaises(Exception) as context: @@ -715,10 +707,10 @@ class TestWithUC1Specifications(unittest.TestCase): for name, times in test_timeschedule.items(): task_blueprint = list(filter(lambda x: x.name == name, self.task_blueprints))[0] for subtask in task_blueprint.subtasks.all(): - subtask.state = models.SubtaskState.objects.get(value="finished") subtask.stop_time = datetime.strptime(times[1], DATETIME_FORMAT) subtask.start_time = datetime.strptime(times[0], DATETIME_FORMAT) subtask.save() + set_subtask_state_following_allowed_transitions(subtask, "finished") # Check times self.assertEqual("2020-11-01 19:20:00", self.scheduling_unit_blueprint.observed_end_time.strftime("%Y-%m-%d %H:%M:%S")) diff --git a/SAS/TMSS/backend/test/t_subtasks.py b/SAS/TMSS/backend/test/t_subtasks.py index 8086f231da7..b6aa1c29a25 100755 --- a/SAS/TMSS/backend/test/t_subtasks.py +++ b/SAS/TMSS/backend/test/t_subtasks.py @@ -42,15 +42,14 @@ from lofar.sas.tmss.tmss.tmssapp.subtasks import * from lofar.sas.tmss.tmss.tmssapp.subtasks import _get_related_target_sap_by_name, _generate_tab_ring_pointings, _filter_subbands, _add_pointings -def create_subtask_object_for_testing(subtask_type_value, subtask_state_value): +def create_subtask_object_for_testing(subtask_type_value): """ Helper function to create a subtask object for testing with given subtask value and subtask state value as string (no object) """ template_type = models.SubtaskType.objects.get(value=subtask_type_value) subtask_template_obj = create_subtask_template_for_testing(template_type) - subtask_state_obj = models.SubtaskState.objects.get(value=subtask_state_value) - subtask_data = Subtask_test_data(subtask_template=subtask_template_obj, state=subtask_state_obj) + subtask_data = Subtask_test_data(subtask_template=subtask_template_obj) return models.Subtask.objects.create(**subtask_data) @@ -118,24 +117,21 @@ def create_scheduling_relation_task_blueprint_for_testing(first_task_blueprint, class SubTasksCreationFromSubTask(unittest.TestCase): - def test_create_qafile_subtask_from_observation_subtask_failed(self): + def test_create_qafile_subtask_from_pipeline_subtask_failed(self): """ - Test if creation of subtask qafile failed due to wrong state or wrong type of the predecessor subtask - Correct state should be 'defined' and correct type should be 'observation' (for this test of course it is not) + Test if creation of subtask qafile failed due to wrong type of the predecessor subtask + Correct type should be 'observation' (for this test of course it is not) """ - subtasks = [create_subtask_object_for_testing("pipeline", "defined"), - create_subtask_object_for_testing("observation", "defining"), - create_subtask_object_for_testing("observation", "defining") ] - for subtask in subtasks: - with self.assertRaises(ValueError): - create_qafile_subtask_from_observation_subtask(subtask) + pipeline_subtask = create_subtask_object_for_testing("pipeline") + with self.assertRaises(ValueError): + create_qafile_subtask_from_observation_subtask(pipeline_subtask) def test_create_qafile_subtask_from_observation_subtask_succeed(self): """ Test if creation of subtask qafile succeed Subtask object is None because QA file conversion is by default not enabled!!!! """ - predecessor_subtask = create_subtask_object_for_testing("observation", "defined") + predecessor_subtask = create_subtask_object_for_testing("observation") subtask = create_qafile_subtask_from_observation_subtask(predecessor_subtask) self.assertEqual(None, subtask) @@ -144,9 +140,9 @@ class SubTasksCreationFromSubTask(unittest.TestCase): Test if creation of subtask qaplots failed due to wrong state or wrong type of the predecessor subtask Correct type should be 'qa_files' (for this test of course it is not) """ - subtasks = [create_subtask_object_for_testing("pipeline", "defined"), - create_subtask_object_for_testing("observation", "defining"), - create_subtask_object_for_testing("observation", "defining") ] + subtasks = [create_subtask_object_for_testing("pipeline"), + create_subtask_object_for_testing("observation"), + create_subtask_object_for_testing("observation") ] for subtask in subtasks: with self.assertRaises(ValueError): create_qaplots_subtask_from_qafile_subtask(subtask) @@ -156,7 +152,7 @@ class SubTasksCreationFromSubTask(unittest.TestCase): Test if creation of subtask qaplots succeed Subtask object is None because QA plots is by default not enabled!!!! """ - predecessor_subtask = create_subtask_object_for_testing("qa_files", "defined") + predecessor_subtask = create_subtask_object_for_testing("qa_files") subtask = create_qaplots_subtask_from_qafile_subtask(predecessor_subtask) self.assertEqual(None, subtask) @@ -356,19 +352,19 @@ class SubtaskInputSelectionFilteringTest(unittest.TestCase): # check/test the redirect urls. with tmss_test_env.create_tmss_client() as client: # observation - subtask_observation = create_subtask_object_for_testing("observation", "defined") + subtask_observation = create_subtask_object_for_testing("observation") response = client.session.get(url=client.get_full_url_for_path('/subtask/%s/task_log' % (subtask_observation.id,)), allow_redirects=False) self.assertTrue(response.is_redirect) self.assertIn("proxy.lofar.eu", response.headers['Location']) self.assertIn("rtcp-%s.errors" % subtask_observation.id, response.headers['Location']) # pipeline - subtask_pipeline = create_subtask_object_for_testing("pipeline", "defined") + subtask_pipeline = create_subtask_object_for_testing("pipeline") response = client.session.get(url=client.get_full_url_for_path('/subtask/%s/task_log' % (subtask_pipeline.id,)), allow_redirects=False) self.assertEqual(404, response.status_code) # no log (yet) for unscheduled pipeline # other (qa_plots) - subtask_qa_plots = create_subtask_object_for_testing("qa_plots", "defined") + subtask_qa_plots = create_subtask_object_for_testing("qa_plots") self.assertEqual(404, response.status_code) # no log for other subtasktypes @@ -378,7 +374,7 @@ class SettingTest(unittest.TestCase): setting = Setting.objects.get(name='dynamic_scheduling_enabled') setting.value = False setting.save() - obs_st = create_subtask_object_for_testing('observation', 'defined') + obs_st = create_subtask_object_for_testing('observation') with self.assertRaises(SubtaskSchedulingException): schedule_observation_subtask(obs_st) @@ -491,6 +487,118 @@ class SubTaskCreationFromTaskBlueprintBeamformer(unittest.TestCase): filtered_subbands = _filter_subbands(subbands, subband_selection) self.assertEqual(filtered_subbands, [10,11,12,13]) +class SubtaskAllowedStateTransitionsTest(unittest.TestCase): + def test_successful_path(self): + subtask = models.Subtask.objects.create(**Subtask_test_data()) + for state_value in (models.SubtaskState.Choices.DEFINING.value, + models.SubtaskState.Choices.DEFINED.value, + models.SubtaskState.Choices.SCHEDULING.value, + models.SubtaskState.Choices.SCHEDULED.value, + models.SubtaskState.Choices.QUEUEING.value, + models.SubtaskState.Choices.QUEUED.value, + models.SubtaskState.Choices.STARTING.value, + models.SubtaskState.Choices.STARTED.value, + models.SubtaskState.Choices.FINISHING.value, + models.SubtaskState.Choices.FINISHED.value): + subtask.state = models.SubtaskState.objects.get(value=state_value) + # no SubtaskIllegalStateTransitionException should be raised upon save. If it is raised, then test fails. No need for asserts. + subtask.save() + + def test_helper_method_set_subtask_state_following_allowed_transitions_successful_path(self): + for state_value in (models.SubtaskState.Choices.DEFINING.value, + models.SubtaskState.Choices.DEFINED.value, + models.SubtaskState.Choices.SCHEDULING.value, + models.SubtaskState.Choices.SCHEDULED.value, + models.SubtaskState.Choices.QUEUEING.value, + models.SubtaskState.Choices.QUEUED.value, + models.SubtaskState.Choices.STARTING.value, + models.SubtaskState.Choices.STARTED.value, + models.SubtaskState.Choices.FINISHING.value, + models.SubtaskState.Choices.FINISHED.value): + # start with subtask in defining state each time + subtask = models.Subtask.objects.create(**Subtask_test_data(state=models.SubtaskState.objects.get(value=models.SubtaskState.Choices.DEFINING.value))) + self.assertEqual(models.SubtaskState.Choices.DEFINING.value, subtask.state.value) + + set_subtask_state_following_allowed_transitions(subtask, state_value) + self.assertEqual(state_value, subtask.state.value) + + def test_helper_method_set_subtask_state_following_allowed_transitions_error_path(self): + for state_value in (models.SubtaskState.Choices.DEFINING.value, + models.SubtaskState.Choices.SCHEDULING.value, + models.SubtaskState.Choices.QUEUEING.value, + models.SubtaskState.Choices.STARTING.value, + models.SubtaskState.Choices.STARTED.value, + models.SubtaskState.Choices.FINISHING.value): + # start with subtask in defining state each time + subtask = models.Subtask.objects.create(**Subtask_test_data(state=models.SubtaskState.objects.get(value=models.SubtaskState.Choices.DEFINING.value))) + self.assertEqual(models.SubtaskState.Choices.DEFINING.value, subtask.state.value) + + # then go to the requested intermediate state + set_subtask_state_following_allowed_transitions(subtask, state_value) + self.assertEqual(state_value, subtask.state.value) + + # then go to the error state (should be allowed from any of these intermediate states) + set_subtask_state_following_allowed_transitions(subtask, models.SubtaskState.Choices.ERROR.value) + self.assertEqual(models.SubtaskState.Choices.ERROR.value, subtask.state.value) + + def test_helper_method_set_subtask_state_following_allowed_transitions_cancel_path(self): + for desired_end_state_value in (models.SubtaskState.Choices.CANCELLING.value,models.SubtaskState.Choices.CANCELLED.value): + for state_value in (models.SubtaskState.Choices.DEFINED.value, + models.SubtaskState.Choices.SCHEDULED.value, + models.SubtaskState.Choices.QUEUED.value, + models.SubtaskState.Choices.STARTED.value): + # start with subtask in defining state each time + subtask = models.Subtask.objects.create(**Subtask_test_data(state=models.SubtaskState.objects.get(value=models.SubtaskState.Choices.DEFINING.value))) + self.assertEqual(models.SubtaskState.Choices.DEFINING.value, subtask.state.value) + + # then go to the requested intermediate state + set_subtask_state_following_allowed_transitions(subtask, state_value) + self.assertEqual(state_value, subtask.state.value) + + # then go to the error state (should be allowed from any of these intermediate states) + set_subtask_state_following_allowed_transitions(subtask, desired_end_state_value) + self.assertEqual(desired_end_state_value, subtask.state.value) + + def test_helper_method_set_subtask_state_following_allowed_transitions_unscheduling_path(self): + # start with subtask in defining state + subtask = models.Subtask.objects.create(**Subtask_test_data(state=models.SubtaskState.objects.get(value=models.SubtaskState.Choices.DEFINING.value))) + self.assertEqual(models.SubtaskState.Choices.DEFINING.value, subtask.state.value) + + # use helper method to follow the allowed path to 'unscheduling' + set_subtask_state_following_allowed_transitions(subtask, models.SubtaskState.Choices.UNSCHEDULING.value) + self.assertEqual(models.SubtaskState.Choices.UNSCHEDULING.value, subtask.state.value) + + # check transition path + state_log = SubtaskStateLog.objects.filter(subtask=subtask).order_by('created_at').all() + self.assertEqual(models.SubtaskState.Choices.DEFINING.value, state_log[0].new_state.value) + self.assertEqual(models.SubtaskState.Choices.DEFINED.value, state_log[1].new_state.value) + self.assertEqual(models.SubtaskState.Choices.SCHEDULING.value, state_log[2].new_state.value) + self.assertEqual(models.SubtaskState.Choices.SCHEDULED.value, state_log[3].new_state.value) + self.assertEqual(models.SubtaskState.Choices.UNSCHEDULING.value, state_log[4].new_state.value) + + + def test_illegal_state_transitions(self): + for state_value in [choice.value for choice in models.SubtaskState.Choices]: + # assume helper method set_subtask_state_following_allowed_transitions is working (see other tests above) + # use it to create subtask in desired initial state + subtask = models.Subtask.objects.create(**Subtask_test_data(state=models.SubtaskState.objects.get(value=models.SubtaskState.Choices.DEFINING.value))) + subtask = set_subtask_state_following_allowed_transitions(subtask, state_value) + self.assertEqual(state_value, subtask.state.value) + + # derive the allowed and illegal state transitions states + allowed_new_states = SubtaskAllowedStateTransitions.allowed_new_states(subtask.state) + illegal_new_states = SubtaskAllowedStateTransitions.illegal_new_states(subtask.state) + logger.info("test_illigal_state_transitions: old_state='%s' allowed_new_states=%s illegal_new_states=%s", state_value, [s.value for s in allowed_new_states], [s.value for s in illegal_new_states]) + + for illegal_new_state in illegal_new_states: + subtask.state = illegal_new_state + # check that the SubtaskIllegalStateTransitionException is raise for this illegal new state + with self.assertRaises(SubtaskIllegalStateTransitionException): + subtask.save() + + # state in database should still be the original + subtask.refresh_from_db() + self.assertEqual(state_value, subtask.state.value) if __name__ == "__main__": os.environ['TZ'] = 'UTC' diff --git a/SAS/TMSS/backend/test/t_tasks.py b/SAS/TMSS/backend/test/t_tasks.py index 88e4791390c..b2bc1768321 100755 --- a/SAS/TMSS/backend/test/t_tasks.py +++ b/SAS/TMSS/backend/test/t_tasks.py @@ -43,6 +43,7 @@ from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator rest_data_creator = TMSSRESTTestDataCreator(tmss_test_env.django_server.url, (tmss_test_env.ldap_server.dbcreds.user, tmss_test_env.ldap_server.dbcreds.password)) from lofar.sas.tmss.tmss.tmssapp.tasks import * +from lofar.sas.tmss.tmss.tmssapp.subtasks import set_subtask_state_following_allowed_transitions from lofar.sas.tmss.tmss.exceptions import SchemaValidationException @@ -240,41 +241,47 @@ class TaskBlueprintStateTest(unittest.TestCase): def test_states_with_one_subtask(self): """ - Test the taskblueprint state when only one subtasks is instantiated, an pipeline - See next table where every row represents: + Test the taskblueprint state when only one subtasks is instantiated, a pipeline + See next tables where every row represents: Substate(Pipeline), Expected TaskBlueprint State """ - test_table = [ - ("defining", "defined"), + # Loop over multiple test_tables which follow the allowed state subtask state transitions up to the three allowed end states: finished, error and cancelled. + test_tables = [[ ("defining", "defined"), ("defined", "schedulable"), ("scheduling", "schedulable"), ("scheduled", "scheduled"), - ("starting", "started"), - ("started", "started"), ("queueing", "started"), ("queued", "started"), + ("starting", "started"), + ("started", "started"), ("finishing", "started"), - ("finished", "finished"), + ("finished", "finished") + ], [ + ("defining", "defined"), + ("error", "error") + ], [ + ("defining", "defined"), + ("defined", "schedulable"), ("cancelling", "cancelled"), - ("cancelled", "cancelled"), - ("error", "error") - ] - # Create taskblueprint - task_blueprint_data = TaskBlueprint_test_data(name="Task Blueprint With One Subtask") - task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) - # Create pipeline subtask related to taskblueprint - subtask_data = Subtask_test_data(task_blueprint, state=models.SubtaskState.objects.get(value="defined"), - subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) - subtask_pipe = models.Subtask.objects.create(**subtask_data) - - # Do the actual test - for test_item in test_table: - state_pipe, expected_task_state = test_item - logger.info("Expected test result of substate pipeline='%s' should be '%s'" % (state_pipe, expected_task_state)) - subtask_pipe.state = models.SubtaskState.objects.get(value=state_pipe) - subtask_pipe.save() - self.assertEqual(expected_task_state, task_blueprint.status) + ("cancelled", "cancelled") + ]] + + for test_table in test_tables: + # Create taskblueprint + task_blueprint_data = TaskBlueprint_test_data(name="Task Blueprint With One Subtask") + task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) + # Create pipeline subtask related to taskblueprint + subtask_data = Subtask_test_data(task_blueprint, subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) + subtask_pipe = models.Subtask.objects.create(**subtask_data) + + # Do the actual test + for test_item in test_table: + state_pipe, expected_task_state = test_item + logger.info("Expected test result of substate pipeline='%s' should be '%s'" % (state_pipe, expected_task_state)) + subtask_pipe.state = models.SubtaskState.objects.get(value=state_pipe) + subtask_pipe.save() + self.assertEqual(expected_task_state, task_blueprint.status) def test_states_with_observation_and_qa_subtask(self): """ @@ -282,73 +289,90 @@ class TaskBlueprintStateTest(unittest.TestCase): See next table where every row represents: Substate(Obs), Substate(QA), Expected TaskBlueprint State """ - test_table = [ + test_tables = [[ ("defining", "defining", "defined"), ("defining", "defined", "defined"), ("defined", "defined", "schedulable"), ("scheduling", "defined", "schedulable"), ("scheduled", "defined", "scheduled"), - ("starting", "defined", "started"), - ("started", "defined", "started"), ("queueing", "defined", "started"), ("queued", "defined", "started"), + ("starting", "defined", "started"), + ("started", "defined", "started"), ("finishing", "defined", "observed"), ("finished", "defined", "observed"), - ("finished", "finished", "finished"), + ("finished", "finished", "finished") + ], [ ("cancelling", "defined", "cancelled"), - ("cancelled", "defined", "cancelled"), - ("error", "defined", "error"), + ("cancelled", "defined", "cancelled") + ] , [ + ("error", "defined", "error") + ], [ # qa finishing/finished should be not observed ("defined", "finishing", "started"), - ("defined", "finished", "started"), + ("defined", "finished", "started") + ], [ ("scheduled", "finishing", "started"), - ("scheduled", "finished", "started"), + ("scheduled", "finished", "started") + ], [ # error and cancelled/ing - ("scheduled", "error", "error"), + ("scheduled", "error", "error") + ], [ ("scheduled", "cancelling", "cancelled"), - ("scheduled", "cancelled", "cancelled"), - ("started", "error", "error"), + ("scheduled", "cancelled", "cancelled") + ], [ + ("started", "error", "error") + ], [ ("started", "cancelling", "cancelled"), - ("started", "cancelled", "cancelled"), - ("finished", "error", "error"), + ("started", "cancelled", "cancelled") + ], [ + ("finished", "error", "error") + ], [ ("finished", "cancelling", "cancelled"), - ("finished", "cancelled", "cancelled"), + ("finished", "cancelled", "cancelled") + ], [ # cancelled over error ("cancelling", "error", "cancelled"), - ("cancelled", "error", "cancelled"), - ("error", "cancelling", "cancelled"), + ("cancelled", "error", "cancelled") + ], [ ("error", "cancelling", "cancelled"), + ("error", "cancelling", "cancelled") + ], [ # qa scheduled - ("starting", "scheduled", "started"), - ("started", "scheduled", "started"), ("queueing", "scheduled", "started"), ("queued", "scheduled", "started"), + ("starting", "scheduled", "started"), + ("started", "scheduled", "started"), ("finishing", "scheduled", "observed"), - ("finished", "scheduled", "observed"), - ("cancelling", "scheduled", "cancelled"), - ("cancelled", "scheduled", "cancelled"), - ("error", "scheduled", "error"), - ] - # Create taskblueprint - task_blueprint_data = TaskBlueprint_test_data(name="Task Blueprint With Subtasks") - task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) - # Create observation and qa subtask related to taskblueprint - subtask_data = Subtask_test_data(task_blueprint, state=models.SubtaskState.objects.get(value="defined"), - subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) - subtask_obs = models.Subtask.objects.create(**subtask_data) - subtask_data = Subtask_test_data(task_blueprint, state=models.SubtaskState.objects.get(value="defined"), - subtask_template=models.SubtaskTemplate.objects.get(name='QA file conversion')) - subtask_qa = models.Subtask.objects.create(**subtask_data) - - # Do the actual test - for test_item in test_table: - state_obs, state_qa, expected_task_state = test_item - logger.info("Expected test result of substates observation='%s' and qa='%s' should be '%s'" % (state_obs, state_qa, expected_task_state)) - subtask_obs.state = models.SubtaskState.objects.get(value=state_obs) - subtask_obs.save() - subtask_qa.state = models.SubtaskState.objects.get(value=state_qa) - subtask_qa.save() - self.assertEqual(expected_task_state, task_blueprint.status) + ("finished", "scheduled", "observed") + ], [ + ("cancelling", "scheduled", "cancelled"), + ("cancelled", "scheduled", "cancelled") + ], [ + ("error", "scheduled", "error"), + ] ] + + for test_table in test_tables: + # Create taskblueprint + task_blueprint_data = TaskBlueprint_test_data(name="Task Blueprint With Subtasks") + task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) + # Create observation and qa subtask related to taskblueprint + subtask_data = Subtask_test_data(task_blueprint, subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) + subtask_obs = models.Subtask.objects.create(**subtask_data) + subtask_data = Subtask_test_data(task_blueprint, subtask_template=models.SubtaskTemplate.objects.get(name='QA file conversion')) + subtask_qa = models.Subtask.objects.create(**subtask_data) + + # Do the actual test + for test_item in test_table: + state_obs, state_qa, expected_task_state = test_item + logger.info("Expected test result of substates observation='%s' and qa='%s' should be '%s'" % (state_obs, state_qa, expected_task_state)) + set_subtask_state_following_allowed_transitions(subtask_obs, state_obs) + set_subtask_state_following_allowed_transitions(subtask_qa, state_qa) + + self.assertEqual(state_obs, subtask_obs.state.value) + self.assertEqual(state_qa, subtask_qa.state.value) + + self.assertEqual(expected_task_state, task_blueprint.status) def test_states_with_two_observation_and_two_qa_subtasks(self): """ @@ -356,47 +380,50 @@ class TaskBlueprintStateTest(unittest.TestCase): See next table where every row represents: Substate(Obs1), Substate(Obs2), Substate(QA1), Substate(QA2), Expected TaskBlueprint State """ - test_table = [ - ("finishing", "defined", "defined", "defined", "started"), - ("finished", "defined", "defined", "defined", "started"), + # Loop over multiple test_tables which follow the allowed state subtask state transitions up to the three allowed end states: finished, error and cancelled. + test_tables = [[ + ("defined", "defined", "defined", "defined", "schedulable"), + ("started", "defined", "defined", "defined", "started"), + #("finishing", "defined", "defined", "defined", "started"), TODO: check this cornercase ("finishing", "started", "defined", "defined", "started"), - ("finished", "started", "defined", "defined", "started"), ("finishing", "finishing", "defined", "defined", "observed"), ("finished", "finished", "defined", "defined", "observed"), ("finished", "finished", "scheduled", "defined", "observed"), ("finished", "finished", "finished", "scheduled", "observed"), - ("finished", "finished", "finished", "finished", "finished"), + ("finished", "finished", "finished", "finished", "finished") + ], [ ("finished", "finished", "finished", "cancelled", "cancelled"), + ], [ ("finished", "finished", "finished", "error", "error"), + ], [ ("error", "finished", "finished", "cancelled", "cancelled"), - ] - # Create taskblueprint - task_blueprint_data = TaskBlueprint_test_data(name="Task Blueprint With Subtasks") - task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) - # Create observation and qa subtasks related to taskblueprint - subtask_data = Subtask_test_data(task_blueprint, state=models.SubtaskState.objects.get(value="defined"), - subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) - subtask_obs1 = models.Subtask.objects.create(**subtask_data) - subtask_obs2 = models.Subtask.objects.create(**subtask_data) - subtask_data = Subtask_test_data(task_blueprint, state=models.SubtaskState.objects.get(value="defined"), - subtask_template=models.SubtaskTemplate.objects.get(name='QA file conversion')) - subtask_qa1 = models.Subtask.objects.create(**subtask_data) - subtask_qa2 = models.Subtask.objects.create(**subtask_data) - - # Do the actual test - for test_item in test_table: - state_obs1, state_obs2, state_qa1, state_qa2, expected_task_state = test_item - logger.info("Expected test result of substates observation='%s','%s' and qa='%s','%s' should be '%s'" % - (state_obs1, state_obs1, state_qa1, state_qa2, expected_task_state)) - subtask_obs1.state = models.SubtaskState.objects.get(value=state_obs1) - subtask_obs1.save() - subtask_obs2.state = models.SubtaskState.objects.get(value=state_obs2) - subtask_obs2.save() - subtask_qa1.state = models.SubtaskState.objects.get(value=state_qa1) - subtask_qa1.save() - subtask_qa2.state = models.SubtaskState.objects.get(value=state_qa2) - subtask_qa2.save() - self.assertEqual(expected_task_state, task_blueprint.status) + ]] + + for test_table in test_tables: + # Create taskblueprint + task_blueprint_data = TaskBlueprint_test_data(name="Task Blueprint With Subtasks") + task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) + # Create observation and qa subtasks related to taskblueprint + subtask_data = Subtask_test_data(task_blueprint, subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) + subtask_obs1 = models.Subtask.objects.create(**subtask_data) + subtask_obs2 = models.Subtask.objects.create(**subtask_data) + subtask_data = Subtask_test_data(task_blueprint, subtask_template=models.SubtaskTemplate.objects.get(name='QA file conversion')) + subtask_qa1 = models.Subtask.objects.create(**subtask_data) + subtask_qa2 = models.Subtask.objects.create(**subtask_data) + + # Do the actual test + for test_item in test_table: + state_obs1, state_obs2, state_qa1, state_qa2, expected_task_state = test_item + logger.info("Expected test result of substates observation='%s','%s' and qa='%s','%s' should be '%s'" % + (state_obs1, state_obs1, state_qa1, state_qa2, expected_task_state)) + + # Set each subtask to its desired stated, always following allowed transitions only + set_subtask_state_following_allowed_transitions(subtask_obs1, state_obs1) + set_subtask_state_following_allowed_transitions(subtask_obs2, state_obs2) + set_subtask_state_following_allowed_transitions(subtask_qa1, state_qa1) + set_subtask_state_following_allowed_transitions(subtask_qa2, state_qa2) + + self.assertEqual(expected_task_state, task_blueprint.status) if __name__ == "__main__": -- GitLab