Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
taskprescheduler.py 9.20 KiB
#!/usr/bin/env python3

# Copyright (C) 2017
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: taskprescheduler.py 1580 2015-09-30 14:18:57Z loose $

"""
Class to take a task on approved and add the information needed to put it on prescheduled.
This means adding/updating some Cobal keys, selecting available stations,
selecting the right timeslot and updating start/end time.
"""

import pprint
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener, OTDBEventMessageHandler
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.sas.resourceassignment.taskprescheduler.cobaltblocksize import CorrelatorSettings, StokesSettings, BlockConstraints, BlockSize
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
from lofar.sas.resourceassignment.common.specification import Specification
from lofar.sas.resourceassignment.common.specification import OUTPUT_PREFIX

import logging
logger = logging.getLogger(__name__)

DATAPRODUCTS = "Observation.DataProducts."
COBALT       = "Observation.ObservationControl.OnlineControl.Cobalt."

def calculateCobaltSettings(spec):
    """uses parset keys to calculate Cobalt block size and integration time"""

    parset = spec.internal_dict #TODO cleanup to access values more directly

    if parset[DATAPRODUCTS + "Output_Correlated.enabled"]:
        corr = CorrelatorSettings()
        corr.nrChannelsPerSubband = parset[COBALT + "Correlator.nrChannelsPerSubband"]
        corr.integrationTime      = parset[COBALT + "Correlator.integrationTime"]
    else:
        corr = None

    if parset[DATAPRODUCTS + "Output_CoherentStokes.enabled"]:
        coherent = StokesSettings()
        coherent.nrChannelsPerSubband  = parset[COBALT + "BeamFormer.CoherentStokes.nrChannelsPerSubband"]
        coherent.timeIntegrationFactor = parset[COBALT + "BeamFormer.CoherentStokes.timeIntegrationFactor"]
    else:
        coherent = None

    if parset[DATAPRODUCTS + "Output_IncoherentStokes.enabled"]:
        incoherent = StokesSettings()
        incoherent.nrChannelsPerSubband  = parset[COBALT + "BeamFormer.IncoherentStokes.nrChannelsPerSubband"]
        incoherent.timeIntegrationFactor = parset[COBALT + "BeamFormer.IncoherentStokes.timeIntegrationFactor"]
    else:
        incoherent = None

    clock = parset["Observation.sampleClock"]
    constraints = BlockConstraints(corr, coherent, incoherent, clock)
    calculator = BlockSize(constraints)

    return {'nrSubblocks': calculator.nrSubblocks, 'blockSize': calculator.blockSize,
            'nrBlocks': calculator.nrBlocks, 'integrationTime': calculator.integrationTime}

def cobaltOTDBsettings(cobalt_values):
    otdb_info = {}
    otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.nrBlocksPerIntegration"] = cobalt_values["nrBlocks"]
    otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.nrIntegrationsPerBlock"] = cobalt_values["nrSubblocks"]
    otdb_info[OUTPUT_PREFIX + COBALT + "blockSize"] = cobalt_values["blockSize"]
    otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.integrationTime"] = cobalt_values["integrationTime"]
    return otdb_info

