diff --git a/SAS/ResourceAssignment/TaskPrescheduler/prescheduler.py b/SAS/ResourceAssignment/TaskPrescheduler/prescheduler.py index f335726b99bc3d6d36cfd92b3b0f2677c7344a1d..8b235b024dab5a5f180a2a94a33e6f05899865d9 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/prescheduler.py +++ b/SAS/ResourceAssignment/TaskPrescheduler/prescheduler.py @@ -7,6 +7,7 @@ This means adding/updating some Cobal keys, selecting available stations, selecting the right timeslot and updating start/end time. ''' from datetime import datetime, timedelta +import pprint from lofar.parameterset import PyParameterValue from lofar.sas.otdb.OTDBBusListener import OTDBBusListener from lofar.sas.otdb.otdbrpc import OTDBRPC @@ -28,27 +29,35 @@ COBALT = "Observation.ObservationControl.OnlineControl.Cobalt." def calculateCobaltSettings(parset): """uses parset keys to calculate Cobalt block size and integration time""" - if parset(DATAPRODUCTS + "Output_Correlated.enabled"): + + def bool(value): + return PyParameterValue(str(value), True).getBool() + + if bool(parset[DATAPRODUCTS + "Output_Correlated.enabled"]): corr = CorrelatorSettings() - corr.nrChannelsPerSubband = parset(COBALT + "Correlator.nrChannelsPerSubband") - corr.integrationTime = parset(COBALT + "Correlator.integrationTime") + corr.nrChannelsPerSubband = int(parset[COBALT + "Correlator.nrChannelsPerSubband"]) + corr.integrationTime = float(parset[COBALT + "Correlator.integrationTime"]) else: corr = None - if parset(DATAPRODUCTS + "Output_CoherentStokes.enabled"): + + if bool(parset[DATAPRODUCTS + "Output_CoherentStokes.enabled"]): coherent = StokesSettings() - coherent.nrChannelsPerSubband = parset(COBALT + "BeamFormer.CoherentStokes.nrChannelsPerSubband") - coherent.timeIntegrationFactor = parset(COBALT + "BeamFormer.CoherentStokes.timeIntegrationFactor") + coherent.nrChannelsPerSubband = int(parset[COBALT + "BeamFormer.CoherentStokes.nrChannelsPerSubband"]) + coherent.timeIntegrationFactor = int(parset[COBALT + "BeamFormer.CoherentStokes.timeIntegrationFactor"]) else: coherent = None - if parset(DATAPRODUCTS + "Output_IncoherentStokes.enabled"): + + if bool(parset[DATAPRODUCTS + "Output_IncoherentStokes.enabled"]): incoherent = StokesSettings() - incoherent.nrChannelsPerSubband = parset(COBALT + "BeamFormer.IncoherentStokes.nrChannelsPerSubband") - incoherent.timeIntegrationFactor = parset(COBALT + "BeamFormer.IncoherentStokes.timeIntegrationFactor") + incoherent.nrChannelsPerSubband = int(parset[COBALT + "BeamFormer.IncoherentStokes.nrChannelsPerSubband"]) + incoherent.timeIntegrationFactor = int(parset[COBALT + "BeamFormer.IncoherentStokes.timeIntegrationFactor"]) else: incoherent = None - clock = parset("Observation.sampleClock") + + clock = int(parset["Observation.sampleClock"]) constraints = BlockConstraints(corr, coherent, incoherent, clock) calculator = BlockSize(constraints) + return {'nrSubblocks': calculator.nrSubblocks, 'blockSize': calculator.blockSize, 'nrBlocks': calculator.nrBlocks, 'integrationTime': calculator.integrationTime} @@ -77,9 +86,6 @@ def resourceIndicatorsFromParset( parsetDict ): #def intvector(value): # return PyParameterValue(str(value), True)._expand().getIntVector() - def bool(value): - return PyParameterValue(str(value), True).getBool() - # ===================================== # Parset meta info # ===================================== @@ -97,10 +103,6 @@ def resourceIndicatorsFromParset( parsetDict ): add("Observation.DataProducts.Output_Correlated.enabled", bool) add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime") add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband") - # This service should compute these 3 values, do we need to read them? - add("Cobalt.Correlator.nrBlocksPerIntegration") - add("Cobalt.Correlator.nrIntegrationsPerBlock") - add("Cobalt.blockSize") add("Observation.DataProducts.Output_IncoherentStokes.enabled", bool) add("Observation.DataProducts.Output_CoherentStokes.enabled", bool) @@ -141,6 +143,7 @@ class TaskPrescheduler(OTDBBusListener): def onObservationApproved(self, treeId, modificationTime): """ Updates task specification and puts task on prescheduled if it was generated by a trigger """ #TODO might work for all tasks in the future + # TODO: only process observations and pipelines try: otdb_id = treeId #Race condition when asking MoM as the mom-otdb-adapter might not have heard that the @@ -157,18 +160,20 @@ class TaskPrescheduler(OTDBBusListener): mom_id = None if mom_id: response = self.momquery.get_trigger_id(mom_id) + response = {"status": "OK", "trigger_id": 1} # TODO: mom does not fill in misc field if response['status'] == 'OK': logger.info('Found a task mom_id=%s with a trigger_id=%s', mom_id, response['trigger_id']) #TODO, check for stations and other resources, start/endtime, target position, then update specification + # TODO: set cobalt values only for observations cobalt_values = calculateCobaltSettings(specification) logger.info('prescheduling otdb_id=%s because it was generated by trigger_id=%s', otdb_id, response['trigger_id']) otdb_info = {} - otdb_info[OUTPUT_PREFIX + "Cobalt.Correlator.nrBlocksPerIntegration"] = cobalt_values["nrBlocks"] - otdb_info[OUTPUT_PREFIX + "Cobalt.Correlator.nrIntegrationsPerBlock"] = cobalt_values["nrSubblocks"] - otdb_info[OUTPUT_PREFIX + "Cobalt.blockSize"] = cobalt_values["blockSize"] - otdb_info[OUTPUT_PREFIX + "Cobalt.Correlator.integrationTime"] = cobalt_values["integrationTime"] + otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.nrBlocksPerIntegration"] = cobalt_values["nrBlocks"] + otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.nrIntegrationsPerBlock"] = cobalt_values["nrSubblocks"] + otdb_info[OUTPUT_PREFIX + COBALT + "blockSize"] = cobalt_values["blockSize"] + otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.integrationTime"] = cobalt_values["integrationTime"] self.setOTDBinfo(otdb_id, otdb_info, 'prescheduled') else: logger.info('Did not find a trigger for task mom_id=%s, because %s', mom_id, response['errors']) @@ -190,7 +195,7 @@ class TaskPrescheduler(OTDBBusListener): self.otdbrpc.taskSetStatus(otdb_id, otdb_status) except Exception as e: logger.error(e) - self.doTaskError(otdb_id) #FIXME should be to the RADB also or instead? + #self.doTaskError(otdb_id) #FIXME should be to the RADB also or instead? def main(): from optparse import OptionParser