Skip to content
Snippets Groups Projects
Commit 804e8ca5 authored by Jörn Künsemöller's avatar Jörn Künsemöller
Browse files

TMSS-200: Implement input dataproduct selection filtering.

parent c78d4202
No related branches found
No related tags found
1 merge request!167Resolve TMSS-200
......@@ -77,13 +77,13 @@ def _populate_task_draft_example():
# connect them
connector = models.TaskConnector.objects.first() # TODO: get the correct connector instead of the first
task_relation_data = {"tags": [],
"selection_doc": {},
"selection_doc": {'sap': [0]},
"dataformat": models.Dataformat.objects.get(value='MeasurementSet'),
"producer": pipeline_task_draft,
"consumer": obs_task_draft,
"input": connector,
"output": connector,
"selection_template": models.TaskRelationSelectionTemplate.objects.get(name="All")}
"selection_template": models.TaskRelationSelectionTemplate.objects.get(name="SAP")}
models.TaskRelationDraft.objects.create(**task_relation_data)
except ImportError:
......@@ -891,15 +891,17 @@ def _populate_taskrelation_selection_templates():
"additionalProperties": false,
"definitions": {},
"properties": {
"SAP": {
"type": "integer",
"title": "SAP",
"default": 0,
"minimum": 0,
"maximum": 1,
"description": "Sub-array pointing selector"
}
},
"sap": {
"type": "array",
"title": "sap list",
"additionalItems": false,
"default": [],
"items": {
"type": "integer",
"title": "sap",
"minimum": 0,
"maximum": 1
}}},
"type": "object"
}'''),
"tags": []}
......@@ -921,6 +923,32 @@ def _populate_dataproduct_specifications_templates():
DataproductSpecificationsTemplate.objects.create(**template_data)
# SAP
template_data = {"name": "SAP",
"description": 'Select by SAP.',
"version": '1',
"schema": json.loads('''{
"$id": "http://example.com/example.json",
"$schema": "http://json-schema.org/draft-06/schema#",
"additionalProperties": false,
"definitions": {},
"properties": {
"sap": {
"type": "array",
"title": "sap list",
"additionalItems": false,
"default": [],
"items": {
"type": "integer",
"title": "sap",
"minimum": 0,
"maximum": 1
}}},
"type": "object"
}'''),
"tags": []}
DataproductSpecificationsTemplate.objects.create(**template_data)
def _populate_dataproduct_feedback_templates():
template_data = { "name": "Empty",
......
......@@ -22,7 +22,7 @@ def check_prerequities_for_subtask_creation(task_blueprint: TaskBlueprint) -> bo
return True
def create_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subtask]:
'''Generic create-method for subtasks. Calls the appropiate create method based on the task_blueprint specifications_template name.'''
'''Generic create-method for subtasks. Calls the appropriate create method based on the task_blueprint specifications_template name.'''
check_prerequities_for_subtask_creation(task_blueprint)
# fixed mapping from template name to generator functions which create the list of subtask(s) for this task_blueprint
......@@ -251,12 +251,13 @@ def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBluepri
subtask = Subtask.objects.create(**subtask_data)
# step 2: create and link subtask input/output
observation_predecessor_task = observation_predecessor_tasks[0] #TODO: make proper selection instead of first.
observation_predecessor_subtask = [st for st in observation_predecessor_task.subtasks.all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value][0]
subtask_input = SubtaskInput.objects.create(subtask=subtask,
producer=observation_predecessor_subtask.outputs.first(),
selection_doc="{}",
selection_template=TaskRelationSelectionTemplate.objects.get(name="All"))
for task_relation_blueprint in task_blueprint.consumed_by.all(): # todo: rename related name 'consumed_by' to 'consuming_task_relation_blueprints' or sth.
for predecessor_subtask in task_relation_blueprint.producer.subtasks.all():
for predecessor_subtask_output in predecessor_subtask.outputs.all():
subtask_input = SubtaskInput.objects.create(subtask=subtask,
producer=predecessor_subtask_output,
selection_doc=task_relation_blueprint.selection_doc,
selection_template=task_relation_blueprint.selection_template)
subtask_output = SubtaskOutput.objects.create(subtask=subtask)
# step 3: set state to DEFINED
......@@ -439,15 +440,15 @@ def schedule_observation_subtask(observation_subtask: Subtask):
# step 4: create output dataproducts, and link these to the output
specifications_doc = observation_subtask.specifications_doc
dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="Empty")
dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="SAP") # todo: should this be derived from the task relation specification template?
dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="Empty")
subtask_output = observation_subtask.outputs.first() # TODO: make proper selection, not default first()
for sb_nr in specifications_doc['stations']['digital_pointings'][0]['subbands']:
Dataproduct.objects.create(filename="L%d_SB%03d_uv.MS" % (observation_subtask.id, sb_nr),
directory="CEP4:/data/test-projects/TMSS_test/L%d/uv/" % (observation_subtask.id,),
directory="CEP4:/data/test-projects/TMSS_test/L%d/uv/" % (observation_subtask.id,), # todo: set correct path
dataformat=Dataformat.objects.get(value="MeasurementSet"),
producer=subtask_output,
specifications_doc={},
specifications_doc={"sap": [0]}, # todo: set correct value. How is that determined?
specifications_template=dataproduct_specifications_template,
feedback_doc="",
feedback_template=dataproduct_feedback_template)
......@@ -478,48 +479,51 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask):
pipeline_subtask.save()
# step 2: link input dataproducts
# for now, use all output dataproducts of the already linked inputs
# TODO: make proper dataproduct input selection
# TODO: use existing and reasonable selection and specification templates when we have those, for now, use "All" and "Empty"
selection_template = TaskRelationSelectionTemplate.objects.get(name="All")
# TODO: use existing and reasonable selection and specification templates for output when we have those, for now, use "Empty"
dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="Empty")
dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="Empty")
# TODO: make proper selection, not first()
pipeline_subtask_input = pipeline_subtask.inputs.first()
pipeline_subtask_output = pipeline_subtask.outputs.first()
pipeline_subtask_input.dataproducts.set(pipeline_subtask_input.producer.dataproducts.all())
# step 3: resource assigner
# TODO: implement. Can be skipped for now.
# step 4: create output dataproducts, and link these to the output
# TODO: create them from the spec, instead of "copying" the input filename
output_dps = []
for input_dp in pipeline_subtask_input.dataproducts.all():
if '_' in input_dp.filename and input_dp.filename.startswith('L'):
filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename.split('_', 1)[1])
else:
filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename)
output_dp = Dataproduct.objects.create(filename=filename,
directory=input_dp.directory.replace(str(pipeline_subtask_input.subtask.pk), str(pipeline_subtask.pk)),
dataformat=Dataformat.objects.get(value="MeasurementSet"),
producer=pipeline_subtask_output,
specifications_doc={},
specifications_template=dataproduct_specifications_template,
feedback_doc="",
feedback_template=dataproduct_feedback_template)
DataproductTransform.objects.create(input=input_dp, output=output_dp, identity=False)
output_dps.append(output_dp)
pipeline_subtask_output.dataproducts.set(output_dps)
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
pipeline_subtask.save()
return pipeline_subtask
# iterate over all inputs
for pipeline_subtask_input in pipeline_subtask.inputs.all():
# select and set input dataproducts that meet the filter defined in selection_doc
dataproducts = [dataproduct for dataproduct in pipeline_subtask_input.producer.dataproducts.all()
if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, pipeline_subtask_input.selection_doc)]
pipeline_subtask_input.dataproducts.set(dataproducts)
# select subtask output the new dataproducts will be linked to
pipeline_subtask_output = pipeline_subtask.outputs.first() # TODO: if we have several, how to map input to output?
# step 3: resource assigner
# TODO: implement. Can be skipped for now.
# step 4: create output dataproducts, and link these to the output
# TODO: create them from the spec, instead of "copying" the input filename
output_dps = []
for input_dp in pipeline_subtask_input.dataproducts.all():
if '_' in input_dp.filename and input_dp.filename.startswith('L'):
filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename.split('_', 1)[1])
else:
filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename)
output_dp = Dataproduct.objects.create(filename=filename,
directory=input_dp.directory.replace(str(pipeline_subtask_input.subtask.pk), str(pipeline_subtask.pk)),
dataformat=Dataformat.objects.get(value="MeasurementSet"),
producer=pipeline_subtask_output,
specifications_doc={},
specifications_template=dataproduct_specifications_template,
feedback_doc="",
feedback_template=dataproduct_feedback_template)
DataproductTransform.objects.create(input=input_dp, output=output_dp, identity=False)
output_dps.append(output_dp)
pipeline_subtask_output.dataproducts.set(output_dps)
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
pipeline_subtask.save()
return pipeline_subtask
# === Misc ===
......@@ -598,3 +602,10 @@ def _generate_subtask_specs_from_preprocessing_task_specs(preprocessing_task_spe
return subtask_specs
def specifications_doc_meets_selection_doc(specifications_doc, selection_doc):
# todo: do something cleverer here. Not sure what are the plans though.
# Should specifications {'sap': [0]} match a selection {'sap': [0,1]}?
meets_criteria = specifications_doc.items() <= selection_doc.items()
print(specifications_doc, selection_doc, meets_criteria)
return meets_criteria
......@@ -56,7 +56,7 @@ def create_task_blueprint_from_task_draft(task_draft: models.TaskDraft) -> model
# now that we have a task_blueprint, its time to refresh the task_draft so we get the non-cached fields
task_draft.refresh_from_db()
# loop over consumers/producers, and 'copy'' the TaskRelationBlueprint form the TaskRelationDraft
# loop over consumers/producers, and 'copy'' the TaskRelationBlueprint from the TaskRelationDraft
# this is only possible if both 'ends' of the task_relation are converted to a TaskBlueprint
# so, when converting two TaskDrafts (for example an observation and a pipeline), then for the conversion
# of the first TaskDraft->TaskBlueprint no relation is setup,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment