Skip to content
Snippets Groups Projects
Commit e536454b authored by Jan David Mol's avatar Jan David Mol
Browse files

TMSS-64: Extract common pipeline settings into a single function

parent 607f6f77
No related branches found
No related tags found
2 merge requests!422Resolve SW-961 "Fix pipelinecontrol",!400Resolve TMSS-64 "Pulsar pipeline"
......@@ -403,36 +403,25 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas
return parset
def _common_parset_dict_for_pipeline_schemas(subtask: models.Subtask) -> dict:
""" Return a parset dict with settings common to all pipelines. """
def _convert_to_parset_dict_for_preprocessing_pipeline_schema(subtask: models.Subtask) -> dict:
# see https://support.astron.nl/confluence/pages/viewpage.action?spaceKey=TMSS&title=UC1+JSON
parset = dict()
# make sure the spec is complete (including all non-filled in properties with default)
spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, subtask.specifications_template.schema)
# -----------------------------------------------------------------------------------------------
# Historic rationale: in TMSS-183 we made MAC run an actual observation from a TMSS specification.
# With the help of Auke and Jan-David I could generate the parset as defined below.
# MAC turned out to be very sensitive for having specific keys with very specific prefixes etc.
# As a result, the generated parset contains many "duplicate"(nested) keys.
# We all agree that this is ugly, and we should not want this, but hey... it works.
# We decided to keep it like this, and maybe do more tuning/pruning later in the TMSS project.
# Or, we can just get rid of this to-parset-adaper when MAC has been rewritten to the new station API.
# -----------------------------------------------------------------------------------------------
parset = dict()
# General
parset["prefix"] = "LOFAR."
parset["Observation.ObsID"] = subtask.pk
parset["Observation.momID"] = 0 # Needed by MACScheduler
parset["Observation.otdbID"] = subtask.pk # HACK: the pipeline uses otdbID as the sasID. our tmssID>2000000 to prevent clashes. TODO: replace all otdbID's by sasID.
parset["Observation.tmssID"] = subtask.pk
parset["Observation.startTime"] = formatDatetime(subtask.start_time) if isinstance(subtask.start_time, datetime) else subtask.start_time
parset["Observation.stopTime"] = formatDatetime(subtask.stop_time) if isinstance(subtask.stop_time, datetime) else subtask.stop_time
parset["Observation.processType"] = "Pipeline"
parset["Observation.processSubtype"] = "Averaging Pipeline"
parset["Observation.ObservationControl.PythonControl.pythonProgram"] = "preprocessing_pipeline.py"
parset["Observation.ObservationControl.PythonControl.softwareVersion"] = ""
project_set = set([tb.scheduling_unit_blueprint.draft.scheduling_set.project.name for tb in subtask.task_blueprints.all()])
if len(project_set) != 1:
raise ConversionException('Subtask pk=%s cannot be converted to parset because it references task blueprint that belong to different projects (names=%s)' % (subtask.pk, project_set))
......@@ -444,6 +433,31 @@ def _convert_to_parset_dict_for_preprocessing_pipeline_schema(subtask: models.Su
parset["Observation.Cluster.ProcessingCluster.numberOfTasks"] = 110 # deprecated (fixed value) to be completely removed in parset with 'JDM-patch 'soon
parset["Observation.Cluster.ProcessingCluster.numberOfCoresPerTask"] = 2 # deprecated (fixed value) to be completely removed in parset with 'JDM-patch 'soon
return parset
def _convert_to_parset_dict_for_preprocessing_pipeline_schema(subtask: models.Subtask) -> dict:
# see https://support.astron.nl/confluence/pages/viewpage.action?spaceKey=TMSS&title=UC1+JSON
# make sure the spec is complete (including all non-filled in properties with default)
spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, subtask.specifications_template.schema)
# -----------------------------------------------------------------------------------------------
# Historic rationale: in TMSS-183 we made MAC run an actual observation from a TMSS specification.
# With the help of Auke and Jan-David I could generate the parset as defined below.
# MAC turned out to be very sensitive for having specific keys with very specific prefixes etc.
# As a result, the generated parset contains many "duplicate"(nested) keys.
# We all agree that this is ugly, and we should not want this, but hey... it works.
# We decided to keep it like this, and maybe do more tuning/pruning later in the TMSS project.
# Or, we can just get rid of this to-parset-adaper when MAC has been rewritten to the new station API.
# -----------------------------------------------------------------------------------------------
# General
parset = _common_parset_dict_for_pipeline_schemas(subtask)
parset["Observation.processSubtype"] = "Averaging Pipeline"
parset["Observation.ObservationControl.PythonControl.pythonProgram"] = "preprocessing_pipeline.py"
parset["Observation.ObservationControl.PythonControl.softwareVersion"] = ""
# DPPP steps
dppp_steps = []
if spec["preflagger0"]["enabled"]:
......@@ -602,28 +616,11 @@ def _convert_to_parset_dict_for_pulsarpipeline_schema(subtask: models.Subtask) -
# make sure the spec is complete (including all non-filled in properties with default)
spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, subtask.specifications_template.schema)
parset = dict()
# General
parset["prefix"] = "LOFAR."
parset["Observation.ObsID"] = subtask.pk
parset["Observation.momID"] = 0 # Needed by MACScheduler
parset["Observation.otdbID"] = subtask.pk # HACK: the pipeline uses otdbID as the sasID. our tmssID>2000000 to prevent clashes. TODO: replace all otdbID's by sasID.
parset["Observation.tmssID"] = subtask.pk
parset["Observation.startTime"] = formatDatetime(subtask.start_time) if isinstance(subtask.start_time, datetime) else subtask.start_time
parset["Observation.stopTime"] = formatDatetime(subtask.stop_time) if isinstance(subtask.stop_time, datetime) else subtask.stop_time
parset["Observation.processType"] = "Pipeline"
parset = _common_parset_dict_for_pipeline_schemas(subtask)
parset["Observation.processSubtype"] = "Pulsar Pipeline"
parset["Observation.ObservationControl.PythonControl.pythonProgram"] = "pulsar_pipeline.py"
parset["Observation.ObservationControl.PythonControl.softwareVersion"] = "lofar-pulp"
parset["Observation.Campaign.name"] = subtask.task_blueprint.scheduling_unit_blueprint.draft.scheduling_set.project.name
parset["Observation.Scheduler.taskName"] = subtask.task_blueprint.name
parset["Observation.Scheduler.predecessors"] = []
parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster.name
parset["Observation.Cluster.ProcessingCluster.clusterPartition"] = 'cpu'
parset["Observation.Cluster.ProcessingCluster.numberOfTasks"] = 110 # deprecated (fixed value) to be completely removed in parset with 'JDM-patch 'soon
parset["Observation.Cluster.ProcessingCluster.numberOfCoresPerTask"] = 2 # deprecated (fixed value) to be completely removed in parset with 'JDM-patch 'soon
# Pulsar pipeline settings
parset["Observation.ObservationControl.PythonControl.Pulsar.2bf2fits_extra_opts"] = spec["presto"]["2bf2fits_extra_opts"]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment