-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
parset.py 51.37 KiB
#!/usr/bin/python3
# Copyright (C) 2020 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp.models.specification import Dataformat, Datatype
from lofar.sas.tmss.tmss.exceptions import ConversionException
from lofar.parameterset import parameterset
from lofar.common.datetimeutils import formatDatetime
from lofar.common.json_utils import add_defaults_to_json_object_for_schema, resolved_remote_refs
from lofar.stationmodel.antennafields import antenna_fields
from lofar.sas.tmss.tmss.exceptions import *
from datetime import datetime
from math import ceil
from lofar.common import isProductionEnvironment
import logging
logger = logging.getLogger(__name__)
# todo: check why this cannot be imported from subtasks.py, and deduplicate.
def _output_filesystem(subtask: models.Subtask) -> models.Filesystem:
""" Return the filesystem that output dataproducts should be written to."""
# TODO: how to handle multiple filesystems on one cluster?
# For CEP4 we return the production or test filessytem, dependent on the environment. Ugly! Should be configured better.
if subtask.cluster.name == "CEP4":
return subtask.cluster.filesystem_set.filter(name__icontains=('production' if isProductionEnvironment() else 'test')).first()
# For other clusters... just return the first. Should be configurable as well.
return subtask.cluster.filesystem_set.first()
def _add_prefix(parset: dict, prefix: str) -> dict:
""" Add a prefix to all the keys in the given parset """
return {prefix+k: v for k,v in parset.items()}
def _stokes_settings_parset_subkeys(stokes_spec: dict) -> dict:
""" Convert stokes specifications to parset keys. """
parset = {}
parset['which'] = stokes_spec['stokes']
parset['nrChannelsPerSubband'] = stokes_spec['channels_per_subband']
parset['timeIntegrationFactor'] = stokes_spec['time_integration_factor']
parset['subbandsPerFile'] = stokes_spec['subbands_per_file']
quantisation = parset['quantize'] = stokes_spec['quantisation'].get('enabled', False)
if quantisation:
parset['quantizeBits'] = stokes_spec['quantisation']['bits']
parset['quantizeScaleMax'] = stokes_spec['quantisation']['scale_max']
parset['quantizeScaleMin'] = stokes_spec['quantisation']['scale_min']
parset['quantizeIpositive'] = (stokes_spec['stokes'] == "I")
return parset
def _dataproduct_parset_subkeys(subtask: models.Subtask, dataproducts: list) -> dict:
""" Return a subset of parset keys and values to list dataproducts.
Dataproducts that are None are included in the list to be written to null:,
and marked as to be skipped as well. """
parset = {}
parset["enabled"] = len(dataproducts) > 0
parset["filenames"] = [dp.filename if dp else "null:" for dp in dataproducts]
parset["skip"] = [0 if dp else 1 for dp in dataproducts]
parset["locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory if dp else "") for dp in dataproducts] # todo: use storage cluster instead of processing cluster? dp.producer.filesystem.cluster.name?
return parset
def _add_dataproduct(found_dataproducts: list, final_dataproduct_list: list):
""" Use to append a dataproduct to `final_dataproduct_list` to put in the parset.
The `found_dataproducts` is a list of candidates.
This function checks if:
* any dataproducts were supplied (found)
* multiple dataproducts were supplied (found)
* the dataproduct is not already in the list
Will append `None` if issues were detected. """
def dp_to_str(dp):
return f'{dp.filename=} {dp.specifications_doc=}'
if len(found_dataproducts) == 0:
logger.warning('No dataproducts matched the search. Dropping it.')
final_dataproduct_list.append(None)
return
if len(found_dataproducts) > 1:
logger.warning('Multiple dataproducts matched the same search. Using the first: %s' % [dp_to_str(dp) for dp in found_dataproducts])
dataproduct = found_dataproducts[0]
if dataproduct in final_dataproduct_list:
logger.warning('The following dataproduct was suggested multiple times. Not using it a second time: %s' % dp_to_str(dataproduct))
final_dataproduct_list.append(None)
return
final_dataproduct_list.append(dataproduct)
def _sap_index(saps: dict, sap_name: str) -> int:
""" Return the SAP index in the observation given a certain SAP name. """
sap_indices = [idx for idx,sap in enumerate(saps) if sap['name'] == sap_name]
# needs to be exactly one hit
if len(sap_indices) != 1:
raise ConversionException("SAP name %s must appear exactly once in the specification. It appeared %d times. Available names: %s" % (sap_name, len(sap_indices), [sap['name'] for sap in saps]))
return sap_indices[0]
def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: dict) -> dict:
""" Provide the parset keys for the COBALT correlator. """
correlator_enabled = spec['COBALT']['correlator']['enabled']
cobalt_version = spec['COBALT']['version']
digi_beams = spec['stations']['digital_pointings']
parset = {}
# ResourceEstimator always wants these keys
parset["Cobalt.Correlator.nrChannelsPerSubband"] = spec['COBALT']['correlator']['channels_per_subband'] if correlator_enabled else 16
parset["Cobalt.Correlator.nrBlocksPerIntegration"] = spec['COBALT']['correlator']['blocks_per_integration'] if correlator_enabled else 1
parset["Cobalt.Correlator.nrIntegrationsPerBlock"] = spec['COBALT']['correlator']['integrations_per_block'] if correlator_enabled else 1
correlator_dataproducts = []
if correlator_enabled:
# Generic settings
parset["Cobalt.Correlator.dopplerCorrection"] = spec['COBALT']['correlator']['doppler_correction']
dataproducts = list(subtask.output_dataproducts.filter(dataformat__value=Dataformat.Choices.MEASUREMENTSET.value).filter(datatype__value=Datatype.Choices.VISIBILITIES.value).order_by('filename'))
# marshall dataproducts, but only if they're supplied. in some use cases, we want a parset before the subtask is scheduled.
for digi_beam in digi_beams:
for subband_index, subband in enumerate(digi_beam["subbands"]):
dataproduct_candidates = [dp for dp in dataproducts
if dp.specifications_doc.get("sap") == digi_beam['name']
and dp.specifications_doc.get("subband") == subband
# In some cases, a subband can appear twice in the same SAP,
# so also match by its index.
and dp.specifications_doc.get("identifiers", {}).get("subband_index", subband_index) == subband_index]
_add_dataproduct(dataproduct_candidates, correlator_dataproducts)
# sanity check
unmatched_dps = []
for dp in dataproducts:
if dp not in correlator_dataproducts:
unmatched_dps.append(dp)
if unmatched_dps:
logger.warning('The following output dataproducts were not expected as correlator output: %s' % [f'filename={dp.filename} sap={dp.specifications_doc.get("sap")} subband={dp.specifications_doc.get("subband")}' for dp in unmatched_dps])
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, correlator_dataproducts), "Observation.DataProducts.Output_Correlated."))
parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = _output_filesystem(subtask).cluster.name
parset["Observation.DataProducts.Output_Correlated.storageClusterPartition"] = _output_filesystem(subtask).directory
# mimic MoM placeholder thingy (the resource estimator parses this)
parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))] # we need one per SAP, and that needs to include the string "SAPxxx" with xxx being the SAP number, just to satisfy the ResourceEstimator.
return parset
def _order_beamformer_dataproducts(dataproducts: list, spec: dict) -> (list, list):
""" Orders the given dataproducts, and returns a tuple of the observation's (coherent_dataproducts, incoherent_dataproducts), in the order COBALT wants them.
Missing dataproducts are replaced by None. """
# Lists of coherent and incoherent dataproducts that will be produced, in the order COBALT wants them: ordered by (from slowest changing to fastest):
# 1. sap
# 2. pipeline
# 3. tab
# 4. pointing (see COB-216)
# 5. stokes
# 6. part
#
# See also the parsing of this list in RTCP/Cobalt/CoInterface/src/Parset.cc around line 1167.
digi_beams = spec['stations']['digital_pointings']
coherent_dataproducts = []
incoherent_dataproducts = []
for sap_idx, sap in enumerate(digi_beams):
for pipeline_idx, pipeline in enumerate(spec['COBALT']['beamformer']['tab_pipelines']):
# lookup this SAP in the pipeline spec
try:
pipeline_sap = pipeline['SAPs'][_sap_index(pipeline['SAPs'], sap['name'])]
except ConversionException:
# SAP was not configured in this pipeline
continue
for tab_idx, tab in enumerate(pipeline_sap['tabs']):
stokes_settings = pipeline['coherent'] if tab.get('coherent') else pipeline['incoherent']
nr_subbands = len(pipeline_sap['subbands']) or len(sap['subbands'])
nr_parts = ceil(1.0 * nr_subbands / stokes_settings['subbands_per_file'])
nr_stokes = len(stokes_settings['stokes'])
for stokes_idx in range(nr_stokes):
for part_idx in range(nr_parts):
dataproduct_candidates = [dp for dp in dataproducts
if dp.specifications_doc.get("sap") == sap['name']
and "identifiers" in dp.specifications_doc
and dp.specifications_doc["identifiers"]["pipeline_index"] == pipeline_idx
and dp.specifications_doc["identifiers"]["tab_index"] == tab_idx
and dp.specifications_doc["identifiers"]["stokes_index"] == stokes_idx
and dp.specifications_doc["identifiers"]["part_index"] == part_idx
and dp.specifications_doc.get("coherent") == tab.get('coherent')]
_add_dataproduct(dataproduct_candidates, coherent_dataproducts if tab.get('coherent') else incoherent_dataproducts)
# Process fly's eye pipelines, continue the pipeline numbering
pipeline_idx_offset = len(spec['COBALT']['beamformer']['tab_pipelines'])
for pipeline_idx, pipeline in enumerate(spec['COBALT']['beamformer']['flyseye_pipelines'], start=pipeline_idx_offset):
# Generate coherent TABs for each antenna field.
# NOTE: stations could be taken out of the observation (not appear in spec['stations']['station_list'])
# during scheduling, but still appear in pipeline['stations']! In that case, we should not create
# dataproducts for them as COBALT won't generate them.
from lofar.sas.tmss.tmss.tmssapp.subtasks import used_pipeline_stations
stations = used_pipeline_stations(pipeline, spec)
antennaset = spec['stations']['antenna_set']
fields = sum([list(antenna_fields(station, antennaset)) for station in stations], [])
for field_idx, field in enumerate(fields):
stokes_settings = pipeline['coherent']
nr_subbands = len(sap['subbands'])
nr_parts = ceil(1.0 * nr_subbands / stokes_settings['subbands_per_file'])
nr_stokes = len(stokes_settings['stokes'])
# marshall dataproducts, but only if they're supplied. in some use cases, we want a parset before the subtask is scheduled.
for stokes_idx in range(nr_stokes):
for part_idx in range(nr_parts):
dataproduct_candidates = [dp for dp in dataproducts
if dp.specifications_doc.get("sap") == sap["name"]
and dp.specifications_doc["identifiers"]["pipeline_index"] == pipeline_idx
and dp.specifications_doc["identifiers"]["tab_index"] == field_idx
and dp.specifications_doc["identifiers"]["stokes_index"] == stokes_idx
and dp.specifications_doc["identifiers"]["part_index"] == part_idx
and dp.specifications_doc["coherent"] == True]
_add_dataproduct(dataproduct_candidates, coherent_dataproducts)
# sanity check
unmatched_dps = []
for dp in dataproducts:
if dp not in coherent_dataproducts and dp not in incoherent_dataproducts:
unmatched_dps.append(dp)
if unmatched_dps:
logger.warning('The following dataproducts could not be sorted for COBALT: %s' % [f'filename={dp.filename} sap={dp.specifications_doc.get("sap")} identifiers={dp.specifications_doc.get("identifiers")}' for dp in unmatched_dps])
return coherent_dataproducts, incoherent_dataproducts
def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: dict) -> dict:
""" Provide the parset keys for the COBALT beamformer. """
cobalt_version = spec['COBALT']['version']
digi_beams = spec['stations']['digital_pointings']
parset = {}
# General settings
parset["Cobalt.BeamFormer.inputPPF"] = spec['COBALT']['beamformer']['input_ppf']
# List of beamformer pipelines, staged to be added to the parset later
beamformer_pipeline_parsets = []
# Process beamformer pipelines
for pipeline_idx, pipeline in enumerate(spec['COBALT']['beamformer']['tab_pipelines']):
pipeline_parset = {}
if 'coherent' in pipeline:
pipeline_parset.update(_add_prefix(_stokes_settings_parset_subkeys(pipeline['coherent']), "CoherentStokes."))
if 'incoherent' in pipeline:
pipeline_parset.update(_add_prefix(_stokes_settings_parset_subkeys(pipeline['incoherent']), "IncoherentStokes."))
pipeline_parset['nrBeams'] = len(pipeline['SAPs'])
for sap in pipeline['SAPs']:
sap_idx = _sap_index(digi_beams, sap['name'])
pipeline_parset['Beam[%s].nrTiedArrayBeams' % sap_idx] = len(sap['tabs'])
for tab_idx, tab in enumerate(sap['tabs']):
if tab.get('coherent'):
if len(tab['pointings']) != 1:
raise ConversionException(f"Coherent TABs must have exactly 1 pointing, but pipeline {pipeline_idx} SAP {sap['name']} TAB {tab_idx} has {len(tab['pointings'])} pointings")
pipeline_parset['Beam[%s].TiedArrayBeam[%s].coherent' % (sap_idx, tab_idx)] = True
pipeline_parset['Beam[%s].TiedArrayBeam[%s].directionType' % (sap_idx, tab_idx)] = tab['pointings'][0]['pointing']['direction_type']
pipeline_parset['Beam[%s].TiedArrayBeam[%s].angle1' % (sap_idx, tab_idx)] = tab['pointings'][0]['pointing']['angle1']
pipeline_parset['Beam[%s].TiedArrayBeam[%s].angle2' % (sap_idx, tab_idx)] = tab['pointings'][0]['pointing']['angle2']
pipeline_parset['Beam[%s].TiedArrayBeam[%s].target' % (sap_idx, tab_idx)] = tab['pointings'][0]['pointing']['target']
else:
pipeline_parset['Beam[%s].TiedArrayBeam[%s].coherent' % (sap_idx, tab_idx)] = False
if cobalt_version >= 2:
pipeline_parset['Beam[%s].subbandList' % sap_idx] = sap['subbands']
if cobalt_version == 1:
# This won't overwrite anything, since COBALT1 supports only one beamformer pipeline
parset["Cobalt.BeamFormer.stationList"] = pipeline['stations']
else:
pipeline_parset['Beam[%s].stationList' % sap_idx] = pipeline['stations']
beamformer_pipeline_parsets.append(pipeline_parset)
# Process fly's eye pipelines
pipeline_idx_offset = len(beamformer_pipeline_parsets)
for pipeline_idx, pipeline in enumerate(spec['COBALT']['beamformer']['flyseye_pipelines'], start=pipeline_idx_offset):
pipeline_parset = {}
pipeline_parset.update(_add_prefix(_stokes_settings_parset_subkeys(pipeline['coherent']), "CoherentStokes."))
pipeline_parset['flysEye'] = True
pipeline_parset['nrBeams'] = len(digi_beams)
if cobalt_version == 1:
parset["Cobalt.BeamFormer.stationList"] = pipeline['stations']
for sap_idx, sap in enumerate(digi_beams):
if cobalt_version >= 2:
pipeline_parset['Beam[%s].stationList' % sap_idx] = pipeline['stations']
pipeline_parset['Beam[%s].nrTiedArrayBeams' % sap_idx] = 0
beamformer_pipeline_parsets.append(pipeline_parset)
# global parset also needs flys eye set if any pipeline uses it
parset['Cobalt.BeamFormer.flysEye'] = (len(spec['COBALT']['beamformer']['flyseye_pipelines']) > 0)
# COBALT1 supports one beamformer pipeline, with prefix "Cobalt.BeamFormer."
# COBALT2 supports multiple pipelines, with prefix "Cobalt.BeamFormer.Pipeline[xxx]."
#
# If we see one pipeline, we write a COBALT1-compatible parset. This also helps the subsequent pulsar pipeline, which actually will read this parset
if cobalt_version == 1 and beamformer_pipeline_parsets:
if len(beamformer_pipeline_parsets) > 1:
raise ConversionException("COBALT1 only supports one beamformer pipeline. %d were specified." % len(beamformer_pipeline_parsets))
# Beam keys are merged under Observation
parset.update(_add_prefix({k:v for k,v in beamformer_pipeline_parsets[0].items() if not k.startswith("Beam")}, "Cobalt.BeamFormer."))
parset.update(_add_prefix({k:v for k,v in beamformer_pipeline_parsets[0].items() if k.startswith("Beam")}, "Observation."))
else:
parset['Cobalt.BeamFormer.nrPipelines'] = len(beamformer_pipeline_parsets)
for pipeline_idx, pipeline_parset in enumerate(beamformer_pipeline_parsets):
parset.update(_add_prefix(pipeline_parset, "Cobalt.BeamFormer.Pipeline[%s]." % pipeline_idx))
# Derive and list the dataproduct list, in order
dataproducts = list(subtask.output_dataproducts.filter(dataformat__value=Dataformat.Choices.BEAMFORMED.value).filter(datatype__value=Datatype.Choices.TIME_SERIES.value))
coherent_dataproducts, incoherent_dataproducts = _order_beamformer_dataproducts(dataproducts, spec)
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, coherent_dataproducts), "Observation.DataProducts.Output_CoherentStokes."))
parset["Observation.DataProducts.Output_CoherentStokes.storageClusterName"] = _output_filesystem(subtask).cluster.name
parset["Observation.DataProducts.Output_CoherentStokes.storageClusterPartition"] = _output_filesystem(subtask).directory
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, incoherent_dataproducts), "Observation.DataProducts.Output_IncoherentStokes."))
parset["Observation.DataProducts.Output_IncoherentStokes.storageClusterName"] = _output_filesystem(subtask).cluster.name
parset["Observation.DataProducts.Output_IncoherentStokes.storageClusterPartition"] = _output_filesystem(subtask).directory
# mimic MoM placeholder thingy (the resource estimator parses this)
parset["Observation.DataProducts.Output_CoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))] # this needs to be unique per SAP only, not dataproduct
parset["Observation.DataProducts.Output_IncoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))] # this needs to be unique per SAP only, not dataproduct
return parset
def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtask) -> dict:
# make sure the spec is complete (including all non-filled in properties with default)
spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, resolved_remote_refs(subtask.specifications_template.schema))
# -----------------------------------------------------------------------------------------------
# Historic rationale: in TMSS-183 we made MAC run an actual observation from a TMSS specification.
# With the help of Auke and Jan-David I could generate the parset as defined below.
# MAC turned out to be very sensitive for having specific keys with very specific prefixes etc.
# As a result, the generated parset contains many "duplicate"(nested) keys.
# We all agree that this is ugly, and we should not want this, but hey... it works.
# We decided to keep it like this, and maybe do more tuning/pruning later in the TMSS project.
# Or, we can just get rid of this to-parset-adaper when MAC has been rewritten to the new station API.
# -----------------------------------------------------------------------------------------------
# ----------------------------
# Generic settings
# ----------------------------
parset = dict() # parameterset has no proper assignment operators, so take detour via dict...
parset["Observation.ObsID"] = subtask.pk
parset["Observation.momID"] = 0 # Needed by MACScheduler
parset["Observation.otdbID"] = 0 # Needed by MACScheduler; should/can this be the same as subtask.pk?
parset["Observation.tmssID"] = subtask.pk
parset["Observation.processType"] = subtask.specifications_template.type.value.capitalize()
parset["Observation.processSubtype"] = "Beam Observation"
parset["Observation.Campaign.name"] = subtask.project.name
parset["Observation.Campaign.title"] = subtask.project.description
parset["Observation.Campaign.PI"] = "classified" # pulp needs the PI to be non-empty
parset["Observation.startTime"] = formatDatetime(subtask.scheduled_start_time) if isinstance(subtask.scheduled_start_time, datetime) else subtask.scheduled_start_time
parset["Observation.stopTime"] = formatDatetime(subtask.scheduled_stop_time) if isinstance(subtask.scheduled_stop_time, datetime) else subtask.scheduled_stop_time
# parset["Observation.strategy"] = "default" # Note: this was an annotation for OTDB and the old Scheduler and is now dropped
# ----------------------------
# Debug settings
# ----------------------------
parset["Cobalt.writeToDisk"] = spec["COBALT"]["debug"]["write_output"]
parset["Cobalt.Benchmark.enabled"] = spec["COBALT"]["debug"]["benchmark_file"] != ""
parset["Cobalt.Benchmark.file"] = spec["COBALT"]["debug"]["benchmark_file"]
parset["Cobalt.FinalMetaDataGatherer.enabled"] = spec["COBALT"]["debug"]["annotate_broken_antennas"]
# ----------------------------
# System resources
# ----------------------------
parset["Cobalt.Nodes"] = spec["COBALT"]["resources"]["processing_nodes"]
parset["Cobalt.OutputProc.Nodes"] = spec["COBALT"]["resources"]["writer_nodes"]
# ----------------------------
# Station settings
# ----------------------------
parset["Observation.VirtualInstrument.minimalNrStations"] = 1 # maybe not mandatory?
parset["Observation.VirtualInstrument.stationSet"] = "Custom" # maybe not mandatory?
parset["Observation.VirtualInstrument.stationList"] = spec["stations"]["station_list"]
parset["Observation.antennaArray"] = "HBA" if "HBA" in spec["stations"]["antenna_set"] else "LBA" # maybe not mandatory?
parset["Observation.antennaSet"] = spec["stations"]["antenna_set"]
parset["Observation.bandFilter"] = spec["stations"]["filter"]
parset["Observation.sampleClock"] = 200 # fixed value, no other values are supported
parset["Observation.nrBitsPerSample"] = 8 # fixed value, no other values are supported.
# Digital beams
digi_beams = spec['stations']['digital_pointings']
parset["Observation.nrBeams"] = len(digi_beams)
for beam_nr, digi_beam in enumerate(digi_beams):
beam_prefix = "Observation.Beam[%d]." % beam_nr
parset[beam_prefix+"directionType"] = digi_beam['pointing']['direction_type']
parset[beam_prefix+"angle1"] = digi_beam['pointing']['angle1']
parset[beam_prefix+"angle2"] = digi_beam['pointing']['angle2']
parset[beam_prefix+"target"] = digi_beam['pointing']['target']
parset[beam_prefix+"subbandList"] = digi_beam['subbands']
parset[beam_prefix+"nrTiedArrayBeams"] = 0 # fixed
parset[beam_prefix+"nrTabRings"] = 0 # fixed
# Analog beam (=HBA tile beam)
analog_beam = spec['stations']['analog_pointing']
parset["Observation.nrAnaBeams"] = 1
beam_prefix = "Observation.AnaBeam[0]."
parset[beam_prefix+"directionType"] = analog_beam['direction_type']
parset[beam_prefix+"angle1"] = analog_beam['angle1']
parset[beam_prefix+"angle2"] = analog_beam['angle2']
parset[beam_prefix+"target"] = analog_beam['target']
# ----------------------------
# COBALT settings
# ----------------------------
cobalt_version = spec['COBALT']['version']
parset["Cobalt.realTime"] = True
parset["Cobalt.blockSize"] = spec['COBALT']['blocksize']
parset["Cobalt.correctBandPass"] = spec['COBALT']['bandpass_correction']
parset["Cobalt.delayCompensation"] = spec['COBALT']['delay_compensation']
parset["Cobalt.correctClocks"] = True # fixed, but deprecated
parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster.name
# Correlator settings
parset.update(_convert_correlator_settings_to_parset_dict(subtask, spec))
# Beamformer settings
parset.update(_convert_beamformer_settings_to_parset_dict(subtask, spec))
# ResourceEstimator wants all Cobalt keys to start with Observation.ObservationControl.OnlineControl.
parset.update(_add_prefix({k:v for k,v in parset.items() if k.startswith("Cobalt.")}, "Observation.ObservationControl.OnlineControl."))
# ----------------------------
# MAC settings
# ----------------------------
# Retrieve the scheduling_unit_blueprint to get piggyback values to set
sub = subtask.task_blueprint.scheduling_unit_blueprint
parset["prefix"] = "LOFAR."
parset["Observation.claimPeriod"] = 35
parset["Observation.preparePeriod"] = 20
for prefix in ["", "Observation."]:
parset[prefix+"ObservationControl.OnlineControl.CorrAppl.CorrProc._executable"] = "CN_Processing"
parset[prefix+"ObservationControl.OnlineControl.CorrAppl.CorrProc._hostname"] = "cbmmaster"
parset[prefix+"ObservationControl.OnlineControl.CorrAppl.CorrProc._nodes"] = []
parset[prefix+"ObservationControl.OnlineControl.CorrAppl.CorrProc._startstopType"] = "bgl" # note: this is three correlators ago, and hopefully we get rid of it in LOFAR 2.0 ;)
parset[prefix+"ObservationControl.OnlineControl.CorrAppl.CorrProc.workingdir"] = f"/opt/lofar-versions/{spec['COBALT'].get('release', 'current')}/bin"
parset[prefix+"ObservationControl.OnlineControl.CorrAppl._hostname"] = "cbmmaster"
parset[prefix+"ObservationControl.OnlineControl.CorrAppl.extraInfo"] = '["PIC","Cobalt"]'
parset[prefix+"ObservationControl.OnlineControl.CorrAppl.procesOrder"] = [] # note: typo is correct ;)
parset[prefix+"ObservationControl.OnlineControl.CorrAppl.processes"] = '["CorrProc"]'
parset[prefix+"ObservationControl.OnlineControl._hostname"] = 'CCU001'
parset[prefix+"ObservationControl.OnlineControl.applOrder"] = '["CorrAppl"]'
parset[prefix+"ObservationControl.OnlineControl.applications"] = '["CorrAppl"]'
parset[prefix+"ObservationControl.StationControl._hostname"] = parset["Observation.VirtualInstrument.stationList"]
parset[prefix+"ObservationControl.StationControl.aartfaacPiggybackAllowed"] = sub.piggyback_allowed_aartfaac
parset[prefix + "ObservationControl.StationControl.tbbPiggybackAllowed"] = sub.piggyback_allowed_tbb
# inspection plot program to start (only CEP4 is supported)
inspection_programs = { "msplots": "/data/bin/inspection-plots-observation.sh",
"dynspec": "/data/home/lofarsys/dynspec/scripts/inspection-dynspec-observation.sh",
"none": "/bin/true" # no-op. MAC needs something to run.
}
parset[prefix+"ObservationControl.OnlineControl.inspectionHost"] = "head.cep4.control.lofar"
parset[prefix+"ObservationControl.OnlineControl.inspectionProgram"] = inspection_programs[spec["QA"]["inspection_plots"]]
return parset
def _common_parset_dict_for_pipeline_schemas(subtask: models.Subtask) -> dict:
""" Return a parset dict with settings common to all pipelines. """
parset = dict()
# make sure the spec is complete (including all non-filled in properties with default)
spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, resolved_remote_refs(subtask.specifications_template.schema))
# General
parset["prefix"] = "LOFAR."
parset["Observation.ObsID"] = subtask.pk
parset["Observation.momID"] = 0 # Needed by MACScheduler
parset["Observation.otdbID"] = subtask.pk # HACK: the pipeline uses otdbID as the sasID. our tmssID>2000000 to prevent clashes. TODO: replace all otdbID's by sasID.
parset["Observation.tmssID"] = subtask.pk
parset["Observation.startTime"] = formatDatetime(subtask.scheduled_start_time) if isinstance(subtask.scheduled_start_time, datetime) else subtask.scheduled_start_time
parset["Observation.stopTime"] = formatDatetime(subtask.scheduled_stop_time) if isinstance(subtask.scheduled_stop_time, datetime) else subtask.scheduled_stop_time
parset["Observation.processType"] = "Pipeline"
parset["Observation.Campaign.name"] = subtask.project.name
parset["Observation.Scheduler.taskName"] = subtask.task_blueprint.short_description # Scheduler keys are artefacts of an older time. Their content is deprecated, so we don't care whch task we take this from
parset["Observation.Scheduler.predecessors"] = [pred.id for pred in subtask.predecessors]
cluster_resources = spec['cluster_resources']
parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster.name # is equal to cluster_resources['where']['cluster']
parset["Observation.Cluster.ProcessingCluster.clusterPartition"] = cluster_resources['where']['partition']
parset["Observation.Cluster.ProcessingCluster.numberOfTasks"] = cluster_resources['parallel_tasks']
parset["Observation.Cluster.ProcessingCluster.numberOfCoresPerTask"] = cluster_resources['cores_per_task']
return parset
def _convert_to_parset_dict_for_preprocessing_pipeline_schema(subtask: models.Subtask) -> dict:
# see https://support.astron.nl/confluence/pages/viewpage.action?spaceKey=TMSS&title=UC1+JSON
# make sure the spec is complete (including all non-filled in properties with default)
spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, resolved_remote_refs(subtask.specifications_template.schema))
# -----------------------------------------------------------------------------------------------
# Historic rationale: in TMSS-183 we made MAC run an actual observation from a TMSS specification.
# With the help of Auke and Jan-David I could generate the parset as defined below.
# MAC turned out to be very sensitive for having specific keys with very specific prefixes etc.
# As a result, the generated parset contains many "duplicate"(nested) keys.
# We all agree that this is ugly, and we should not want this, but hey... it works.
# We decided to keep it like this, and maybe do more tuning/pruning later in the TMSS project.
# Or, we can just get rid of this to-parset-adaper when MAC has been rewritten to the new station API.
# -----------------------------------------------------------------------------------------------
# General
parset = _common_parset_dict_for_pipeline_schemas(subtask)
parset["Observation.processSubtype"] = "Averaging Pipeline"
parset["Observation.ObservationControl.PythonControl.pythonProgram"] = "preprocessing_pipeline.py"
parset["Observation.ObservationControl.PythonControl.softwareVersion"] = spec.get('software_version', "")
# DPPP steps
dppp_steps = []
if spec["preflagger0"]["enabled"]:
dppp_steps.append('preflagger[0]')
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].chan"] = spec["preflagger0"]["channels"].split(",")
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].abstime"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].azimuth"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].baseline"] = ""
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].blrange"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].corrtype"] = ""
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].count.path"] = "-"
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].count.save"] = False
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].elevation"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].expr"] = ""
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].freqrange"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].lst"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].reltime"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].timeofday"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].timeslot"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].type"] = "preflagger"
if spec["preflagger1"]["enabled"]:
dppp_steps.append('preflagger[1]')
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].corrtype"] = spec["preflagger1"]["corrtype"]
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].abstime"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].azimuth"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].baseline"] = ""
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].blrange"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].chan"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].count.path"] = "-"
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].count.save"] = False
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].elevation"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].expr"] = ""
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].freqrange"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].lst"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].reltime"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].timeofday"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].timeslot"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].type"] = "preflagger"
if spec["aoflagger"]["enabled"]:
dppp_steps.append('aoflagger')
parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.strategy"] = spec["aoflagger"]["strategy"]
parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.autocorr"] = False
parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.count.path"] = "-"
parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.count.save"] = False
parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.keepstatistics"] = True
parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.memorymax"] = 10
parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.memoryperc"] = 0
parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.overlapmax"] = 0
parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.overlapperc"] = 0
parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.pedantic"] = False
parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.pulsar"] = False
parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.timewindow"] = 0
parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.type"] = "aoflagger"
if spec["demixer"]["enabled"]:
dppp_steps.append('demixer')
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.baseline"] = spec["demixer"]["baselines"]
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep"] = spec["demixer"]["demix_frequency_steps"]
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep"] = spec["demixer"]["demix_time_steps"]
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.freqstep"] = spec["demixer"]["frequency_steps"]
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.timestep"] = spec["demixer"]["time_steps"]
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.ignoretarget"] = spec["demixer"]["ignore_target"]
parset["Observation.ObservationControl.PythonControl.PreProcessing.demix_always"] = spec["demixer"]["demix_always"]
parset["Observation.ObservationControl.PythonControl.PreProcessing.demix_if_needed"] = spec["demixer"]["demix_if_needed"]
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.blrange"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.corrtype"] = "cross"
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.elevationcutoff"] = "0.0deg"
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.instrumentmodel"] = "instrument"
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.modelsources"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.ntimechunk"] = 0
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.othersources"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.skymodel"] = "sky"
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.subtractsources"] = ""
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.targetsource"] = ""
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.type"] = "demixer"
else:
# ResourceEstimator wants these keys always
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.freqstep"] = 1
parset["Observation.ObservationControl.PythonControl.DPPP.demixer.timestep"] = 1
parset["Observation.ObservationControl.PythonControl.DPPP.steps"] = dppp_steps
parset["Observation.ObservationControl.PythonControl.DPPP.msout.storagemanager.name"] = spec["storagemanager"]
# Dataproducts
subtask_inputs = list(subtask.inputs.order_by('id').all())
in_dataproducts = sum([list(subtask_input.dataproducts.order_by('id').all()) for subtask_input in subtask_inputs],[])
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, in_dataproducts), "Observation.DataProducts.Input_Correlated."))
# mimic MoM placeholder thingy (the resource assigner parses this)
# should be expanded with SAPS and datatypes
parset["Observation.DataProducts.Input_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask_input.producer.subtask.id, input_nr) for input_nr, subtask_input in enumerate(subtask_inputs)]
subtask_outputs = list(subtask.outputs.all())
unsorted_out_dataproducts = sum([list(subtask_output.dataproducts.all()) for subtask_output in subtask_outputs],[])
def find_dataproduct(dataproducts: list, specification_doc: dict):
hits = [dp for dp in dataproducts if dp.specifications_doc['sap'] == specification_doc['sap']
and dp.specifications_doc['subband'] == specification_doc['subband']]
return hits[0] if hits else None
# list output dataproducts in the same order as input dataproducts, matched by the identifiers
out_dataproducts = [find_dataproduct(unsorted_out_dataproducts, in_dp.specifications_doc) for in_dp in in_dataproducts]
# sanity check
unmatched_dps = []
for dp in unsorted_out_dataproducts:
if dp not in out_dataproducts:
unmatched_dps.append(dp)
if unmatched_dps:
logger.warning('The following output dataproducts were not expected as preprocessing pipeline output: %s' % [f'filename={dp.filename} sap={dp.specifications_doc.get("sap")} subband={dp.specifications_doc.get("subband")}' for dp in unmatched_dps])
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, out_dataproducts), "Observation.DataProducts.Output_Correlated."))
parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, 0)]
parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = _output_filesystem(subtask).cluster.name
# Other
parset["Observation.ObservationControl.PythonControl.PreProcessing.SkyModel"] = "Ateam_LBA_CC"
parset["Observation.ObservationControl.PythonControl.DPPP.checkparset"] = -1
parset["Observation.ObservationControl.PythonControl.DPPP.msin.autoweight"] = True
parset["Observation.ObservationControl.PythonControl.DPPP.msin.band"] = -1
parset["Observation.ObservationControl.PythonControl.DPPP.msin.baseline"] = ""
parset["Observation.ObservationControl.PythonControl.DPPP.msin.blrange"] = []
parset["Observation.ObservationControl.PythonControl.DPPP.msin.corrtype"] = ""
parset["Observation.ObservationControl.PythonControl.DPPP.msin.datacolumn"] = "DATA"
parset["Observation.ObservationControl.PythonControl.DPPP.msin.forceautoweight"] = False
parset["Observation.ObservationControl.PythonControl.DPPP.msin.missingdata"] = False
parset["Observation.ObservationControl.PythonControl.DPPP.msin.nchan"] = "nchan"
parset["Observation.ObservationControl.PythonControl.DPPP.msin.orderms"] = False
parset["Observation.ObservationControl.PythonControl.DPPP.msin.sort"] = False
parset["Observation.ObservationControl.PythonControl.DPPP.msin.startchan"] = 0
parset["Observation.ObservationControl.PythonControl.DPPP.msin.useflag"] = True
parset["Observation.ObservationControl.PythonControl.DPPP.msout.overwrite"] = False
parset["Observation.ObservationControl.PythonControl.DPPP.msout.tilenchan"] = 8
parset["Observation.ObservationControl.PythonControl.DPPP.msout.tilesize"] = 4096
parset["Observation.ObservationControl.PythonControl.DPPP.msout.vdsdir"] = "A"
parset["Observation.ObservationControl.PythonControl.DPPP.msout.writefullresflag"] = True
parset["Observation.ObservationControl.PythonControl.DPPP.showprogress"] = False
parset["Observation.ObservationControl.PythonControl.DPPP.showtimings"] = False
parset["Observation.ObservationControl.PythonControl.DPPP.uselogger"] = True
# pragmatic solution to deal with the various parset using subsystems...
# some want the keys as "Observation.<subkey>" and some as "ObsSW.Observation.<subkey>"
# so, just copy all "Observation.<subkey>" keys and prepend them with ObsSW.
for key, value in list(parset.items()):
if key.startswith("Observation."):
parset["ObsSW."+key] = value
return parset
def _convert_to_parset_dict_for_pulsarpipeline_schema(subtask: models.Subtask) -> dict:
# make sure the spec is complete (including all non-filled in properties with default)
spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, resolved_remote_refs(subtask.specifications_template.schema))
# General
parset = _common_parset_dict_for_pipeline_schemas(subtask)
parset["Observation.processSubtype"] = "Pulsar Pipeline"
parset["Observation.ObservationControl.PythonControl.pythonProgram"] = "pulsar_pipeline.py"
parset["Observation.ObservationControl.PythonControl.softwareVersion"] = spec.get('software_version', "lofar-pulp:tmss")
# Pulsar pipeline settings
parset["Observation.ObservationControl.PythonControl.Pulsar.2bf2fits_extra_opts"] = spec["presto"]["2bf2fits_extra_opts"]
parset["Observation.ObservationControl.PythonControl.Pulsar.8bit_conversion_sigma"] = spec["output"]["8bit_conversion_sigma"]
parset["Observation.ObservationControl.PythonControl.Pulsar.decode_nblocks"] = spec["presto"]["decode_nblocks"]
parset["Observation.ObservationControl.PythonControl.Pulsar.decode_sigma"] = spec["presto"]["decode_sigma"]
parset["Observation.ObservationControl.PythonControl.Pulsar.digifil_extra_opts"] = spec["dspsr"]["digifil_extra_opts"]
parset["Observation.ObservationControl.PythonControl.Pulsar.dspsr_extra_opts"] = spec["dspsr"]["dspsr_extra_opts"]
parset["Observation.ObservationControl.PythonControl.Pulsar.dynamic_spectrum_time_average"] = spec["output"]["dynamic_spectrum_time_average"]
parset["Observation.ObservationControl.PythonControl.Pulsar.nofold"] = spec["presto"]["nofold"]
parset["Observation.ObservationControl.PythonControl.Pulsar.nopdmp"] = spec["dspsr"]["nopdmp"]
parset["Observation.ObservationControl.PythonControl.Pulsar.norfi"] = spec["dspsr"]["norfi"]
parset["Observation.ObservationControl.PythonControl.Pulsar.prepdata_extra_opts"] = spec["presto"]["prepdata_extra_opts"]
parset["Observation.ObservationControl.PythonControl.Pulsar.prepfold_extra_opts"] = spec["presto"]["prepfold_extra_opts"]
parset["Observation.ObservationControl.PythonControl.Pulsar.prepsubband_extra_opts"] = spec["presto"]["prepsubband_extra_opts"]
parset["Observation.ObservationControl.PythonControl.Pulsar.pulsar"] = spec["pulsar"]
parset["Observation.ObservationControl.PythonControl.Pulsar.raw_to_8bit"] = spec["output"]["raw_to_8bit"]
parset["Observation.ObservationControl.PythonControl.Pulsar.rfifind_extra_opts"] = spec["presto"]["rfifind_extra_opts"]
parset["Observation.ObservationControl.PythonControl.Pulsar.rrats"] = spec["presto"]["rrats"]
parset["Observation.ObservationControl.PythonControl.Pulsar.rrats_dm_range"] = spec["presto"]["rrats_dm_range"]
parset["Observation.ObservationControl.PythonControl.Pulsar.single_pulse"] = spec["single_pulse"]
parset["Observation.ObservationControl.PythonControl.Pulsar.skip_dspsr"] = spec["dspsr"]["skip_dspsr"]
parset["Observation.ObservationControl.PythonControl.Pulsar.skip_dynamic_spectrum"] = spec["output"]["skip_dynamic_spectrum"]
parset["Observation.ObservationControl.PythonControl.Pulsar.skip_prepfold"] = spec["presto"]["skip_prepfold"]
parset["Observation.ObservationControl.PythonControl.Pulsar.tsubint"] = spec["dspsr"]["tsubint"]
# Dataproducts. NOTE: The pulsar pipeline doesn't actually use this information, and reads input/writes output as it pleases.
inputs = subtask.inputs.order_by('id').all()
in_dataproducts = sum([list(subtask_input.dataproducts.order_by('id').all()) for subtask_input in inputs], [])
coherent_in_dataproducts = [dp for dp in in_dataproducts if dp.specifications_doc["coherent"]]
incoherent_in_dataproducts = [dp for dp in in_dataproducts if not dp.specifications_doc["coherent"]]
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, coherent_in_dataproducts), "Observation.DataProducts.Input_CoherentStokes."))
parset["Observation.DataProducts.Input_CoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (input.producer.subtask.id, 0) for input in inputs] # needed by ResourceEstimator
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, incoherent_in_dataproducts), "Observation.DataProducts.Input_IncoherentStokes."))
parset["Observation.DataProducts.Input_IncoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (input.producer.subtask.id, 0) for input in inputs] # needed by ResourceEstimator
subtask_outputs = list(subtask.outputs.order_by('id').all())
out_dataproducts = sorted(sum([list(models.Dataproduct.objects.filter(producer_id=subtask_output.id).order_by('id')) for subtask_output in subtask_outputs], []), key=lambda dp: dp.filename)
parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, out_dataproducts), "Observation.DataProducts.Output_Pulsar."))
parset["Observation.DataProducts.Output_Pulsar.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, 0)] # todo: find correct SAP id (although this is probably fine since the pulsar pipeline does not use this?)
parset["Observation.DataProducts.Output_Pulsar.storageClusterName"] = _output_filesystem(subtask).cluster.name
# pragmatic solution to deal with the various parset using subsystems...
# some want the keys as "Observation.<subkey>" and some as "ObsSW.Observation.<subkey>"
# so, just copy all "Observation.<subkey>" keys and prepend them with ObsSW.
for key, value in list(parset.items()):
if key.startswith("Observation."):
parset["ObsSW."+key] = value
return parset
# dict to store conversion methods based on subtask.specifications_template.name
_convertors = {'observation control': _convert_to_parset_dict_for_observationcontrol_schema,
'preprocessing pipeline': _convert_to_parset_dict_for_preprocessing_pipeline_schema,
'pulsar pipeline': _convert_to_parset_dict_for_pulsarpipeline_schema}
def convert_to_parset(subtask: models.Subtask) -> parameterset:
'''
Convert the specifications in the subtask to a LOFAR parset for MAC/COBALT
:raises ConversionException if no proper conversion is available.
'''
return parameterset(convert_to_parset_dict(subtask))
def convert_to_parset_dict(subtask: models.Subtask) -> dict:
'''
Convert the specifications in the subtask to a LOFAR parset dict with typed values for MAC/COBALT
:raises ConversionException if no proper conversion is available.
'''
try:
convertor = _convertors[subtask.specifications_template.name]
except KeyError:
raise ConversionException("Cannot convert subtask id=%d to parset. No conversion routine available for specifications_template='%s'" % (
subtask.id, subtask.specifications_template.name))
return convertor(subtask)