-
Auke Klazema authoredAuke Klazema authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
taskprescheduler.py 9.20 KiB
#!/usr/bin/env python3
# Copyright (C) 2017
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: taskprescheduler.py 1580 2015-09-30 14:18:57Z loose $
"""
Class to take a task on approved and add the information needed to put it on prescheduled.
This means adding/updating some Cobal keys, selecting available stations,
selecting the right timeslot and updating start/end time.
"""
import pprint
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener, OTDBEventMessageHandler
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.sas.resourceassignment.taskprescheduler.cobaltblocksize import CorrelatorSettings, StokesSettings, BlockConstraints, BlockSize
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
from lofar.sas.resourceassignment.common.specification import Specification
from lofar.sas.resourceassignment.common.specification import OUTPUT_PREFIX
import logging
logger = logging.getLogger(__name__)
DATAPRODUCTS = "Observation.DataProducts."
COBALT = "Observation.ObservationControl.OnlineControl.Cobalt."
def calculateCobaltSettings(spec):
"""uses parset keys to calculate Cobalt block size and integration time"""
parset = spec.internal_dict #TODO cleanup to access values more directly
if parset[DATAPRODUCTS + "Output_Correlated.enabled"]:
corr = CorrelatorSettings()
corr.nrChannelsPerSubband = parset[COBALT + "Correlator.nrChannelsPerSubband"]
corr.integrationTime = parset[COBALT + "Correlator.integrationTime"]
else:
corr = None
if parset[DATAPRODUCTS + "Output_CoherentStokes.enabled"]:
coherent = StokesSettings()
coherent.nrChannelsPerSubband = parset[COBALT + "BeamFormer.CoherentStokes.nrChannelsPerSubband"]
coherent.timeIntegrationFactor = parset[COBALT + "BeamFormer.CoherentStokes.timeIntegrationFactor"]
else:
coherent = None
if parset[DATAPRODUCTS + "Output_IncoherentStokes.enabled"]:
incoherent = StokesSettings()
incoherent.nrChannelsPerSubband = parset[COBALT + "BeamFormer.IncoherentStokes.nrChannelsPerSubband"]
incoherent.timeIntegrationFactor = parset[COBALT + "BeamFormer.IncoherentStokes.timeIntegrationFactor"]
else:
incoherent = None
clock = parset["Observation.sampleClock"]
constraints = BlockConstraints(corr, coherent, incoherent, clock)
calculator = BlockSize(constraints)
return {'nrSubblocks': calculator.nrSubblocks, 'blockSize': calculator.blockSize,
'nrBlocks': calculator.nrBlocks, 'integrationTime': calculator.integrationTime}
def cobaltOTDBsettings(cobalt_values):
otdb_info = {}
otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.nrBlocksPerIntegration"] = cobalt_values["nrBlocks"]
otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.nrIntegrationsPerBlock"] = cobalt_values["nrSubblocks"]
otdb_info[OUTPUT_PREFIX + COBALT + "blockSize"] = cobalt_values["blockSize"]
otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.integrationTime"] = cobalt_values["integrationTime"]
return otdb_info
class TaskPrescheduler(OTDBEventMessageHandler):
def __init__(self, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
super().__init__()
self.otdbrpc = OTDBRPC.create(exchange=exchange, broker=broker)
self.momquery = MoMQueryRPC.create(exchange=exchange, broker=broker)
self.radbrpc = RADBRPC.create(exchange=exchange, broker=broker)
def start_handling(self):
self.otdbrpc.open()
self.momquery.open()
self.radbrpc.open()
super().start_handling()
def stop_handling(self):
self.otdbrpc.close()
self.momquery.close()
self.radbrpc.close()
super().stop_handling()
def onObservationApproved(self, treeId, modificationTime):
""" Updates task specification and puts task on prescheduled if it was generated by a trigger
"""
# TODO might work for all tasks in the future
# TODO: only process observations and pipelines
# TODO we probably need to implement a lot of the checks from
# std::pair<unscheduled_reasons, QString> Controller::doPreScheduleChecks(Task *task)
# in Controller.cpp line 2986-3371 from the Alwin Scheduler
# We might look at the XMLgenerator code as it already does a lot of similar checks.
# For now we assume the specification is entirely correct.
# Maybe these checks need to go into the RATaskSpecified instead.
# NOTE: The MoM predecessor Ids to OTDB predecessor Ids conversion is done in RATaskSpecified on the fly
# otdb_id = treeId
#
# Note: Race condition when asking MoM as the mom-otdb-adapter might not have heard that the
# task is on approved and might still be on approved pending in MoM.
# so don't ask the MomQuery: mom_ids = self.momquery.getMoMIdsForOTDBIds([otdb_id])
# We get the mom_id from the parset
#
# We get the parset for all tasks we receive instead of just for the ones with
# a trigger.
status = "approved"
spec = Specification(self.otdbrpc, self.momquery, self.radbrpc)
spec.set_status(status)
spec.read_from_OTDB_with_predecessors(treeId, "otdb", {}) #Now checks predecessors, which theoretically could cause race contitions
spec.read_from_mom()
if spec.status == "error":
return
spec.update_start_end_times()
spec.insert_into_radb()
# if spec.validate()?
if spec.status != status:
return
if not spec.mom_id:
return
if spec.isTriggered():
logger.info('prescheduling otdb_id=%s because it was generated by trigger_id=%s', spec.otdb_id, spec.trigger_id)
otdb_info = {}
if spec.isObservation():
cobalt_values = calculateCobaltSettings(spec)
otdb_info.update(cobaltOTDBsettings(cobalt_values))
self.setOTDBinfo(spec.otdb_id, otdb_info, 'prescheduled')
else:
logger.info('Did not find a trigger for task mom_id=%s', spec.mom_id)
def setOTDBinfo(self, otdb_id, otdb_info, otdb_status):
"""This function sets the values in otdb_info in OTDB, almost a copy from the RAtoOTDBPropagator"""
try:
if otdb_info:
logger.info('Setting specification for otdb_id %s:\n', otdb_id)
logger.info(pprint.pformat(otdb_info))
self.otdbrpc.taskSetSpecification(otdb_id, otdb_info)
#We probably will need this as well
#self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"],
# otdb_info["LOFAR.ObsSW.Observation.stopTime"])
logger.info('Setting status (%s) for otdb_id %s', otdb_status, otdb_id)
self.otdbrpc.taskSetStatus(otdb_id, otdb_status)
except Exception as e:
logger.exception(e)
logger.error("Problem setting specification or status in OTDB for otdb_id=%s", otdb_id)
self.radbrpc.updateTaskStatusForOtdbId(otdb_id, 'error') # We don't catch an exception if this fails.
def main():
from optparse import OptionParser
from lofar.common.util import waitForInterrupt
# make sure we run in UTC timezone
import os
os.environ['TZ'] = 'UTC'
# Check the invocation arguments
parser = OptionParser("%prog [options]", description='runs the task prescheduler service')
parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
help='Address of the qpid broker, default: localhost')
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
parser.add_option('-e', '--exchange', dest='exchange', type='string',
default=DEFAULT_BUSNAME,
help='exchange for communication. [default: %default]')
(options, args) = parser.parse_args()
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
with OTDBBusListener(handler_type=TaskPrescheduler,
exchange=options.exchange,
broker=options.broker,
num_threads=1):
waitForInterrupt()
if __name__ == '__main__':
main()