Skip to content
Snippets Groups Projects
Commit bd63e5de authored by Jan David Mol's avatar Jan David Mol
Browse files

TMSS-64: Parset keys for dataproduct lists are now generated by a function,...

TMSS-64: Parset keys for dataproduct lists are now generated by a function, removing a lot of code duplication.
parent e536454b
No related branches found
No related tags found
2 merge requests!422Resolve SW-961 "Fix pipelinecontrol",!400Resolve TMSS-64 "Pulsar pipeline"
......@@ -60,6 +60,17 @@ def _stokes_settings_parset_subkeys(stokes_spec: dict) -> dict:
return parset
def _dataproduct_parset_subkeys(subtask: models.Subtask, dataproducts: list) -> dict:
""" Return a subset of parset keys and values to list dataproducts. """
parset = {}
parset["enabled"] = len(dataproducts) > 0
parset["filenames"] = [dp.filename for dp in dataproducts]
parset["skip"] = [0] * len(dataproducts)
parset["locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in dataproducts]
return parset
def _sap_index(saps: dict, sap_name: str) -> int:
""" Return the SAP index in the observation given a certain SAP name. """
......@@ -80,17 +91,14 @@ def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: d
digi_beams = spec['stations']['digital_pointings']
parset = {}
parset["Observation.DataProducts.Output_Correlated.enabled"] = correlator_enabled
parset["Observation.DataProducts.Output_Correlated.filenames"] = []
parset["Observation.DataProducts.Output_Correlated.locations"] = []
parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = subtask.cluster.name # TODO: This must be the storage cluster, not the processing cluster
parset["Observation.DataProducts.Output_Correlated.storageClusterPartition"] = "/data/test-projects"
# ResourceEstimator always wants these keys
parset["Cobalt.Correlator.nrChannelsPerSubband"] = spec['COBALT']['correlator']['channels_per_subband'] if correlator_enabled else 16
parset["Cobalt.Correlator.nrBlocksPerIntegration"] = spec['COBALT']['correlator']['blocks_per_integration'] if correlator_enabled else 1
parset["Cobalt.Correlator.nrIntegrationsPerBlock"] = spec['COBALT']['correlator']['integrations_per_block'] if correlator_enabled else 1
correlator_dataproducts = []
if correlator_enabled:
if cobalt_version >= 2 and 'phase_centers' in spec['COBALT']['correlator']:
for beam_nr, digi_beam in enumerate(digi_beams):
......@@ -110,7 +118,6 @@ def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: d
dataproducts = list(subtask.output_dataproducts.filter(dataformat__value=Dataformat.Choices.MEASUREMENTSET.value).filter(datatype__value=Datatype.Choices.VISIBILITIES.value).order_by('filename'))
# marshall dataproducts, but only if they're supplied. in some use cases, we want a parset before the subtask is scheduled.
correlator_dataproducts = []
for digi_beam in digi_beams:
for subband in digi_beam["subbands"]:
dataproduct = [dp for dp in dataproducts
......@@ -119,10 +126,12 @@ def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: d
correlator_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct)
parset["Observation.DataProducts.Output_Correlated.filenames"] = [dp.filename for dp in correlator_dataproducts]
parset["Observation.DataProducts.Output_Correlated.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in correlator_dataproducts]
# mimic MoM placeholder thingy (the resource estimator parses this)
parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))]
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, correlator_dataproducts), "Observation.DataProducts.Output_Correlated."))
parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = subtask.cluster.name # TODO: This must be the storage cluster, not the processing cluster
parset["Observation.DataProducts.Output_Correlated.storageClusterPartition"] = "/data/test-projects"
# mimic MoM placeholder thingy (the resource estimator parses this)
parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))]
return parset
......@@ -266,15 +275,11 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d
# 2) TAB
# 3) Stokes
# 4) Part
parset["Observation.DataProducts.Output_CoherentStokes.enabled"] = len(coherent_dataproducts) > 0
parset["Observation.DataProducts.Output_CoherentStokes.filenames"] = [dp.filename for dp in coherent_dataproducts]
parset["Observation.DataProducts.Output_CoherentStokes.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in coherent_dataproducts]
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, coherent_dataproducts), "Observation.DataProducts.Output_CoherentStokes."))
parset["Observation.DataProducts.Output_CoherentStokes.storageClusterName"] = subtask.cluster.name # TODO: This must be the storage cluster, not the processing cluster
parset["Observation.DataProducts.Output_CoherentStokes.storageClusterPartition"] = "/data/test-projects"
parset["Observation.DataProducts.Output_IncoherentStokes.enabled"] = len(incoherent_dataproducts) > 0
parset["Observation.DataProducts.Output_IncoherentStokes.filenames"] = [dp.filename for dp in incoherent_dataproducts]
parset["Observation.DataProducts.Output_IncoherentStokes.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in incoherent_dataproducts]
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, incoherent_dataproducts), "Observation.DataProducts.Output_IncoherentStokes."))
parset["Observation.DataProducts.Output_IncoherentStokes.storageClusterName"] = subtask.cluster.name # TODO: This must be the storage cluster, not the processing cluster
parset["Observation.DataProducts.Output_IncoherentStokes.storageClusterPartition"] = "/data/test-projects"
......@@ -544,22 +549,17 @@ def _convert_to_parset_dict_for_preprocessing_pipeline_schema(subtask: models.Su
parset["Observation.ObservationControl.PythonControl.DPPP.msout.storagemanager.name"] = spec["storagemanager"]
# Dataproducts
parset["Observation.DataProducts.Input_Correlated.enabled"] = "true"
subtask_inputs = list(subtask.inputs.all())
in_dataproducts = sum([list(subtask_input.dataproducts.all()) for subtask_input in subtask_inputs],[])
in_dataproducts = []
for input_nr, subtask_input in enumerate(subtask.inputs.all()):
in_dataproducts = subtask_input.dataproducts.all()
parset["Observation.DataProducts.Input_Correlated.filenames"] = [dp.filename for dp in in_dataproducts]
parset["Observation.DataProducts.Input_Correlated.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in in_dataproducts]
# mimic MoM placeholder thingy (the resource assigner parses this)
# should be expanded with SAPS and datatypes
parset["Observation.DataProducts.Input_Correlated.identifications"] = "[TMSS_subtask_%s.SAP%03d]" % (subtask_input.producer.subtask.id, input_nr)
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, in_dataproducts), "Observation.DataProducts.Input_Correlated."))
parset["Observation.DataProducts.Input_Correlated.skip"] = ['0']*len(in_dataproducts)
# mimic MoM placeholder thingy (the resource assigner parses this)
# should be expanded with SAPS and datatypes
parset["Observation.DataProducts.Input_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask_input.producer.subtask.id, input_nr) for input_nr, subtask_input in enumerate(subtask_inputs)]
# TODO: do not use SubtaskOutput.objects.filter but make subtask.subtask_outputs work
subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id))
unsorted_out_dataproducts = sum([list(models.Dataproduct.objects.filter(producer_id=subtask_output.id)) for subtask_output in subtask_outputs],[])
subtask_outputs = list(subtask.outputs.all())
unsorted_out_dataproducts = sum([list(subtask_output.dataproducts.all()) for subtask_output in subtask_outputs],[])
def find_dataproduct(dataproducts: list, specification_doc: dict):
hits = [dp for dp in dataproducts if dp.specifications_doc['sap'] == specification_doc['sap']
......@@ -569,11 +569,8 @@ def _convert_to_parset_dict_for_preprocessing_pipeline_schema(subtask: models.Su
# list output dataproducts in the same order as input dataproducts, matched by the identifiers
out_dataproducts = [find_dataproduct(unsorted_out_dataproducts, in_dp.specifications_doc) for in_dp in in_dataproducts]
parset["Observation.DataProducts.Output_Correlated.enabled"] = "true"
parset["Observation.DataProducts.Output_Correlated.filenames"] = [dp.filename for dp in out_dataproducts]
parset["Observation.DataProducts.Output_Correlated.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in out_dataproducts]
parset["Observation.DataProducts.Output_Correlated.skip"] = ['0']*len(out_dataproducts)
parset["Observation.DataProducts.Output_Correlated.identifications"] = "[TMSS_subtask_%s.SAP%03d]" % (subtask.id, 0)
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, out_dataproducts), "Observation.DataProducts.Output_Correlated."))
parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, 0)]
parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = subtask.cluster.name
# Other
......@@ -654,23 +651,17 @@ def _convert_to_parset_dict_for_pulsarpipeline_schema(subtask: models.Subtask) -
coherent_in_dataproducts = [dp for dp in in_dataproducts if dp.specifications_doc["coherent"]]
incoherent_in_dataproducts = [dp for dp in in_dataproducts if not dp.specifications_doc["coherent"]]
parset["Observation.DataProducts.Input_CoherentStokes.enabled"] = len(coherent_in_dataproducts) > 0
parset["Observation.DataProducts.Input_CoherentStokes.filenames"] = [dp.filename for dp in coherent_in_dataproducts]
parset["Observation.DataProducts.Input_CoherentStokes.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in coherent_in_dataproducts]
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, coherent_in_dataproducts), "Observation.DataProducts.Input_CoherentStokes."))
parset["Observation.DataProducts.Input_CoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (input.producer.subtask.id, 0) for input in inputs] # needed by ResourceEstimator
parset["Observation.DataProducts.Input_IncoherentStokes.enabled"] = len(incoherent_in_dataproducts) > 0
parset["Observation.DataProducts.Input_IncoherentStokes.filenames"] = [dp.filename for dp in incoherent_in_dataproducts]
parset["Observation.DataProducts.Input_IncoherentStokes.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in incoherent_in_dataproducts]
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, incoherent_in_dataproducts), "Observation.DataProducts.Input_IncoherentStokes."))
parset["Observation.DataProducts.Input_IncoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (input.producer.subtask.id, 0) for input in inputs] # needed by ResourceEstimator
# TODO: do not use SubtaskOutput.objects.filter but make subtask.subtask_outputs work
subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id))
out_dataproducts = sum([list(models.Dataproduct.objects.filter(producer_id=subtask_output.id)) for subtask_output in subtask_outputs], [])
out_dataproducts = sum([list(models.Dataproduct.objects.filter(producer_id=subtask_output.id)) for subtask_output in subtask_outputs], []) # todo, order these correctly?
parset["Observation.DataProducts.Output_Pulsar.enabled"] = True
parset["Observation.DataProducts.Output_Pulsar.filenames"] = [dp.filename for dp in out_dataproducts] # todo: order this correctly.
parset["Observation.DataProducts.Output_Pulsar.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in out_dataproducts]
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, out_dataproducts), "Observation.DataProducts.Output_Pulsar."))
parset["Observation.DataProducts.Output_Pulsar.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, 0)]
parset["Observation.DataProducts.Output_Pulsar.storageClusterName"] = subtask.cluster.name
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment