Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
RATaskSpecified.py 17.09 KiB
#!/usr/bin/env python
#coding: iso-8859-15
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id$
"""
Daemon that listens to OTDB status changes to PRESCHEDULED and SCHEDULED, requests
the parset of such jobs (+ their predecessors), and posts them on the bus.
"""

from lofar.messaging import FromBus, ToBus, EventMessage # RPC,
from lofar.parameterset import PyParameterValue
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
from lofar.common.util import waitForInterrupt
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT
from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME

import logging
logger = logging.getLogger(__name__)

""" Prefix that is common to all parset keys, depending on the exact source. """
PARSET_PREFIX="ObsSW."

def convertSchedulerProcessSubtype(processSubType):
    '''convert scheduler processSubType as defined in SAS/Scheduler/src/OTDBTree.h to RA (type, subtype) tuple'''
    if processSubType == "Averaging Pipeline":
        return "pipeline", "averaging pipeline"
    elif processSubType == "Calibration Pipeline":
        return "pipeline", "calibration pipeline"
    elif processSubType == "Imaging Pipeline":
        return "pipeline", "imaging pipeline"
    elif processSubType == "Imaging Pipeline MSSS":
        return "pipeline", "imaging pipeline msss"
    elif processSubType == "Long Baseline Pipeline":
        return "pipeline", "long baseline pipeline"
    elif processSubType == "Pulsar Pipeline":
        return "pipeline", "pulsar pipeline"
    elif processSubType == "Beam Observation":
        return "observation", "bfmeasurement"
    elif processSubType == "Interferometer":
        return "observation", "interferometer"
    elif processSubType == "TBB (piggyback)":
        return "observation", "tbbmeasurement"
    elif processSubType == "TBB (standalone)":
        return "observation", "tbbmeasurement"
    ##TODO Maintenance and Reservation
    return "unknown", "unknown"

