Skip to content
Snippets Groups Projects
Commit 27da12f3 authored by Adriaan Renting's avatar Adriaan Renting
Browse files

Task #10557: added cobaltblocksize calculations to TaskPrescheduler

parent 0eb4e1fd
Branches
Tags
No related merge requests found
......@@ -123,7 +123,7 @@ def resourceIndicatorsFromParset( parsetDict ):
add("Observation.DataProducts.Output_Correlated.identifications", strvector)
add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime")
add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband")
# TODO: We need a service that computes these 3 values
# These 3 values should have been computed in the Alwin Scheduler or TaskPrescheduler
add("Cobalt.Correlator.nrBlocksPerIntegration")
add("Cobalt.Correlator.nrIntegrationsPerBlock")
add("Cobalt.blockSize")
......@@ -143,7 +143,7 @@ def resourceIndicatorsFromParset( parsetDict ):
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile")
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor")
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which")
#add("Observation.ObservationControl.OnlineControl.Cobalt.IncoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize
#add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile")
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor")
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which")
......
......@@ -20,6 +20,6 @@ install(FILES
raestimatorservice.ini
DESTINATION etc/supervisord.d)
add_subdirectory(lib)
#add_subdirectory(lib)
add_subdirectory(test)
add_subdirectory(resource_estimators)
......@@ -19,3 +19,5 @@ install(FILES
taskprescheduler.ini
DESTINATION etc/supervisord.d)
#add_subdirectory(lib)
add_subdirectory(test)
\ No newline at end of file
......@@ -3,5 +3,5 @@
python_install(
__init__.py
cobaltblocksize.py
DESTINATION lofar/sas/resourceassignment/resourceassignmentestimator)
DESTINATION lofar/sas/resourceassignment/taskprescheduler)
......@@ -2,9 +2,10 @@
# $Id$
'''
TODO: add doc
Class to take a task on approved and add the information needed to put it on prescheduled.
This means adding/updating some Cobal keys, selecting available stations,
selecting the right timeslot and updating start/end time.
'''
import logging
from datetime import datetime, timedelta
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
from lofar.sas.otdb.otdbrpc import OTDBRPC
......@@ -12,9 +13,103 @@ from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SER
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.sas.resourceassignment.taskprescheduler.cobaltblocksize import CorrelatorSettings, StokesSettings, BlockConstraints, BlockSize
import logging
logger = logging.getLogger(__name__)
""" Prefix that is common to all parset keys, when we get a parset from OTDBRPC. """
INPUT_PREFIX = "ObsSW."
""" Prefix that is common to all parset keys, when we need to write a parset to OTDB """
OUTPUT_PREFIX = "LOFAR.ObsSW."
DATAPRODUCTS = "Observation.DataProducts."
COBALT = "Observation.ObservationControl.OnlineControl.Cobalt."
def calculateCobaltSettings(parset):
"""uses parset keys to calculate Cobalt block size and integration time"""
if parset(DATAPRODUCTS + "Output_Correlated.enabled"):
corr = CorrelatorSettings()
corr.nrChannelsPerSubband = parset(COBALT + "Correlator.nrChannelsPerSubband")
corr.integrationTime = parset(COBALT + "Correlator.integrationTime")
else:
corr = None
if parset(DATAPRODUCTS + "Output_CoherentStokes.enabled"):
coherent = StokesSettings()
coherent.nrChannelsPerSubband = parset(COBALT + "BeamFormer.CoherentStokes.nrChannelsPerSubband")
coherent.timeIntegrationFactor = parset(COBALT + "BeamFormer.CoherentStokes.timeIntegrationFactor")
else:
coherent = None
if parset(DATAPRODUCTS + "Output_IncoherentStokes.enabled"):
incoherent = StokesSettings()
incoherent.nrChannelsPerSubband = parset(COBALT + "BeamFormer.IncoherentStokes.nrChannelsPerSubband")
incoherent.timeIntegrationFactor = parset(COBALT + "BeamFormer.IncoherentStokes.timeIntegrationFactor")
else:
incoherent = None
clock = parset("Observation.sampleClock")
constraints = BlockConstraints(corr, coherent, incoherent, clock)
calculator = BlockSize(constraints)
return {'nrSubblocks': calculator.nrSubblocks, 'blockSize': calculator.blockSize,
'nrBlocks': calculator.nrBlocks, 'integrationTime': calculator.integrationTime}
def resourceIndicatorsFromParset( parsetDict ):
""" Extract the parset keys that are required for determining Cobalt blocksize. """
#TODO There is some overlap with the RATaskSpecified code here, might need refactoring
subset = {}
def get(key, default=None):
""" Return the value of parset key `key', or `default' if the key
is not defined. """
return parsetDict.get(INPUT_PREFIX + key, default)
def add(key, conversion=lambda x: x):
""" Add the given key to our subset selection, using an optional
conversion. """
value = get(key)
if value is not None:
subset[key] = conversion(value)
""" Some conversion functions for common parameter-value types."""
#def strvector(value):
# return PyParameterValue(str(value), True).getStringVector()
#def intvector(value):
# return PyParameterValue(str(value), True)._expand().getIntVector()
def bool(value):
return PyParameterValue(str(value), True).getBool()
# =====================================
# Parset meta info
# =====================================
subset["Version.number"] = parsetDict.get("Version.number")
# =====================================
# Observation settings
# =====================================
add("Observation.momID")
add("Observation.sampleClock")
# =====================================
# Correlator settings
# =====================================
add("Observation.DataProducts.Output_Correlated.enabled", bool)
add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime")
add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband")
# This service should compute these 3 values, do we need to read them?
add("Cobalt.Correlator.nrBlocksPerIntegration")
add("Cobalt.Correlator.nrIntegrationsPerBlock")
add("Cobalt.blockSize")
add("Observation.DataProducts.Output_IncoherentStokes.enabled", bool)
add("Observation.DataProducts.Output_CoherentStokes.enabled", bool)
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.nrChannelsPerSubband")
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor")
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.nrChannelsPerSubband")
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor")
return subset
class TaskPrescheduler(OTDBBusListener):
def __init__(self,
otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
......@@ -60,10 +155,17 @@ class TaskPrescheduler(OTDBBusListener):
logger.info('Found a task mom_id=%s with a trigger_id=%s', mom_id, response['trigger_id'])
#TODO, check for stations and other resources, start/endtime, target position, then update specification
#self.otdb.taskSetSpecification(otdb_id, otdb_info)
parset = self.otdbrpc.taskGetSpecification( otdb_id=otdb_id )['specification']
specification = resourceIndicatorsFromParset(parset)
cobalt_values = calculateCobaltSettings(specification)
logger.info('prescheduling otdb_id=%s because it was generated by trigger_id=%s', otdb_id, response['trigger_id'])
self.otdb.taskSetStatus(otdb_id, 'prescheduled')
otdb_info = {}
otdb_info[OUTPUT_PREFIX + "Cobalt.Correlator.nrBlocksPerIntegration"] = cobalt_values["nrBlocks"]
otdb_info[OUTPUT_PREFIX + "Cobalt.Correlator.nrIntegrationsPerBlock"] = cobalt_values["nrSubblocks"]
otdb_info[OUTPUT_PREFIX + "Cobalt.blockSize"] = cobalt_values["blockSize"]
otdb_info[OUTPUT_PREFIX + "Cobalt.Correlator.integrationTime"] = cobalt_values["integrationTime"]
self.setOTDBinfo(otdb_id, otdb_info, 'prescheduled')
else:
logger.info('Did not find a trigger for task mom_id=%s, because %s', mom_id, response['errors'])
else:
......@@ -71,6 +173,21 @@ class TaskPrescheduler(OTDBBusListener):
except Exception as e:
logger.error(e)
def setOTDBinfo(self, otdb_id, otdb_info, otdb_status):
"""This function sets the values in otdb_info in OTDB, almost a copy from the RAtoOTDBPropagator"""
try:
logger.info('Setting specification for otdb_id %s:\n' % (otdb_id,))
logger.info(pprint.pformat(otdb_info))
self.otdbrpc.taskSetSpecification(otdb_id, otdb_info)
#We probably will need this as well
#self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"],
# otdb_info["LOFAR.ObsSW.Observation.stopTime"])
logger.info('Setting status (%s) for otdb_id %s' % (otdb_status, otdb_id))
self.otdbrpc.taskSetStatus(otdb_id, otdb_status)
except Exception as e:
logger.error(e)
self.doTaskError(otdb_id) #FIXME should be to the RADB also or instead?
def main():
from optparse import OptionParser
from lofar.messaging import setQpidLogLevel
......
......@@ -15,6 +15,7 @@ lofar_package(RAServices
SystemStatusDatabase
SystemStatusService
TriggerServices
TaskPrescheduler
DataManagement
RAScripts)
......
[group:RA_Services]
programs=raewebservice,radbservice,radbpglistener,resourceassigner,RATaskSpecified,raestimatorservice,rotspservice,ortspropagator
programs=raewebservice,radbservice,radbpglistener,resourceassigner,RATaskSpecified,raestimatorservice,rotspservice,ortspropagator,taskprescheduler
priority=200
[group:Specification_Services]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment