-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
RATaskSpecified.py 8.85 KiB
#!/usr/bin/env python
#coding: iso-8859-15
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id$
"""
Daemon that listens to OTDB status changes to PRESCHEDULED and SCHEDULED, requests
the parset of such jobs (+ their predecessors), and posts them on the bus.
"""
from lofar.messaging import FromBus, ToBus, RPC, EventMessage
from lofar.parameterset import PyParameterValue
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
from lofar.common.util import waitForInterrupt
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT
import logging
logger = logging.getLogger(__name__)
""" Prefix that is common to all parset keys, depending on the exact source. """
PARSET_PREFIX="ObsSW."
def predecessors( parset ):
""" Extract the list of predecessor obs IDs from the given parset. """
key = PARSET_PREFIX + "Observation.Scheduler.predecessors"
strlist = PyParameterValue(str(parset[key]), True).getStringVector()
# Key contains "Lxxxxx" values, we want to have "xxxxx" only
result = [int(filter(str.isdigit,x)) for x in strlist]
return result
def resourceIndicatorsFromParset( parset ):
""" Extract the parset keys that are required for resource assignment. """
subset = {}
def get(key, default=None):
""" Return the value of parset key `key', or `default' if the key
is not defined. """
return parset.get(PARSET_PREFIX + key, default)
def add(key, conversion=lambda x: x):
""" Add the given key to our subset selection, using an optional
conversion. """
value = get(key)
if value is not None:
subset[key] = conversion(value)
""" Some conversion functions for common parameter-value types."""
def strvector(value):
return PyParameterValue(value, True).getStringVector()
def intvector(value):
return PyParameterValue(value, True).getIntVector()
def bool(value):
return PyParameterValue(value, True).getBool()
# =====================================
# Parset meta info
# =====================================
subset["Version.number"] = parset.get("Version.number")
# =====================================
# Observation settings
# =====================================
add("Observation.sampleClock")
add("Observation.nrBitsPerSample")
add("Observation.antennaSet")
add("Observation.VirtualInstrument.stationList", strvector)
add("Observation.startTime")
add("Observation.stopTime")
add("Observation.nrBeams")
nrSAPs = int(get("Observation.nrBeams", 0))
for sap in xrange(0, nrSAPs):
add("Observation.Beam[%d].subbandList" % (sap,), intvector)
# =====================================
# Correlator settings
# =====================================
add("Observation.DataProducts.Output_Correlated.enabled", bool)
add("Cobalt.Correlator.integrationTime")
add("Cobalt.Correlator.nrChannelsPerSubband")
# TODO: We need a service that computes these 3 values
add("Cobalt.Correlator.nrBlocksPerIntegration")
add("Cobalt.Correlator.nrIntegrationsPerBlock")
add("Cobalt.blockSize")
# =====================================
# Beamformer settings
# =====================================
add("Observation.DataProducts.Output_IncoherentStokes.enabled", bool)
add("Observation.DataProducts.Output_CoherentStokes.enabled", bool)
add("Cobalt.BeamFormer.flysEye", bool)
#add("Cobalt.BeamFormer.CoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize
add("Cobalt.BeamFormer.CoherentStokes.subbandsPerFile")
add("Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor")
add("Cobalt.BeamFormer.CoherentStokes.which")
#add("Cobalt.BeamFormer.IncoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize
add("Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile")
add("Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor")
add("Cobalt.BeamFormer.IncoherentStokes.which")
for sap in xrange(0, nrSAPs):
add("Observation.Beam[%d].nrTabRings" % (sap,))
nrTABs = int(get("Observation.Beam[%d].nrTiedArrayBeams" % (sap,), 0))
for tab in xrange(0, nrTABs):
add("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap,tab), bool)
# =====================================
# Pipeline settings
# =====================================
# Calibrator / Averaging pipelines
add("Observation.DataProducts.Output_Correlated.enabled", bool)
add("Observation.DataProducts.Output_InstrumentModel.enabled", bool)
add("Observation.DataProducts.Input_Correlated.enabled", bool)
add("Observation.DataProducts.Input_Correlated.skip", intvector)
add("Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep")
add("Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep")
# Imaging pipeline
add("Observation.DataProducts.Output_SkyImage.enabled", bool)
add("Observation.ObservationControl.PythonControl.Imaging.slices_per_image")
add("Observation.ObservationControl.PythonControl.Imaging.subbands_per_image")
# Long-baseline pipeline
add("Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms")
add("Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup")
# Pulsar pipeline
add("Observation.DataProducts.Output_Pulsar.enabled", bool)
add("Observation.DataProducts.Input_CoherentStokes.enabled", bool)
add("Observation.DataProducts.Input_CoherentStokes.skip", intvector)
add("Observation.DataProducts.Input_IncoherentStokes.enabled", bool)
add("Observation.DataProducts.Input_IncoherentStokes.skip", intvector)
return subset
class RATaskSpecified(OTDBBusListener):
def __init__(self,
otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
otdb_notification_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
notification_busname=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME,
notification_subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT,
**kwargs):
super(RATaskSpecified, self).__init__(busname=otdb_busname, subject=otdb_notification_subject, **kwargs)
self.parset_rpc = RPC(service="TaskSpecification", busname=otdb_busname)
self.send_bus = ToBus("%s/%s" % (notification_busname, notification_subject))
def start_listening(self, **kwargs):
self.parset_rpc.open()
self.send_bus.open()
super(RATaskSpecified, self).start_listening(**kwargs)
def stop_listening(self, **kwargs):
super(RATaskSpecified, self).stop_listening(**kwargs)
self.send_bus.close()
self.parset_rpc.close()
def onObservationPrescheduled(self, treeId, modificationTime):
logger.info("Processing obs ID %s", treeId)
# Request the parset
main_obsID = treeId
main_parset,_ = self.parset_rpc( OtdbID=main_obsID )
# Construct a dict of all the parsets we retrieved
parsets = {}
parsets[main_obsID] = main_parset
logger.info("Processing predecessors")
# Collect the initial set of predecessors
request_obsIDs = set(predecessors(main_parset))
logger.info("Processing %s", request_obsIDs)
# Iterate recursively over all known predecessor obsIDs, and request their parsets
while request_obsIDs:
obsID = request_obsIDs.pop()
if obsID in parsets:
# Predecessor lists can overlap -- we already have this one
continue
logger.info("Fetching predecessor %s", obsID)
# Request predecessor parset
parsets[obsID],_ = self.parset_rpc( OtdbID=obsID )
# Add the list of predecessors
request_obsIDs = request_obsIDs.union(predecessors(parsets[obsID]))
# Convert parsets to resource indicators
logger.info("Extracting resource indicators")
resourceIndicators = dict([(str(obsID), resourceIndicatorsFromParset(parset)) for (obsID,parset) in parsets.iteritems()])
# Construct and send result message
logger.info("Sending result")
result = {
"sasID": main_obsID,
"state": "prescheduled",
"time_of_change": modificationTime,
"resource_indicators": resourceIndicators,
}
# Put result on bus
msg = EventMessage(content=result)
self.send_bus.send(msg)
logger.info("Result sent")