diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py index 5d2b985cd9ffe09fd500233e761829d12103ef62..6dc6a3d439f8e37069bf8842bf0aaada67695a6d 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py @@ -113,11 +113,18 @@ def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: d dataproducts = list(models.Dataproduct.objects.filter(producer_id__in=subtask_output_ids).filter(dataformat=Dataformat.Choices.MEASUREMENTSET.value).filter(datatype=Datatype.Choices.VISIBILITIES)) - # put them in the order as expected in the parset - dataproducts.sort(key=lambda x: x.specification_doc["subband_index"]) - - parset["Observation.DataProducts.Output_Correlated.filenames"] = [dp.filename for dp in dataproducts] - parset["Observation.DataProducts.Output_Correlated.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in dataproducts] + # marshall dataproducts, but only if they're supplied. in some use cases, we want a parset before the subtask is scheduled. + correlator_dataproducts = [] + for digi_beam in digi_beams: + for subband in digi_beam["subbands"]: + dataproduct = [dp for dp in dataproducts + if dp.specifications_doc["sap"] == digi_beam['name'] + and dp.specifications_doc["subband"] == subband] + + correlator_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct) + + parset["Observation.DataProducts.Output_Correlated.filenames"] = [dp.filename for dp in correlator_dataproducts] + parset["Observation.DataProducts.Output_Correlated.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in correlator_dataproducts] # mimic MoM placeholder thingy (the resource estimator parses this) parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))] @@ -177,12 +184,12 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d for s in range(nr_stokes): for p in range(nr_parts): dataproduct = [dp for dp in dataproducts - if dp.specifications_doc["identifiers"]["sap_index"] == sap_idx + if dp.specifications_doc["sap"] == sap['name'] and dp.specifications_doc["identifiers"]["pipeline_index"] == pipeline_idx and dp.specifications_doc["identifiers"]["tab_index"] == field_idx and dp.specifications_doc["identifiers"]["stokes_index"] == s and dp.specifications_doc["identifiers"]["part_index"] == p - and dp.specifications_doc["identifiers"]["coherent"] == tab['coherent']] + and dp.specifications_doc["coherent"] == tab['coherent']] if tab['coherent']: coherent_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct) else: @@ -226,12 +233,12 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d for s in range(nr_stokes): for p in range(nr_parts): dataproduct = [dp for dp in dataproducts - if dp.specifications_doc["identifiers"]["sap_index"] == sap_idx + if dp.specifications_doc["sap"] == sap["name"] and dp.specifications_doc["identifiers"]["pipeline_index"] == pipeline_idx and dp.specifications_doc["identifiers"]["tab_index"] == field_idx and dp.specifications_doc["identifiers"]["stokes_index"] == s and dp.specifications_doc["identifiers"]["part_index"] == p - and dp.specifications_doc["identifiers"]["coherent"] == True] + and dp.specifications_doc["coherent"] == True] coherent_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct) if cobalt_version >= 2: @@ -536,13 +543,13 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id)) unsorted_out_dataproducts = sum([list(models.Dataproduct.objects.filter(producer_id=subtask_output.id)) for subtask_output in subtask_outputs],[]) - def find_dataproduct(dataproducts: list, identifiers: dict): - hits = [dp for dp in dataproducts if dp.specifications_doc['identifiers']['sap_index'] == identifiers['sap_index'] - and dp.specifications_doc['identifiers']['subband_index'] == identifiers['subband_index']] + def find_dataproduct(dataproducts: list, specification_doc: dict): + hits = [dp for dp in dataproducts if dp.specifications_doc['sap'] == specification_doc['sap'] + and dp.specifications_doc['subband'] == specification_doc['subband']] return hits[0] if hits else null_dataproduct # list output dataproducts in the same order as input dataproducts, matched by the identifiers - out_dataproducts = [find_dataproduct(unsorted_out_dataproducts, in_dp.specifications_doc['identifiers']) for in_dp in in_dataproducts] + out_dataproducts = [find_dataproduct(unsorted_out_dataproducts, in_dp.specifications_doc) for in_dp in in_dataproducts] parset["Observation.DataProducts.Output_Correlated.enabled"] = "true" parset["Observation.DataProducts.Output_Correlated.filenames"] = "[%s]" % ",".join([dp.filename for dp in out_dataproducts]) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-visibilities-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-visibilities-1.json index cacd84f4ff4eee5e8b660b8717e45a36dd8f25bc..161f96803940afef59c4ceaf35787ad6012f5e66 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-visibilities-1.json +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-visibilities-1.json @@ -10,31 +10,13 @@ "title": "SAP", "default": "" }, - "identifiers": { - "title": "identifiers", - "description": "identification of this dataproduct within the producing subtask.", - "default": {}, - "type": "object", - "properties": { - "sap_index": { - "title": "SAP index", - "type": "integer", - "default": 0, - "minimum": 0 - }, - "subband_index": { - "title": "Subband index", - "description": "Subband offset within the subtask (SAPs do not reset numbering)", - "type": "integer", - "default": 0, - "minimum": 0 - } - }, - "required": [ - "sap_index", - "subband_index" - ] + "subband": { + "type": "integer", + "title": "subband number", + "default": 0, + "minimum": 0, + "maximum": 511 } }, - "required": [ "identifiers" ] + "required": [ "sap", "subband" ] } diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index db086aba7adfff3b6c55aa83d731ebff48dcfaa6..d3809c4537659d8ad90d477f7ba6eb54ca9318bd 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -1151,13 +1151,13 @@ def schedule_observation_subtask(observation_subtask: Subtask): sb_nr_offset = 0 # subband numbers run from 0 to (nr_subbands-1), increasing across SAPs for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings']): - for sb_nr, _ in enumerate(pointing['subbands'], start=sb_nr_offset): + for sb_nr, subband in enumerate(pointing['subbands'], start=sb_nr_offset): dataproducts.append(Dataproduct(filename="L%d_SAP%03d_SB%03d_uv.MS" % (observation_subtask.id, sap_nr, sb_nr), directory=directory+"/uv", dataformat=Dataformat.objects.get(value="MeasurementSet"), datatype=Datatype.objects.get(value="visibilities"), producer=subtask_output, - specifications_doc={"sap": pointing["name"], "identifiers": {"sap_index": sap_nr, "subband_index": sb_nr} }, + specifications_doc={"sap": pointing["name"], "subband": subband}, specifications_template=dataproduct_specifications_template_visibilities, feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), feedback_template=dataproduct_feedback_template, @@ -1194,7 +1194,7 @@ def schedule_observation_subtask(observation_subtask: Subtask): dataformat=Dataformat.objects.get(value="Beamformed"), datatype=Datatype.objects.get(value="time series"), producer=subtask_output, - specifications_doc={"sap": specifications_doc['stations']['digital_pointings'][sap_nr]["name"], "identifiers": {"sap_index": sap_nr, "pipeline_index": pipeline_nr, "tab_index": tab_nr, "stokes_index": stokes_nr, "part_index": part_nr, "coherent": coherent}}, + specifications_doc={"sap": specifications_doc['stations']['digital_pointings'][sap_nr]["name"], "coherent": coherent, "identifiers": {"pipeline_index": pipeline_nr, "tab_index": tab_nr, "stokes_index": stokes_nr, "part_index": part_nr}}, specifications_template=dataproduct_specifications_template_timeseries, feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), feedback_template=dataproduct_feedback_template, @@ -1304,9 +1304,6 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask): else: filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename) - if "identifiers" not in input_dp.specifications_doc: - raise SubtaskSchedulingSpecificationException("Input dataproduct must have identifiers, specification_doc is %s, template is %s" % (input_dp.specifications_doc, input_dp.specifications_template)) - output_dp = Dataproduct(filename=filename, directory=input_dp.directory.replace(str(pipeline_subtask_input.producer.subtask.pk), str(pipeline_subtask.pk)), dataformat=dataformat, diff --git a/SAS/TMSS/backend/test/t_scheduling.py b/SAS/TMSS/backend/test/t_scheduling.py index 5e1f624500b62f3efde4647a24c871cfc24f705a..ef715b44ce336bd8540419240b8757c9e5ddb883 100755 --- a/SAS/TMSS/backend/test/t_scheduling.py +++ b/SAS/TMSS/backend/test/t_scheduling.py @@ -260,7 +260,7 @@ class SchedulingTest(unittest.TestCase): obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/') obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/') test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS"%obs_subtask['id'], - specifications_doc={"identifiers": { "sap_index": 0, "subband_index": 0 }}, + specifications_doc={"sap": "target0", "subband": 0 }, subtask_output_url=obs_subtask_output_url), '/dataproduct/') # now create the pipeline... @@ -305,7 +305,7 @@ class SchedulingTest(unittest.TestCase): obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/') obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/') test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS"%obs_subtask['id'], - specifications_doc={"identifiers": { "sap_index": 0, "subband_index": 0 }}, + specifications_doc={"sap": "target0", "subband": 0}, subtask_output_url=obs_subtask_output_url), '/dataproduct/') # now create the ingest... @@ -447,12 +447,12 @@ class SubtaskInputOutputTest(unittest.TestCase): pipe_in2 = models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=pipe_st, producer=obs_out2, selection_doc={'sap': ['target1']})) # create obs output dataproducts with specs we can filter on - dp1_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': ['target0'], 'identifiers': {'sap_index': 0, 'subband_index': 0}})) - dp1_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': ['target1'], 'identifiers': {'sap_index': 1, 'subband_index': 0}})) - dp1_3 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': ['target0'], 'identifiers': {'sap_index': 0, 'subband_index': 1}})) + dp1_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': 'target0', 'subband': 0})) + dp1_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': 'target1', 'subband': 0})) + dp1_3 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': 'target0', 'subband': 1})) - dp2_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': ['target0'], 'identifiers': {'sap_index': 0, 'subband_index': 0}})) - dp2_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': ['target1'], 'identifiers': {'sap_index': 1, 'subband_index': 0}})) + dp2_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': 'target0', 'subband': 0})) + dp2_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': 'target1', 'subband': 0})) # trigger: # schedule pipeline, which should attach the correct subset of dataproducts to the pipeline inputs