diff --git a/SAS/TMSS/src/tmss/tmssapp/schemas/subtask_template-ingest-1.json b/SAS/TMSS/src/tmss/tmssapp/schemas/subtask_template-ingest-1.json new file mode 100644 index 0000000000000000000000000000000000000000..d2727dbeaa138caa4e953d5d2caf11ac8a0554bc --- /dev/null +++ b/SAS/TMSS/src/tmss/tmssapp/schemas/subtask_template-ingest-1.json @@ -0,0 +1,12 @@ +{ + "$id":"http://tmss.lofar.org/api/schemas/subtasktemplate/ingest control/1#", + "$schema": "http://json-schema.org/draft-06/schema#", + "title":"ingest control", + "description":"This schema defines the parameters to setup and control an ingest subtask.", + "version":1, + "type": "object", + "properties": { + }, + "required": [ + ] +} diff --git a/SAS/TMSS/src/tmss/tmssapp/schemas/task_template-ingest-1.json b/SAS/TMSS/src/tmss/tmssapp/schemas/task_template-ingest-1.json new file mode 100644 index 0000000000000000000000000000000000000000..9877e438a728ce036b8619b6d849106863b93a9d --- /dev/null +++ b/SAS/TMSS/src/tmss/tmssapp/schemas/task_template-ingest-1.json @@ -0,0 +1,12 @@ +{ + "$id": "http://tmss.lofar.org/api/schemas/tasktemplate/ingest/1#", + "$schema": "http://json-schema.org/draft-06/schema#", + "title": "ingest", + "description": "This schema defines the parameters to setup an ingest task.", + "version": 1, + "type": "object", + "properties": { + }, + "required": [ + ] +} diff --git a/SAS/TMSS/src/tmss/tmssapp/schemas/templates.json b/SAS/TMSS/src/tmss/tmssapp/schemas/templates.json index b181f8ea1edb710d50264007309a63775962316a..27f52ee1913374218f741e80cd33a3ac96a84e06 100644 --- a/SAS/TMSS/src/tmss/tmssapp/schemas/templates.json +++ b/SAS/TMSS/src/tmss/tmssapp/schemas/templates.json @@ -112,5 +112,15 @@ { "file_name": "sap_template-1.json", "template": "sap_template" + }, + { + "file_name": "subtask_template-ingest-1.json", + "template": "subtask_template", + "type": "copy" + }, + { + "file_name": "task_template-ingest-1.json", + "template": "task_template", + "type": "ingest" } ] \ No newline at end of file diff --git a/SAS/TMSS/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/src/tmss/tmssapp/subtasks.py index febb5de9b585ed8202bb2853b61aa5374559689a..d200d964073c2f1786dba6e8dadb53bfdc2be3e2 100644 --- a/SAS/TMSS/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/src/tmss/tmssapp/subtasks.py @@ -49,7 +49,8 @@ def create_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subta generators_mapping = {'target observation': [create_observation_control_subtask_from_task_blueprint, create_qafile_subtask_from_task_blueprint, create_qaplots_subtask_from_task_blueprint], - 'preprocessing pipeline': [create_preprocessing_subtask_from_task_blueprint]} + 'preprocessing pipeline': [create_preprocessing_subtask_from_task_blueprint], + 'ingest': [create_ingest_subtask_from_task_blueprint]} generators_mapping['calibrator observation'] = generators_mapping['target observation'] template_name = task_blueprint.specifications_template.name @@ -450,6 +451,50 @@ def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBluepri return subtask +def create_ingest_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask: + ''' Create a subtask to for an ingest job + This method implements "Instantiate subtasks" step from the "Specification Flow" + https://support.astron.nl/confluence/display/TMSS/Specification+Flow + ''' + # step 0: check pre-requisites + check_prerequities_for_subtask_creation(task_blueprint) + + # step 1: create subtask in defining state, with filled-in subtask_template + subtask_template = SubtaskTemplate.objects.get(name='ingest control') + default_subtask_specs = get_default_json_object_for_schema(subtask_template.schema) + subtask_specs = default_subtask_specs # todo: translate specs from task to subtask once we have non-empty templates + cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4") + subtask_data = {"start_time": None, + "stop_time": None, + "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), + "task_blueprint": task_blueprint, + "specifications_template": subtask_template, + "specifications_doc": subtask_specs, + "priority": 1, + "schedule_method": ScheduleMethod.objects.get(value=ScheduleMethod.Choices.DYNAMIC.value), + "cluster": Cluster.objects.get(name=cluster_name)} + subtask = Subtask.objects.create(**subtask_data) + + # step 2: create and link subtask input + for task_relation_blueprint in task_blueprint.produced_by.all(): + producing_task_blueprint = task_relation_blueprint.producer + + predecessor_subtasks = [st for st in producing_task_blueprint.subtasks.order_by('id').all()] + for predecessor_subtask in predecessor_subtasks: + for predecessor_subtask_output in predecessor_subtask.outputs.all(): + SubtaskInput.objects.create(subtask=subtask, + producer=predecessor_subtask_output, + selection_doc=task_relation_blueprint.selection_doc, + selection_template=task_relation_blueprint.selection_template) + + # step 3: set state to DEFINED + subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) + subtask.save() + + # done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this ingest + return subtask + + # ==== various schedule* methods to schedule a Subtasks (if possible) ==== def schedule_subtask(subtask: Subtask) -> Subtask: @@ -469,6 +514,9 @@ def schedule_subtask(subtask: Subtask) -> Subtask: if subtask.specifications_template.type.value == SubtaskType.Choices.QA_PLOTS.value: return schedule_qaplots_subtask(subtask) + if subtask.specifications_template.type.value == SubtaskType.Choices.COPY.value: + return schedule_copy_subtask(subtask) + raise SubtaskSchedulingException("Cannot schedule subtask id=%d because there is no schedule-method known for this subtasktype=%s." % (subtask.pk, subtask.specifications_template.type.value)) except Exception as e: @@ -865,6 +913,59 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask): return pipeline_subtask +def schedule_copy_subtask(copy_subtask: Subtask): + ''' Schedule the given copy_subtask + This method should typically be called upon the event of an predecessor (pipeline or observation) subtask being finished. + This method implements "Scheduling subtasks" step from the "Specification Flow" + https://support.astron.nl/confluence/display/TMSS/Specification+Flow + ''' + # step 0: check pre-requisites + check_prerequities_for_scheduling(copy_subtask) + + if copy_subtask.specifications_template.type.value != SubtaskType.Choices.COPY.value: + raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (copy_subtask.pk, + copy_subtask.specifications_template.type, + SubtaskType.Choices.COPY.value)) + + # step 1: set state to SCHEDULING + copy_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value) + copy_subtask.save() + + # step 1a: check start/stop times + # not very relevant for tmss/dynamic scheduling, but the resource assigner demands it. + if copy_subtask.start_time is None: + now = datetime.utcnow() + logger.info("copy id=%s has no starttime. assigned default: %s", copy_subtask.pk, formatDatetime(now)) + copy_subtask.start_time = now + + if copy_subtask.stop_time is None: + stop_time = copy_subtask.start_time + timedelta(hours=+1) + logger.info("copy id=%s has no stop_time. assigned default: %s", copy_subtask.pk, formatDatetime(stop_time)) + copy_subtask.stop_time = stop_time + + # step 2: link input dataproducts + if copy_subtask.inputs.count() == 0: + raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (copy_subtask.pk, + copy_subtask.specifications_template.type)) + + # iterate over all inputs + for copy_subtask_input in copy_subtask.inputs.all(): + + # select and set input dataproducts that meet the filter defined in selection_doc + dataproducts = [dataproduct for dataproduct in copy_subtask_input.producer.dataproducts.all() + if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, copy_subtask_input.selection_doc)] + copy_subtask_input.dataproducts.set(dataproducts) + + # todo: I assume that there is no RA involvement here? If there is, how does a copy parset look like? + # step 4: resource assigner (if possible) + #_assign_resources(copy_subtask) + + # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it) + copy_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value) + copy_subtask.save() + + return copy_subtask + # === Misc === def create_and_schedule_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subtask]: diff --git a/SAS/TMSS/test/t_scheduling.py b/SAS/TMSS/test/t_scheduling.py index 5c078a44c378904424e2cdd317b04d71df0f8ad4..116f201108242191c8b5ea2782c6600f7c5f74a8 100755 --- a/SAS/TMSS/test/t_scheduling.py +++ b/SAS/TMSS/test/t_scheduling.py @@ -189,6 +189,49 @@ class SchedulingTest(unittest.TestCase): self.assertEqual('scheduled', subtask['state_value']) self.assertEqual('scheduled', ra_test_env.radb.getTask(tmss_id=pipe_subtask['id'])['status']) + def test_schedule_ingest_subtask(self): + with tmss_test_env.create_tmss_client() as client: + cluster_url = client.get_path_as_json_object('/cluster/1')['url'] + + # setup: first create an observation, so the ingest can have input. + obs_subtask_template = client.get_subtask_template("observation control") + obs_spec = get_default_json_object_for_schema(obs_subtask_template['schema']) + obs_spec['stations']['digital_pointings'][0]['subbands'] = [0] + + obs_subtask_data = test_data_creator.Subtask(specifications_template_url=obs_subtask_template['url'], + specifications_doc=obs_spec, + cluster_url=cluster_url, + task_blueprint_url=test_data_creator.post_data_and_get_url(test_data_creator.TaskBlueprint(), '/task_blueprint/')) + obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/') + obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/') + test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS"%obs_subtask['id'], + subtask_output_url=obs_subtask_output_url), '/dataproduct/') + + # now create the ingest... + ingest_subtask_template = client.get_subtask_template("ingest control") + ingest_spec = get_default_json_object_for_schema(ingest_subtask_template['schema']) + + ingest_subtask_data = test_data_creator.Subtask(specifications_template_url=ingest_subtask_template['url'], + specifications_doc=ingest_spec, + task_blueprint_url=obs_subtask['task_blueprint'], + cluster_url=cluster_url) + ingest_subtask = test_data_creator.post_data_and_get_response_as_json_object(ingest_subtask_data, '/subtask/') + + # ...and connect it to the observation + test_data_creator.post_data_and_get_url(test_data_creator.SubtaskInput(subtask_url=ingest_subtask['url'], subtask_output_url=obs_subtask_output_url), '/subtask_input/') + test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=ingest_subtask['url']), '/subtask_output/') + + for predecessor in client.get_subtask_predecessors(ingest_subtask['id']): + client.set_subtask_status(predecessor['id'], 'finished') + client.set_subtask_status(ingest_subtask['id'], 'defined') + + # trigger + subtask = client.schedule_subtask(ingest_subtask['id']) + + # assert + self.assertEqual('scheduled', subtask['state_value']) + self.assertEqual(models.Subtask.objects.get(id=ingest_subtask['id']).inputs.first().dataproducts.count(), 1) + def test_schedule_schedulingunit_enough_resources_available(self): '''similar test as test_schedule_pipeline_subtask_with_enough_resources_available, but now created from a scheduling_unit''' diff --git a/SAS/TMSS/test/t_subtasks.py b/SAS/TMSS/test/t_subtasks.py index cc14049e65a0bc1832812fbd0954b5fc42dd962e..4f40e9c53b8c0857430c032a9df3d08848f517b1 100755 --- a/SAS/TMSS/test/t_subtasks.py +++ b/SAS/TMSS/test/t_subtasks.py @@ -283,6 +283,25 @@ class SubTasksCreationFromTaskBluePrintCalibrator(unittest.TestCase): self.assertEqual(2.222, subtask.specifications_doc['stations']['analog_pointing']['angle2']) +class SubTaskCreationFromTaskBlueprintIngest(unittest.TestCase): + + def test_create_subtask_from_task_blueprint_ingest(self): + """ + Test that ingest task blueprint can be turned into a ingest control subtask + """ + + # setup + ingest_task_blueprint = create_task_blueprint_object_for_testing(task_template_name="ingest") + + # trigger + subtask = create_ingest_subtask_from_task_blueprint(ingest_task_blueprint) + + # assert + self.assertEqual("defined", str(subtask.state)) + self.assertEqual("ingest control", str(subtask.specifications_template.name)) + self.assertEqual("copy", str(subtask.specifications_template.type)) + + class SubtaskInputSelectionFilteringTest(unittest.TestCase): def setUp(self) -> None: