Skip to content
Snippets Groups Projects
Commit 51a298fc authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #10557: Fixed parset parsing errors and work-around for missing mom misc field

parent 7444ce69
No related branches found
No related tags found
No related merge requests found
......@@ -7,6 +7,7 @@ This means adding/updating some Cobal keys, selecting available stations,
selecting the right timeslot and updating start/end time.
'''
from datetime import datetime, timedelta
import pprint
from lofar.parameterset import PyParameterValue
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
from lofar.sas.otdb.otdbrpc import OTDBRPC
......@@ -28,27 +29,35 @@ COBALT = "Observation.ObservationControl.OnlineControl.Cobalt."
def calculateCobaltSettings(parset):
"""uses parset keys to calculate Cobalt block size and integration time"""
if parset(DATAPRODUCTS + "Output_Correlated.enabled"):
def bool(value):
return PyParameterValue(str(value), True).getBool()
if bool(parset[DATAPRODUCTS + "Output_Correlated.enabled"]):
corr = CorrelatorSettings()
corr.nrChannelsPerSubband = parset(COBALT + "Correlator.nrChannelsPerSubband")
corr.integrationTime = parset(COBALT + "Correlator.integrationTime")
corr.nrChannelsPerSubband = int(parset[COBALT + "Correlator.nrChannelsPerSubband"])
corr.integrationTime = float(parset[COBALT + "Correlator.integrationTime"])
else:
corr = None
if parset(DATAPRODUCTS + "Output_CoherentStokes.enabled"):
if bool(parset[DATAPRODUCTS + "Output_CoherentStokes.enabled"]):
coherent = StokesSettings()
coherent.nrChannelsPerSubband = parset(COBALT + "BeamFormer.CoherentStokes.nrChannelsPerSubband")
coherent.timeIntegrationFactor = parset(COBALT + "BeamFormer.CoherentStokes.timeIntegrationFactor")
coherent.nrChannelsPerSubband = int(parset[COBALT + "BeamFormer.CoherentStokes.nrChannelsPerSubband"])
coherent.timeIntegrationFactor = int(parset[COBALT + "BeamFormer.CoherentStokes.timeIntegrationFactor"])
else:
coherent = None
if parset(DATAPRODUCTS + "Output_IncoherentStokes.enabled"):
if bool(parset[DATAPRODUCTS + "Output_IncoherentStokes.enabled"]):
incoherent = StokesSettings()
incoherent.nrChannelsPerSubband = parset(COBALT + "BeamFormer.IncoherentStokes.nrChannelsPerSubband")
incoherent.timeIntegrationFactor = parset(COBALT + "BeamFormer.IncoherentStokes.timeIntegrationFactor")
incoherent.nrChannelsPerSubband = int(parset[COBALT + "BeamFormer.IncoherentStokes.nrChannelsPerSubband"])
incoherent.timeIntegrationFactor = int(parset[COBALT + "BeamFormer.IncoherentStokes.timeIntegrationFactor"])
else:
incoherent = None
clock = parset("Observation.sampleClock")
clock = int(parset["Observation.sampleClock"])
constraints = BlockConstraints(corr, coherent, incoherent, clock)
calculator = BlockSize(constraints)
return {'nrSubblocks': calculator.nrSubblocks, 'blockSize': calculator.blockSize,
'nrBlocks': calculator.nrBlocks, 'integrationTime': calculator.integrationTime}
......@@ -77,9 +86,6 @@ def resourceIndicatorsFromParset( parsetDict ):
#def intvector(value):
# return PyParameterValue(str(value), True)._expand().getIntVector()
def bool(value):
return PyParameterValue(str(value), True).getBool()
# =====================================
# Parset meta info
# =====================================
......@@ -97,10 +103,6 @@ def resourceIndicatorsFromParset( parsetDict ):
add("Observation.DataProducts.Output_Correlated.enabled", bool)
add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime")
add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband")
# This service should compute these 3 values, do we need to read them?
add("Cobalt.Correlator.nrBlocksPerIntegration")
add("Cobalt.Correlator.nrIntegrationsPerBlock")
add("Cobalt.blockSize")
add("Observation.DataProducts.Output_IncoherentStokes.enabled", bool)
add("Observation.DataProducts.Output_CoherentStokes.enabled", bool)
......@@ -141,6 +143,7 @@ class TaskPrescheduler(OTDBBusListener):
def onObservationApproved(self, treeId, modificationTime):
""" Updates task specification and puts task on prescheduled if it was generated by a trigger
""" #TODO might work for all tasks in the future
# TODO: only process observations and pipelines
try:
otdb_id = treeId
#Race condition when asking MoM as the mom-otdb-adapter might not have heard that the
......@@ -157,18 +160,20 @@ class TaskPrescheduler(OTDBBusListener):
mom_id = None
if mom_id:
response = self.momquery.get_trigger_id(mom_id)
response = {"status": "OK", "trigger_id": 1} # TODO: mom does not fill in misc field
if response['status'] == 'OK':
logger.info('Found a task mom_id=%s with a trigger_id=%s', mom_id, response['trigger_id'])
#TODO, check for stations and other resources, start/endtime, target position, then update specification
# TODO: set cobalt values only for observations
cobalt_values = calculateCobaltSettings(specification)
logger.info('prescheduling otdb_id=%s because it was generated by trigger_id=%s', otdb_id, response['trigger_id'])
otdb_info = {}
otdb_info[OUTPUT_PREFIX + "Cobalt.Correlator.nrBlocksPerIntegration"] = cobalt_values["nrBlocks"]
otdb_info[OUTPUT_PREFIX + "Cobalt.Correlator.nrIntegrationsPerBlock"] = cobalt_values["nrSubblocks"]
otdb_info[OUTPUT_PREFIX + "Cobalt.blockSize"] = cobalt_values["blockSize"]
otdb_info[OUTPUT_PREFIX + "Cobalt.Correlator.integrationTime"] = cobalt_values["integrationTime"]
otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.nrBlocksPerIntegration"] = cobalt_values["nrBlocks"]
otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.nrIntegrationsPerBlock"] = cobalt_values["nrSubblocks"]
otdb_info[OUTPUT_PREFIX + COBALT + "blockSize"] = cobalt_values["blockSize"]
otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.integrationTime"] = cobalt_values["integrationTime"]
self.setOTDBinfo(otdb_id, otdb_info, 'prescheduled')
else:
logger.info('Did not find a trigger for task mom_id=%s, because %s', mom_id, response['errors'])
......@@ -190,7 +195,7 @@ class TaskPrescheduler(OTDBBusListener):
self.otdbrpc.taskSetStatus(otdb_id, otdb_status)
except Exception as e:
logger.error(e)
self.doTaskError(otdb_id) #FIXME should be to the RADB also or instead?
#self.doTaskError(otdb_id) #FIXME should be to the RADB also or instead?
def main():
from optparse import OptionParser
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment