Skip to content
Snippets Groups Projects
Commit 5b6d599c authored by Adriaan Renting's avatar Adriaan Renting
Browse files

Task #8887: changed RATaskSpecified to handle non-otdb predecesors

parent c106d597
No related branches found
No related tags found
No related merge requests found
......@@ -25,10 +25,11 @@ Daemon that listens to OTDB status changes to PRESCHEDULED and SCHEDULED, reques
the parset of such jobs (+ their predecessors), and posts them on the bus.
"""
from lofar.messaging import FromBus, ToBus, RPC, EventMessage
from lofar.messaging import FromBus, ToBus, EventMessage # RPC,
from lofar.parameterset import PyParameterValue
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
from lofar.common.util import waitForInterrupt
from lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT
from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT
......@@ -40,17 +41,6 @@ logger = logging.getLogger(__name__)
""" Prefix that is common to all parset keys, depending on the exact source. """
PARSET_PREFIX="ObsSW."
def predecessors( parset ):
""" Extract the list of predecessor obs IDs from the given parset. """
key = PARSET_PREFIX + "Observation.Scheduler.predecessors"
strlist = PyParameterValue(str(parset[key]), True).getStringVector()
# Key contains "Lxxxxx" values, we want to have "xxxxx" only
result = [int(filter(str.isdigit,x)) for x in strlist]
return result
def convertSchedulerProcessSubtype(processSubType):
'''convert scheduler processSubType as defined in SAS/Scheduler/src/OTDBTree.h to RA (type, subtype) tuple'''
if processSubType == "Averaging Pipeline":
......@@ -73,230 +63,212 @@ def convertSchedulerProcessSubtype(processSubType):
return "observation", "tbbmeasurement"
elif processSubType == "TBB (standalone)":
return "observation", "tbbmeasurement"
##TODO Maintenance and Reservation
return "unknown", "unknown"
def resourceIndicatorsFromParset( parsetDict ):
""" Extract the parset keys that are required for resource assignment. """
subset = {}
def get(key, default=None):
""" Return the value of parset key `key', or `default' if the key
is not defined. """
return parsetDict.get(PARSET_PREFIX + key, default)
def add(key, conversion=lambda x: x):
""" Add the given key to our subset selection, using an optional
conversion. """
value = get(key)
if value is not None:
subset[key] = conversion(value)
""" Some conversion functions for common parameter-value types."""
def strvector(value):
return PyParameterValue(str(value), True).getStringVector()
def intvector(value):
return PyParameterValue(str(value), True)._expand().getIntVector()
def bool(value):
return PyParameterValue(str(value), True).getBool()
# =====================================
# Parset meta info
# =====================================
subset["Version.number"] = parsetDict.get("Version.number")
# =====================================
# Observation settings
# =====================================
add("Observation.momID")
add("Observation.sampleClock")
add("Observation.nrBitsPerSample")
add("Observation.antennaSet")
add("Observation.VirtualInstrument.stationList", strvector)
add("Observation.startTime")
add("Observation.stopTime")
add("Observation.nrBeams")
nrSAPs = int(get("Observation.nrBeams", 0))
for sap in xrange(0, nrSAPs):
add("Observation.Beam[%d].subbandList" % (sap,), intvector)
# =====================================
# Correlator settings
# =====================================
add("Observation.DataProducts.Output_Correlated.enabled", bool)
add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime")
add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband")
# TODO: We need a service that computes these 3 values
add("Cobalt.Correlator.nrBlocksPerIntegration")
add("Cobalt.Correlator.nrIntegrationsPerBlock")
add("Cobalt.blockSize")
# =====================================
# Beamformer settings
# =====================================
add("Observation.DataProducts.Output_IncoherentStokes.enabled", bool)
add("Observation.DataProducts.Output_CoherentStokes.enabled", bool)
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye", bool)
#add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile")
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor")
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which")
#add("Observation.ObservationControl.OnlineControl.Cobalt.IncoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile")
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor")
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which")
for sap in xrange(0, nrSAPs):
add("Observation.Beam[%d].nrTabRings" % (sap,))
add("Observation.Beam[%d].nrTiedArrayBeams" % (sap,))
nrTABs = int(get("Observation.Beam[%d].nrTiedArrayBeams" % (sap,), 0))
for tab in xrange(0, nrTABs):
add("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap,tab), bool)
# =====================================
# Pipeline settings
# =====================================
# Calibrator / Averaging pipelines
add("Observation.DataProducts.Output_Correlated.enabled", bool)
add("Observation.DataProducts.Output_InstrumentModel.enabled", bool)
add("Observation.DataProducts.Input_Correlated.enabled", bool)
add("Observation.DataProducts.Input_Correlated.skip", intvector)
add("Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep")
add("Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep")
# Imaging pipeline
add("Observation.DataProducts.Output_SkyImage.enabled", bool)
add("Observation.ObservationControl.PythonControl.Imaging.slices_per_image")
add("Observation.ObservationControl.PythonControl.Imaging.subbands_per_image")
# Long-baseline pipeline
add("Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms")
add("Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup")
# Pulsar pipeline
add("Observation.DataProducts.Output_Pulsar.enabled", bool)
add("Observation.DataProducts.Input_CoherentStokes.enabled", bool)
add("Observation.DataProducts.Input_CoherentStokes.skip", intvector)
add("Observation.DataProducts.Input_IncoherentStokes.enabled", bool)
add("Observation.DataProducts.Input_IncoherentStokes.skip", intvector)
return subset
""" Extract the parset keys that are required for resource assignment. """
subset = {}
def get(key, default=None):
""" Return the value of parset key `key', or `default' if the key
is not defined. """
return parsetDict.get(PARSET_PREFIX + key, default)
def add(key, conversion=lambda x: x):
""" Add the given key to our subset selection, using an optional
conversion. """
value = get(key)
if value is not None:
subset[key] = conversion(value)
""" Some conversion functions for common parameter-value types."""
def strvector(value):
return PyParameterValue(str(value), True).getStringVector()
def intvector(value):
return PyParameterValue(str(value), True)._expand().getIntVector()
def bool(value):
return PyParameterValue(str(value), True).getBool()
# =====================================
# Parset meta info
# =====================================
subset["Version.number"] = parsetDict.get("Version.number")
# =====================================
# Observation settings
# =====================================
add("Observation.momID")
add("Observation.sampleClock")
add("Observation.nrBitsPerSample")
add("Observation.antennaSet")
add("Observation.VirtualInstrument.stationList", strvector)
add("Observation.startTime")
add("Observation.stopTime")
add("Observation.nrBeams")
nrSAPs = int(get("Observation.nrBeams", 0))
for sap in xrange(0, nrSAPs):
add("Observation.Beam[%d].subbandList" % (sap,), intvector)
# =====================================
# Correlator settings
# =====================================
add("Observation.DataProducts.Output_Correlated.enabled", bool)
add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime")
add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband")
# TODO: We need a service that computes these 3 values
add("Cobalt.Correlator.nrBlocksPerIntegration")
add("Cobalt.Correlator.nrIntegrationsPerBlock")
add("Cobalt.blockSize")
# =====================================
# Beamformer settings
# =====================================
add("Observation.DataProducts.Output_IncoherentStokes.enabled", bool)
add("Observation.DataProducts.Output_CoherentStokes.enabled", bool)
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye", bool)
#add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile")
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor")
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which")
#add("Observation.ObservationControl.OnlineControl.Cobalt.IncoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile")
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor")
add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which")
for sap in xrange(0, nrSAPs):
add("Observation.Beam[%d].nrTabRings" % (sap,))
add("Observation.Beam[%d].nrTiedArrayBeams" % (sap,))
nrTABs = int(get("Observation.Beam[%d].nrTiedArrayBeams" % (sap,), 0))
for tab in xrange(0, nrTABs):
add("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap,tab), bool)
# =====================================
# Pipeline settings
# =====================================
# Calibrator / Averaging pipelines
add("Observation.DataProducts.Output_Correlated.enabled", bool)
add("Observation.DataProducts.Output_InstrumentModel.enabled", bool)
add("Observation.DataProducts.Input_Correlated.enabled", bool)
add("Observation.DataProducts.Input_Correlated.skip", intvector)
add("Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep")
add("Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep")
# Imaging pipeline
add("Observation.DataProducts.Output_SkyImage.enabled", bool)
add("Observation.ObservationControl.PythonControl.Imaging.slices_per_image")
add("Observation.ObservationControl.PythonControl.Imaging.subbands_per_image")
# Long-baseline pipeline
add("Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms")
add("Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup")
# Pulsar pipeline
add("Observation.DataProducts.Output_Pulsar.enabled", bool)
add("Observation.DataProducts.Input_CoherentStokes.enabled", bool)
add("Observation.DataProducts.Input_CoherentStokes.skip", intvector)
add("Observation.DataProducts.Input_IncoherentStokes.enabled", bool)
add("Observation.DataProducts.Input_IncoherentStokes.skip", intvector)
return subset
class RATaskSpecified(OTDBBusListener):
def __init__(self,
otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
otdb_notification_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
otdb_service_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
otdb_service_subject=DEFAULT_OTDB_SERVICENAME + '.TaskGetSpecification',
notification_busname=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME,
notification_subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT,
broker=None, **kwargs):
super(RATaskSpecified, self).__init__(busname=otdb_notification_busname, subject=otdb_notification_subject, **kwargs)
self.parset_rpc = RPC(service=otdb_service_subject, busname=otdb_service_busname)
self.send_bus = ToBus("%s/%s" % (notification_busname, notification_subject))
def start_listening(self, **kwargs):
self.parset_rpc.open()
self.send_bus.open()
super(RATaskSpecified, self).start_listening(**kwargs)
def stop_listening(self, **kwargs):
super(RATaskSpecified, self).stop_listening(**kwargs)
self.send_bus.close()
self.parset_rpc.close()
def onObservationPrescheduled(self, treeId, modificationTime):
logger.info("Processing obs ID %s", treeId)
# Request the parset
main_obsID = treeId
main_parset,_ = self.parset_rpc( OtdbID=main_obsID )
main_parset = main_parset['TaskSpecification']
logger.info("main_parset [%s]: %s" % (main_obsID, main_parset))
# Construct a dict of all the parsets we retrieved
parsets = {}
parsets[main_obsID] = main_parset
logger.info("Processing predecessors")
# Collect the initial set of predecessors
request_obsIDs = set(predecessors(main_parset))
logger.info("Processing %s", request_obsIDs)
obsId2predId = {int(main_obsID):list(request_obsIDs)}
# Iterate recursively over all known predecessor obsIDs, and request their parsets
while request_obsIDs:
obsID = request_obsIDs.pop()
if obsID in parsets:
# Predecessor lists can overlap -- we already have this one
continue
logger.info("Fetching predecessor %s", obsID)
# Request predecessor parset
parsets[obsID],_ = self.parset_rpc( OtdbID=obsID )
parsets[obsID] = parsets[obsID]['TaskSpecification']
#logger.info("predecessor parset [%s]: %s" % (obsID, parsets[obsID]))
# Add the list of predecessors
predecessor_ids = predecessors(parsets[obsID])
request_obsIDs = request_obsIDs.union(predecessor_ids)
obsId2predId[obsID] = predecessor_ids
logger.info("obsID %s: preds: %s" % (obsID, predecessor_ids))
logger.info("obsId2predId %s" % (obsId2predId))
# Convert parsets to resource indicators
logger.info("Extracting resource indicators")
specifications = dict([(obsID, resourceIndicatorsFromParset(parset)) for (obsID,parset) in parsets.iteritems()])
# recursive method to build the tree of obs and its predecessors
def appendChildNodes(treeNode):
node_otdb_id = treeNode['otdb_id']
node_pred_otdb_ids = obsId2predId[node_otdb_id]
node_pred_specifications = {pred_id:specifications[pred_id] for pred_id in node_pred_otdb_ids}
for pred_id, pred_specification in node_pred_specifications.items():
childNode = {
"otdb_id": pred_id,
"specification": pred_specification,
"predecessors": []
}
childNode['task_type'], childNode['task_subtype'] = convertSchedulerProcessSubtype(parsets[pred_id].get(PARSET_PREFIX+"Observation.processSubtype", ""))
appendChildNodes(childNode)
treeNode["predecessors"].append(childNode)
# Construct root node of tree
resultTree = {
"otdb_id": int(main_obsID),
"state": "prescheduled",
"specification": specifications[main_obsID],
"predecessors": []
}
resultTree['task_type'], resultTree['task_subtype'] = convertSchedulerProcessSubtype(main_parset.get(PARSET_PREFIX+"Observation.processSubtype", ""))
#recursively append predecessors as child nodes
appendChildNodes(resultTree)
logger.info("Sending result: %s" % resultTree)
# Put result on bus
msg = EventMessage(content=resultTree)
self.send_bus.send(msg)
logger.info("Result sent")
def __init__(self,
otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
otdb_notification_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
otdb_service_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
otdb_service_subject=DEFAULT_OTDB_SERVICENAME,
notification_busname=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME,
notification_subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT,
broker=None, **kwargs):
super(RATaskSpecified, self).__init__(busname=otdb_notification_busname, subject=otdb_notification_subject, **kwargs)
self.otdbrpc = OTDBRPC(busname=otdb_service_busname, servicename=otdb_service_subject, broker=broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
self.send_bus = ToBus("%s/%s" % (notification_busname, notification_subject))
def start_listening(self, **kwargs):
self.otdbrpc.open()
self.send_bus.open()
super(RATaskSpecified, self).start_listening(**kwargs)
def stop_listening(self, **kwargs):
super(RATaskSpecified, self).stop_listening(**kwargs)
self.send_bus.close()
self.otdbrpc.close()
def get_predecessors(self, parset):
""" Extract the list of predecessor obs IDs from the given parset. """
key = PARSET_PREFIX + "Observation.Scheduler.predecessors"
strlist = PyParameterValue(str(parset[key]), True).getStringVector()
# Key contains values starting with 'S' = Scheduler, 'L'/'T' = OTDB, 'M' = MoM
# 'S' we can probably ignore? Might be only internal in the Scheduler?
result = set()
for s in stringlist:
try: # Made the source a string for readability, but it's not efficient
if s.startswith('M'):
result.add({'source': 'mom', 'id': int(s[1:])})
elif s.startswith('L') or s.startswith('T'):
result.add({'source': 'otdb', 'id': int(s[1:])})
else: # 'S'
logger.info("found a predecessor ID I can't handle: %s" % s)
result.add({'source': 'other', 'id': int(s[1:])})
except ValueError:
logger.warning("found a predecessor ID that I can't parse %s" % s)
return result
def get_specification_with_predecessors(self, id, id_source, state, found_parsets):
logger.info("Processing ID %s from %s", (id, id_source))
if id_source == "other":
return None
elif id_source == "mom":
otdb_id = self.otdbrpc.taskGetIDs( mom_id=id )['otdb_id']
elif id_source == "otdb":
otdb_id = id
else:
logger.warning("Error in understanding id %s", id)
logger.info("Processing OTDB ID %s", otdb_id)
result = {"otdb_id": otdb_id, "predecessors": []}
if state:
result["state"] = state # TODO should be status not state
else:
pass #otdbrpc.taskGetStatus not implemented and maybe not needed?
if otdb_id in found_parsets:
parset = found_parsets[otdb_id]
else:
parset = self.otdbrpc.taskGetSpecification( OtdbID=otdb_id )['specification']
found_parsets[otdb_id] = parset
logger.info("parset [%s]: %s" % (otdb_id, parset))
result['specification'] = resourceIndicatorsFromParset(parset)
key = PARSET_PREFIX + "Observation.processSubtype"
result['task_type'], result['task_subtype'] = convertSchedulerProcessSubtype(parset.get(key, ""))
logger.info("Processing predecessors")
predecessor_ids = self.get_predecessors(parset)
for id in predecessor_ids:
predecessor_result = self.get_specification_with_predecessors(id['id'], id['source'], "", found_parsets)
if predecessor_result:
result["predecessors"].append(predecessor_result)
return result
def onObservationPrescheduled(self, main_id, modificationTime):
# Construct root node of tree
resultTree = get_specification_with_predecessors(main_id, "otdb", "prescheduled", {})
logger.info("Sending result: %s" % resultTree)
# Put result on bus
msg = EventMessage(content=resultTree)
self.send_bus.send(msg)
logger.info("Result sent")
def main():
import logging
......
......@@ -3,7 +3,6 @@
import logging
import datetime
from lofar.messaging.RPC import RPC, RPCException, RPCWrapper
#from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
''' Simple RPC client for Service lofarbus.*Z
......@@ -24,6 +23,18 @@ class OTDBRPC(RPCWrapper):
broker=None):
super(OTDBRPC, self).__init__(busname, servicename, broker)
def taskGetIDs(self, otdb_id=None, mom_id=None):
if otdb_id:
answer = self.rpc('TaskGetIDs', OtdbID=otdb_id, return_tuple=False)
elif mom_id:
answer = self.rpc('TaskGetIDs', MomID=mom_id, return_tuple=False)
else:
raise OTDBPRCException("TaskGetIDs was called without OTDB or Mom ID")
if not answer:
raise OTDBPRCException("TaskGetIDs returned an empty dict")
return {"tree_type": answer[0], "otdb_id": answer[1], "mom_id": answer[2]}
def taskGetSpecification(self, otdb_id=None, mom_id=None):
if otdb_id:
answer = self.rpc('TaskGetSpecification', OtdbID=otdb_id)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment