diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 49dfeb026d3084b06810aed0f716de5511e8a81c..e58e67d3c10e824c6f97bf97978b3f11d29c51e8 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -643,7 +643,9 @@ def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskB # step 2: create and link subtask input/output # an observation has no input, it just produces output data - subtask_output = SubtaskOutput.objects.create(subtask=subtask) + # create a subtask_output per task_connector_type of the task + for task_connector_type in task_blueprint.specifications_template.connector_types.filter(iotype__value=IOType.Choices.OUTPUT.value).all(): + subtask_output = SubtaskOutput.objects.create(subtask=subtask, output_role=task_connector_type) # step 3: set state to DEFINED subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) @@ -705,10 +707,12 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) for obs_out in observation_subtask.outputs.all(): qafile_subtask_input = SubtaskInput.objects.create(subtask=qafile_subtask, producer=obs_out, # TODO: determine proper producer based on spec in task_relation_blueprint + input_role=None, # TODO: determine proper role based on spec in task_relation_blueprint selection_doc=selection_doc, selection_template=selection_template) - qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask) + # create an internal SubtaskOutput (output_role=None), because we do not expose qa data yet at task level + qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask, output_role=None) # step 3: set state to DEFINED qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) @@ -781,10 +785,12 @@ def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subta selection_doc = selection_template.get_default_json_document_for_schema() qaplots_subtask_input = SubtaskInput.objects.create(subtask=qaplots_subtask, producer=qafile_subtask.outputs.first(), + input_role=None, selection_doc=selection_doc, selection_template=selection_template) - qaplots_subtask_output = SubtaskOutput.objects.create(subtask=qaplots_subtask) + # create an internal SubtaskOutput (output_role=None), because we do not expose qa data yet at task level + qaplots_subtask_output = SubtaskOutput.objects.create(subtask=qaplots_subtask, output_role=None) # step 3: set state to DEFINED qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) @@ -794,6 +800,28 @@ def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subta return qaplots_subtask +def _create_subtask_inputs(subtask: Subtask): + # loop over all incoming task_relations... + for task_relation_blueprint in subtask.task_blueprint.produced_by.all(): + # ... and check which task_relation matches with this task's connector_types + for input_connector_type in subtask.task_blueprint.specifications_template.connector_types.filter(role=task_relation_blueprint.output_role.role, + datatype=task_relation_blueprint.output_role.datatype, + dataformat=task_relation_blueprint.output_role.dataformat, + iotype__value=IOType.Choices.INPUT.value).all(): + # find the producing_subtask_output for this relation + for producing_subtask_output in SubtaskOutput.objects.filter(subtask__in=task_relation_blueprint.producer.subtasks.all()).filter(output_role__role=task_relation_blueprint.output_role.role, + output_role__datatype=task_relation_blueprint.output_role.datatype, + output_role__dataformat=task_relation_blueprint.output_role.dataformat, + output_role__iotype=task_relation_blueprint.output_role.iotype).all(): + # create the SubtaskInput + # this is a "socket" with a 'input_connector_type'-form into which dataproducts of the correct type can be fed upon scheduling + subtask_input = SubtaskInput.objects.create(subtask=subtask, + producer=producing_subtask_output, + input_role=input_connector_type, + selection_doc=task_relation_blueprint.selection_doc, + selection_template=task_relation_blueprint.selection_template) + + def create_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint, subtask_template_name: str, generate_subtask_specs_from_task_spec_func) -> Subtask: ''' Create a subtask to for the preprocessing pipeline. This method implements "Instantiate subtasks" step from the "Specification Flow" @@ -826,21 +854,12 @@ def create_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint, s "cluster": Cluster.objects.get(name=cluster_name) } subtask = Subtask.objects.create(**subtask_data) - # step 2: create and link subtask input/output - for task_relation_blueprint in task_blueprint.produced_by.all(): - producing_task_blueprint = task_relation_blueprint.producer - - # create inputs for all predecessor observation subtask outputs that belong to the producer task of this task relation - # TODO: more filtering needed? - predecessor_observation_subtasks = [st for st in producing_task_blueprint.subtasks.order_by('id').all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value] - for predecessor_obs_subtask in predecessor_observation_subtasks: - for predecessor_subtask_output in predecessor_obs_subtask.outputs.all(): - subtask_input = SubtaskInput.objects.create(subtask=subtask, - producer=predecessor_subtask_output, - selection_doc=task_relation_blueprint.selection_doc, - selection_template=task_relation_blueprint.selection_template) + # step 2: create and link subtask input + _create_subtask_inputs(subtask) - subtask_output = SubtaskOutput.objects.create(subtask=subtask) + # create a subtask_output per task_connector_type of the task + for task_connector_type in task_blueprint.specifications_template.connector_types.filter(iotype__value=IOType.Choices.OUTPUT.value).all(): + subtask_output = SubtaskOutput.objects.create(subtask=subtask, output_role=task_connector_type) # step 3: set state to DEFINED subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) @@ -882,16 +901,7 @@ def create_ingest_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> subtask = Subtask.objects.create(**subtask_data) # step 2: create and link subtask input - for task_relation_blueprint in task_blueprint.produced_by.all(): - producing_task_blueprint = task_relation_blueprint.producer - - predecessor_subtasks = [st for st in producing_task_blueprint.subtasks.filter(specifications_template__type__value__in=(SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value)).order_by('id').all()] - for predecessor_subtask in predecessor_subtasks: - for predecessor_subtask_output in predecessor_subtask.outputs.all(): - SubtaskInput.objects.create(subtask=subtask, - producer=predecessor_subtask_output, - selection_doc=task_relation_blueprint.selection_doc, - selection_template=task_relation_blueprint.selection_template) + _create_subtask_inputs(subtask) # step 3: set state to DEFINED subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) @@ -931,16 +941,7 @@ def create_cleanup_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> # step 2: create and link subtask input # for this cleanup subtask an 'input' seems a bit weird, but it actually makes sense! # this cleanup subtask will cleanup the output data of all linked input predecessors. - for task_relation_blueprint in task_blueprint.produced_by.all(): - producing_task_blueprint = task_relation_blueprint.producer - - predecessor_subtasks = [st for st in producing_task_blueprint.subtasks.filter(specifications_template__type__value__in=(SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value)).order_by('id').all()] - for predecessor_subtask in predecessor_subtasks: - for predecessor_subtask_output in predecessor_subtask.outputs.all(): - SubtaskInput.objects.create(subtask=subtask, - producer=predecessor_subtask_output, - selection_doc=task_relation_blueprint.selection_doc, - selection_template=task_relation_blueprint.selection_template) + _create_subtask_inputs(subtask) # step 3: set state to DEFINED subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) @@ -1697,9 +1698,6 @@ def _create_preprocessing_output_dataproducts_and_transforms(pipeline_subtask: S return output_dataproducts def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: Subtask, input_dataproducts: list): - # select subtask output the new dataproducts will be linked to - pipeline_subtask_output = pipeline_subtask.outputs.first() # TODO: if we have several, how to map input to output? - dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty") directory = os.path.join(_output_root_directory(pipeline_subtask), "pulp") @@ -1707,7 +1705,7 @@ def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: # we process per data type, as later on we produce summaries per data type # different datatypes need different treatment - data_types = ["cs", "is", "cv"] + bf_data_types = ["cs", "is", "cv"] # sort input dataproducts by data type input_dataproducts = { @@ -1735,22 +1733,25 @@ def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: "cv": "CV" } - for data_type in data_types: + for bf_data_type in bf_data_types: # do not generate output for a data type that is not in the input - if not input_dataproducts[data_type]: + if not input_dataproducts[bf_data_type]: continue dataformat = Dataformat.objects.get(value="pulp analysis") datatype = Datatype.objects.get(value="pulsar profile") dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="time series") + # select subtask output the new dataproducts will be linked to + pipeline_subtask_output = pipeline_subtask.outputs.filter(output_role__dataformat=dataformat, output_role__datatype=datatype).first() # TODO: if we have several, how to map input to output? + # how the output is constructed from the input. we do this based on filenames: input dataproducts that result in the same filename # are combined to produce the output with that filename. # # format: { output_dp_filename: {"output_dp": output_dp, "input_dps": [input_dps]} } transformation_map = {} - for input_dp in input_dataproducts[data_type]: + for input_dp in input_dataproducts[bf_data_type]: # the filename doesn't contain the stokes number if there is only 1 stokes for this data type or when recording cv: # # type example filename input:output mapping @@ -1775,7 +1776,7 @@ def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: else: # a new output to model output_dp = Dataproduct(filename=output_filename, - directory=os.path.join(directory, output_subdir[data_type]), + directory=os.path.join(directory, output_subdir[bf_data_type]), dataformat=dataformat, datatype=datatype, producer=pipeline_subtask_output, @@ -1815,17 +1816,20 @@ def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: datatype = Datatype.objects.get(value="quality") dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="pulp summary") + # select subtask output the new dataproducts will be linked to + pipeline_subtask_output = pipeline_subtask.outputs.filter(output_role__dataformat=dataformat, output_role__datatype=datatype).first() # TODO: if we have several, how to map input to output? + # 1. create dataproducts # type example filename # CV pulp/cs/L808292_summaryCV.tar # IS pulp/is/L808290_summaryIS.tar # CS pulp/cs/L808288_summaryCS.tar - summary_dataproduct = Dataproduct(filename="L%s_summary%s.tar" % (pipeline_subtask.id, summary_filename_suffix[data_type]), - directory=os.path.join(directory, output_subdir[data_type]), + summary_dataproduct = Dataproduct(filename="L%s_summary%s.tar" % (pipeline_subtask.id, summary_filename_suffix[bf_data_type]), + directory=os.path.join(directory, output_subdir[bf_data_type]), dataformat=dataformat, datatype=datatype, producer=pipeline_subtask_output, - specifications_doc={ "coherent": data_type != "is", "identifiers": { "data_type": data_type } }, + specifications_doc={ "coherent": bf_data_type != "is", "identifiers": { "data_type": bf_data_type } }, specifications_template=dataproduct_specifications_template, feedback_doc=dataproduct_feedback_template.get_default_json_document_for_schema(), feedback_template=dataproduct_feedback_template, @@ -1838,7 +1842,7 @@ def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: # 2. create transforms from the input # populate the transform, each input_dp of this datatype is input for this summary - transforms = [DataproductTransform(input=input_dp, output=summary_dataproduct, identity=False) for input_dp in input_dataproducts[data_type]] + transforms = [DataproductTransform(input=input_dp, output=summary_dataproduct, identity=False) for input_dp in input_dataproducts[bf_data_type]] DataproductTransform.objects.bulk_create(transforms) return None @@ -1953,21 +1957,22 @@ def schedule_ingest_subtask(ingest_subtask: Subtask): raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input dataproduct(s)" % (ingest_subtask.pk, ingest_subtask.specifications_template.type)) - # iterate over all inputs - for ingest_subtask_input in ingest_subtask.inputs.all(): + # Create one internal SubtaskOutput (output_role=None), all dataproducts end up in here. + # We do not expose ingested data as a task output connector. The LTA is an endpoint. + ingest_subtask_output = SubtaskOutput.objects.create(subtask=ingest_subtask, output_role=None) + # gather all dataproducts from all inputs + for ingest_subtask_input in ingest_subtask.inputs.all(): # select and set input dataproducts that meet the filter defined in selection_doc input_dataproducts = [dataproduct for dataproduct in ingest_subtask_input.producer.dataproducts.all() if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, ingest_subtask_input.selection_doc)] ingest_subtask_input.dataproducts.set(input_dataproducts) - # define output and create output dataproducts. - ingest_subtask_output = SubtaskOutput.objects.create(subtask=ingest_subtask) - # prepare identifiers in bulk for each output_dataproduct dp_gids = [SIPidentifier(source="TMSS") for _ in input_dataproducts] SIPidentifier.objects.bulk_create(dp_gids) + # define output and create output dataproducts. output_dataproducts = [Dataproduct(filename=input_dp.filename, # overwritten later by ingest 'feedback'. Is determined at transfer time by the LTA. directory="LTA", # filled in later by ingest 'feedback'. Is determined at transfer time by the LTA. dataformat=input_dp.dataformat,