From a11ec00d8c605a00ff749ea6dc361a582f3d6b92 Mon Sep 17 00:00:00 2001 From: Jan David Mol <mol@astron.nl> Date: Thu, 4 Mar 2021 15:49:05 +0100 Subject: [PATCH] TMSS-604: Fix typos, errors, and selection filter --- .../src/tmss/tmssapp/adapters/parset.py | 10 ++++--- ..._specifications_template-timeseries-1.json | 7 ++++- ...pecifications_template-visibilities-1.json | 13 ++++++-- SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py | 16 ++++++---- SAS/TMSS/backend/test/t_scheduling.py | 30 ++++++++++++++----- 5 files changed, 56 insertions(+), 20 deletions(-) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py index c13d7aac673..5d2b985cd9f 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py @@ -181,7 +181,8 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d and dp.specifications_doc["identifiers"]["pipeline_index"] == pipeline_idx and dp.specifications_doc["identifiers"]["tab_index"] == field_idx and dp.specifications_doc["identifiers"]["stokes_index"] == s - and dp.specifications_doc["identifiers"]["part_index"] == p] + and dp.specifications_doc["identifiers"]["part_index"] == p + and dp.specifications_doc["identifiers"]["coherent"] == tab['coherent']] if tab['coherent']: coherent_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct) else: @@ -229,7 +230,8 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d and dp.specifications_doc["identifiers"]["pipeline_index"] == pipeline_idx and dp.specifications_doc["identifiers"]["tab_index"] == field_idx and dp.specifications_doc["identifiers"]["stokes_index"] == s - and dp.specifications_doc["identifiers"]["part_index"] == p] + and dp.specifications_doc["identifiers"]["part_index"] == p + and dp.specifications_doc["identifiers"]["coherent"] == True] coherent_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct) if cobalt_version >= 2: @@ -534,13 +536,13 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id)) unsorted_out_dataproducts = sum([list(models.Dataproduct.objects.filter(producer_id=subtask_output.id)) for subtask_output in subtask_outputs],[]) - def find_out_dataproduct(dataproducts: list, identifiers: dict): + def find_dataproduct(dataproducts: list, identifiers: dict): hits = [dp for dp in dataproducts if dp.specifications_doc['identifiers']['sap_index'] == identifiers['sap_index'] and dp.specifications_doc['identifiers']['subband_index'] == identifiers['subband_index']] return hits[0] if hits else null_dataproduct # list output dataproducts in the same order as input dataproducts, matched by the identifiers - out_dataproducts = [find_dataproduct(unsorted_out_dataproducts, in_dp.specification_doc['identifiers'] for in_dp in in_dataproducts] + out_dataproducts = [find_dataproduct(unsorted_out_dataproducts, in_dp.specifications_doc['identifiers']) for in_dp in in_dataproducts] parset["Observation.DataProducts.Output_Correlated.enabled"] = "true" parset["Observation.DataProducts.Output_Correlated.filenames"] = "[%s]" % ",".join([dp.filename for dp in out_dataproducts]) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-timeseries-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-timeseries-1.json index a78a0351bc4..d11ec11cc08 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-timeseries-1.json +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-timeseries-1.json @@ -5,9 +5,14 @@ "type": "object", "default": {}, "properties": { + "sap": { + "type": "string", + "title": "SAP", + "default": "" + }, "identifiers": { "title": "Identifiers", - "description": "Identification of this dataproduct within the subtask.", + "description": "Identification of this dataproduct within the producing subtask.", "type": "object", "default": {}, "properties": { diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-visibilities-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-visibilities-1.json index f92d3a4f9ac..cacd84f4ff4 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-visibilities-1.json +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/dataproduct_specifications_template-visibilities-1.json @@ -5,21 +5,28 @@ "type": "object", "default": {}, "properties": { + "sap": { + "type": "string", + "title": "SAP", + "default": "" + }, "identifiers": { "title": "identifiers", - "description": "identification of this dataproduct within the subtask.", - "type": "object", + "description": "identification of this dataproduct within the producing subtask.", "default": {}, + "type": "object", "properties": { - "SAP_index": { + "sap_index": { "title": "SAP index", "type": "integer", + "default": 0, "minimum": 0 }, "subband_index": { "title": "Subband index", "description": "Subband offset within the subtask (SAPs do not reset numbering)", "type": "integer", + "default": 0, "minimum": 0 } }, diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 07aa02e2b07..db086aba7ad 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -1157,7 +1157,7 @@ def schedule_observation_subtask(observation_subtask: Subtask): dataformat=Dataformat.objects.get(value="MeasurementSet"), datatype=Datatype.objects.get(value="visibilities"), producer=subtask_output, - specifications_doc={"identifiers": {"sap_index": sap_nr, "subband_index": sb_nr} }, + specifications_doc={"sap": pointing["name"], "identifiers": {"sap_index": sap_nr, "subband_index": sb_nr} }, specifications_template=dataproduct_specifications_template_visibilities, feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), feedback_template=dataproduct_feedback_template, @@ -1185,7 +1185,7 @@ def schedule_observation_subtask(observation_subtask: Subtask): return sap_indices[0] def add_tab_dataproducts(sap_nr, pipeline_nr, tab_nr, stokes_settings, coherent): - nr_subbands = len(sap['subbands']) or len(specifications_doc['stations']['digital_pointings'][sap_idx]['subbands']) + nr_subbands = len(sap['subbands']) or len(specifications_doc['stations']['digital_pointings'][sap_nr]['subbands']) nr_stokes = len(stokes_settings['stokes']) nr_parts = ceil(1.0 * nr_subbands / stokes_settings['subbands_per_file']) @@ -1194,12 +1194,12 @@ def schedule_observation_subtask(observation_subtask: Subtask): dataformat=Dataformat.objects.get(value="Beamformed"), datatype=Datatype.objects.get(value="time series"), producer=subtask_output, - specifications_doc={"identifiers": {"sap_index": sap_nr, "pipeline_index": pipeline_nr, "tab_index": tab_nr, "stokes_index": stokes_nr, "part_index": part_nr} }, + specifications_doc={"sap": specifications_doc['stations']['digital_pointings'][sap_nr]["name"], "identifiers": {"sap_index": sap_nr, "pipeline_index": pipeline_nr, "tab_index": tab_nr, "stokes_index": stokes_nr, "part_index": part_nr, "coherent": coherent}}, specifications_template=dataproduct_specifications_template_timeseries, feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema), feedback_template=dataproduct_feedback_template, size=0, - expected_size=1024*1024*1024*sb_nr, + expected_size=1024*1024*1024*tab_nr, sap=saps[sap_nr], global_identifier=None) for part_nr in range(nr_parts) for stokes_nr in range(nr_stokes)]) @@ -1304,6 +1304,9 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask): else: filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename) + if "identifiers" not in input_dp.specifications_doc: + raise SubtaskSchedulingSpecificationException("Input dataproduct must have identifiers, specification_doc is %s, template is %s" % (input_dp.specifications_doc, input_dp.specifications_template)) + output_dp = Dataproduct(filename=filename, directory=input_dp.directory.replace(str(pipeline_subtask_input.producer.subtask.pk), str(pipeline_subtask.pk)), dataformat=dataformat, @@ -1569,10 +1572,13 @@ def specifications_doc_meets_selection_doc(specifications_doc, selection_doc): meets_criteria = False else: spec = specifications_doc[k] - if isinstance(spec, Iterable) and isinstance(v, Iterable): + if isinstance(spec, list) and isinstance(v, list): for spec_v in spec: if spec_v not in v: meets_criteria = False + elif isinstance(v, list): + if spec not in v: + meets_criteria = False else: if spec != v: meets_criteria = False diff --git a/SAS/TMSS/backend/test/t_scheduling.py b/SAS/TMSS/backend/test/t_scheduling.py index 6dda9cf61de..5e1f624500b 100755 --- a/SAS/TMSS/backend/test/t_scheduling.py +++ b/SAS/TMSS/backend/test/t_scheduling.py @@ -260,6 +260,7 @@ class SchedulingTest(unittest.TestCase): obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/') obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/') test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS"%obs_subtask['id'], + specifications_doc={"identifiers": { "sap_index": 0, "subband_index": 0 }}, subtask_output_url=obs_subtask_output_url), '/dataproduct/') # now create the pipeline... @@ -304,6 +305,7 @@ class SchedulingTest(unittest.TestCase): obs_subtask = test_data_creator.post_data_and_get_response_as_json_object(obs_subtask_data, '/subtask/') obs_subtask_output_url = test_data_creator.post_data_and_get_url(test_data_creator.SubtaskOutput(subtask_url=obs_subtask['url']), '/subtask_output/') test_data_creator.post_data_and_get_url(test_data_creator.Dataproduct(filename="L%s_SB000.MS"%obs_subtask['id'], + specifications_doc={"identifiers": { "sap_index": 0, "subband_index": 0 }}, subtask_output_url=obs_subtask_output_url), '/dataproduct/') # now create the ingest... @@ -416,6 +418,20 @@ class SubtaskInputOutputTest(unittest.TestCase): setting.value = True setting.save() + + def test_specifications_doc_meets_selection_doc(self): + # specification is a list? specification must be a subset of the selection + self.assertTrue(specifications_doc_meets_selection_doc({'sap': ['target0']}, {'sap': ['target0']})) + self.assertFalse(specifications_doc_meets_selection_doc({'sap': ['target0','target1','target2']}, {'sap': ['target0','target1']})) + + # specification is a value? it must appear in the selection + self.assertTrue(specifications_doc_meets_selection_doc({'sap': 'target0'}, {'sap': ['target0']})) + self.assertTrue(specifications_doc_meets_selection_doc({'sap': 'target0'}, {'sap': ['target0','target1']})) + self.assertTrue(specifications_doc_meets_selection_doc({'sap': 'target0'}, {'sap': 'target0'})) + + # specification must contain the selection key + self.assertFalse(specifications_doc_meets_selection_doc({'something else': 'target0'}, {'sap': 'target0'})) + @mock.patch("lofar.sas.tmss.tmss.tmssapp.subtasks.assign_or_unassign_resources") def test_schedule_pipeline_subtask_filters_predecessor_output_dataproducts_for_input(self, assign_resources_mock): # setup: @@ -431,12 +447,12 @@ class SubtaskInputOutputTest(unittest.TestCase): pipe_in2 = models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=pipe_st, producer=obs_out2, selection_doc={'sap': ['target1']})) # create obs output dataproducts with specs we can filter on - dp1_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': ['target0']})) - dp1_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': ['target1']})) - dp1_3 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': ['target0']})) + dp1_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': ['target0'], 'identifiers': {'sap_index': 0, 'subband_index': 0}})) + dp1_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': ['target1'], 'identifiers': {'sap_index': 1, 'subband_index': 0}})) + dp1_3 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out1, specifications_doc={'sap': ['target0'], 'identifiers': {'sap_index': 0, 'subband_index': 1}})) - dp2_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': ['target0']})) - dp2_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': ['target1']})) + dp2_1 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': ['target0'], 'identifiers': {'sap_index': 0, 'subband_index': 0}})) + dp2_2 = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out2, specifications_doc={'sap': ['target1'], 'identifiers': {'sap_index': 1, 'subband_index': 0}})) # trigger: # schedule pipeline, which should attach the correct subset of dataproducts to the pipeline inputs @@ -505,8 +521,8 @@ class SAPTest(unittest.TestCase): pipe_in = models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=pipe_st, producer=obs_out)) # create obs output dataproducts - dp1_in = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out)) - dp2_in = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out)) + dp1_in = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out, specifications_doc={"identifiers": { "sap_index": 0, "subband_index": 0 }})) + dp2_in = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_out, specifications_doc={"identifiers": { "sap_index": 0, "subband_index": 1 }})) # schedule pipeline, which should copy the SAP schedule_pipeline_subtask(pipe_st) -- GitLab