diff --git a/SAS/TMSS/src/tmss/tmssapp/populate.py b/SAS/TMSS/src/tmss/tmssapp/populate.py index fffc75596da5670c0389450a06b0c93acaae9183..96092786d6a566fcce5203d99134e24eb4657a32 100644 --- a/SAS/TMSS/src/tmss/tmssapp/populate.py +++ b/SAS/TMSS/src/tmss/tmssapp/populate.py @@ -77,13 +77,13 @@ def _populate_task_draft_example(): # connect them connector = models.TaskConnector.objects.first() # TODO: get the correct connector instead of the first task_relation_data = {"tags": [], - "selection_doc": {}, + "selection_doc": {'sap': [0]}, "dataformat": models.Dataformat.objects.get(value='MeasurementSet'), "producer": pipeline_task_draft, "consumer": obs_task_draft, "input": connector, "output": connector, - "selection_template": models.TaskRelationSelectionTemplate.objects.get(name="All")} + "selection_template": models.TaskRelationSelectionTemplate.objects.get(name="SAP")} models.TaskRelationDraft.objects.create(**task_relation_data) except ImportError: @@ -891,15 +891,17 @@ def _populate_taskrelation_selection_templates(): "additionalProperties": false, "definitions": {}, "properties": { - "SAP": { - "type": "integer", - "title": "SAP", - "default": 0, - "minimum": 0, - "maximum": 1, - "description": "Sub-array pointing selector" - } - }, + "sap": { + "type": "array", + "title": "sap list", + "additionalItems": false, + "default": [], + "items": { + "type": "integer", + "title": "sap", + "minimum": 0, + "maximum": 1 + }}}, "type": "object" }'''), "tags": []} @@ -921,6 +923,32 @@ def _populate_dataproduct_specifications_templates(): DataproductSpecificationsTemplate.objects.create(**template_data) + # SAP + template_data = {"name": "SAP", + "description": 'Select by SAP.', + "version": '1', + "schema": json.loads('''{ + "$id": "http://example.com/example.json", + "$schema": "http://json-schema.org/draft-06/schema#", + "additionalProperties": false, + "definitions": {}, + "properties": { + "sap": { + "type": "array", + "title": "sap list", + "additionalItems": false, + "default": [], + "items": { + "type": "integer", + "title": "sap", + "minimum": 0, + "maximum": 1 + }}}, + "type": "object" + }'''), + "tags": []} + DataproductSpecificationsTemplate.objects.create(**template_data) + def _populate_dataproduct_feedback_templates(): template_data = { "name": "Empty", diff --git a/SAS/TMSS/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/src/tmss/tmssapp/subtasks.py index 311f403f6e23556836fa3a2e5aa8fd87ddbc8b3d..674f7ae68138bd60c02ae810536049fe8347e76d 100644 --- a/SAS/TMSS/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/src/tmss/tmssapp/subtasks.py @@ -22,7 +22,7 @@ def check_prerequities_for_subtask_creation(task_blueprint: TaskBlueprint) -> bo return True def create_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subtask]: - '''Generic create-method for subtasks. Calls the appropiate create method based on the task_blueprint specifications_template name.''' + '''Generic create-method for subtasks. Calls the appropriate create method based on the task_blueprint specifications_template name.''' check_prerequities_for_subtask_creation(task_blueprint) # fixed mapping from template name to generator functions which create the list of subtask(s) for this task_blueprint @@ -251,12 +251,13 @@ def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBluepri subtask = Subtask.objects.create(**subtask_data) # step 2: create and link subtask input/output - observation_predecessor_task = observation_predecessor_tasks[0] #TODO: make proper selection instead of first. - observation_predecessor_subtask = [st for st in observation_predecessor_task.subtasks.all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value][0] - subtask_input = SubtaskInput.objects.create(subtask=subtask, - producer=observation_predecessor_subtask.outputs.first(), - selection_doc="{}", - selection_template=TaskRelationSelectionTemplate.objects.get(name="All")) + for task_relation_blueprint in task_blueprint.consumed_by.all(): # todo: rename related name 'consumed_by' to 'consuming_task_relation_blueprints' or sth. + for predecessor_subtask in task_relation_blueprint.producer.subtasks.all(): + for predecessor_subtask_output in predecessor_subtask.outputs.all(): + subtask_input = SubtaskInput.objects.create(subtask=subtask, + producer=predecessor_subtask_output, + selection_doc=task_relation_blueprint.selection_doc, + selection_template=task_relation_blueprint.selection_template) subtask_output = SubtaskOutput.objects.create(subtask=subtask) # step 3: set state to DEFINED @@ -439,15 +440,15 @@ def schedule_observation_subtask(observation_subtask: Subtask): # step 4: create output dataproducts, and link these to the output specifications_doc = observation_subtask.specifications_doc - dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="Empty") + dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="SAP") # todo: should this be derived from the task relation specification template? dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="Empty") subtask_output = observation_subtask.outputs.first() # TODO: make proper selection, not default first() for sb_nr in specifications_doc['stations']['digital_pointings'][0]['subbands']: Dataproduct.objects.create(filename="L%d_SB%03d_uv.MS" % (observation_subtask.id, sb_nr), - directory="CEP4:/data/test-projects/TMSS_test/L%d/uv/" % (observation_subtask.id,), + directory="CEP4:/data/test-projects/TMSS_test/L%d/uv/" % (observation_subtask.id,), # todo: set correct path dataformat=Dataformat.objects.get(value="MeasurementSet"), producer=subtask_output, - specifications_doc={}, + specifications_doc={"sap": [0]}, # todo: set correct value. How is that determined? specifications_template=dataproduct_specifications_template, feedback_doc="", feedback_template=dataproduct_feedback_template) @@ -478,48 +479,51 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask): pipeline_subtask.save() # step 2: link input dataproducts - # for now, use all output dataproducts of the already linked inputs - # TODO: make proper dataproduct input selection - # TODO: use existing and reasonable selection and specification templates when we have those, for now, use "All" and "Empty" - selection_template = TaskRelationSelectionTemplate.objects.get(name="All") + + # TODO: use existing and reasonable selection and specification templates for output when we have those, for now, use "Empty" dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="Empty") dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="Empty") - # TODO: make proper selection, not first() - pipeline_subtask_input = pipeline_subtask.inputs.first() - pipeline_subtask_output = pipeline_subtask.outputs.first() - - pipeline_subtask_input.dataproducts.set(pipeline_subtask_input.producer.dataproducts.all()) - - # step 3: resource assigner - # TODO: implement. Can be skipped for now. - - # step 4: create output dataproducts, and link these to the output - # TODO: create them from the spec, instead of "copying" the input filename - output_dps = [] - for input_dp in pipeline_subtask_input.dataproducts.all(): - if '_' in input_dp.filename and input_dp.filename.startswith('L'): - filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename.split('_', 1)[1]) - else: - filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename) - - output_dp = Dataproduct.objects.create(filename=filename, - directory=input_dp.directory.replace(str(pipeline_subtask_input.subtask.pk), str(pipeline_subtask.pk)), - dataformat=Dataformat.objects.get(value="MeasurementSet"), - producer=pipeline_subtask_output, - specifications_doc={}, - specifications_template=dataproduct_specifications_template, - feedback_doc="", - feedback_template=dataproduct_feedback_template) - DataproductTransform.objects.create(input=input_dp, output=output_dp, identity=False) - output_dps.append(output_dp) - pipeline_subtask_output.dataproducts.set(output_dps) - - # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it) - pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value) - pipeline_subtask.save() - - return pipeline_subtask + # iterate over all inputs + for pipeline_subtask_input in pipeline_subtask.inputs.all(): + + # select and set input dataproducts that meet the filter defined in selection_doc + dataproducts = [dataproduct for dataproduct in pipeline_subtask_input.producer.dataproducts.all() + if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, pipeline_subtask_input.selection_doc)] + pipeline_subtask_input.dataproducts.set(dataproducts) + + # select subtask output the new dataproducts will be linked to + pipeline_subtask_output = pipeline_subtask.outputs.first() # TODO: if we have several, how to map input to output? + + # step 3: resource assigner + # TODO: implement. Can be skipped for now. + + # step 4: create output dataproducts, and link these to the output + # TODO: create them from the spec, instead of "copying" the input filename + output_dps = [] + for input_dp in pipeline_subtask_input.dataproducts.all(): + if '_' in input_dp.filename and input_dp.filename.startswith('L'): + filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename.split('_', 1)[1]) + else: + filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename) + + output_dp = Dataproduct.objects.create(filename=filename, + directory=input_dp.directory.replace(str(pipeline_subtask_input.subtask.pk), str(pipeline_subtask.pk)), + dataformat=Dataformat.objects.get(value="MeasurementSet"), + producer=pipeline_subtask_output, + specifications_doc={}, + specifications_template=dataproduct_specifications_template, + feedback_doc="", + feedback_template=dataproduct_feedback_template) + DataproductTransform.objects.create(input=input_dp, output=output_dp, identity=False) + output_dps.append(output_dp) + pipeline_subtask_output.dataproducts.set(output_dps) + + # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it) + pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value) + pipeline_subtask.save() + + return pipeline_subtask # === Misc === @@ -598,3 +602,10 @@ def _generate_subtask_specs_from_preprocessing_task_specs(preprocessing_task_spe return subtask_specs + +def specifications_doc_meets_selection_doc(specifications_doc, selection_doc): + # todo: do something cleverer here. Not sure what are the plans though. + # Should specifications {'sap': [0]} match a selection {'sap': [0,1]}? + meets_criteria = specifications_doc.items() <= selection_doc.items() + print(specifications_doc, selection_doc, meets_criteria) + return meets_criteria diff --git a/SAS/TMSS/src/tmss/tmssapp/tasks.py b/SAS/TMSS/src/tmss/tmssapp/tasks.py index d905e7774b12a1ac56977c5e72f6412aee455535..b97afa0eb13b3714d55586c27c03b265ab7524d6 100644 --- a/SAS/TMSS/src/tmss/tmssapp/tasks.py +++ b/SAS/TMSS/src/tmss/tmssapp/tasks.py @@ -56,7 +56,7 @@ def create_task_blueprint_from_task_draft(task_draft: models.TaskDraft) -> model # now that we have a task_blueprint, its time to refresh the task_draft so we get the non-cached fields task_draft.refresh_from_db() - # loop over consumers/producers, and 'copy'' the TaskRelationBlueprint form the TaskRelationDraft + # loop over consumers/producers, and 'copy'' the TaskRelationBlueprint from the TaskRelationDraft # this is only possible if both 'ends' of the task_relation are converted to a TaskBlueprint # so, when converting two TaskDrafts (for example an observation and a pipeline), then for the conversion # of the first TaskDraft->TaskBlueprint no relation is setup,