class TaskPrescheduler(OTDBEventMessageHandler):
    def __init__(self, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
        super().__init__()
        self.otdbrpc  = OTDBRPC.create(exchange=exchange, broker=broker)
        self.momquery = MoMQueryRPC.create(exchange=exchange, broker=broker)
        self.radbrpc = RADBRPC.create(exchange=exchange, broker=broker)

    def start_handling(self):
        self.otdbrpc.open()
        self.momquery.open()
        self.radbrpc.open()
        super().start_handling()

    def stop_handling(self):
        self.otdbrpc.close()
        self.momquery.close()
        self.radbrpc.close()
        super().stop_handling()

    def onObservationApproved(self, treeId, modificationTime):
        """ Updates task specification and puts task on prescheduled if it was generated by a trigger
        """
        # TODO might work for all tasks in the future
        # TODO: only process observations and pipelines
        # TODO we probably need to implement a lot of the checks from
        # std::pair<unscheduled_reasons, QString> Controller::doPreScheduleChecks(Task *task)
        # in Controller.cpp line 2986-3371 from the Alwin Scheduler
        # We might look at the XMLgenerator code as it already does a lot of similar checks.
        # For now we assume the specification is entirely correct.
        # Maybe these checks need to go into the RATaskSpecified instead.
        # NOTE: The MoM predecessor Ids to OTDB predecessor Ids conversion is done in RATaskSpecified on the fly

        # otdb_id = treeId
        #
        # Note: Race condition when asking MoM as the mom-otdb-adapter might not have heard that the
        # task is on approved and might still be on approved pending in MoM.
        # so don't ask the MomQuery: mom_ids = self.momquery.getMoMIdsForOTDBIds([otdb_id])
        # We get the mom_id from the parset
        #
        # We get the parset for all tasks we receive instead of just for the ones with
        # a trigger.
        status = "approved"
        spec = Specification(self.otdbrpc, self.momquery, self.radbrpc)
        spec.set_status(status)
        spec.read_from_OTDB_with_predecessors(treeId, "otdb", {}) #Now checks predecessors, which theoretically could cause race contitions
        spec.read_from_mom()

        if spec.status == "error":
            return

        spec.update_start_end_times()
        spec.insert_into_radb()
        # if spec.validate()?
        if spec.status != status:
            return
        if not spec.mom_id:
            return
        if spec.isTriggered():
            logger.info('prescheduling otdb_id=%s because it was generated by trigger_id=%s', spec.otdb_id, spec.trigger_id)
            otdb_info = {}
            if spec.isObservation():
                cobalt_values = calculateCobaltSettings(spec)
                otdb_info.update(cobaltOTDBsettings(cobalt_values))
            self.setOTDBinfo(spec.otdb_id, otdb_info, 'prescheduled')
        else:
            logger.info('Did not find a trigger for task mom_id=%s', spec.mom_id)

    def setOTDBinfo(self, otdb_id, otdb_info, otdb_status):
        """This function sets the values in otdb_info in OTDB, almost a copy from the RAtoOTDBPropagator"""
        try:
            if otdb_info:
                logger.info('Setting specification for otdb_id %s:\n', otdb_id)
                logger.info(pprint.pformat(otdb_info))
                self.otdbrpc.taskSetSpecification(otdb_id, otdb_info)
                #We probably will need this as well
                #self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"],
                #                                               otdb_info["LOFAR.ObsSW.Observation.stopTime"])
            logger.info('Setting status (%s) for otdb_id %s', otdb_status, otdb_id)
            self.otdbrpc.taskSetStatus(otdb_id, otdb_status)
        except Exception as e:
            logger.exception(e)
            logger.error("Problem setting specification or status in OTDB for otdb_id=%s", otdb_id)
            self.radbrpc.updateTaskStatusForOtdbId(otdb_id, 'error') # We don't catch an exception if this fails.

def main():
    from optparse import OptionParser
    from lofar.common.util import waitForInterrupt

    # make sure we run in UTC timezone
    import os
    os.environ['TZ'] = 'UTC'

    # Check the invocation arguments
    parser = OptionParser("%prog [options]", description='runs the task prescheduler service')
    parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
                      help='Address of the qpid broker, default: localhost')
    parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
    parser.add_option('-e', '--exchange', dest='exchange', type='string',
                      default=DEFAULT_BUSNAME,
                      help='exchange for communication. [default: %default]')
    (options, args) = parser.parse_args()

    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
                        level=logging.DEBUG if options.verbose else logging.INFO)

    with OTDBBusListener(handler_type=TaskPrescheduler,
                         exchange=options.exchange,
                         broker=options.broker,
                         num_threads=1):
        waitForInterrupt()

if __name__ == '__main__':
    main()