Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
parset.py 51.37 KiB
#!/usr/bin/python3

# Copyright (C) 2020  ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.

from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp.models.specification import Dataformat, Datatype
from lofar.sas.tmss.tmss.exceptions import ConversionException
from lofar.parameterset import parameterset
from lofar.common.datetimeutils import formatDatetime
from lofar.common.json_utils import add_defaults_to_json_object_for_schema, resolved_remote_refs
from lofar.stationmodel.antennafields import antenna_fields
from lofar.sas.tmss.tmss.exceptions import *
from datetime import datetime
from math import ceil
from lofar.common import isProductionEnvironment

import logging
logger = logging.getLogger(__name__)

# todo: check why this cannot be imported from subtasks.py, and deduplicate.
def _output_filesystem(subtask: models.Subtask) -> models.Filesystem:
    """ Return the filesystem that output dataproducts should be written to."""
    # TODO: how to handle multiple filesystems on one cluster?
    # For CEP4 we return the production or test filessytem, dependent on the environment. Ugly! Should be configured better.
    if subtask.cluster.name == "CEP4":
        return subtask.cluster.filesystem_set.filter(name__icontains=('production' if isProductionEnvironment() else 'test')).first()

    # For other clusters... just return the first. Should be configurable as well.
    return subtask.cluster.filesystem_set.first()

def _add_prefix(parset: dict, prefix: str) -> dict:
    """ Add a prefix to all the keys in the given parset """
    return {prefix+k: v for k,v in parset.items()}

def _stokes_settings_parset_subkeys(stokes_spec: dict) -> dict:
    """ Convert stokes specifications to parset keys. """

    parset = {}
    parset['which'] = stokes_spec['stokes']
    parset['nrChannelsPerSubband'] = stokes_spec['channels_per_subband']
    parset['timeIntegrationFactor'] = stokes_spec['time_integration_factor']
    parset['subbandsPerFile'] = stokes_spec['subbands_per_file']

    quantisation = parset['quantize'] = stokes_spec['quantisation'].get('enabled', False)
    if quantisation:
        parset['quantizeBits'] = stokes_spec['quantisation']['bits']
        parset['quantizeScaleMax'] = stokes_spec['quantisation']['scale_max']
        parset['quantizeScaleMin'] = stokes_spec['quantisation']['scale_min']
        parset['quantizeIpositive'] = (stokes_spec['stokes'] == "I")

    return parset

def _dataproduct_parset_subkeys(subtask: models.Subtask, dataproducts: list) -> dict:
    """ Return a subset of parset keys and values to list dataproducts.
    
        Dataproducts that are None are included in the list to be written to null:,
        and marked as to be skipped as well. """

    parset = {}
    parset["enabled"] = len(dataproducts) > 0
    parset["filenames"] = [dp.filename if dp else "null:" for dp in dataproducts]
    parset["skip"] = [0 if dp else 1 for dp in dataproducts]
    parset["locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory if dp else "") for dp in dataproducts]  # todo: use storage cluster instead of processing cluster? dp.producer.filesystem.cluster.name?

    return parset

def _add_dataproduct(found_dataproducts: list, final_dataproduct_list: list):
    """ Use to append a dataproduct to `final_dataproduct_list` to put in the parset.
        The `found_dataproducts` is a list of candidates. 

        This function checks if:
          * any dataproducts were supplied (found)
          * multiple dataproducts were supplied (found)
          * the dataproduct is not already in the list

        Will append `None` if issues were detected. """

    def dp_to_str(dp):
        return f'{dp.filename=} {dp.specifications_doc=}'

    if len(found_dataproducts) == 0:
        logger.warning('No dataproducts matched the search. Dropping it.')
        final_dataproduct_list.append(None)
        return

    if len(found_dataproducts) > 1:
        logger.warning('Multiple dataproducts matched the same search. Using the first: %s' % [dp_to_str(dp) for dp in found_dataproducts])

    dataproduct = found_dataproducts[0]

    if dataproduct in final_dataproduct_list:
        logger.warning('The following dataproduct was suggested multiple times. Not using it a second time: %s' % dp_to_str(dataproduct))
        final_dataproduct_list.append(None)
        return

    final_dataproduct_list.append(dataproduct)


def _sap_index(saps: dict, sap_name: str) -> int:
    """ Return the SAP index in the observation given a certain SAP name. """

    sap_indices = [idx for idx,sap in enumerate(saps) if sap['name'] == sap_name]

    # needs to be exactly one hit
    if len(sap_indices) != 1:
        raise ConversionException("SAP name %s must appear exactly once in the specification. It appeared %d times. Available names: %s" % (sap_name, len(sap_indices), [sap['name'] for sap in saps]))

    return sap_indices[0]


def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: dict) -> dict:
    """ Provide the parset keys for the COBALT correlator. """

    correlator_enabled = spec['COBALT']['correlator']['enabled']
    cobalt_version = spec['COBALT']['version']
    digi_beams = spec['stations']['digital_pointings']

    parset = {}

    # ResourceEstimator always wants these keys
    parset["Cobalt.Correlator.nrChannelsPerSubband"] = spec['COBALT']['correlator']['channels_per_subband'] if correlator_enabled else 16
    parset["Cobalt.Correlator.nrBlocksPerIntegration"] = spec['COBALT']['correlator']['blocks_per_integration'] if correlator_enabled else 1
    parset["Cobalt.Correlator.nrIntegrationsPerBlock"] = spec['COBALT']['correlator']['integrations_per_block'] if correlator_enabled else 1

    correlator_dataproducts = []

    if correlator_enabled:
        # Generic settings
        parset["Cobalt.Correlator.dopplerCorrection"] = spec['COBALT']['correlator']['doppler_correction']

        dataproducts = list(subtask.output_dataproducts.filter(dataformat__value=Dataformat.Choices.MEASUREMENTSET.value).filter(datatype__value=Datatype.Choices.VISIBILITIES.value).order_by('filename'))

        # marshall dataproducts, but only if they're supplied. in some use cases, we want a parset before the subtask is scheduled.
        for digi_beam in digi_beams:
            for subband_index, subband in enumerate(digi_beam["subbands"]):
                dataproduct_candidates = [dp for dp in dataproducts
                               if  dp.specifications_doc.get("sap") == digi_beam['name']
                               and dp.specifications_doc.get("subband") == subband
                               # In some cases, a subband can appear twice in the same SAP,
                               # so also match by its index.
                               and dp.specifications_doc.get("identifiers", {}).get("subband_index", subband_index) == subband_index]

                _add_dataproduct(dataproduct_candidates, correlator_dataproducts)

        # sanity check
        unmatched_dps = []
        for dp in dataproducts:
            if dp not in correlator_dataproducts:
                unmatched_dps.append(dp)
        if unmatched_dps:
            logger.warning('The following output dataproducts were not expected as correlator output: %s' % [f'filename={dp.filename} sap={dp.specifications_doc.get("sap")} subband={dp.specifications_doc.get("subband")}' for dp in unmatched_dps])

    parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, correlator_dataproducts), "Observation.DataProducts.Output_Correlated."))
    parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = _output_filesystem(subtask).cluster.name
    parset["Observation.DataProducts.Output_Correlated.storageClusterPartition"] = _output_filesystem(subtask).directory

    # mimic MoM placeholder thingy (the resource estimator parses this)
    parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))]  # we need one per SAP, and that needs to include the string "SAPxxx" with xxx being the SAP number, just to satisfy the ResourceEstimator.

    return parset

def _order_beamformer_dataproducts(dataproducts: list, spec: dict) -> (list, list):
    """ Orders the given dataproducts, and returns a tuple of the observation's (coherent_dataproducts, incoherent_dataproducts), in the order COBALT wants them.
    
    Missing dataproducts are replaced by None. """

    # Lists of coherent and incoherent dataproducts that will be produced, in the order COBALT wants them: ordered by (from slowest changing to fastest):
    #     1. sap
    #     2. pipeline
    #     3. tab
    #     4. pointing (see COB-216)
    #     5. stokes
    #     6. part
    #
    # See also the parsing of this list in RTCP/Cobalt/CoInterface/src/Parset.cc around line 1167.

    digi_beams = spec['stations']['digital_pointings']
    coherent_dataproducts = []
    incoherent_dataproducts = []

    for sap_idx, sap in enumerate(digi_beams):
        for pipeline_idx, pipeline in enumerate(spec['COBALT']['beamformer']['tab_pipelines']):
            # lookup this SAP in the pipeline spec
            try:
                pipeline_sap = pipeline['SAPs'][_sap_index(pipeline['SAPs'], sap['name'])]
            except ConversionException:
                # SAP was not configured in this pipeline
                continue

            for tab_idx, tab in enumerate(pipeline_sap['tabs']):
                stokes_settings = pipeline['coherent'] if tab.get('coherent') else pipeline['incoherent']

                nr_subbands = len(pipeline_sap['subbands']) or len(sap['subbands'])
                nr_parts = ceil(1.0 * nr_subbands / stokes_settings['subbands_per_file'])
                nr_stokes = len(stokes_settings['stokes'])

                for stokes_idx in range(nr_stokes):
                    for part_idx in range(nr_parts):
                        dataproduct_candidates = [dp for dp in dataproducts
                                       if  dp.specifications_doc.get("sap") == sap['name']
                                       and "identifiers" in dp.specifications_doc
                                       and dp.specifications_doc["identifiers"]["pipeline_index"] == pipeline_idx
                                       and dp.specifications_doc["identifiers"]["tab_index"] == tab_idx
                                       and dp.specifications_doc["identifiers"]["stokes_index"] == stokes_idx
                                       and dp.specifications_doc["identifiers"]["part_index"] == part_idx
                                       and dp.specifications_doc.get("coherent") == tab.get('coherent')]

                        _add_dataproduct(dataproduct_candidates, coherent_dataproducts if tab.get('coherent') else incoherent_dataproducts)

        # Process fly's eye pipelines, continue the pipeline numbering
        pipeline_idx_offset = len(spec['COBALT']['beamformer']['tab_pipelines'])
        for pipeline_idx, pipeline in enumerate(spec['COBALT']['beamformer']['flyseye_pipelines'], start=pipeline_idx_offset):
            # Generate coherent TABs for each antenna field.
            # NOTE: stations could be taken out of the observation (not appear in spec['stations']['station_list'])
            # during scheduling, but still appear in pipeline['stations']! In that case, we should not create
            # dataproducts for them as COBALT won't generate them.
            from lofar.sas.tmss.tmss.tmssapp.subtasks import used_pipeline_stations
            stations = used_pipeline_stations(pipeline, spec)
            antennaset = spec['stations']['antenna_set']
            fields = sum([list(antenna_fields(station, antennaset)) for station in stations], [])

            for field_idx, field in enumerate(fields):
                stokes_settings = pipeline['coherent']

                nr_subbands = len(sap['subbands'])
                nr_parts = ceil(1.0 * nr_subbands / stokes_settings['subbands_per_file'])
                nr_stokes = len(stokes_settings['stokes'])

                # marshall dataproducts, but only if they're supplied. in some use cases, we want a parset before the subtask is scheduled.
                for stokes_idx in range(nr_stokes):
                    for part_idx in range(nr_parts):
                        dataproduct_candidates = [dp for dp in dataproducts
                                       if  dp.specifications_doc.get("sap") == sap["name"]
                                       and dp.specifications_doc["identifiers"]["pipeline_index"] == pipeline_idx
                                       and dp.specifications_doc["identifiers"]["tab_index"] == field_idx
                                       and dp.specifications_doc["identifiers"]["stokes_index"] == stokes_idx
                                       and dp.specifications_doc["identifiers"]["part_index"] == part_idx
                                       and dp.specifications_doc["coherent"] == True]

                        _add_dataproduct(dataproduct_candidates, coherent_dataproducts)

    # sanity check
    unmatched_dps = []
    for dp in dataproducts:
        if dp not in coherent_dataproducts and dp not in incoherent_dataproducts:
            unmatched_dps.append(dp)
    if unmatched_dps:
        logger.warning('The following dataproducts could not be sorted for COBALT: %s' % [f'filename={dp.filename} sap={dp.specifications_doc.get("sap")} identifiers={dp.specifications_doc.get("identifiers")}' for dp in unmatched_dps])

    return coherent_dataproducts, incoherent_dataproducts

def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: dict) -> dict:
    """ Provide the parset keys for the COBALT beamformer. """

    cobalt_version = spec['COBALT']['version']
    digi_beams = spec['stations']['digital_pointings']

    parset = {}

    # General settings
    parset["Cobalt.BeamFormer.inputPPF"] = spec['COBALT']['beamformer']['input_ppf']

    # List of beamformer pipelines, staged to be added to the parset later
    beamformer_pipeline_parsets = []

    # Process beamformer pipelines
    for pipeline_idx, pipeline in enumerate(spec['COBALT']['beamformer']['tab_pipelines']):
        pipeline_parset = {}
        if 'coherent' in pipeline:
            pipeline_parset.update(_add_prefix(_stokes_settings_parset_subkeys(pipeline['coherent']), "CoherentStokes."))

        if 'incoherent' in pipeline:
            pipeline_parset.update(_add_prefix(_stokes_settings_parset_subkeys(pipeline['incoherent']), "IncoherentStokes."))

        pipeline_parset['nrBeams'] = len(pipeline['SAPs'])
        for sap in pipeline['SAPs']:
            sap_idx = _sap_index(digi_beams, sap['name'])

            pipeline_parset['Beam[%s].nrTiedArrayBeams' % sap_idx] = len(sap['tabs'])
            for tab_idx, tab in enumerate(sap['tabs']):
                if tab.get('coherent'):
                    if len(tab['pointings']) != 1:
                        raise ConversionException(f"Coherent TABs must have exactly 1 pointing, but pipeline {pipeline_idx} SAP {sap['name']} TAB {tab_idx} has {len(tab['pointings'])} pointings")

                    pipeline_parset['Beam[%s].TiedArrayBeam[%s].coherent'      % (sap_idx, tab_idx)] = True
                    pipeline_parset['Beam[%s].TiedArrayBeam[%s].directionType' % (sap_idx, tab_idx)] = tab['pointings'][0]['pointing']['direction_type']
                    pipeline_parset['Beam[%s].TiedArrayBeam[%s].angle1'        % (sap_idx, tab_idx)] = tab['pointings'][0]['pointing']['angle1']
                    pipeline_parset['Beam[%s].TiedArrayBeam[%s].angle2'        % (sap_idx, tab_idx)] = tab['pointings'][0]['pointing']['angle2']
                    pipeline_parset['Beam[%s].TiedArrayBeam[%s].target'        % (sap_idx, tab_idx)] = tab['pointings'][0]['pointing']['target']
                else:
                    pipeline_parset['Beam[%s].TiedArrayBeam[%s].coherent'      % (sap_idx, tab_idx)] = False

            if cobalt_version >= 2:
                pipeline_parset['Beam[%s].subbandList' % sap_idx] = sap['subbands']

            if cobalt_version == 1:
                # This won't overwrite anything, since COBALT1 supports only one beamformer pipeline
                parset["Cobalt.BeamFormer.stationList"] = pipeline['stations']
            else:
                pipeline_parset['Beam[%s].stationList' % sap_idx] = pipeline['stations']

        beamformer_pipeline_parsets.append(pipeline_parset)

    # Process fly's eye pipelines
    pipeline_idx_offset = len(beamformer_pipeline_parsets)
    for pipeline_idx, pipeline in enumerate(spec['COBALT']['beamformer']['flyseye_pipelines'], start=pipeline_idx_offset):

        pipeline_parset = {}
        pipeline_parset.update(_add_prefix(_stokes_settings_parset_subkeys(pipeline['coherent']), "CoherentStokes."))
        pipeline_parset['flysEye'] = True

        pipeline_parset['nrBeams'] = len(digi_beams)
        if cobalt_version == 1:
            parset["Cobalt.BeamFormer.stationList"] = pipeline['stations']
        for sap_idx, sap in enumerate(digi_beams):
            if cobalt_version >= 2:
                pipeline_parset['Beam[%s].stationList' % sap_idx] = pipeline['stations']

            pipeline_parset['Beam[%s].nrTiedArrayBeams' % sap_idx] = 0

        beamformer_pipeline_parsets.append(pipeline_parset)

    # global parset also needs flys eye set if any pipeline uses it
    parset['Cobalt.BeamFormer.flysEye'] = (len(spec['COBALT']['beamformer']['flyseye_pipelines']) > 0)

    # COBALT1 supports one beamformer pipeline, with prefix "Cobalt.BeamFormer."
    # COBALT2 supports multiple pipelines, with prefix "Cobalt.BeamFormer.Pipeline[xxx]."
    #
    # If we see one pipeline, we write a COBALT1-compatible parset. This also helps the subsequent pulsar pipeline, which actually will read this parset
    if cobalt_version == 1 and beamformer_pipeline_parsets:
        if len(beamformer_pipeline_parsets) > 1:
            raise ConversionException("COBALT1 only supports one beamformer pipeline. %d were specified." % len(beamformer_pipeline_parsets))

        # Beam keys are merged under Observation
        parset.update(_add_prefix({k:v for k,v in beamformer_pipeline_parsets[0].items() if not k.startswith("Beam")}, "Cobalt.BeamFormer."))
        parset.update(_add_prefix({k:v for k,v in beamformer_pipeline_parsets[0].items() if k.startswith("Beam")}, "Observation."))
    else:
        parset['Cobalt.BeamFormer.nrPipelines'] = len(beamformer_pipeline_parsets)
        for pipeline_idx, pipeline_parset in enumerate(beamformer_pipeline_parsets):
            parset.update(_add_prefix(pipeline_parset, "Cobalt.BeamFormer.Pipeline[%s]." % pipeline_idx))

    # Derive and list the dataproduct list, in order
    dataproducts = list(subtask.output_dataproducts.filter(dataformat__value=Dataformat.Choices.BEAMFORMED.value).filter(datatype__value=Datatype.Choices.TIME_SERIES.value))
    coherent_dataproducts, incoherent_dataproducts = _order_beamformer_dataproducts(dataproducts, spec)

    parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, coherent_dataproducts), "Observation.DataProducts.Output_CoherentStokes."))
    parset["Observation.DataProducts.Output_CoherentStokes.storageClusterName"] = _output_filesystem(subtask).cluster.name
    parset["Observation.DataProducts.Output_CoherentStokes.storageClusterPartition"] = _output_filesystem(subtask).directory

    parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, incoherent_dataproducts), "Observation.DataProducts.Output_IncoherentStokes."))
    parset["Observation.DataProducts.Output_IncoherentStokes.storageClusterName"] = _output_filesystem(subtask).cluster.name
    parset["Observation.DataProducts.Output_IncoherentStokes.storageClusterPartition"] = _output_filesystem(subtask).directory

    # mimic MoM placeholder thingy (the resource estimator parses this)
    parset["Observation.DataProducts.Output_CoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))]  # this needs to be unique per SAP only, not dataproduct
    parset["Observation.DataProducts.Output_IncoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))]  # this needs to be unique per SAP only, not dataproduct

    return parset

def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtask) -> dict:
    # make sure the spec is complete (including all non-filled in properties with default)
    spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, resolved_remote_refs(subtask.specifications_template.schema))

    # -----------------------------------------------------------------------------------------------
    # Historic rationale: in TMSS-183 we made MAC run an actual observation from a TMSS specification.
    # With the help of Auke and Jan-David I could generate the parset as defined below.
    # MAC turned out to be very sensitive for having specific keys with very specific prefixes etc.
    # As a result, the generated parset contains many "duplicate"(nested) keys.
    # We all agree that this is ugly, and we should not want this, but hey... it works.
    # We decided to keep it like this, and maybe do more tuning/pruning later in the TMSS project.
    # Or, we can just get rid of this to-parset-adaper when MAC has been rewritten to the new station API.
    # -----------------------------------------------------------------------------------------------

    # ----------------------------
    #   Generic settings
    # ----------------------------

    parset = dict() # parameterset has no proper assignment operators, so take detour via dict...
    parset["Observation.ObsID"] = subtask.pk
    parset["Observation.momID"] = 0 # Needed by MACScheduler
    parset["Observation.otdbID"] = 0 # Needed by MACScheduler; should/can this be the same as subtask.pk?
    parset["Observation.tmssID"] = subtask.pk
    parset["Observation.processType"] = subtask.specifications_template.type.value.capitalize()
    parset["Observation.processSubtype"] = "Beam Observation"
    parset["Observation.Campaign.name"] = subtask.project.name
    parset["Observation.Campaign.title"] = subtask.project.description
    parset["Observation.Campaign.PI"] = "classified" # pulp needs the PI to be non-empty
    parset["Observation.startTime"] = formatDatetime(subtask.scheduled_start_time) if isinstance(subtask.scheduled_start_time, datetime) else subtask.scheduled_start_time
    parset["Observation.stopTime"] = formatDatetime(subtask.scheduled_stop_time) if isinstance(subtask.scheduled_stop_time, datetime) else subtask.scheduled_stop_time
    # parset["Observation.strategy"] = "default"  # Note: this was an annotation for OTDB and the old Scheduler and is now dropped

    # ----------------------------
    #   Debug settings
    # ----------------------------

    parset["Cobalt.writeToDisk"]                   = spec["COBALT"]["debug"]["write_output"]
    parset["Cobalt.Benchmark.enabled"]             = spec["COBALT"]["debug"]["benchmark_file"] != ""
    parset["Cobalt.Benchmark.file"]                = spec["COBALT"]["debug"]["benchmark_file"]
    parset["Cobalt.FinalMetaDataGatherer.enabled"] = spec["COBALT"]["debug"]["annotate_broken_antennas"]

    # ----------------------------
    #   System resources
    # ----------------------------

    parset["Cobalt.Nodes"]             = spec["COBALT"]["resources"]["processing_nodes"]
    parset["Cobalt.OutputProc.Nodes"]  = spec["COBALT"]["resources"]["writer_nodes"]

    # ----------------------------
    #   Station settings
    # ----------------------------

    parset["Observation.VirtualInstrument.minimalNrStations"] = 1  # maybe not mandatory?
    parset["Observation.VirtualInstrument.stationSet"] = "Custom"  # maybe not mandatory?
    parset["Observation.VirtualInstrument.stationList"] = spec["stations"]["station_list"]
    parset["Observation.antennaArray"] = "HBA" if "HBA" in spec["stations"]["antenna_set"] else "LBA" # maybe not mandatory?
    parset["Observation.antennaSet"] = spec["stations"]["antenna_set"]
    parset["Observation.bandFilter"] = spec["stations"]["filter"]
    parset["Observation.sampleClock"] = 200 # fixed value, no other values are supported
    parset["Observation.nrBitsPerSample"] = 8 # fixed value, no other values are supported.

    # Digital beams

    digi_beams = spec['stations']['digital_pointings']
    parset["Observation.nrBeams"] = len(digi_beams)
    for beam_nr, digi_beam in enumerate(digi_beams):
        beam_prefix = "Observation.Beam[%d]." % beam_nr
        parset[beam_prefix+"directionType"] = digi_beam['pointing']['direction_type']
        parset[beam_prefix+"angle1"] = digi_beam['pointing']['angle1']
        parset[beam_prefix+"angle2"] = digi_beam['pointing']['angle2']
        parset[beam_prefix+"target"] = digi_beam['pointing']['target']
        parset[beam_prefix+"subbandList"] = digi_beam['subbands']
        parset[beam_prefix+"nrTiedArrayBeams"] = 0   # fixed
        parset[beam_prefix+"nrTabRings"] = 0         # fixed

    # Analog beam (=HBA tile beam)

    analog_beam = spec['stations']['analog_pointing']
    parset["Observation.nrAnaBeams"] = 1
    beam_prefix = "Observation.AnaBeam[0]."
    parset[beam_prefix+"directionType"] = analog_beam['direction_type']
    parset[beam_prefix+"angle1"] = analog_beam['angle1']
    parset[beam_prefix+"angle2"] = analog_beam['angle2']
    parset[beam_prefix+"target"] = analog_beam['target']

    # ----------------------------
    #   COBALT settings
    # ----------------------------

    cobalt_version = spec['COBALT']['version']

    parset["Cobalt.realTime"] = True
    parset["Cobalt.blockSize"] = spec['COBALT']['blocksize']
    parset["Cobalt.correctBandPass"] = spec['COBALT']['bandpass_correction']
    parset["Cobalt.delayCompensation"] = spec['COBALT']['delay_compensation']
    parset["Cobalt.correctClocks"] = True  # fixed, but deprecated

    parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster.name

    # Correlator settings
    parset.update(_convert_correlator_settings_to_parset_dict(subtask, spec))

    # Beamformer settings
    parset.update(_convert_beamformer_settings_to_parset_dict(subtask, spec))

    # ResourceEstimator wants all Cobalt keys to start with Observation.ObservationControl.OnlineControl.
    parset.update(_add_prefix({k:v for k,v in parset.items() if k.startswith("Cobalt.")}, "Observation.ObservationControl.OnlineControl."))

    # ----------------------------
    #   MAC settings
    # ----------------------------

    # Retrieve the scheduling_unit_blueprint to get piggyback values to set
    sub = subtask.task_blueprint.scheduling_unit_blueprint

    parset["prefix"] = "LOFAR."
    parset["Observation.claimPeriod"] = 35
    parset["Observation.preparePeriod"] = 20
    for prefix in ["", "Observation."]:
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.CorrProc._executable"] = "CN_Processing"
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.CorrProc._hostname"] = "cbmmaster"
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.CorrProc._nodes"] = []
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.CorrProc._startstopType"] = "bgl"  # note: this is three correlators ago, and hopefully we get rid of it in LOFAR 2.0 ;)
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.CorrProc.workingdir"] = f"/opt/lofar-versions/{spec['COBALT'].get('release', 'current')}/bin"
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl._hostname"] = "cbmmaster"
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.extraInfo"] = '["PIC","Cobalt"]'
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.procesOrder"] = []    # note: typo is correct ;)
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.processes"] = '["CorrProc"]'
        parset[prefix+"ObservationControl.OnlineControl._hostname"] = 'CCU001'
        parset[prefix+"ObservationControl.OnlineControl.applOrder"] = '["CorrAppl"]'
        parset[prefix+"ObservationControl.OnlineControl.applications"] = '["CorrAppl"]'
        parset[prefix+"ObservationControl.StationControl._hostname"] = parset["Observation.VirtualInstrument.stationList"]
        parset[prefix+"ObservationControl.StationControl.aartfaacPiggybackAllowed"] = sub.piggyback_allowed_aartfaac
        parset[prefix + "ObservationControl.StationControl.tbbPiggybackAllowed"] = sub.piggyback_allowed_tbb

        # inspection plot program to start (only CEP4 is supported)
        inspection_programs = { "msplots": "/data/bin/inspection-plots-observation.sh",
                                "dynspec": "/data/home/lofarsys/dynspec/scripts/inspection-dynspec-observation.sh",
                                "none": "/bin/true" # no-op. MAC needs something to run.
                              }

        parset[prefix+"ObservationControl.OnlineControl.inspectionHost"] = "head.cep4.control.lofar"
        parset[prefix+"ObservationControl.OnlineControl.inspectionProgram"] = inspection_programs[spec["QA"]["inspection_plots"]]

    return parset

def _common_parset_dict_for_pipeline_schemas(subtask: models.Subtask) -> dict:
    """ Return a parset dict with settings common to all pipelines. """

    parset = dict()

    # make sure the spec is complete (including all non-filled in properties with default)
    spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, resolved_remote_refs(subtask.specifications_template.schema))

    # General
    parset["prefix"] = "LOFAR."
    parset["Observation.ObsID"] = subtask.pk
    parset["Observation.momID"] = 0 # Needed by MACScheduler
    parset["Observation.otdbID"] = subtask.pk # HACK: the pipeline uses otdbID as the sasID. our tmssID>2000000 to prevent clashes. TODO: replace all otdbID's by sasID.
    parset["Observation.tmssID"] = subtask.pk
    parset["Observation.startTime"] = formatDatetime(subtask.scheduled_start_time) if isinstance(subtask.scheduled_start_time, datetime) else subtask.scheduled_start_time
    parset["Observation.stopTime"] = formatDatetime(subtask.scheduled_stop_time) if isinstance(subtask.scheduled_stop_time, datetime) else subtask.scheduled_stop_time

    parset["Observation.processType"] = "Pipeline"

    parset["Observation.Campaign.name"] = subtask.project.name
    parset["Observation.Scheduler.taskName"] = subtask.task_blueprint.short_description   # Scheduler keys are artefacts of an older time. Their content is deprecated, so we don't care whch task we take this from
    parset["Observation.Scheduler.predecessors"] = [pred.id for pred in subtask.predecessors]

    cluster_resources = spec['cluster_resources']

    parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster.name # is equal to cluster_resources['where']['cluster']
    parset["Observation.Cluster.ProcessingCluster.clusterPartition"] = cluster_resources['where']['partition']
    parset["Observation.Cluster.ProcessingCluster.numberOfTasks"] = cluster_resources['parallel_tasks']
    parset["Observation.Cluster.ProcessingCluster.numberOfCoresPerTask"] = cluster_resources['cores_per_task']

    return parset


def _convert_to_parset_dict_for_preprocessing_pipeline_schema(subtask: models.Subtask) -> dict:
    # see https://support.astron.nl/confluence/pages/viewpage.action?spaceKey=TMSS&title=UC1+JSON

    # make sure the spec is complete (including all non-filled in properties with default)
    spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, resolved_remote_refs(subtask.specifications_template.schema))

    # -----------------------------------------------------------------------------------------------
    # Historic rationale: in TMSS-183 we made MAC run an actual observation from a TMSS specification.
    # With the help of Auke and Jan-David I could generate the parset as defined below.
    # MAC turned out to be very sensitive for having specific keys with very specific prefixes etc.
    # As a result, the generated parset contains many "duplicate"(nested) keys.
    # We all agree that this is ugly, and we should not want this, but hey... it works.
    # We decided to keep it like this, and maybe do more tuning/pruning later in the TMSS project.
    # Or, we can just get rid of this to-parset-adaper when MAC has been rewritten to the new station API.
    # -----------------------------------------------------------------------------------------------

    # General
    parset = _common_parset_dict_for_pipeline_schemas(subtask)
    parset["Observation.processSubtype"] = "Averaging Pipeline"
    parset["Observation.ObservationControl.PythonControl.pythonProgram"] = "preprocessing_pipeline.py"
    parset["Observation.ObservationControl.PythonControl.softwareVersion"] = spec.get('software_version', "")

    # DPPP steps
    dppp_steps = []
    if spec["preflagger0"]["enabled"]:
        dppp_steps.append('preflagger[0]')
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].chan"] = spec["preflagger0"]["channels"].split(",")
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].abstime"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].azimuth"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].baseline"] = ""
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].blrange"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].corrtype"] = ""
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].count.path"] = "-"
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].count.save"] = False
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].elevation"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].expr"] = ""
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].freqrange"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].lst"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].reltime"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].timeofday"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].timeslot"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[0].type"] = "preflagger"

    if spec["preflagger1"]["enabled"]:
        dppp_steps.append('preflagger[1]')
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].corrtype"] = spec["preflagger1"]["corrtype"]
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].abstime"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].azimuth"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].baseline"] = ""
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].blrange"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].chan"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].count.path"] = "-"
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].count.save"] = False
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].elevation"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].expr"] = ""
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].freqrange"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].lst"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].reltime"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].timeofday"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].timeslot"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.preflagger[1].type"] = "preflagger"

    if spec["aoflagger"]["enabled"]:
        dppp_steps.append('aoflagger')
        parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.strategy"] = spec["aoflagger"]["strategy"]
        parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.autocorr"] = False
        parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.count.path"] = "-"
        parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.count.save"] = False
        parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.keepstatistics"] = True
        parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.memorymax"] = 10
        parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.memoryperc"] = 0
        parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.overlapmax"] = 0
        parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.overlapperc"] = 0
        parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.pedantic"] = False
        parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.pulsar"] = False
        parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.timewindow"] = 0
        parset["Observation.ObservationControl.PythonControl.DPPP.aoflagger.type"] = "aoflagger"

    if spec["demixer"]["enabled"]:
        dppp_steps.append('demixer')
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.baseline"] = spec["demixer"]["baselines"]
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep"] = spec["demixer"]["demix_frequency_steps"]
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep"] = spec["demixer"]["demix_time_steps"]
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.freqstep"] = spec["demixer"]["frequency_steps"]
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.timestep"] = spec["demixer"]["time_steps"]
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.ignoretarget"] = spec["demixer"]["ignore_target"]
        parset["Observation.ObservationControl.PythonControl.PreProcessing.demix_always"] = spec["demixer"]["demix_always"]
        parset["Observation.ObservationControl.PythonControl.PreProcessing.demix_if_needed"] = spec["demixer"]["demix_if_needed"]
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.blrange"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.corrtype"] = "cross"
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.elevationcutoff"] = "0.0deg"
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.instrumentmodel"] = "instrument"
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.modelsources"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.ntimechunk"] = 0
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.othersources"] = []
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.skymodel"] = "sky"
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.subtractsources"] = ""
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.targetsource"] = ""
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.type"] = "demixer"
    else:
        # ResourceEstimator wants these keys always
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.freqstep"] = 1
        parset["Observation.ObservationControl.PythonControl.DPPP.demixer.timestep"] = 1

    parset["Observation.ObservationControl.PythonControl.DPPP.steps"] = dppp_steps
    parset["Observation.ObservationControl.PythonControl.DPPP.msout.storagemanager.name"] = spec["storagemanager"]

    # Dataproducts
    subtask_inputs = list(subtask.inputs.order_by('id').all())
    in_dataproducts = sum([list(subtask_input.dataproducts.order_by('id').all()) for subtask_input in subtask_inputs],[])

    parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, in_dataproducts), "Observation.DataProducts.Input_Correlated."))

    # mimic MoM placeholder thingy (the resource assigner parses this)
    # should be expanded with SAPS and datatypes
    parset["Observation.DataProducts.Input_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask_input.producer.subtask.id, input_nr) for input_nr, subtask_input in enumerate(subtask_inputs)]

    subtask_outputs = list(subtask.outputs.all())
    unsorted_out_dataproducts = sum([list(subtask_output.dataproducts.all()) for subtask_output in subtask_outputs],[])

    def find_dataproduct(dataproducts: list, specification_doc: dict):
        hits = [dp for dp in dataproducts if dp.specifications_doc['sap'] == specification_doc['sap']
                                         and dp.specifications_doc['subband'] == specification_doc['subband']]
        return hits[0] if hits else None

    # list output dataproducts in the same order as input dataproducts, matched by the identifiers
    out_dataproducts = [find_dataproduct(unsorted_out_dataproducts, in_dp.specifications_doc) for in_dp in in_dataproducts]

    # sanity check
    unmatched_dps = []
    for dp in unsorted_out_dataproducts:
        if dp not in out_dataproducts:
            unmatched_dps.append(dp)
    if unmatched_dps:
        logger.warning('The following output dataproducts were not expected as preprocessing pipeline output: %s' % [f'filename={dp.filename} sap={dp.specifications_doc.get("sap")} subband={dp.specifications_doc.get("subband")}' for dp in unmatched_dps])

    parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, out_dataproducts), "Observation.DataProducts.Output_Correlated."))
    parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, 0)]
    parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = _output_filesystem(subtask).cluster.name

    # Other
    parset["Observation.ObservationControl.PythonControl.PreProcessing.SkyModel"] = "Ateam_LBA_CC"
    parset["Observation.ObservationControl.PythonControl.DPPP.checkparset"] = -1

    parset["Observation.ObservationControl.PythonControl.DPPP.msin.autoweight"] = True
    parset["Observation.ObservationControl.PythonControl.DPPP.msin.band"] = -1
    parset["Observation.ObservationControl.PythonControl.DPPP.msin.baseline"] = ""
    parset["Observation.ObservationControl.PythonControl.DPPP.msin.blrange"] = []
    parset["Observation.ObservationControl.PythonControl.DPPP.msin.corrtype"] = ""
    parset["Observation.ObservationControl.PythonControl.DPPP.msin.datacolumn"] = "DATA"
    parset["Observation.ObservationControl.PythonControl.DPPP.msin.forceautoweight"] = False
    parset["Observation.ObservationControl.PythonControl.DPPP.msin.missingdata"] = False
    parset["Observation.ObservationControl.PythonControl.DPPP.msin.nchan"] = "nchan"
    parset["Observation.ObservationControl.PythonControl.DPPP.msin.orderms"] = False
    parset["Observation.ObservationControl.PythonControl.DPPP.msin.sort"] = False
    parset["Observation.ObservationControl.PythonControl.DPPP.msin.startchan"] = 0
    parset["Observation.ObservationControl.PythonControl.DPPP.msin.useflag"] = True
    parset["Observation.ObservationControl.PythonControl.DPPP.msout.overwrite"] = False
    parset["Observation.ObservationControl.PythonControl.DPPP.msout.tilenchan"] = 8
    parset["Observation.ObservationControl.PythonControl.DPPP.msout.tilesize"] = 4096
    parset["Observation.ObservationControl.PythonControl.DPPP.msout.vdsdir"] = "A"
    parset["Observation.ObservationControl.PythonControl.DPPP.msout.writefullresflag"] = True

    parset["Observation.ObservationControl.PythonControl.DPPP.showprogress"] = False
    parset["Observation.ObservationControl.PythonControl.DPPP.showtimings"] = False
    parset["Observation.ObservationControl.PythonControl.DPPP.uselogger"] = True

    # pragmatic solution to deal with the various parset using subsystems...
    # some want the keys as "Observation.<subkey>" and some as "ObsSW.Observation.<subkey>"
    # so, just copy all "Observation.<subkey>" keys and prepend them with ObsSW.
    for key, value in list(parset.items()):
        if key.startswith("Observation."):
            parset["ObsSW."+key] = value

    return parset

def _convert_to_parset_dict_for_pulsarpipeline_schema(subtask: models.Subtask) -> dict:
    # make sure the spec is complete (including all non-filled in properties with default)
    spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, resolved_remote_refs(subtask.specifications_template.schema))

    # General
    parset = _common_parset_dict_for_pipeline_schemas(subtask)
    parset["Observation.processSubtype"] = "Pulsar Pipeline"
    parset["Observation.ObservationControl.PythonControl.pythonProgram"] = "pulsar_pipeline.py"
    parset["Observation.ObservationControl.PythonControl.softwareVersion"] = spec.get('software_version', "lofar-pulp:tmss")

    # Pulsar pipeline settings
    parset["Observation.ObservationControl.PythonControl.Pulsar.2bf2fits_extra_opts"] = spec["presto"]["2bf2fits_extra_opts"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.8bit_conversion_sigma"] = spec["output"]["8bit_conversion_sigma"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.decode_nblocks"] = spec["presto"]["decode_nblocks"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.decode_sigma"] = spec["presto"]["decode_sigma"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.digifil_extra_opts"] = spec["dspsr"]["digifil_extra_opts"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.dspsr_extra_opts"] = spec["dspsr"]["dspsr_extra_opts"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.dynamic_spectrum_time_average"] = spec["output"]["dynamic_spectrum_time_average"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.nofold"] = spec["presto"]["nofold"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.nopdmp"] = spec["dspsr"]["nopdmp"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.norfi"] = spec["dspsr"]["norfi"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.prepdata_extra_opts"] = spec["presto"]["prepdata_extra_opts"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.prepfold_extra_opts"] = spec["presto"]["prepfold_extra_opts"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.prepsubband_extra_opts"] = spec["presto"]["prepsubband_extra_opts"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.pulsar"] = spec["pulsar"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.raw_to_8bit"] = spec["output"]["raw_to_8bit"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.rfifind_extra_opts"] = spec["presto"]["rfifind_extra_opts"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.rrats"] = spec["presto"]["rrats"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.rrats_dm_range"] = spec["presto"]["rrats_dm_range"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.single_pulse"] = spec["single_pulse"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.skip_dspsr"] = spec["dspsr"]["skip_dspsr"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.skip_dynamic_spectrum"] = spec["output"]["skip_dynamic_spectrum"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.skip_prepfold"] = spec["presto"]["skip_prepfold"]
    parset["Observation.ObservationControl.PythonControl.Pulsar.tsubint"] = spec["dspsr"]["tsubint"]

    # Dataproducts. NOTE: The pulsar pipeline doesn't actually use this information, and reads input/writes output as it pleases.

    inputs = subtask.inputs.order_by('id').all()
    in_dataproducts = sum([list(subtask_input.dataproducts.order_by('id').all()) for subtask_input in inputs], [])
    coherent_in_dataproducts = [dp for dp in in_dataproducts if dp.specifications_doc["coherent"]]
    incoherent_in_dataproducts = [dp for dp in in_dataproducts if not dp.specifications_doc["coherent"]]

    parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, coherent_in_dataproducts), "Observation.DataProducts.Input_CoherentStokes."))
    parset["Observation.DataProducts.Input_CoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (input.producer.subtask.id, 0) for input in inputs] # needed by ResourceEstimator

    parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, incoherent_in_dataproducts), "Observation.DataProducts.Input_IncoherentStokes."))
    parset["Observation.DataProducts.Input_IncoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (input.producer.subtask.id, 0) for input in inputs] # needed by ResourceEstimator

    subtask_outputs = list(subtask.outputs.order_by('id').all())
    out_dataproducts = sorted(sum([list(models.Dataproduct.objects.filter(producer_id=subtask_output.id).order_by('id')) for subtask_output in subtask_outputs], []), key=lambda dp: dp.filename)

    parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, out_dataproducts), "Observation.DataProducts.Output_Pulsar."))
    parset["Observation.DataProducts.Output_Pulsar.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, 0)]   # todo: find correct SAP id (although this is probably fine since the pulsar pipeline does not use this?)
    parset["Observation.DataProducts.Output_Pulsar.storageClusterName"] = _output_filesystem(subtask).cluster.name

    # pragmatic solution to deal with the various parset using subsystems...
    # some want the keys as "Observation.<subkey>" and some as "ObsSW.Observation.<subkey>"
    # so, just copy all "Observation.<subkey>" keys and prepend them with ObsSW.
    for key, value in list(parset.items()):
        if key.startswith("Observation."):
            parset["ObsSW."+key] = value

    return parset

# dict to store conversion methods based on subtask.specifications_template.name
_convertors = {'observation control': _convert_to_parset_dict_for_observationcontrol_schema,
               'preprocessing pipeline': _convert_to_parset_dict_for_preprocessing_pipeline_schema,
               'pulsar pipeline': _convert_to_parset_dict_for_pulsarpipeline_schema}


def convert_to_parset(subtask: models.Subtask) -> parameterset:
    '''
    Convert the specifications in the subtask to a LOFAR parset for MAC/COBALT
    :raises ConversionException if no proper conversion is available.
    '''
    return parameterset(convert_to_parset_dict(subtask))

def convert_to_parset_dict(subtask: models.Subtask) -> dict:
    '''
    Convert the specifications in the subtask to a LOFAR parset dict with typed values for MAC/COBALT
    :raises ConversionException if no proper conversion is available.
    '''
    try:
        convertor = _convertors[subtask.specifications_template.name]
    except KeyError:
        raise ConversionException("Cannot convert subtask id=%d to parset. No conversion routine available for specifications_template='%s'" % (
                                  subtask.id, subtask.specifications_template.name))

    return convertor(subtask)