diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py index a4fd63788ffff44af3696a8b2c3e4be9999e4d49..ead648a3dcf9acbad21afdc95a350496db0569de 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py @@ -402,7 +402,8 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) # Or, we can just get rid of this to-parset-adaper when MAC has been rewritten to the new station API. # ----------------------------------------------------------------------------------------------- - parset = dict() # parameterset has no proper assignment operators, so take detour via dict... + + parset = dict() # General parset["prefix"] = "LOFAR." @@ -566,10 +567,96 @@ def _convert_to_parset_dict_for_pipelinecontrol_schema(subtask: models.Subtask) return parset +def _convert_to_parset_dict_for_pulsarpipeline_schema(subtask: models.Subtask) -> dict: + # make sure the spec is complete (including all non-filled in properties with default) + spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, subtask.specifications_template.schema) + + parset = dict() + + # General + parset["prefix"] = "LOFAR." + parset["Observation.ObsID"] = subtask.pk + parset["Observation.momID"] = 0 # Needed by MACScheduler + parset["Observation.otdbID"] = subtask.pk # HACK: the pipeline uses otdbID as the sasID. our tmssID>2000000 to prevent clashes. TODO: replace all otdbID's by sasID. + parset["Observation.tmssID"] = subtask.pk + parset["Observation.processType"] = "Pipeline" + parset["Observation.processSubtype"] = "Pulsar Pipeline" + parset["Observation.ObservationControl.PythonControl.pythonProgram"] = "pulsar_pipeline.py" + parset["Observation.ObservationControl.PythonControl.softwareVersion"] = "lofar-pulp" + parset["Observation.Campaign.name"] = subtask.task_blueprint.scheduling_unit_blueprint.draft.scheduling_set.project.name + parset["Observation.Scheduler.taskName"] = subtask.task_blueprint.name + parset["Observation.Scheduler.predecessors"] = [] + parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster.name + parset["Observation.Cluster.ProcessingCluster.clusterPartition"] = 'cpu' + parset["Observation.Cluster.ProcessingCluster.numberOfTasks"] = 110 # deprecated (fixed value) to be completely removed in parset with 'JDM-patch 'soon + parset["Observation.Cluster.ProcessingCluster.numberOfCoresPerTask"] = 2 # deprecated (fixed value) to be completely removed in parset with 'JDM-patch 'soon + + + # Pulsar pipeline settings + parset["Observation.ObservationControl.PythonControl.Pulsar.2bf2fits_extra_opts"] = task["presto"]["2bf2fits_extra_opts"] + parset["Observation.ObservationControl.PythonControl.Pulsar.8bit_conversion_sigma"] = subtask["output"]["8bit_conversion_sigma"] + parset["Observation.ObservationControl.PythonControl.Pulsar.decode_nblocks"] = subtask["presto"]["decode_nblocks"] + parset["Observation.ObservationControl.PythonControl.Pulsar.decode_sigma"] = subtask["presto"]["decode_sigma"] + parset["Observation.ObservationControl.PythonControl.Pulsar.digifil_extra_opts"] = subtask["dspsr"]["digifil_extra_opts"] + parset["Observation.ObservationControl.PythonControl.Pulsar.dspsr_extra_opts"] = subtask["dspsr"]["dspsr_extra_opts"] + parset["Observation.ObservationControl.PythonControl.Pulsar.dynamic_spectrum_time_average"] = subtask["output"]["dynamic_spectrum_time_average"] + parset["Observation.ObservationControl.PythonControl.Pulsar.nofold"] = subtask["presto"]["nofold"] + parset["Observation.ObservationControl.PythonControl.Pulsar.nopdmp"] = subtask["presto"]["nopdmp"] + parset["Observation.ObservationControl.PythonControl.Pulsar.norfi"] = subtask["presto"]["norfi"] + parset["Observation.ObservationControl.PythonControl.Pulsar.prepdata_extra_opts"] = subtask["presto"]["prepdata_extra_opts"] + parset["Observation.ObservationControl.PythonControl.Pulsar.prepfold_extra_opts"] = subtask["presto"]["prepfold_extra_opts"] + parset["Observation.ObservationControl.PythonControl.Pulsar.prepsubband_extra_opts"] = subtask["oresto"]["prepsubband_extra_opts"] + parset["Observation.ObservationControl.PythonControl.Pulsar.pulsar"] = subtask["pulsar"] + parset["Observation.ObservationControl.PythonControl.Pulsar.raw_to_8bit"] = subtask["output"]["raw_to_8bit"] + parset["Observation.ObservationControl.PythonControl.Pulsar.rfifind_extra_opts"] = subtask["presto"]["rfifind_extra_opts"] + parset["Observation.ObservationControl.PythonControl.Pulsar.rrats"] = subtask["presto"]["rrats"] + parset["Observation.ObservationControl.PythonControl.Pulsar.rrats_dm_range"] = subtask["presto"]["rrats_dm_range"] + parset["Observation.ObservationControl.PythonControl.Pulsar.single_pulse"] = subtask["single_pulse"] + parset["Observation.ObservationControl.PythonControl.Pulsar.skip_dspsr"] = subtask["dspsr"]["skip_dspsr"] + parset["Observation.ObservationControl.PythonControl.Pulsar.skip_dynamic_spectrum"] = subtask["output"]["skip_dynamic_spectrum"] + parset["Observation.ObservationControl.PythonControl.Pulsar.skip_prepfold"] = subtask["presto"]["skip_prepfold"] + parset["Observation.ObservationControl.PythonControl.Pulsar.tsubint"] = subtask["dspsr"]["tsubint"] + + # Dataproducts. NOTE: The pulsar pipeline doesn't actually use this information, and reads input/writes output as it pleases. + + inputs = subtask.inputs.all() + in_dataproducts = sum([subtasK_input.dataproducts.all() for subtask_input in subtask.inputs.all()], []) + coherent_in_dataproducts = [dp for dp in in_dataproducts if dp.specifications_doc["coherent"]] + incoherent_in_dataproducts = [dp for dp in in_dataproducts if not dp.specifications_doc["coherent"]] + + parset["Observation.DataProducts.Input_CoherentStokes.enabled"] = len(coherent_in_dataproducts) > 0 + parset["Observation.DataProducts.Input_CoherentStokes.filenames"] = [dp.filename for dp in coherent_in_dataproducts] + parset["Observation.DataProducts.Input_CoherentStokes.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in coherent_in_dataproducts] + parset["Observation.DataProducts.Input_CoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask_input.producer.subtask.id, 0)] # needed by ResourceEstimator + + parset["Observation.DataProducts.Input_IncoherentStokes.enabled"] = len(incoherent_in_dataproducts) > 0 + parset["Observation.DataProducts.Input_IncoherentStokes.filenames"] = [dp.filename for dp in incoherent_in_dataproducts] + parset["Observation.DataProducts.Input_IncoherentStokes.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in incoherent_in_dataproducts] + parset["Observation.DataProducts.Input_IncoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask_input.producer.subtask.id, 0)] # needed by ResourceEstimator + + # TODO: do not use SubtaskOutput.objects.filter but make subtask.subtask_outputs work + subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id)) + out_dataproducts = sum([models.Dataproduct.objects.filter(producer_id=subtask_output.id) for subtask_output in subtask_outputs], []) + + parset["Observation.DataProducts.Output_Pulsar.enabled"] = True + parset["Observation.DataProducts.Output_Pulsar.filenames"] = [dp.filename for dp in out_dataproducts] # todo: order this correctly. + parset["Observation.DataProducts.Output_Pulsar.locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory) for dp in out_dataproducts] + parset["Observation.DataProducts.Output_Pulsar.identifications"] = "[TMSS_subtask_%s.SAP%03d]" % (subtask.id, 0) + parset["Observation.DataProducts.Output_Pulsar.storageClusterName"] = subtask.cluster.name + + # pragmatic solution to deal with the various parset using subsystems... + # some want the keys as "Observation.<subkey>" and some as "ObsSW.Observation.<subkey>" + # so, just copy all "Observation.<subkey>" keys and prepend them with ObsSW. + for key, value in list(parset.items()): + if key.startswith("Observation."): + parset["ObsSW."+key] = value + + return parset # dict to store conversion methods based on subtask.specifications_template.name _convertors = {'observation control': _convert_to_parset_dict_for_observationcontrol_schema, - 'pipeline control': _convert_to_parset_dict_for_pipelinecontrol_schema } + 'pipeline control': _convert_to_parset_dict_for_pipelinecontrol_schema, + 'pulsar pipeline': _convert_to_parset_dict_for_pulsarpipeline_schema} def convert_to_parset(subtask: models.Subtask) -> parameterset: