#!/usr/bin/python3 # Copyright (C) 2020 ASTRON (Netherlands Institute for Radio Astronomy) # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it and/or # modify it under the terms of the GNU General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. from lofar.sas.tmss.tmss.tmssapp import models from lofar.parameterset import parameterset from lofar.common.datetimeutils import formatDatetime from lofar.common.json_utils import add_defaults_to_json_object_for_schema from lofar.sas.tmss.tmss.exceptions import * def _convert_to_parset_for_observationcontrol_schema(subtask: models.Subtask) -> parameterset: # make sure the spec is complete (including all non-filled in properties with default) spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, subtask.specifications_template.schema) parset = dict() # parameterset has no proper assignment operators, so take detour via dict... # mandatory "fixed" keys for MACScheduler and ObservationControl parset["prefix"] = "LOFAR." parset["Observation.claimPeriod"] = 35 parset["Observation.preparePeriod"] = 20 parset["Observation.ObsID"] = subtask.pk parset["Observation.momID"] = -1 # Needed by MACScheduler parset["Observation.otdbID"] = -1 # Needed by MACScheduler; should/can this be the same as subtask.pk? parset["Observation.processType"] = subtask.specifications_template.type.value.capitalize() parset["Observation.processSubtype"] = "Beam Observation" # TODO: where to derive the processSubtype from? parset["Observation.startTime"] = formatDatetime(subtask.start_time) parset["Observation.stopTime"] = formatDatetime(subtask.stop_time) parset["Observation.VirtualInstrument.minimalNrStations"] = 1 # maybe not mandatory? parset["Observation.VirtualInstrument.stationSet"] = "Custom" # maybe not mandatory? parset["Observation.VirtualInstrument.stationList"] = "[%s]" % ','.join(s for s in spec["stations"]["station_list"]) parset["Observation.antennaArray"] = "HBA" if "HBA" in spec["stations"]["antenna_set"] else "LBA" # maybe not mandatory? parset["Observation.antennaSet"] = spec["stations"]["antenna_set"] parset["Observation.bandFilter"] = spec["stations"]["filter"] parset["Observation.sampleClock"] = 200 # why is this not part of the schema? for example as a required setting with a single allowed value. parset["Observation.nrBitsPerSample"] = 8 # why is this not part of the schema? for example as a required setting with a single allowed value. parset["Observation.strategy"] = "default" # maybe not mandatory? digi_beams = spec['stations']['digital_pointings'] parset["Observation.nrBeams"] = len(digi_beams) for beam_nr, digi_beam in enumerate(digi_beams): beam_prefix = "Observation.Beam[%d]." % beam_nr parset[beam_prefix+"directionType"] = digi_beam['pointing']['direction_type'] parset[beam_prefix+"angle1"] = digi_beam['pointing']['angle1'] parset[beam_prefix+"angle2"] = digi_beam['pointing']['angle2'] parset[beam_prefix+"target"] = digi_beam['name'] parset[beam_prefix+"subbandList"] = digi_beam['subbands'] phase_centers = spec['COBALT']['correlator']['phase_centers'] if phase_centers: # for now, cobalt can handle only one phase_center # assume the first is the one phase_center = phase_centers[0] parset[beam_prefix+"Correlator.phaseCenterOverride"] = phase_center['index'] == beam_nr parset[beam_prefix+"Correlator.directionType"] = phase_center['pointing']['direction_type'] parset[beam_prefix+"Correlator.angle1"] = phase_center['pointing']['angle1'] parset[beam_prefix+"Correlator.angle2"] = phase_center['pointing']['angle2'] analog_beam = spec['stations']['analog_pointing'] parset["Observation.nrAnaBeams"] = 1 beam_prefix = "Observation.AnaBeam[0]." parset[beam_prefix+"directionType"] = analog_beam['direction_type'] parset[beam_prefix+"angle1"] = analog_beam['angle1'] parset[beam_prefix+"angle2"] = analog_beam['angle2'] cobalt_prefix = "Observation.ObservationControl.OnlineControl." parset[cobalt_prefix+"Cobalt.realTime"] = True parset[cobalt_prefix+"Cobalt.blockSize"] = spec['COBALT']['blocksize'] parset[cobalt_prefix+"Cobalt.correctBandPass"] = spec['COBALT']['bandpass_correction'] parset[cobalt_prefix+"Cobalt.delayCompensation"] = spec['COBALT']['delay_compensation'] parset[cobalt_prefix+"Cobalt.Correlator.nrChannelsPerSubband"] = spec['COBALT']['correlator']['channels_per_subband'] parset[cobalt_prefix+"Cobalt.Correlator.nrBlocksPerIntegration"] = spec['COBALT']['correlator']['blocks_per_integration'] parset[cobalt_prefix+"Cobalt.Correlator.nrIntegrationsPerBlock"] = spec['COBALT']['correlator']['integrations_per_block'] parset["Observation.DataProducts.Output_Correlated.enabled"] = True parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster or "CEP4" parset["Observation.DataProducts.Output_Correlated.filenames"] = [] parset["Observation.DataProducts.Output_Correlated.locations"] = [] # TODO: do not use SubtaskOutput.objects.filter but make subtask.subtask_outputs work subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id)) for subtask_output in subtask_outputs: dataproducts = list(models.Dataproduct.objects.filter(producer_id=subtask_output.id)) for dataproduct in dataproducts: parset["Observation.DataProducts.Output_Correlated.filenames"].append(dataproduct.filename) parset["Observation.DataProducts.Output_Correlated.locations"].append(dataproduct.directory) # convert dict to real parameterset, and return it parset = parameterset(parset) return parset # dict to store conversion methods based on subtask.specifications_template.name _convertors = {'observationcontrol schema': _convert_to_parset_for_observationcontrol_schema } def convert_to_parset(subtask: models.Subtask) -> parameterset: ''' Convert the specifications in the subtask to a LOFAR parset for MAC/COBALT :raises ConversionException if no proper conversion is available. ''' try: convertor = _convertors[subtask.specifications_template.name] return convertor(subtask) except KeyError: raise ConversionException("Cannot convert subtask id=%d to parset. No conversion routine available for specifications_template='%s'" % ( subtask.id, subtask.specifications_template.name))