diff --git a/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py b/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py index 23383a9e410f2bb65d408e243b26401cbb5d6835..367c08541533ee36f1a5663a7e438c4e37c7a897 100755 --- a/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py +++ b/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py @@ -123,7 +123,7 @@ def resourceIndicatorsFromParset( parsetDict ): add("Observation.DataProducts.Output_Correlated.identifications", strvector) add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime") add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband") - # TODO: We need a service that computes these 3 values + # These 3 values should have been computed in the Alwin Scheduler or TaskPrescheduler add("Cobalt.Correlator.nrBlocksPerIntegration") add("Cobalt.Correlator.nrIntegrationsPerBlock") add("Cobalt.blockSize") @@ -143,7 +143,7 @@ def resourceIndicatorsFromParset( parsetDict ): add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile") add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor") add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which") - #add("Observation.ObservationControl.OnlineControl.Cobalt.IncoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize + #add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile") add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor") add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which") diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/CMakeLists.txt b/SAS/ResourceAssignment/ResourceAssignmentEstimator/CMakeLists.txt index ea07c75bb19b5ec6d6f9a0969cceee692fa081b4..23f93c7ea566232d896192a656cdb95495bd0b2a 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/CMakeLists.txt +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/CMakeLists.txt @@ -20,6 +20,6 @@ install(FILES raestimatorservice.ini DESTINATION etc/supervisord.d) -add_subdirectory(lib) +#add_subdirectory(lib) add_subdirectory(test) add_subdirectory(resource_estimators) diff --git a/SAS/ResourceAssignment/TaskPrescheduler/CMakeLists.txt b/SAS/ResourceAssignment/TaskPrescheduler/CMakeLists.txt index 67263176c28472b62e252371cdfa41796d26a4cf..8c484584da84f191c86114e054c57b5d179dd391 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/CMakeLists.txt +++ b/SAS/ResourceAssignment/TaskPrescheduler/CMakeLists.txt @@ -19,3 +19,5 @@ install(FILES taskprescheduler.ini DESTINATION etc/supervisord.d) +#add_subdirectory(lib) +add_subdirectory(test) \ No newline at end of file diff --git a/SAS/ResourceAssignment/TaskPrescheduler/lib/CMakeLists.txt b/SAS/ResourceAssignment/TaskPrescheduler/lib/CMakeLists.txt index d2cac297f54d39c22b1601fba27f27ef03bf9c98..cbd3592991f8d395c54b704155782d47d533d933 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/lib/CMakeLists.txt +++ b/SAS/ResourceAssignment/TaskPrescheduler/lib/CMakeLists.txt @@ -3,5 +3,5 @@ python_install( __init__.py cobaltblocksize.py - DESTINATION lofar/sas/resourceassignment/resourceassignmentestimator) + DESTINATION lofar/sas/resourceassignment/taskprescheduler) diff --git a/SAS/ResourceAssignment/TaskPrescheduler/prescheduler.py b/SAS/ResourceAssignment/TaskPrescheduler/prescheduler.py index 0ea6548e84b0b0f4aa3f7e25c6fa3bb1763be0ba..09446d584b98c59e19f35f07f872e946a6a84738 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/prescheduler.py +++ b/SAS/ResourceAssignment/TaskPrescheduler/prescheduler.py @@ -2,9 +2,10 @@ # $Id$ ''' -TODO: add doc +Class to take a task on approved and add the information needed to put it on prescheduled. +This means adding/updating some Cobal keys, selecting available stations, +selecting the right timeslot and updating start/end time. ''' -import logging from datetime import datetime, timedelta from lofar.sas.otdb.OTDBBusListener import OTDBBusListener from lofar.sas.otdb.otdbrpc import OTDBRPC @@ -12,9 +13,103 @@ from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SER from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME +from lofar.sas.resourceassignment.taskprescheduler.cobaltblocksize import CorrelatorSettings, StokesSettings, BlockConstraints, BlockSize +import logging logger = logging.getLogger(__name__) +""" Prefix that is common to all parset keys, when we get a parset from OTDBRPC. """ +INPUT_PREFIX = "ObsSW." +""" Prefix that is common to all parset keys, when we need to write a parset to OTDB """ +OUTPUT_PREFIX = "LOFAR.ObsSW." +DATAPRODUCTS = "Observation.DataProducts." +COBALT = "Observation.ObservationControl.OnlineControl.Cobalt." + +def calculateCobaltSettings(parset): + """uses parset keys to calculate Cobalt block size and integration time""" + if parset(DATAPRODUCTS + "Output_Correlated.enabled"): + corr = CorrelatorSettings() + corr.nrChannelsPerSubband = parset(COBALT + "Correlator.nrChannelsPerSubband") + corr.integrationTime = parset(COBALT + "Correlator.integrationTime") + else: + corr = None + if parset(DATAPRODUCTS + "Output_CoherentStokes.enabled"): + coherent = StokesSettings() + coherent.nrChannelsPerSubband = parset(COBALT + "BeamFormer.CoherentStokes.nrChannelsPerSubband") + coherent.timeIntegrationFactor = parset(COBALT + "BeamFormer.CoherentStokes.timeIntegrationFactor") + else: + coherent = None + if parset(DATAPRODUCTS + "Output_IncoherentStokes.enabled"): + incoherent = StokesSettings() + incoherent.nrChannelsPerSubband = parset(COBALT + "BeamFormer.IncoherentStokes.nrChannelsPerSubband") + incoherent.timeIntegrationFactor = parset(COBALT + "BeamFormer.IncoherentStokes.timeIntegrationFactor") + else: + incoherent = None + clock = parset("Observation.sampleClock") + constraints = BlockConstraints(corr, coherent, incoherent, clock) + calculator = BlockSize(constraints) + return {'nrSubblocks': calculator.nrSubblocks, 'blockSize': calculator.blockSize, + 'nrBlocks': calculator.nrBlocks, 'integrationTime': calculator.integrationTime} + +def resourceIndicatorsFromParset( parsetDict ): + """ Extract the parset keys that are required for determining Cobalt blocksize. """ + + #TODO There is some overlap with the RATaskSpecified code here, might need refactoring + subset = {} + + def get(key, default=None): + """ Return the value of parset key `key', or `default' if the key + is not defined. """ + return parsetDict.get(INPUT_PREFIX + key, default) + + def add(key, conversion=lambda x: x): + """ Add the given key to our subset selection, using an optional + conversion. """ + value = get(key) + if value is not None: + subset[key] = conversion(value) + + """ Some conversion functions for common parameter-value types.""" + #def strvector(value): + # return PyParameterValue(str(value), True).getStringVector() + + #def intvector(value): + # return PyParameterValue(str(value), True)._expand().getIntVector() + + def bool(value): + return PyParameterValue(str(value), True).getBool() + + # ===================================== + # Parset meta info + # ===================================== + subset["Version.number"] = parsetDict.get("Version.number") + + # ===================================== + # Observation settings + # ===================================== + add("Observation.momID") + add("Observation.sampleClock") + + # ===================================== + # Correlator settings + # ===================================== + add("Observation.DataProducts.Output_Correlated.enabled", bool) + add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime") + add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband") + # This service should compute these 3 values, do we need to read them? + add("Cobalt.Correlator.nrBlocksPerIntegration") + add("Cobalt.Correlator.nrIntegrationsPerBlock") + add("Cobalt.blockSize") + + add("Observation.DataProducts.Output_IncoherentStokes.enabled", bool) + add("Observation.DataProducts.Output_CoherentStokes.enabled", bool) + add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.nrChannelsPerSubband") + add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor") + add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.nrChannelsPerSubband") + add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor") + + return subset + class TaskPrescheduler(OTDBBusListener): def __init__(self, otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME, @@ -60,10 +155,17 @@ class TaskPrescheduler(OTDBBusListener): logger.info('Found a task mom_id=%s with a trigger_id=%s', mom_id, response['trigger_id']) #TODO, check for stations and other resources, start/endtime, target position, then update specification - #self.otdb.taskSetSpecification(otdb_id, otdb_info) - + parset = self.otdbrpc.taskGetSpecification( otdb_id=otdb_id )['specification'] + specification = resourceIndicatorsFromParset(parset) + cobalt_values = calculateCobaltSettings(specification) + logger.info('prescheduling otdb_id=%s because it was generated by trigger_id=%s', otdb_id, response['trigger_id']) - self.otdb.taskSetStatus(otdb_id, 'prescheduled') + otdb_info = {} + otdb_info[OUTPUT_PREFIX + "Cobalt.Correlator.nrBlocksPerIntegration"] = cobalt_values["nrBlocks"] + otdb_info[OUTPUT_PREFIX + "Cobalt.Correlator.nrIntegrationsPerBlock"] = cobalt_values["nrSubblocks"] + otdb_info[OUTPUT_PREFIX + "Cobalt.blockSize"] = cobalt_values["blockSize"] + otdb_info[OUTPUT_PREFIX + "Cobalt.Correlator.integrationTime"] = cobalt_values["integrationTime"] + self.setOTDBinfo(otdb_id, otdb_info, 'prescheduled') else: logger.info('Did not find a trigger for task mom_id=%s, because %s', mom_id, response['errors']) else: @@ -71,6 +173,21 @@ class TaskPrescheduler(OTDBBusListener): except Exception as e: logger.error(e) + def setOTDBinfo(self, otdb_id, otdb_info, otdb_status): + """This function sets the values in otdb_info in OTDB, almost a copy from the RAtoOTDBPropagator""" + try: + logger.info('Setting specification for otdb_id %s:\n' % (otdb_id,)) + logger.info(pprint.pformat(otdb_info)) + self.otdbrpc.taskSetSpecification(otdb_id, otdb_info) + #We probably will need this as well + #self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"], + # otdb_info["LOFAR.ObsSW.Observation.stopTime"]) + logger.info('Setting status (%s) for otdb_id %s' % (otdb_status, otdb_id)) + self.otdbrpc.taskSetStatus(otdb_id, otdb_status) + except Exception as e: + logger.error(e) + self.doTaskError(otdb_id) #FIXME should be to the RADB also or instead? + def main(): from optparse import OptionParser from lofar.messaging import setQpidLogLevel diff --git a/SubSystems/RAServices/CMakeLists.txt b/SubSystems/RAServices/CMakeLists.txt index aa347d8eec5650d6e94a7b19f28c8999229b91f9..aafa4261ffe6ba44bd43906ba96e46a0f9b5d16f 100644 --- a/SubSystems/RAServices/CMakeLists.txt +++ b/SubSystems/RAServices/CMakeLists.txt @@ -15,6 +15,7 @@ lofar_package(RAServices SystemStatusDatabase SystemStatusService TriggerServices + TaskPrescheduler DataManagement RAScripts) diff --git a/SubSystems/RAServices/RAServices.ini b/SubSystems/RAServices/RAServices.ini index bb6bdd589ff0b66bd202e7b8a2e8ce137b41f1fc..ac4e46e72101182e657a007d3651ff0f6428a555 100644 --- a/SubSystems/RAServices/RAServices.ini +++ b/SubSystems/RAServices/RAServices.ini @@ -1,5 +1,5 @@ [group:RA_Services] -programs=raewebservice,radbservice,radbpglistener,resourceassigner,RATaskSpecified,raestimatorservice,rotspservice,ortspropagator +programs=raewebservice,radbservice,radbpglistener,resourceassigner,RATaskSpecified,raestimatorservice,rotspservice,ortspropagator,taskprescheduler priority=200 [group:Specification_Services]