Skip to content
Snippets Groups Projects
parset.py 8.98 KiB
Newer Older
#!/usr/bin/python3

# Copyright (C) 2020  ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.

from lofar.sas.tmss.tmss.tmssapp import models
from lofar.parameterset import parameterset
from lofar.common.datetimeutils import formatDatetime
from lofar.common.json_utils import add_defaults_to_json_object_for_schema
from lofar.sas.tmss.tmss.exceptions import *
def _convert_to_parset_for_observationcontrol_schema(subtask: models.Subtask) -> parameterset:
    # make sure the spec is complete (including all non-filled in properties with default)
    spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, subtask.specifications_template.schema)

    parset = dict() # parameterset has no proper assignment operators, so take detour via dict...
    parset["Observation.ObsID"] = subtask.pk
Jorrit Schaap's avatar
Jorrit Schaap committed
    parset["Observation.momID"] = 0 # Needed by MACScheduler
    parset["Observation.otdbID"] = 0 # Needed by MACScheduler; should/can this be the same as subtask.pk?
    parset["Observation.processType"] = subtask.specifications_template.type.value.capitalize()
    parset["Observation.processSubtype"] = "Beam Observation" # TODO: where to derive the processSubtype from?
Jorrit Schaap's avatar
Jorrit Schaap committed
    parset["Observation.Campaign.name"] = "TMSS_test" #toDo: replace by project name
    parset["Observation.startTime"] = formatDatetime(subtask.start_time)
    parset["Observation.stopTime"] = formatDatetime(subtask.stop_time)
    parset["Observation.VirtualInstrument.minimalNrStations"] = 1  # maybe not mandatory?
    parset["Observation.VirtualInstrument.stationSet"] = "Custom"  # maybe not mandatory?
    parset["Observation.VirtualInstrument.stationList"] = "[%s]" % ','.join(s for s in spec["stations"]["station_list"])
    parset["Observation.antennaArray"] = "HBA" if "HBA" in spec["stations"]["antenna_set"] else "LBA" # maybe not mandatory?
    parset["Observation.antennaSet"] = spec["stations"]["antenna_set"]
    parset["Observation.bandFilter"] = spec["stations"]["filter"]
    parset["Observation.sampleClock"] = 200 # why is this not part of the schema? for example as a required setting with a single allowed value.
    parset["Observation.nrBitsPerSample"] = 8 # why is this not part of the schema? for example as a required setting with a single allowed value.
    parset["Observation.strategy"] = "default"  # maybe not mandatory?

    digi_beams = spec['stations']['digital_pointings']
    parset["Observation.nrBeams"] = len(digi_beams)
    for beam_nr, digi_beam in enumerate(digi_beams):
        beam_prefix = "Observation.Beam[%d]." % beam_nr
        parset[beam_prefix+"directionType"] = digi_beam['pointing']['direction_type']
        parset[beam_prefix+"angle1"] = digi_beam['pointing']['angle1']
        parset[beam_prefix+"angle2"] = digi_beam['pointing']['angle2']
        parset[beam_prefix+"target"] = digi_beam['name']
        parset[beam_prefix+"subbandList"] = digi_beam['subbands']

        phase_centers = spec['COBALT']['correlator']['phase_centers']
        if phase_centers:
            # for now, cobalt can handle only one phase_center
            # assume the first is the one
            phase_center = phase_centers[0]
            parset[beam_prefix+"Correlator.phaseCenterOverride"] = phase_center['index'] == beam_nr
            parset[beam_prefix+"Correlator.directionType"] = phase_center['pointing']['direction_type']
            parset[beam_prefix+"Correlator.angle1"] = phase_center['pointing']['angle1']
            parset[beam_prefix+"Correlator.angle2"] = phase_center['pointing']['angle2']

    analog_beam = spec['stations']['analog_pointing']
    parset["Observation.nrAnaBeams"] = 1
    beam_prefix = "Observation.AnaBeam[0]."
    parset[beam_prefix+"directionType"] = analog_beam['direction_type']
    parset[beam_prefix+"angle1"] = analog_beam['angle1']
    parset[beam_prefix+"angle2"] = analog_beam['angle2']

    for prefix in ["", "Observation.ObservationControl.OnlineControl."]:
        parset[prefix+"Cobalt.realTime"] = True
        parset[prefix+"Cobalt.blockSize"] = spec['COBALT']['blocksize']
        parset[prefix+"Cobalt.correctBandPass"] = spec['COBALT']['bandpass_correction']
        parset[prefix+"Cobalt.delayCompensation"] = spec['COBALT']['delay_compensation']
        parset[prefix+"Cobalt.Correlator.nrChannelsPerSubband"] = spec['COBALT']['correlator']['channels_per_subband']
        parset[prefix+"Cobalt.Correlator.nrBlocksPerIntegration"] = spec['COBALT']['correlator']['blocks_per_integration']
        parset[prefix+"Cobalt.Correlator.nrIntegrationsPerBlock"] = spec['COBALT']['correlator']['integrations_per_block']
Jorrit Schaap's avatar
Jorrit Schaap committed
    parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster.name
    
    parset["Observation.DataProducts.Output_Correlated.enabled"] = True
Jorrit Schaap's avatar
Jorrit Schaap committed
    parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = subtask.cluster.name
    parset["Observation.DataProducts.Output_Correlated.storageClusterPartition"] = "/data/test-projects"
    parset["Observation.DataProducts.Output_Correlated.filenames"] = []
    parset["Observation.DataProducts.Output_Correlated.locations"] = []
    # TODO: do not use SubtaskOutput.objects.filter but make subtask.subtask_outputs work
    subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id))
    for subtask_output in subtask_outputs:
        dataproducts = list(models.Dataproduct.objects.filter(producer_id=subtask_output.id))
        parset["Observation.DataProducts.Output_Correlated.filenames"] = "[%s]" % ','.join(dp.filename for dp in dataproducts)
        parset["Observation.DataProducts.Output_Correlated.locations"] = "[%s]" % ','.join(dp.directory for dp in dataproducts)

    # various additional 'Control' settings which seem to be needed for MAC
    parset["prefix"] = "LOFAR."
    parset["Observation.claimPeriod"] = 35
    parset["Observation.preparePeriod"] = 20
    for prefix in ["", "Observation."]:
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.CorrProc._executable"] = "CN_Processing"
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.CorrProc._hostname"] = "cbmmaster"
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.CorrProc._nodes"] = []
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.CorrProc._startstopType"] = "bgl"
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.CorrProc.workingdir"] = "/opt/lofar/bin/"
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl._hostname"] = "cbmmaster"
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.extraInfo"] = '["PIC","Cobalt"]'
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.procesOrder"] = []
        parset[prefix+"ObservationControl.OnlineControl.CorrAppl.processes"] = '["CorrProc"]'
        parset[prefix+"ObservationControl.OnlineControl._hostname"] = 'CCU001'
        parset[prefix+"ObservationControl.OnlineControl.applOrder"] = '["CorrAppl"]'
        parset[prefix+"ObservationControl.OnlineControl.applications"] = '["CorrAppl"]'
        parset[prefix+"ObservationControl.OnlineControl.inspectionHost"] = 'head01.cep4.control.lofar'
        parset[prefix+"ObservationControl.OnlineControl.inspectionProgram"] = 'inspection-plots-observation.sh'
        parset[prefix+"ObservationControl.StationControl._hostname"] = parset["Observation.VirtualInstrument.stationList"]
        parset[prefix+"ObservationControl.StationControl.aartfaacPiggybackAllowed"] = False
        parset[prefix+"ObservationControl.StationControl.tbbPiggybackAllowed"] = False

    # convert dict to real parameterset, and return it
    parset = parameterset(parset)
    return parset

# dict to store conversion methods based on subtask.specifications_template.name
_convertors = {'observationcontrol schema': _convert_to_parset_for_observationcontrol_schema }

def convert_to_parset(subtask: models.Subtask) -> parameterset:
    '''
    Convert the specifications in the subtask to a LOFAR parset for MAC/COBALT
    :raises ConversionException if no proper conversion is available.
    '''
    try:
        convertor = _convertors[subtask.specifications_template.name]
        return convertor(subtask)
    except KeyError:
Jorrit Schaap's avatar
Jorrit Schaap committed
        raise ConversionException("Cannot convert subtask id=%d to parset. No conversion routine available for specifications_template='%s'" % (
                                  subtask.id, subtask.specifications_template.name))