def resourceIndicatorsFromParset( parsetDict ):
    """ Extract the parset keys that are required for resource assignment. """

    subset = {}

    def get(key, default=None):
        """ Return the value of parset key `key', or `default' if the key
            is not defined. """
        return parsetDict.get(PARSET_PREFIX + key, default)

    def add(key, conversion=lambda x: x):
        """ Add the given key to our subset selection, using an optional
            conversion. """
        value = get(key)
        if value is not None:
            subset[key] = conversion(value)

    """ Some conversion functions for common parameter-value types."""
    def strvector(value):
        return PyParameterValue(str(value), True).getStringVector()

    def intvector(value):
        return PyParameterValue(str(value), True)._expand().getIntVector()

    def bool(value):
        return PyParameterValue(str(value), True).getBool()

    # =====================================
    # Parset meta info
    # =====================================
    subset["Version.number"] = parsetDict.get("Version.number")

    # =====================================
    # Observation settings
    # =====================================
    add("Observation.momID")
    add("Observation.sampleClock")
    add("Observation.nrBitsPerSample")
    add("Observation.antennaSet")
    add("Observation.VirtualInstrument.stationList", strvector)
    add("Observation.startTime")
    add("Observation.stopTime")
    add("Observation.Scheduler.taskDuration")
    add("Observation.nrBeams")

    nrSAPs = int(get("Observation.nrBeams", 0))
    for sap in xrange(0, nrSAPs):
        add("Observation.Beam[%d].subbandList" % (sap,), intvector)

    # =====================================
    # Correlator settings
    # =====================================
    add("Observation.DataProducts.Output_Correlated.enabled", bool)
    add("Observation.DataProducts.Output_Correlated.storageClusterName")
    add("Observation.DataProducts.Output_Correlated.identifications", strvector)
    add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime")
    add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband")
    # TODO: We need a service that computes these 3 values
    add("Cobalt.Correlator.nrBlocksPerIntegration")
    add("Cobalt.Correlator.nrIntegrationsPerBlock")
    add("Cobalt.blockSize")


    # =====================================
    # Beamformer settings
    # =====================================
    add("Observation.DataProducts.Output_IncoherentStokes.enabled", bool)
    add("Observation.DataProducts.Output_IncoherentStokes.storageClusterName")
    add("Observation.DataProducts.Output_IncoherentStokes.identifications", strvector)
    add("Observation.DataProducts.Output_CoherentStokes.enabled", bool)
    add("Observation.DataProducts.Output_CoherentStokes.storageClusterName")
    add("Observation.DataProducts.Output_CoherentStokes.identifications", strvector)
    add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye", bool)
    #add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize
    add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile")
    add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor")
    add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which")
    #add("Observation.ObservationControl.OnlineControl.Cobalt.IncoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize
    add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile")
    add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor")
    add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which")
    for sap in xrange(0, nrSAPs):
        add("Observation.Beam[%d].nrTabRings" % (sap,))

        add("Observation.Beam[%d].nrTiedArrayBeams" % (sap,))
        nrTABs = int(get("Observation.Beam[%d].nrTiedArrayBeams" % (sap,), 0))
        for tab in xrange(0, nrTABs):
            add("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap,tab), bool)

    # =====================================
    # Pipeline settings
    # =====================================
    # Calibrator / Averaging pipelines
    add("Observation.DataProducts.Output_Correlated.enabled", bool)
    add("Observation.DataProducts.Output_Correlated.storageClusterName")
    add("Observation.DataProducts.Output_Correlated.identifications", strvector)
    add("Observation.DataProducts.Output_InstrumentModel.enabled", bool)
    add("Observation.DataProducts.Output_InstrumentModel.storageClusterName")
    add("Observation.DataProducts.Output_InstrumentModel.identifications", strvector)
    add("Observation.DataProducts.Input_Correlated.enabled", bool)
    add("Observation.DataProducts.Input_Correlated.identifications", strvector)
    #We don't care about skip add("Observation.DataProducts.Input_Correlated.skip", intvector)
    add("Observation.DataProducts.Input_InstrumentModel.enabled", bool)
    add("Observation.DataProducts.Input_InstrumentModel.identifications", strvector)
    #We don't care about skip add("Observation.DataProducts.Input_Correlated.skip", intvector)
    add("Observation.ObservationControl.PythonControl.DPPP.demixer.freqstep")
    add("Observation.ObservationControl.PythonControl.DPPP.demixer.timestep")

    # Imaging pipeline
    add("Observation.DataProducts.Output_SkyImage.enabled", bool)
    add("Observation.DataProducts.Output_SkyImage.storageClusterName")
    add("Observation.DataProducts.Output_SkyImage.identifications", strvector)
    add("Observation.ObservationControl.PythonControl.Imaging.slices_per_image")
    add("Observation.ObservationControl.PythonControl.Imaging.subbands_per_image")

    # Long-baseline pipeline
    add("Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms")
    add("Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup")

    # Pulsar pipeline
    add("Observation.DataProducts.Output_Pulsar.enabled", bool)
    add("Observation.DataProducts.Output_Pulsar.storageClusterName")
    add("Observation.DataProducts.Output_Pulsar.identifications", strvector)
    add("Observation.DataProducts.Input_CoherentStokes.enabled", bool)
    add("Observation.DataProducts.Input_CoherentStokes.identifications", strvector)
    #We don't care about skip add("Observation.DataProducts.Input_CoherentStokes.skip", intvector)
    add("Observation.DataProducts.Input_IncoherentStokes.enabled", bool)
    add("Observation.DataProducts.Input_IncoherentStokes.identifications", strvector)
    #We don't care about skip add("Observation.DataProducts.Input_IncoherentStokes.skip", intvector)

    return subset

class RATaskSpecified(OTDBBusListener):
    def __init__(self,
                   otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
                   otdb_notification_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
                   otdb_service_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
                   otdb_service_subject=DEFAULT_OTDB_SERVICENAME,
                   notification_busname=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME,
                   notification_subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT,
                   broker=None, **kwargs):
        super(RATaskSpecified, self).__init__(busname=otdb_notification_busname, subject=otdb_notification_subject, **kwargs)
        self.otdbrpc  = OTDBRPC(busname=otdb_service_busname, servicename=otdb_service_subject, broker=broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
        self.send_bus = ToBus("%s/%s" % (notification_busname, notification_subject))

    def start_listening(self, **kwargs):
        self.otdbrpc.open()
        self.send_bus.open()
        super(RATaskSpecified, self).start_listening(**kwargs)

    def stop_listening(self, **kwargs):
        super(RATaskSpecified, self).stop_listening(**kwargs)
        self.send_bus.close()
        self.otdbrpc.close()

    @staticmethod
    def get_predecessors(parset):
        """ Extract the list of predecessor obs IDs from the given parset. """

        key = PARSET_PREFIX + "Observation.Scheduler.predecessors"
        stringlist = PyParameterValue(str(parset[key]), True).getStringVector()

        # Key contains values starting with 'S' = Scheduler, 'L'/'T' = OTDB, 'M' = MoM
        # 'S' we can probably ignore? Might be only internal in the Scheduler?
        result = []
        for s in stringlist:
            try: # Made the source a string for readability, but it's not efficient
                if s.startswith('M'):
                    result.append({'source': 'mom', 'id': int(s[1:])})
                elif s.startswith('L') or s.startswith('T'):
                    result.append({'source': 'otdb', 'id': int(s[1:])})
                else: # 'S'
                    logger.info("found a predecessor ID I can't handle: %s" % s)
                    result.append({'source': 'other', 'id': int(s[1:])})
            except ValueError:
                logger.warning("found a predecessor ID that I can't parse %s" % s)
        return result

    def get_specification_with_predecessors(self, id, id_source, state, found_parsets):
        logger.info("Processing ID %s from %s" % (id, id_source))
        if id_source == "other":
            return None
        elif id_source == "mom":
            otdb_id = self.otdbrpc.taskGetIDs( mom_id=id )['otdb_id']
        elif id_source == "otdb":
            otdb_id = id
        else:
            logger.warning("Error in understanding id %s", id)
      
        logger.info("Processing OTDB ID %s", otdb_id)
        result = {"otdb_id": otdb_id, "predecessors": []} 
        if state:
            result["state"] = state # TODO should be status not state
        else:
            pass #otdbrpc.taskGetStatus not implemented and maybe not needed?
          
        if otdb_id in found_parsets:
            parset = found_parsets[otdb_id]
        else:
            parset = self.otdbrpc.taskGetSpecification( otdb_id=otdb_id )['specification']
            found_parsets[otdb_id] = parset
          
        logger.info("parset [%s]: %s" % (otdb_id, parset))
        result['specification'] = resourceIndicatorsFromParset(parset)
      
        key = PARSET_PREFIX + "Observation.processSubtype"
        result['task_type'], result['task_subtype'] = convertSchedulerProcessSubtype(parset.get(key, ""))

        logger.info("Processing predecessors")
        predecessor_ids = self.get_predecessors(parset)
        for id in predecessor_ids:
            predecessor_result = self.get_specification_with_predecessors(id['id'], id['source'], "", found_parsets)
            if predecessor_result:
                result["predecessors"].append(predecessor_result)
        return result

    def onObservationApproved(self, main_id, modificationTime):
        self.createAndSendSpecifiedTask(main_id, 'approved')

    def onObservationPrescheduled(self, main_id, modificationTime):
        self.createAndSendSpecifiedTask(main_id, 'prescheduled')

    def createAndSendSpecifiedTask(self, main_id, status):
        # Construct root node of tree
        resultTree = self.get_specification_with_predecessors(main_id, "otdb", status, {})
        logger.info("Sending result: %s" % resultTree)

        # Put result on bus
        msg = EventMessage(content=resultTree)
        self.send_bus.send(msg)
        logger.info("Result sent")

def main():
    # make sure we run in UTC timezone
    import os
    os.environ['TZ'] = 'UTC'

    import logging
    import sys
    from optparse import OptionParser
    from lofar.common.util import waitForInterrupt

    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)

    # Check the invocation arguments
    parser = OptionParser("%prog [options]",
                          description="run the rataskspecified service")
    parser.add_option("-b", "--notification_bus", dest="notification_bus", type="string",
                      default=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME,
                      help="Bus or queue we publish resource requests on. [default: %default]")
    parser.add_option("-s", "--notification_subject", dest="notification_subject", type="string",
                      default=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT,
                      help="The subject of the event messages which this service publishes. [default: %default]")
    parser.add_option("--otdb_notification_bus", dest="otdb_notification_bus", type="string",
                      default=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
                      help="Bus or queue where the OTDB notifications are published. [default: %default]")
    parser.add_option("--otdb_notification_subject", dest="otdb_notification_subject", type="string",
                      default=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
                      help="Subject of OTDB notifications on otdb_notification_bus. [default: %default]")
    parser.add_option("--otdb_request_bus", dest="otdb_request_bus", type="string",
                      default=DEFAULT_OTDB_SERVICE_BUSNAME,
                      help="Bus or queue where the OTDB requests are handled. [default: %default]")
    parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
    (options, args) = parser.parse_args()

    with RATaskSpecified(otdb_notification_busname=options.otdb_notification_bus,
                         otdb_notification_subject=options.otdb_notification_subject,
                         otdb_service_busname=options.otdb_request_bus,
                         otdb_service_subject=DEFAULT_OTDB_SERVICENAME, ##TODO parse this from command line
                         notification_busname=options.notification_bus,
                         notification_subject=options.notification_subject,
                         broker=options.broker):
        waitForInterrupt()


if __name__ == "__main__":
    main()