From c5b7ecd521009eee1c0ae22a88a91f1cd27c45ef Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Tue, 30 Jan 2024 13:00:18 +0100 Subject: [PATCH] TMSS-2829: implemented scheduling the copy-subtask and bookkeeping the managed-or-not-output --- SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py | 81 ++++++++++++------- 1 file changed, 50 insertions(+), 31 deletions(-) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 0d904aa3d86..9b463b39612 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -994,10 +994,11 @@ def create_copy_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBluepri # step 0: check pre-requisites check_prerequities_for_subtask_creation(task_blueprint) - # step 1: create subtask in defining state, with filled-in subtask_template + # create subtask in defining state, with filled-in subtask_template copy_subtask_template = SubtaskTemplate.get_version_or_latest(name="copy pipeline") copy_subtask_spec = copy_subtask_template.get_default_json_document_for_schema() - # copy_subtask_spec[''] + copy_subtask_spec['destination'] = task_blueprint.specifications_doc.get('destination', '/tmp') + copy_subtask_spec['managed_output'] = task_blueprint.specifications_doc.get('managed_output', False) copy_subtask_data = { "scheduled_start_time": None, "scheduled_stop_time": None, @@ -1005,25 +1006,14 @@ def create_copy_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBluepri "task_blueprint": task_blueprint, "specifications_template": copy_subtask_template, "specifications_doc": copy_subtask_spec, - "primary": True } #, - #"cluster": observation_subtask.cluster} + "primary": True, + "cluster": task_blueprint.predecessors.first().subtasks.first().cluster } copy_subtask = Subtask.objects.create(**copy_subtask_data) - # step 2: create and link subtask input/output - selection_template = TaskRelationSelectionTemplate.get_version_or_latest(name="all") - selection_doc = selection_template.get_default_json_document_for_schema() + # create and link subtask input/output + _create_or_update_subtask_inputs(copy_subtask) - # for obs_out in observation_subtask.outputs.filter(output_role__role__value=Role.Choices.CORRELATOR.value).all(): - # qafile_subtask_input = SubtaskInput.objects.create(subtask=qafile_subtask, - # producer=obs_out, # TODO: determine proper producer based on spec in task_relation_blueprint - # input_role=None, # TODO: determine proper role based on spec in task_relation_blueprint - # selection_doc=selection_doc, - # selection_template=selection_template) - # - # # create an internal SubtaskOutput (output_role=None), because we do not expose qa data yet at task level - # qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask, output_role=None, filesystem=_output_filesystem(qafile_subtask)) - - # step 3: set state to DEFINED + # set state to DEFINED copy_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) copy_subtask.save() @@ -1031,9 +1021,6 @@ def create_copy_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBluepri return copy_subtask - - - # ==== various schedule* methods to schedule a Subtasks (if possible) ==== # LOFAR needs to have a gap in between observations to (re)initialize hardware. @@ -2291,17 +2278,49 @@ def schedule_copy_subtask(copy_subtask: Subtask): # iterate over all inputs for copy_subtask_input in copy_subtask.inputs.all(): - # select and set input dataproducts that meet the filter defined in selection_doc - dataproducts = [dataproduct for dataproduct in copy_subtask_input.producer.dataproducts.all() - if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, copy_subtask_input.selection_doc)] - copy_subtask_input.dataproducts.set(dataproducts) - - # skip resource assigner - - # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it) - copy_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value) - copy_subtask.save() + input_dataproducts = [dataproduct for dataproduct in copy_subtask_input.producer.dataproducts.all() + if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, copy_subtask_input.selection_doc)] + if input_dataproducts: + copy_subtask_input.dataproducts.set(input_dataproducts) + + if copy_subtask.specifications_doc.get('managed_output', True): + # create an exact copy of the input type, but then for output + output_connector = copy_subtask.task_blueprint.specifications_template.connector_types.filter(dataformat=copy_subtask_input.input_role.dataformat, + datatype=copy_subtask_input.input_role.datatype, + role=copy_subtask_input.input_role.role, + iotype__value=IOType.Choices.OUTPUT.value).first() + + subtask_output = SubtaskOutput.objects.create(subtask=copy_subtask, + output_role=output_connector, + filesystem=_output_filesystem(copy_subtask)) + + # prepare output_dataproducts, which are just exact copies of the input, exept for the directory + dataproduct_feedback_template = DataproductFeedbackTemplate.get_version_or_latest(name="empty") + dataproduct_feedback_doc = dataproduct_feedback_template.get_default_json_document_for_schema() + output_dataproducts = [Dataproduct(filename=input_dp.filename, + directory=copy_subtask.specifications_doc['destination'], # the key property of the copytask. Where to copy to? + dataformat=input_dp.dataformat, + datatype=input_dp.datatype, + producer=subtask_output, + specifications_doc=input_dp.specifications_doc, + specifications_template=input_dp.specifications_template, + feedback_doc=dataproduct_feedback_doc, + feedback_template=dataproduct_feedback_template, + sap=input_dp.sap, + global_identifier=None) for input_dp in input_dataproducts] + + # create the dataproducts + output_dataproducts = _bulk_create_dataproducts_with_global_identifiers(output_dataproducts) + subtask_output.dataproducts.set(output_dataproducts) + + # a copy action is an identity transform + transforms = [DataproductTransform(input=input_dp, output=output_dp, identity=True) for input_dp,output_dp in zip(input_dataproducts, output_dataproducts)] + DataproductTransform.objects.bulk_create(transforms) + + # step 5: set state to SCHEDULED (resulting in the copy_service to pick this subtask up and run it) + copy_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value) + copy_subtask.save() return copy_subtask -- GitLab