From bd63e5def16b834cbdd82fe499ed5890a2abb7ba Mon Sep 17 00:00:00 2001 From: Jan David Mol <mol@astron.nl> Date: Thu, 15 Apr 2021 00:57:42 +0200 Subject: [PATCH] TMSS-64: Parset keys for dataproduct lists are now generated by a function, removing a lot of code duplication. --- .../src/tmss/tmssapp/adapters/parset.py | 79 ++++++++----------- 1 file changed, 35 insertions(+), 44 deletions(-) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py index 96bcf6a0df2..09c14f6b785 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py @@ -60,6 +60,17 @@ def _stokes_settings_parset_subkeys(stokes_spec: dict) -> dict: return parset +def _dataproduct_parset_subkeys(subtask: models.Subtask, dataproducts: list) -> dict: + """ Return a subset of parset keys and values to list dataproducts. """ + + parset = {} + parset["enabled"] = len(dataproducts) > 0 + parset["filenames"] = [dp.filename for dp in dataproducts] + parset["skip"] = [0] * len(dataproducts) + parset["locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in dataproducts] + + return parset + def _sap_index(saps: dict, sap_name: str) -> int: """ Return the SAP index in the observation given a certain SAP name. """ @@ -80,17 +91,14 @@ def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: d digi_beams = spec['stations']['digital_pointings'] parset = {} - parset["Observation.DataProducts.Output_Correlated.enabled"] = correlator_enabled - parset["Observation.DataProducts.Output_Correlated.filenames"] = [] - parset["Observation.DataProducts.Output_Correlated.locations"] = [] - parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = subtask.cluster.name # TODO: This must be the storage cluster, not the processing cluster - parset["Observation.DataProducts.Output_Correlated.storageClusterPartition"] = "/data/test-projects" # ResourceEstimator always wants these keys parset["Cobalt.Correlator.nrChannelsPerSubband"] = spec['COBALT']['correlator']['channels_per_subband'] if correlator_enabled else 16 parset["Cobalt.Correlator.nrBlocksPerIntegration"] = spec['COBALT']['correlator']['blocks_per_integration'] if correlator_enabled else 1 parset["Cobalt.Correlator.nrIntegrationsPerBlock"] = spec['COBALT']['correlator']['integrations_per_block'] if correlator_enabled else 1 + correlator_dataproducts = [] + if correlator_enabled: if cobalt_version >= 2 and 'phase_centers' in spec['COBALT']['correlator']: for beam_nr, digi_beam in enumerate(digi_beams): @@ -110,7 +118,6 @@ def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: d dataproducts = list(subtask.output_dataproducts.filter(dataformat__value=Dataformat.Choices.MEASUREMENTSET.value).filter(datatype__value=Datatype.Choices.VISIBILITIES.value).order_by('filename')) # marshall dataproducts, but only if they're supplied. in some use cases, we want a parset before the subtask is scheduled. - correlator_dataproducts = [] for digi_beam in digi_beams: for subband in digi_beam["subbands"]: dataproduct = [dp for dp in dataproducts @@ -119,10 +126,12 @@ def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: d correlator_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct) - parset["Observation.DataProducts.Output_Correlated.filenames"] = [dp.filename for dp in correlator_dataproducts] - parset["Observation.DataProducts.Output_Correlated.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in correlator_dataproducts] - # mimic MoM placeholder thingy (the resource estimator parses this) - parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))] + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, correlator_dataproducts), "Observation.DataProducts.Output_Correlated.")) + parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = subtask.cluster.name # TODO: This must be the storage cluster, not the processing cluster + parset["Observation.DataProducts.Output_Correlated.storageClusterPartition"] = "/data/test-projects" + + # mimic MoM placeholder thingy (the resource estimator parses this) + parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))] return parset @@ -266,15 +275,11 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d # 2) TAB # 3) Stokes # 4) Part - parset["Observation.DataProducts.Output_CoherentStokes.enabled"] = len(coherent_dataproducts) > 0 - parset["Observation.DataProducts.Output_CoherentStokes.filenames"] = [dp.filename for dp in coherent_dataproducts] - parset["Observation.DataProducts.Output_CoherentStokes.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in coherent_dataproducts] + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, coherent_dataproducts), "Observation.DataProducts.Output_CoherentStokes.")) parset["Observation.DataProducts.Output_CoherentStokes.storageClusterName"] = subtask.cluster.name # TODO: This must be the storage cluster, not the processing cluster parset["Observation.DataProducts.Output_CoherentStokes.storageClusterPartition"] = "/data/test-projects" - parset["Observation.DataProducts.Output_IncoherentStokes.enabled"] = len(incoherent_dataproducts) > 0 - parset["Observation.DataProducts.Output_IncoherentStokes.filenames"] = [dp.filename for dp in incoherent_dataproducts] - parset["Observation.DataProducts.Output_IncoherentStokes.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in incoherent_dataproducts] + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, incoherent_dataproducts), "Observation.DataProducts.Output_IncoherentStokes.")) parset["Observation.DataProducts.Output_IncoherentStokes.storageClusterName"] = subtask.cluster.name # TODO: This must be the storage cluster, not the processing cluster parset["Observation.DataProducts.Output_IncoherentStokes.storageClusterPartition"] = "/data/test-projects" @@ -544,22 +549,17 @@ def _convert_to_parset_dict_for_preprocessing_pipeline_schema(subtask: models.Su parset["Observation.ObservationControl.PythonControl.DPPP.msout.storagemanager.name"] = spec["storagemanager"] # Dataproducts - parset["Observation.DataProducts.Input_Correlated.enabled"] = "true" + subtask_inputs = list(subtask.inputs.all()) + in_dataproducts = sum([list(subtask_input.dataproducts.all()) for subtask_input in subtask_inputs],[]) - in_dataproducts = [] - for input_nr, subtask_input in enumerate(subtask.inputs.all()): - in_dataproducts = subtask_input.dataproducts.all() - parset["Observation.DataProducts.Input_Correlated.filenames"] = [dp.filename for dp in in_dataproducts] - parset["Observation.DataProducts.Input_Correlated.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in in_dataproducts] - # mimic MoM placeholder thingy (the resource assigner parses this) - # should be expanded with SAPS and datatypes - parset["Observation.DataProducts.Input_Correlated.identifications"] = "[TMSS_subtask_%s.SAP%03d]" % (subtask_input.producer.subtask.id, input_nr) + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, in_dataproducts), "Observation.DataProducts.Input_Correlated.")) - parset["Observation.DataProducts.Input_Correlated.skip"] = ['0']*len(in_dataproducts) + # mimic MoM placeholder thingy (the resource assigner parses this) + # should be expanded with SAPS and datatypes + parset["Observation.DataProducts.Input_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask_input.producer.subtask.id, input_nr) for input_nr, subtask_input in enumerate(subtask_inputs)] - # TODO: do not use SubtaskOutput.objects.filter but make subtask.subtask_outputs work - subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id)) - unsorted_out_dataproducts = sum([list(models.Dataproduct.objects.filter(producer_id=subtask_output.id)) for subtask_output in subtask_outputs],[]) + subtask_outputs = list(subtask.outputs.all()) + unsorted_out_dataproducts = sum([list(subtask_output.dataproducts.all()) for subtask_output in subtask_outputs],[]) def find_dataproduct(dataproducts: list, specification_doc: dict): hits = [dp for dp in dataproducts if dp.specifications_doc['sap'] == specification_doc['sap'] @@ -569,11 +569,8 @@ def _convert_to_parset_dict_for_preprocessing_pipeline_schema(subtask: models.Su # list output dataproducts in the same order as input dataproducts, matched by the identifiers out_dataproducts = [find_dataproduct(unsorted_out_dataproducts, in_dp.specifications_doc) for in_dp in in_dataproducts] - parset["Observation.DataProducts.Output_Correlated.enabled"] = "true" - parset["Observation.DataProducts.Output_Correlated.filenames"] = [dp.filename for dp in out_dataproducts] - parset["Observation.DataProducts.Output_Correlated.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in out_dataproducts] - parset["Observation.DataProducts.Output_Correlated.skip"] = ['0']*len(out_dataproducts) - parset["Observation.DataProducts.Output_Correlated.identifications"] = "[TMSS_subtask_%s.SAP%03d]" % (subtask.id, 0) + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, out_dataproducts), "Observation.DataProducts.Output_Correlated.")) + parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, 0)] parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = subtask.cluster.name # Other @@ -654,23 +651,17 @@ def _convert_to_parset_dict_for_pulsarpipeline_schema(subtask: models.Subtask) - coherent_in_dataproducts = [dp for dp in in_dataproducts if dp.specifications_doc["coherent"]] incoherent_in_dataproducts = [dp for dp in in_dataproducts if not dp.specifications_doc["coherent"]] - parset["Observation.DataProducts.Input_CoherentStokes.enabled"] = len(coherent_in_dataproducts) > 0 - parset["Observation.DataProducts.Input_CoherentStokes.filenames"] = [dp.filename for dp in coherent_in_dataproducts] - parset["Observation.DataProducts.Input_CoherentStokes.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in coherent_in_dataproducts] + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, coherent_in_dataproducts), "Observation.DataProducts.Input_CoherentStokes.")) parset["Observation.DataProducts.Input_CoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (input.producer.subtask.id, 0) for input in inputs] # needed by ResourceEstimator - parset["Observation.DataProducts.Input_IncoherentStokes.enabled"] = len(incoherent_in_dataproducts) > 0 - parset["Observation.DataProducts.Input_IncoherentStokes.filenames"] = [dp.filename for dp in incoherent_in_dataproducts] - parset["Observation.DataProducts.Input_IncoherentStokes.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in incoherent_in_dataproducts] + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, incoherent_in_dataproducts), "Observation.DataProducts.Input_IncoherentStokes.")) parset["Observation.DataProducts.Input_IncoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (input.producer.subtask.id, 0) for input in inputs] # needed by ResourceEstimator # TODO: do not use SubtaskOutput.objects.filter but make subtask.subtask_outputs work subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id)) - out_dataproducts = sum([list(models.Dataproduct.objects.filter(producer_id=subtask_output.id)) for subtask_output in subtask_outputs], []) + out_dataproducts = sum([list(models.Dataproduct.objects.filter(producer_id=subtask_output.id)) for subtask_output in subtask_outputs], []) # todo, order these correctly? - parset["Observation.DataProducts.Output_Pulsar.enabled"] = True - parset["Observation.DataProducts.Output_Pulsar.filenames"] = [dp.filename for dp in out_dataproducts] # todo: order this correctly. - parset["Observation.DataProducts.Output_Pulsar.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in out_dataproducts] + parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, out_dataproducts), "Observation.DataProducts.Output_Pulsar.")) parset["Observation.DataProducts.Output_Pulsar.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, 0)] parset["Observation.DataProducts.Output_Pulsar.storageClusterName"] = subtask.cluster.name -- GitLab