Skip to content
Snippets Groups Projects
Commit c161f05a authored by Jan David Mol's avatar Jan David Mol
Browse files

TMSS-604: Moved some dataproduct specifications from identifiers to the...

TMSS-604: Moved some dataproduct specifications from identifiers to the properties tho identifiers encode, to allow them to be filtered on
parent a11ec00d
No related branches found
No related tags found
1 merge request!383Resolve TMSS-604 "Dataproduct specs"
......@@ -113,11 +113,18 @@ def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: d
dataproducts = list(models.Dataproduct.objects.filter(producer_id__in=subtask_output_ids).filter(dataformat=Dataformat.Choices.MEASUREMENTSET.value).filter(datatype=Datatype.Choices.VISIBILITIES))
# put them in the order as expected in the parset
dataproducts.sort(key=lambda x: x.specification_doc["subband_index"])
parset["Observation.DataProducts.Output_Correlated.filenames"] = [dp.filename for dp in dataproducts]
parset["Observation.DataProducts.Output_Correlated.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in dataproducts]
# marshall dataproducts, but only if they're supplied. in some use cases, we want a parset before the subtask is scheduled.
correlator_dataproducts = []
for digi_beam in digi_beams:
for subband in digi_beam["subbands"]:
dataproduct = [dp for dp in dataproducts
if dp.specifications_doc["sap"] == digi_beam['name']
and dp.specifications_doc["subband"] == subband]
correlator_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct)
parset["Observation.DataProducts.Output_Correlated.filenames"] = [dp.filename for dp in correlator_dataproducts]
parset["Observation.DataProducts.Output_Correlated.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in correlator_dataproducts]
# mimic MoM placeholder thingy (the resource estimator parses this)
parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))]
......@@ -177,12 +184,12 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d
for s in range(nr_stokes):
for p in range(nr_parts):
dataproduct = [dp for dp in dataproducts
if dp.specifications_doc["identifiers"]["sap_index"] == sap_idx
if dp.specifications_doc["sap"] == sap['name']
and dp.specifications_doc["identifiers"]["pipeline_index"] == pipeline_idx
and dp.specifications_doc["identifiers"]["tab_index"] == field_idx
and dp.specifications_doc["identifiers"]["stokes_index"] == s
and dp.specifications_doc["identifiers"]["part_index"] == p
and dp.specifications_doc["identifiers"]["coherent"] == tab['coherent']]
and dp.specifications_doc["coherent"] == tab['coherent']]
if tab['coherent']:
coherent_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct)
else:
......@@ -226,12 +233,12 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d
for s in range(nr_stokes):
for p in range(nr_parts):
dataproduct = [dp for dp in dataproducts
if dp.specifications_doc["identifiers"]["sap_index"] == sap_idx
if dp.specifications_doc["sap"] == sap["name"]
and dp.specifications_doc["identifiers"]["pipeline_index"] == pipeline_idx
and dp.specifications_doc["identifiers"]["tab_index"] == field_idx
and dp.specifications_doc["identifiers"]["stokes_index"] == s
and dp.specifications_doc["identifiers"]["part_index"] == p
and dp.specifications_doc["identifiers"]["coherent"] == True]
and dp.specifications_doc["coherent"] == True]
coherent_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct)
if cobalt_version >= 2:
......@@ -536,13 +543,13 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask)
subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id))
unsorted_out_dataproducts = sum([list(models.Dataproduct.objects.filter(producer_id=subtask_output.id)) for subtask_output in subtask_outputs],[])
def find_dataproduct(dataproducts: list, identifiers: dict):
hits = [dp for dp in dataproducts if dp.specifications_doc['identifiers']['sap_index'] == identifiers['sap_index']
and dp.specifications_doc['identifiers']['subband_index'] == identifiers['subband_index']]
def find_dataproduct(dataproducts: list, specification_doc: dict):
hits = [dp for dp in dataproducts if dp.specifications_doc['sap'] == specification_doc['sap']
and dp.specifications_doc['subband'] == specification_doc['subband']]
return hits[0] if hits else null_dataproduct
# list output dataproducts in the same order as input dataproducts, matched by the identifiers
out_dataproducts = [find_dataproduct(unsorted_out_dataproducts, in_dp.specifications_doc['identifiers']) for in_dp in in_dataproducts]
out_dataproducts = [find_dataproduct(unsorted_out_dataproducts, in_dp.specifications_doc) for in_dp in in_dataproducts]
parset["Observation.DataProducts.Output_Correlated.enabled"] = "true"
parset["Observation.DataProducts.Output_Correlated.filenames"] = "[%s]" % ",".join([dp.filename for dp in out_dataproducts])
......
......@@ -10,31 +10,13 @@
"title": "SAP",
"default": ""
},
"identifiers": {
"title": "identifiers",
"description": "identification of this dataproduct within the producing subtask.",
"default": {},
"type": "object",
"properties": {
"sap_index": {
"title": "SAP index",
"type": "integer",
"default": 0,
"minimum": 0
},
"subband_index": {
"title": "Subband index",
"description": "Subband offset within the subtask (SAPs do not reset numbering)",
"type": "integer",
"default": 0,
"minimum": 0
}
},
"required": [
"sap_index",
"subband_index"
]
"subband": {
"type": "integer",
"title": "subband number",
"default": 0,
"minimum": 0,
"maximum": 511
}
},
"required": [ "identifiers" ]
"required": [ "sap", "subband" ]
}
......@@ -1151,13 +1151,13 @@ def schedule_observation_subtask(observation_subtask: Subtask):
sb_nr_offset = 0 # subband numbers run from 0 to (nr_subbands-1), increasing across SAPs
for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings']):
for sb_nr, _ in enumerate(pointing['subbands'], start=sb_nr_offset):
for sb_nr, subband in enumerate(pointing['subbands'], start=sb_nr_offset):
dataproducts.append(Dataproduct(filename="L%d_SAP%03d_SB%03d_uv.MS" % (observation_subtask.id, sap_nr, sb_nr),
directory=directory+"/uv",
dataformat=Dataformat.objects.get(value="MeasurementSet"),
datatype=Datatype.objects.get(value="visibilities"),
producer=subtask_output,
specifications_doc={"sap": pointing["name"], "identifiers": {"sap_index": sap_nr, "subband_index": sb_nr} },
specifications_doc={"sap": pointing["name"], "subband": subband},
specifications_template=dataproduct_specifications_template_visibilities,
feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema),
feedback_template=dataproduct_feedback_template,
......@@ -1194,7 +1194,7 @@ def schedule_observation_subtask(observation_subtask: Subtask):
dataformat=Dataformat.objects.get(value="Beamformed"),
datatype=Datatype.objects.get(value="time series"),
producer=subtask_output,
specifications_doc={"sap": specifications_doc['stations']['digital_pointings'][sap_nr]["name"], "identifiers": {"sap_index": sap_nr, "pipeline_index": pipeline_nr, "tab_index": tab_nr, "stokes_index": stokes_nr, "part_index": part_nr, "coherent": coherent}},
specifications_doc={"sap": specifications_doc['stations']['digital_pointings'][sap_nr]["name"], "coherent": coherent, "identifiers": {"pipeline_index": pipeline_nr, "tab_index": tab_nr, "stokes_index": stokes_nr, "part_index": part_nr}},
specifications_template=dataproduct_specifications_template_timeseries,
feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema),
feedback_template=dataproduct_feedback_template,
......@@ -1304,9 +1304,6 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask):
else:
filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename)
if "identifiers" not in input_dp.specifications_doc:
raise SubtaskSchedulingSpecificationException("Input dataproduct must have identifiers, specification_doc is %s, template is %s" % (input_dp.specifications_doc, input_dp.specifications_template))
output_dp = Dataproduct(filename=filename,
directory=input_dp.directory.replace(str(pipeline_subtask_input.producer.subtask.pk), str(pipeline_subtask.pk)),
dataformat=dataformat,
......
......@@ -260,7 +260,7 @@ class SchedulingTest(unittest.TestCase):
obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/')
obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/')
test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS"%obs_subtask['id'],
specifications_doc={"identifiers": { "sap_index": 0, "subband_index": 0 }},
specifications_doc={"sap": "target0", "subband": 0 },
subtask_output_url=obs_subtask_output_url), '/dataproduct/')
# now create the pipeline...
......@@ -305,7 +305,7 @@ class SchedulingTest(unittest.TestCase):
obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/')
obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/')
test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS"%obs_subtask['id'],
specifications_doc={"identifiers": { "sap_index": 0, "subband_index": 0 }},
specifications_doc={"sap": "target0", "subband": 0},
subtask_output_url=obs_subtask_output_url), '/dataproduct/')
# now create the ingest...
......@@ -447,12 +447,12 @@ class SubtaskInputOutputTest(unittest.TestCase):
pipe_in2 = models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=pipe_st, producer=obs_out2, selection_doc={'sap': ['target1']}))
# create obs output dataproducts with specs we can filter on
dp1_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': ['target0'], 'identifiers': {'sap_index': 0, 'subband_index': 0}}))
dp1_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': ['target1'], 'identifiers': {'sap_index': 1, 'subband_index': 0}}))
dp1_3 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': ['target0'], 'identifiers': {'sap_index': 0, 'subband_index': 1}}))
dp1_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': 'target0', 'subband': 0}))
dp1_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': 'target1', 'subband': 0}))
dp1_3 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': 'target0', 'subband': 1}))
dp2_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': ['target0'], 'identifiers': {'sap_index': 0, 'subband_index': 0}}))
dp2_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': ['target1'], 'identifiers': {'sap_index': 1, 'subband_index': 0}}))
dp2_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': 'target0', 'subband': 0}))
dp2_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': 'target1', 'subband': 0}))
# trigger:
# schedule pipeline, which should attach the correct subset of dataproducts to the pipeline inputs
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment