diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index 772182382fcc07c3b3b57de4515e7fbde30dc0ef..0000000000000000000000000000000000000000 Binary files a/.DS_Store and /dev/null differ diff --git a/SAS/TMSS/src/remakemigrations.py b/SAS/TMSS/src/remakemigrations.py index 6a4ee430ffd683388eb4c0ba5523dfc4d89d4c39..a80266cbb4acbff5cc59bbe34e590a2d4d555474 100755 --- a/SAS/TMSS/src/remakemigrations.py +++ b/SAS/TMSS/src/remakemigrations.py @@ -75,6 +75,16 @@ class Migration(migrations.Migration): # Start SubTask id with 2 000 000 to avoid overlap with 'old' (test/production) OTDB operations = [ migrations.RunSQL('ALTER SEQUENCE tmssapp_SubTask_id_seq RESTART WITH 2000000;'), + migrations.RunSQL("DROP VIEW IF EXISTS tmssapp_taskblueprintsummary; " + "CREATE OR REPLACE VIEW tmssapp_taskblueprintsummary AS " + "SELECT tmssapp_taskblueprint.id AS taskblueprint_id, tmssapp_subtask.id AS subtask_id, tmssapp_subtask.state_id AS substate, tmssapp_subtasktemplate.type_id AS subtask_type" + " FROM tmssapp_subtask LEFT JOIN tmssapp_taskblueprint ON tmssapp_taskblueprint.id = tmssapp_subtask.task_blueprint_id" + " LEFT JOIN tmssapp_subtasktemplate ON tmssapp_subtasktemplate.id = tmssapp_subtask.specifications_template_id;"), + migrations.RunSQL("DROP VIEW IF EXISTS tmssapp_schedulingunitblueprintsummary; " + "CREATE OR REPLACE VIEW tmssapp_schedulingunitblueprintsummary AS " + "SELECT row_number() OVER () AS id, tmssapp_schedulingunitblueprint.id AS sub_id, tmssapp_taskblueprint.id AS taskblueprint_id, tmssapp_tasktemplate.type_id AS task_type, 'unknown' AS derived_task_status" + " FROM tmssapp_taskblueprint LEFT JOIN tmssapp_schedulingunitblueprint ON tmssapp_schedulingunitblueprint.id = tmssapp_taskblueprint.scheduling_unit_blueprint_id" + " LEFT JOIN tmssapp_tasktemplate ON tmssapp_tasktemplate.id = tmssapp_taskblueprint.specifications_template_id;"), migrations.RunPython(populate_choices), migrations.RunPython(populate_settings), migrations.RunPython(populate_misc), diff --git a/SAS/TMSS/src/tmss/tmssapp/migrations/0002_populate.py b/SAS/TMSS/src/tmss/tmssapp/migrations/0002_populate.py index 92baffd4c15a8c025d234eeffed61ae9f443fabf..023594b67ad9d5f700bb0a6976b5151bacd4fd49 100644 --- a/SAS/TMSS/src/tmss/tmssapp/migrations/0002_populate.py +++ b/SAS/TMSS/src/tmss/tmssapp/migrations/0002_populate.py @@ -16,6 +16,16 @@ class Migration(migrations.Migration): # Start SubTask id with 2 000 000 to avoid overlap with 'old' (test/production) OTDB operations = [ migrations.RunSQL('ALTER SEQUENCE tmssapp_SubTask_id_seq RESTART WITH 2000000;'), + migrations.RunSQL("DROP VIEW IF EXISTS tmssapp_taskblueprintsummary; " + "CREATE OR REPLACE VIEW tmssapp_taskblueprintsummary AS " + "SELECT tmssapp_taskblueprint.id AS taskblueprint_id, tmssapp_subtask.id AS subtask_id, tmssapp_subtask.state_id AS substate, tmssapp_subtasktemplate.type_id AS subtask_type" + " FROM tmssapp_subtask LEFT JOIN tmssapp_taskblueprint ON tmssapp_taskblueprint.id = tmssapp_subtask.task_blueprint_id" + " LEFT JOIN tmssapp_subtasktemplate ON tmssapp_subtasktemplate.id = tmssapp_subtask.specifications_template_id;"), + migrations.RunSQL("DROP VIEW IF EXISTS tmssapp_schedulingunitblueprintsummary; " + "CREATE OR REPLACE VIEW tmssapp_schedulingunitblueprintsummary AS " + "SELECT row_number() OVER () AS id, tmssapp_schedulingunitblueprint.id AS sub_id, tmssapp_taskblueprint.id AS taskblueprint_id, tmssapp_tasktemplate.type_id AS task_type, 'unknown' AS derived_task_status" + " FROM tmssapp_taskblueprint LEFT JOIN tmssapp_schedulingunitblueprint ON tmssapp_schedulingunitblueprint.id = tmssapp_taskblueprint.scheduling_unit_blueprint_id" + " LEFT JOIN tmssapp_tasktemplate ON tmssapp_tasktemplate.id = tmssapp_taskblueprint.specifications_template_id;"), migrations.RunPython(populate_choices), migrations.RunPython(populate_settings), migrations.RunPython(populate_misc), diff --git a/SAS/TMSS/src/tmss/tmssapp/models/specification.py b/SAS/TMSS/src/tmss/tmssapp/models/specification.py index 1a807b5fad0e2283fc811343aa49826f2e3cb2e6..f6665e3da24db201901f135fc5708efe6e8f0caa 100644 --- a/SAS/TMSS/src/tmss/tmssapp/models/specification.py +++ b/SAS/TMSS/src/tmss/tmssapp/models/specification.py @@ -18,6 +18,8 @@ import datetime import json import jsonschema from django.urls import reverse as revese_url +from collections import Counter + # # Common @@ -368,11 +370,40 @@ class DefaultTaskTemplate(BasicCommon): class TaskRelationSelectionTemplate(Template): pass + class DefaultTaskRelationSelectionTemplate(BasicCommon): name = CharField(max_length=128, unique=True) template = ForeignKey("TaskRelationSelectionTemplate", on_delete=PROTECT) +# +# DatabaseView objects +# +class TaskBlueprintSummary(Model): + taskblueprint_id = IntegerField() + subtask_id = IntegerField() + substate = CharField(max_length=128) + subtask_type = CharField(max_length=128) + + class Meta: + managed = False + db_table = 'tmssapp_taskblueprintsummary' + + +class SchedulingUnitBlueprintSummary(Model): + # Using in an id and ForeignKey is not common for a view BUT the id is a 'dummy' to be able to use in Django + # https://resources.rescale.com/using-database-views-in-django-orm/ + # otherwise an exception will be thrown + id = IntegerField(primary_key=True) + sub_id = IntegerField() + taskblueprint_id = IntegerField() + task_type = CharField(max_length=128) + derived_task_status = CharField(max_length=128) + + class Meta: + managed = False + db_table = 'tmssapp_schedulingunitblueprintsummary' + # # Instance Objects # @@ -615,6 +646,106 @@ class SchedulingUnitBlueprint(NamedCommon): else: return None + @property + def status(self): + """ + Return the schedulingunit blueprint status which is derived from the taskblueprint status (which is derived + from the subtasks states) + See https://support.astron.nl/confluence/display/TMSS/Specification+Flow#SpecificationFlow-TaskBlueprints + The status is displayed as extra field in rest api of the schedulingunit + """ + logger.debug("Status of SUB with id %d" % self.id) + logger.debug("total_nbr_observation_tasks=%d, total_nbr_processing_tasks=%d, total_nbr_ingest_tasks=%d" + % (self._get_total_nbr_observation_tasks(), self._get_total_nbr_processing_tasks(), self._get_total_nbr_observation_tasks())) + + # Get the the taskblueprint_ids per task_type + taskblueprints_per_type_dict = {"observation": [], "pipeline": [], "ingest": []} + for task_type in taskblueprints_per_type_dict: + queryset = SchedulingUnitBlueprintSummary.objects.filter(sub_id=self.id, task_type=task_type) + taskblueprints_per_type_dict[task_type].extend([item.taskblueprint_id for item in queryset]) + + # Determine status per task_type (unfortunately did not manage with updatable view) + status_overview_counter = Counter() + status_overview_counter_per_type = {"observation": Counter(), "pipeline": Counter(), "ingest": Counter() } + for tb in TaskBlueprint.objects.filter(scheduling_unit_blueprint_id=self.id): + status_overview_counter[(tb.status)]+=1 + for task_type in taskblueprints_per_type_dict: + if tb.id in taskblueprints_per_type_dict[task_type]: + status_overview_counter_per_type[task_type][(tb.status)] += 1 + + # The actual determination of the SchedulingunitBlueprint status + if not self._task_graph_instantiated(): + status = "defined" + elif self._all_task_finished(status_overview_counter): + status = "finished" + elif self._any_task_cancelled(status_overview_counter): + status = "cancelled" + elif self._any_task_error(status_overview_counter): + status = "error" + elif self._any_task_started_observed_finished(status_overview_counter): + if not self._all_observation_task_observed_finished(status_overview_counter_per_type): + status = "observing" + elif not self._any_processing_task_started_or_finished(status_overview_counter_per_type): + status = "observed" + elif not self._all_processing_tasks_and_observation_tasks_finished(status_overview_counter_per_type): + status = "processing" + elif not self._any_ingest_task_started(status_overview_counter_per_type): + status = "processed" + else: + status = "ingesting" + elif self._any_task_scheduled(status_overview_counter): + status = "scheduled" + else: + status = "schedulable" + return status + + def _task_graph_instantiated(self): + return self._get_total_nbr_tasks() > 0 + + def _all_task_finished(self, status_overview_counter): + return status_overview_counter["finished"] == self._get_total_nbr_tasks() + + def _any_task_cancelled(self, status_overview_counter): + return status_overview_counter["cancelled"] > 0 + + def _any_task_error(self, status_overview_counter): + return status_overview_counter["error"] > 0 + + def _any_task_started_observed_finished(self, status_overview_counter): + return (status_overview_counter["started"] + status_overview_counter["observed"] + status_overview_counter["finished"]) > 0 + + def _any_task_scheduled(self, status_overview_counter): + return status_overview_counter["scheduled"] > 0 + + def _all_observation_task_observed_finished(self, status_overview_counter_per_type): + total_nbr_observation_tasks = self._get_total_nbr_observation_tasks() + return (status_overview_counter_per_type["observation"]["observed"] + + status_overview_counter_per_type["observation"]["finished"]) == total_nbr_observation_tasks + + def _any_processing_task_started_or_finished(self, status_overview_counter_per_type): + return status_overview_counter_per_type["pipeline"]["started"] + status_overview_counter_per_type["pipeline"]["finished"] > 0 + + def _all_processing_tasks_and_observation_tasks_finished(self, status_overview_counter_per_type): + total_nbr_observation_tasks = self._get_total_nbr_observation_tasks() + total_nbr_processing_tasks = self._get_total_nbr_processing_tasks() + return (status_overview_counter_per_type["pipeline"]["finished"] == total_nbr_processing_tasks and + status_overview_counter_per_type["observation"]["finished"] == total_nbr_observation_tasks) + + def _any_ingest_task_started(self, status_overview_counter_per_type): + return status_overview_counter_per_type["ingest"]["started"] > 0 + + def _get_total_nbr_tasks(self): + return self.task_blueprints.all().count() + + def _get_total_nbr_observation_tasks(self): + return SchedulingUnitBlueprintSummary.objects.filter(sub_id=self.id, task_type='observation').count() + + def _get_total_nbr_processing_tasks(self): + return SchedulingUnitBlueprintSummary.objects.filter(sub_id=self.id, task_type='pipeline').count() + + def _get_total_nbr_ingest_tasks(self): + return SchedulingUnitBlueprintSummary.objects.filter(sub_id=self.id, task_type='ingest').count() + class TaskDraft(NamedCommon): specifications_doc = JSONField(help_text='Specifications for this task.') @@ -823,6 +954,58 @@ class TaskBlueprint(NamedCommon): else: return None + @property + def status(self): + """ + Return the taskblueprint status which is derived from the subtasks status + See https://support.astron.nl/confluence/display/TMSS/Specification+Flow#SpecificationFlow-TaskBlueprints + The status is displayed as extra field in rest api of the taskblueprint + """ + if self._subtask_graph_not_instantiated(): + status = "defined" + elif self._all_subtask_finished(): + status = "finished" + elif self._any_subtask_cancelled(): + status = "cancelled" + elif self._any_subtask_error(): + status = "error" + elif self._all_observation_subtasks_finishing_finished(): + status = "observed" + elif self._any_subtask_between_started_finished(): + status = "started" + elif self._any_subtask_scheduled(): + status = "scheduled" + else: + status = "schedulable" + return status + + def _subtask_graph_not_instantiated(self): + total_nbr_subtasks = self.subtasks.all().count() + return (total_nbr_subtasks == 0 or + TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id, substate='defining').count() > 0) + + def _all_subtask_finished(self): + total_nbr_subtasks = self.subtasks.all().count() + return (TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id, substate='finished').count() == total_nbr_subtasks) + + def _any_subtask_cancelled(self): + return (TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id, substate__in=('cancelling', 'cancelled')).count() > 0) + + def _any_subtask_error(self): + return (TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id, substate='error').count() > 0) + + def _all_observation_subtasks_finishing_finished(self): + total_nbr_observation_subtasks = TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id, + subtask_type='observation').count() + return (TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id, substate__in=('finishing','finished'), subtask_type='observation').count() + == total_nbr_observation_subtasks and total_nbr_observation_subtasks > 0) + + def _any_subtask_between_started_finished(self): + return (TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id, substate__in=('starting','started','queueing','queued','finishing','finished')).count() > 0) + + def _any_subtask_scheduled(self): + return (TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id, substate='scheduled').count() > 0) + class TaskRelationDraft(BasicCommon): selection_doc = JSONField(help_text='Filter for selecting dataproducts from the output role.') diff --git a/SAS/TMSS/src/tmss/tmssapp/schemas/task_template-calibrator_observation-1.json b/SAS/TMSS/src/tmss/tmssapp/schemas/task_template-calibrator_observation-1.json index 0e32bb1da081fbee61a559f8a07364787282bdb7..cd606bf6794ef166c491a80cff583e0838d8d788 100644 --- a/SAS/TMSS/src/tmss/tmssapp/schemas/task_template-calibrator_observation-1.json +++ b/SAS/TMSS/src/tmss/tmssapp/schemas/task_template-calibrator_observation-1.json @@ -7,6 +7,7 @@ "type": "object", "properties": { "duration": { + "$id": "#duration", "type": "number", "title": "Duration (seconds)", "description": "Duration of this observation", diff --git a/SAS/TMSS/src/tmss/tmssapp/schemas/task_template-target_observation-1.json b/SAS/TMSS/src/tmss/tmssapp/schemas/task_template-target_observation-1.json index e987d0d2fc1d0628be24ef009833f712601cf05f..f6e92bc010d32d7e20cc879dfe7576fd18dfdb71 100644 --- a/SAS/TMSS/src/tmss/tmssapp/schemas/task_template-target_observation-1.json +++ b/SAS/TMSS/src/tmss/tmssapp/schemas/task_template-target_observation-1.json @@ -46,6 +46,7 @@ "default": "" }, "digital_pointing": { + "$id": "#target_pointing", "title": "Digital pointing", "default": {}, "$ref": "http://tmss.lofar.org/api/schemas/commonschematemplate/pointing/1/#/definitions/pointing" @@ -70,6 +71,7 @@ } }, "duration": { + "$id": "#duration", "type": "number", "title": "Duration (seconds)", "description": "Duration of this observation", diff --git a/SAS/TMSS/src/tmss/tmssapp/serializers/specification.py b/SAS/TMSS/src/tmss/tmssapp/serializers/specification.py index 6f31450473362b76bd9ee1cb74258474a3f0a58d..3936550569ec776d7138c793660d446d266223cc 100644 --- a/SAS/TMSS/src/tmss/tmssapp/serializers/specification.py +++ b/SAS/TMSS/src/tmss/tmssapp/serializers/specification.py @@ -313,7 +313,7 @@ class SchedulingUnitBlueprintSerializer(RelationalHyperlinkedModelSerializer): class Meta: model = models.SchedulingUnitBlueprint fields = '__all__' - extra_fields = ['task_blueprints', 'duration', 'start_time', 'stop_time'] + extra_fields = ['task_blueprints', 'duration', 'start_time', 'stop_time', 'status'] class SchedulingUnitBlueprintCopyToSchedulingUnitDraftSerializer(SchedulingUnitBlueprintSerializer): class Meta(SchedulingUnitDraftSerializer.Meta): @@ -346,7 +346,8 @@ class TaskBlueprintSerializer(RelationalHyperlinkedModelSerializer): class Meta: model = models.TaskBlueprint fields = '__all__' - extra_fields = ['subtasks', 'produced_by', 'consumed_by', 'first_to_connect', 'second_to_connect', 'duration', 'start_time', 'stop_time', 'relative_start_time', 'relative_stop_time'] + extra_fields = ['subtasks', 'produced_by', 'consumed_by', 'first_to_connect', 'second_to_connect', 'duration', + 'start_time', 'stop_time', 'relative_start_time', 'relative_stop_time', 'status'] class TaskRelationDraftSerializer(RelationalHyperlinkedModelSerializer): diff --git a/SAS/TMSS/test/t_subtasks.py b/SAS/TMSS/test/t_subtasks.py index 17210063f2e24e31a19a3a1f05edee0375c409d7..ca94d9eb33be0377cbbd5aa1d3a3cb3623be411f 100755 --- a/SAS/TMSS/test/t_subtasks.py +++ b/SAS/TMSS/test/t_subtasks.py @@ -115,6 +115,8 @@ def create_scheduling_relation_task_blueprint_for_testing(first_task_blueprint, placement=models.SchedulingRelationPlacement.objects.get(value='before'), time_offset=60) return task_scheduling_rel_obj + + class SubTasksCreationFromSubTask(unittest.TestCase): def test_create_qafile_subtask_from_observation_subtask_failed(self): diff --git a/SAS/TMSS/test/t_tasks.py b/SAS/TMSS/test/t_tasks.py index ae878f68ad6712aab49ab8d974d4aa8a1416712f..a5a099bbdb7e13c8efd15d40df6941e166c243ee 100755 --- a/SAS/TMSS/test/t_tasks.py +++ b/SAS/TMSS/test/t_tasks.py @@ -255,6 +255,386 @@ class CreationFromTaskDraft(unittest.TestCase): self.assertEqual(0, task_blueprint.subtasks.count()) +class TaskBlueprintStateTest(unittest.TestCase): + """ + Test the Task Blueprint State which is derived from the SubTask states. + The result of each possible combination of these states will be checked + See https://support.astron.nl/confluence/display/TMSS/Specification+Flow#SpecificationFlow-TaskBlueprints + """ + + def test_state_with_no_subtasks(self): + """ + Test the taskblueprint state when subtasks are not instantiated. + the expected state should be 'defined' + """ + task_blueprint_data = TaskBlueprint_test_data(name="Task Blueprint No Subtasks") + task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) + self.assertEqual("defined", task_blueprint.status) + + def test_states_with_one_subtask(self): + """ + Test the taskblueprint state when only one subtasks is instantiated, an pipeline + See next table where every row represents: + Substate(Pipeline), Expected TaskBlueprint State + """ + test_table = [ + ("defining", "defined"), + ("defining", "defined"), + ("defined", "schedulable"), + ("scheduling", "schedulable"), + ("scheduled", "scheduled"), + ("starting", "started"), + ("started", "started"), + ("queueing", "started"), + ("queued", "started"), + ("finishing", "started"), + ("finished", "finished"), + ("cancelling", "cancelled"), + ("cancelled", "cancelled"), + ("error", "error") + ] + # Create taskblueprint + task_blueprint_data = TaskBlueprint_test_data(name="Task Blueprint With One Subtask") + task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) + # Create pipeline subtask related to taskblueprint + subtask_data = Subtask_test_data(task_blueprint, state=models.SubtaskState.objects.get(value="defined"), + subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) + subtask_pipe = models.Subtask.objects.create(**subtask_data) + + # Do the actual test + for test_item in test_table: + state_pipe, expected_task_state = test_item + logger.info("Expected test result of substate pipeline='%s' should be '%s'" % (state_pipe, expected_task_state)) + subtask_pipe.state = models.SubtaskState.objects.get(value=state_pipe) + subtask_pipe.save() + self.assertEqual(expected_task_state, task_blueprint.status) + + def test_states_with_observation_and_qa_subtask(self): + """ + Test the taskblueprint state when two subtasks are instantiated, an observation and a QA. + See next table where every row represents: + Substate(Obs), Substate(QA), Expected TaskBlueprint State + """ + test_table = [ + ("defining", "defining", "defined"), + ("defining", "defined", "defined"), + ("defined", "defined", "schedulable"), + ("scheduling", "defined", "schedulable"), + ("scheduled", "defined", "scheduled"), + ("starting", "defined", "started"), + ("started", "defined", "started"), + ("queueing", "defined", "started"), + ("queued", "defined", "started"), + ("finishing", "defined", "observed"), + ("finished", "defined", "observed"), + ("finished", "finished", "finished"), + ("cancelling", "defined", "cancelled"), + ("cancelled", "defined", "cancelled"), + ("error", "defined", "error"), + # qa finishing/finished should be not observed + ("defined", "finishing", "started"), + ("defined", "finished", "started"), + ("scheduled", "finishing", "started"), + ("scheduled", "finished", "started"), + # error and cancelled/ing + ("scheduled", "error", "error"), + ("scheduled", "cancelling", "cancelled"), + ("scheduled", "cancelled", "cancelled"), + ("started", "error", "error"), + ("started", "cancelling", "cancelled"), + ("started", "cancelled", "cancelled"), + ("finished", "error", "error"), + ("finished", "cancelling", "cancelled"), + ("finished", "cancelled", "cancelled"), + # cancelled over error + ("cancelling", "error", "cancelled"), + ("cancelled", "error", "cancelled"), + ("error", "cancelling", "cancelled"), + ("error", "cancelling", "cancelled"), + # qa scheduled + ("starting", "scheduled", "started"), + ("started", "scheduled", "started"), + ("queueing", "scheduled", "started"), + ("queued", "scheduled", "started"), + ("finishing", "scheduled", "observed"), + ("finished", "scheduled", "observed"), + ("cancelling", "scheduled", "cancelled"), + ("cancelled", "scheduled", "cancelled"), + ("error", "scheduled", "error"), + ] + # Create taskblueprint + task_blueprint_data = TaskBlueprint_test_data(name="Task Blueprint With Subtasks") + task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) + # Create observation and qa subtask related to taskblueprint + subtask_data = Subtask_test_data(task_blueprint, state=models.SubtaskState.objects.get(value="defined"), + subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) + subtask_obs = models.Subtask.objects.create(**subtask_data) + subtask_data = Subtask_test_data(task_blueprint, state=models.SubtaskState.objects.get(value="defined"), + subtask_template=models.SubtaskTemplate.objects.get(name='QA file conversion')) + subtask_qa = models.Subtask.objects.create(**subtask_data) + + # Do the actual test + for test_item in test_table: + state_obs, state_qa, expected_task_state = test_item + logger.info("Expected test result of substates observation='%s' and qa='%s' should be '%s'" % (state_obs, state_qa, expected_task_state)) + subtask_obs.state = models.SubtaskState.objects.get(value=state_obs) + subtask_obs.save() + subtask_qa.state = models.SubtaskState.objects.get(value=state_qa) + subtask_qa.save() + self.assertEqual(expected_task_state, task_blueprint.status) + + def test_states_with_two_observation_and_two_qa_subtasks(self): + """ + Test the taskblueprint state when four subtasks are instantiated, two observation and two QA. + See next table where every row represents: + Substate(Obs1), Substate(Obs2), Substate(QA1), Substate(QA2), Expected TaskBlueprint State + """ + test_table = [ + ("finishing", "defined", "defined", "defined", "started"), + ("finished", "defined", "defined", "defined", "started"), + ("finishing", "started", "defined", "defined", "started"), + ("finished", "started", "defined", "defined", "started"), + ("finishing", "finishing", "defined", "defined", "observed"), + ("finished", "finished", "defined", "defined", "observed"), + ("finished", "finished", "scheduled", "defined", "observed"), + ("finished", "finished", "finished", "scheduled", "observed"), + ("finished", "finished", "finished", "finished", "finished"), + ("finished", "finished", "finished", "cancelled", "cancelled"), + ("finished", "finished", "finished", "error", "error"), + ("error", "finished", "finished", "cancelled", "cancelled"), + ] + # Create taskblueprint + task_blueprint_data = TaskBlueprint_test_data(name="Task Blueprint With Subtasks") + task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data) + # Create observation and qa subtasks related to taskblueprint + subtask_data = Subtask_test_data(task_blueprint, state=models.SubtaskState.objects.get(value="defined"), + subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) + subtask_obs1 = models.Subtask.objects.create(**subtask_data) + subtask_obs2 = models.Subtask.objects.create(**subtask_data) + subtask_data = Subtask_test_data(task_blueprint, state=models.SubtaskState.objects.get(value="defined"), + subtask_template=models.SubtaskTemplate.objects.get(name='QA file conversion')) + subtask_qa1 = models.Subtask.objects.create(**subtask_data) + subtask_qa2 = models.Subtask.objects.create(**subtask_data) + + # Do the actual test + for test_item in test_table: + state_obs1, state_obs2, state_qa1, state_qa2, expected_task_state = test_item + logger.info("Expected test result of substates observation='%s','%s' and qa='%s','%s' should be '%s'" % + (state_obs1, state_obs1, state_qa1, state_qa2, expected_task_state)) + subtask_obs1.state = models.SubtaskState.objects.get(value=state_obs1) + subtask_obs1.save() + subtask_obs2.state = models.SubtaskState.objects.get(value=state_obs2) + subtask_obs2.save() + subtask_qa1.state = models.SubtaskState.objects.get(value=state_qa1) + subtask_qa1.save() + subtask_qa2.state = models.SubtaskState.objects.get(value=state_qa2) + subtask_qa2.save() + self.assertEqual(expected_task_state, task_blueprint.status) + + +class SchedulingUnitBlueprintStateTest(unittest.TestCase): + """ + Test the Scheduling Blueprint State which is derived from the TaskBlueprint states. + The result of each possible combination of these states will be checked + See https://support.astron.nl/confluence/display/TMSS/Specification+Flow#SpecificationFlow-SchedulingBlueprints + """ + + def create_tasks_and_subtasks(self, schedulingunit_blueprint, skip_create_subtask=[]): + """ + Create three taskblueprint related to the schedulingunit_blueprint. + These task are an observation, a pipeline and a ingest task. + Also per task one subtask is instantiated (so makes three total) which is required to be able to set + the task status which is a read-only property and is derived from the subtask states + :param schedulingunit_blueprint: + :return: dictionary with task and subtask objects + """ + # Create observation task + task_data = TaskBlueprint_test_data(name="Task Observation", scheduling_unit_blueprint=schedulingunit_blueprint) + task_obs = models.TaskBlueprint.objects.create(**task_data) + subtask_data = Subtask_test_data(task_obs, state=models.SubtaskState.objects.get(value="defined"), + subtask_template=models.SubtaskTemplate.objects.get(name='observation control')) + if "observation" in skip_create_subtask: + subtask_obs = None + else: + subtask_obs = models.Subtask.objects.create(**subtask_data) + + # Create pipeline task + task_data = TaskBlueprint_test_data(name="Task Pipeline", scheduling_unit_blueprint=schedulingunit_blueprint) + task_pipe = models.TaskBlueprint.objects.create(**task_data) + # Need to change the default template type (observation) to pipeline + task_pipe.specifications_template = models.TaskTemplate.objects.get(type=models.TaskType.Choices.PIPELINE.value) + task_pipe.save() + subtask_data = Subtask_test_data(task_pipe, + state=models.SubtaskState.objects.get(value="defined"), + subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) + if "pipeline" in skip_create_subtask: + subtask_pipe = None + else: + subtask_pipe = models.Subtask.objects.create(**subtask_data) + + # Create ingest task + # Because there is no taskTemplate object for ingest by default I have to create one + test_data = TaskTemplate_test_data(name="task_template_for_ingest", task_type_value="ingest") + my_test_template = models.TaskTemplate.objects.create(**test_data) + task_data = TaskBlueprint_test_data(name="Task Ingest", scheduling_unit_blueprint=schedulingunit_blueprint) + task_ingest = models.TaskBlueprint.objects.create(**task_data) + task_ingest.specifications_template = my_test_template + task_ingest.save() + # There is no template defined for ingest yet ...but I can use pipeline control, only the template type matters + # ....should become other thing in future but for this test does not matter + subtask_data = Subtask_test_data(task_ingest, + state=models.SubtaskState.objects.get(value="defined"), + subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control')) + if "ingest" in skip_create_subtask: + subtask_ingest = None + else: + subtask_ingest = models.Subtask.objects.create(**subtask_data) + + return {"observation": {"task": task_obs, "subtask": subtask_obs}, + "pipeline": {"task": task_pipe, "subtask": subtask_pipe}, + "ingest": {"task": task_ingest, "subtask": subtask_ingest}} + + def set_task_state(self, task_state, task_type, task, subtask): + """ + Set the taskblueprint state for given task_type + State of task can only be set by setting the subtask state + Do not set subtask state if subtask is None + :param task_state: Task state to be set + :param task_type: observation, pipeline or ingest + :param task: TaskBlueprint object + :param subtask: SubTask object + """ + # Translate task state to subtask state, mostly one-o-one but two exceptions + if task_state == "observed": + subtask_state = "finishing" + elif task_state == "schedulable": + subtask_state = "scheduling" + else: + subtask_state = task_state + + if subtask is not None: + subtask.state = models.SubtaskState.objects.get(value=subtask_state) + subtask.save() + # Check task.status as precondition + self.assertEqual(task_state, task.status, + "PRECONDITION DOES NOT MET. Expect %s task to be equal to %s (but is %s)" % ( + task_type, task_state, task.status)) + + def test_state_with_no_tasks(self): + """ + Test the schedulingunitblueprint state when tasks are not instantiated. + the expected state should be 'defined' + """ + schedulingunit_data = SchedulingUnitBlueprint_test_data(name="Scheduling Blueprint No Tasks") + schedulingunit_blueprint = models.SchedulingUnitBlueprint.objects.create(**schedulingunit_data) + self.assertEqual("defined", schedulingunit_blueprint.status) + + def test_states_with_observation_pipeline_ingest_tasks_subtasks(self): + """ + Test the schedulingunitblueprint state when only one task is instantiated, an pipeline + Subtask are also instantiated so minimal task state is schedulable ! + See next table where every row represents: + Taskstate(obs), Taskstate(pipeline), Taskstate(ingest), Expected SchedulingUnitBlueprint Status + """ + test_table = [ + # normal behaviour + ("error", "schedulable", "schedulable", "error"), + ("cancelled", "schedulable", "schedulable", "cancelled"), + ("schedulable", "schedulable", "schedulable", "schedulable"), + ("scheduled", "schedulable", "schedulable", "scheduled"), + ("started", "schedulable", "schedulable", "observing"), + ("observed", "schedulable", "schedulable", "observed"), + ("observed", "scheduled", "schedulable", "observed"), + ("observed", "started", "schedulable", "processing"), + ("observed", "finished", "schedulable", "processing"), + ("observed", "finished", "scheduled", "processing"), + ("observed", "finished", "started", "processing"), + ("observed", "finished", "finished", "processing"), + ("finished", "schedulable", "schedulable", "observed"), + ("finished", "scheduled", "schedulable", "observed"), + ("finished", "started", "schedulable", "processing"), + ("finished", "finished", "schedulable", "processed"), + ("finished", "finished", "scheduled", "processed"), + ("finished", "finished", "started", "ingesting"), + ("finished", "finished", "finished", "finished"), + # any cancelled + ("observed", "cancelled", "schedulable", "cancelled"), + ("observed", "schedulable", "cancelled", "cancelled"), + ("observed", "scheduled", "cancelled", "cancelled"), + ("observed", "started", "cancelled", "cancelled"), + ("observed", "cancelled", "schedulable", "cancelled"), + ("observed", "cancelled", "scheduled", "cancelled"), + ("observed", "cancelled", "started", "cancelled"), + ("observed", "cancelled", "finished", "cancelled"), + ("finished", "cancelled", "schedulable", "cancelled"), + # any error + ("observed", "error", "schedulable", "error"), + ("observed", "schedulable", "error", "error"), + ("observed", "scheduled", "error", "error"), + ("observed", "started", "error", "error"), + ("observed", "error", "schedulable", "error"), + ("observed", "error", "scheduled", "error"), + ("observed", "error", "started", "error"), + ("observed", "error", "finished", "error"), + # cancelled over error + ("error", "error", "cancelled", "cancelled") + ] + # Create schedulingblueprint + schedulingunit_data = SchedulingUnitBlueprint_test_data(name="Task Blueprint With Three Tasks") + schedulingunit_blueprint = models.SchedulingUnitBlueprint.objects.create(**schedulingunit_data) + # Create related task and subtasks + tasks_and_subtasks_dict = self.create_tasks_and_subtasks(schedulingunit_blueprint) + # Do the actual test + task_state_dict = {} + for test_item in test_table: + task_state_dict["observation"], task_state_dict["pipeline"], task_state_dict["ingest"], expected_schedulingunit_status = test_item + info_msg = "Test with with states observation='%s',pipeline='%s',ingest='%s' should result in schedulingunit_blueprint.status '%s'" \ + % (task_state_dict["observation"], task_state_dict["pipeline"], task_state_dict["ingest"], expected_schedulingunit_status) + logger.info(info_msg) + for key in tasks_and_subtasks_dict: + self.set_task_state(task_state_dict[key], key, tasks_and_subtasks_dict[key]["task"], tasks_and_subtasks_dict[key]["subtask"]) + # Check result + self.assertEqual(expected_schedulingunit_status, schedulingunit_blueprint.status, info_msg) + + def test_states_with_observation_pipeline_ingest_tasks_no_ingest_subtask(self): + """ + Test the schedulingunitblueprint state when the tasks, observation, pipeline and ingest are instantiated + Subtask of ingest is missing, which makes implicit the task state defined! + See next table where every row represents: + Taskstate(obs), Taskstate(pipeline), Taskstate(ingest), Expected SchedulingUnitBlueprint Status + """ + test_table = [ + # normal behaviour + ("error", "schedulable", "defined", "error"), + ("cancelled", "schedulable", "defined", "cancelled"), + ("schedulable", "schedulable", "defined", "schedulable"), + ("scheduled", "schedulable", "defined", "scheduled"), + ("started", "schedulable", "defined", "observing"), + ("observed", "schedulable", "defined", "observed"), + ("observed", "scheduled", "defined", "observed"), + ("observed", "started", "defined", "processing"), + ("observed", "finished", "defined", "processing"), + ("finished", "schedulable", "defined", "observed"), + ] + # Create schedulingblueprint + schedulingunit_data = SchedulingUnitBlueprint_test_data(name="Task Blueprint With Three Tasks No Ingest Subtask") + schedulingunit_blueprint = models.SchedulingUnitBlueprint.objects.create(**schedulingunit_data) + # Create related task and subtasks (skip creation of ingest subtask) + tasks_and_subtasks_dict = self.create_tasks_and_subtasks(schedulingunit_blueprint, ["ingest"]) + # Do the actual test + task_state_dict = {} + for test_item in test_table: + task_state_dict["observation"], task_state_dict["pipeline"], task_state_dict["ingest"], expected_schedulingunit_status = test_item + info_msg = "Test with with states observation='%s',pipeline='%s',ingest='%s' should result in schedulingunit_blueprint.status '%s'" \ + % (task_state_dict["observation"], task_state_dict["pipeline"], task_state_dict["ingest"], expected_schedulingunit_status) + logger.info(info_msg) + for key in tasks_and_subtasks_dict: + self.set_task_state(task_state_dict[key], key, tasks_and_subtasks_dict[key]["task"], tasks_and_subtasks_dict[key]["subtask"]) + # Check result + self.assertEqual(expected_schedulingunit_status, schedulingunit_blueprint.status, info_msg) + + + if __name__ == "__main__": os.environ['TZ'] = 'UTC' unittest.main() diff --git a/SAS/TMSS/test/tmss_test_data_django_models.py b/SAS/TMSS/test/tmss_test_data_django_models.py index 4aed6d9eebc53f9d76c5f24f6270d69e518d906f..13fc9c26cb698e3bbb65b2f4152d276c20cb3b55 100644 --- a/SAS/TMSS/test/tmss_test_data_django_models.py +++ b/SAS/TMSS/test/tmss_test_data_django_models.py @@ -84,11 +84,14 @@ def SchedulingUnitObservingStrategyTemplate_test_data(name="my_SchedulingUnitObs "scheduling_unit_template": scheduling_unit_template, "tags": ["TMSS", "TESTING"]} -def TaskTemplate_test_data(name="my TaskTemplate", description:str=None, schema:dict=None) -> dict: +def TaskTemplate_test_data(name="my TaskTemplate", description:str=None, schema:dict=None, task_type_value:str=None) -> dict: if schema is None: schema = minimal_json_schema(properties={"mykey": {}}) - return {"type": models.TaskType.objects.get(value='observation'), + if task_type_value is None: + task_type_value = 'observation' + + return {"type": models.TaskType.objects.get(value=task_type_value), "validation_code_js":"", "name": name, "description": description or "<no description>",