From 5b6d599cf77ed3f3ac388f11a38753f2af5595a9 Mon Sep 17 00:00:00 2001 From: Adriaan Renting <renting@astron.nl> Date: Thu, 7 Apr 2016 13:30:07 +0000 Subject: [PATCH] Task #8887: changed RATaskSpecified to handle non-otdb predecesors --- .../lib/RATaskSpecified.py | 434 ++++++++---------- .../lib/otdbrpc.py | 13 +- 2 files changed, 215 insertions(+), 232 deletions(-) diff --git a/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py b/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py index fd410facf85..c57371cca40 100755 --- a/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py +++ b/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py @@ -25,10 +25,11 @@ Daemon that listens to OTDB status changes to PRESCHEDULED and SCHEDULED, reques the parset of such jobs (+ their predecessors), and posts them on the bus. """ -from lofar.messaging import FromBus, ToBus, RPC, EventMessage +from lofar.messaging import FromBus, ToBus, EventMessage # RPC, from lofar.parameterset import PyParameterValue from lofar.sas.otdb.OTDBBusListener import OTDBBusListener from lofar.common.util import waitForInterrupt +from lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.otdbrpc import OTDBRPC from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT @@ -40,17 +41,6 @@ logger = logging.getLogger(__name__) """ Prefix that is common to all parset keys, depending on the exact source. """ PARSET_PREFIX="ObsSW." -def predecessors( parset ): - """ Extract the list of predecessor obs IDs from the given parset. """ - - key = PARSET_PREFIX + "Observation.Scheduler.predecessors" - strlist = PyParameterValue(str(parset[key]), True).getStringVector() - - # Key contains "Lxxxxx" values, we want to have "xxxxx" only - result = [int(filter(str.isdigit,x)) for x in strlist] - - return result - def convertSchedulerProcessSubtype(processSubType): '''convert scheduler processSubType as defined in SAS/Scheduler/src/OTDBTree.h to RA (type, subtype) tuple''' if processSubType == "Averaging Pipeline": @@ -73,230 +63,212 @@ def convertSchedulerProcessSubtype(processSubType): return "observation", "tbbmeasurement" elif processSubType == "TBB (standalone)": return "observation", "tbbmeasurement" - + ##TODO Maintenance and Reservation return "unknown", "unknown" def resourceIndicatorsFromParset( parsetDict ): - """ Extract the parset keys that are required for resource assignment. """ - - subset = {} - - def get(key, default=None): - """ Return the value of parset key `key', or `default' if the key - is not defined. """ - return parsetDict.get(PARSET_PREFIX + key, default) - - def add(key, conversion=lambda x: x): - """ Add the given key to our subset selection, using an optional - conversion. """ - value = get(key) - if value is not None: - subset[key] = conversion(value) - - """ Some conversion functions for common parameter-value types.""" - def strvector(value): - return PyParameterValue(str(value), True).getStringVector() - - def intvector(value): - return PyParameterValue(str(value), True)._expand().getIntVector() - - def bool(value): - return PyParameterValue(str(value), True).getBool() - - # ===================================== - # Parset meta info - # ===================================== - subset["Version.number"] = parsetDict.get("Version.number") - - # ===================================== - # Observation settings - # ===================================== - add("Observation.momID") - add("Observation.sampleClock") - add("Observation.nrBitsPerSample") - add("Observation.antennaSet") - add("Observation.VirtualInstrument.stationList", strvector) - add("Observation.startTime") - add("Observation.stopTime") - add("Observation.nrBeams") - - nrSAPs = int(get("Observation.nrBeams", 0)) - for sap in xrange(0, nrSAPs): - add("Observation.Beam[%d].subbandList" % (sap,), intvector) - - # ===================================== - # Correlator settings - # ===================================== - add("Observation.DataProducts.Output_Correlated.enabled", bool) - add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime") - add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband") - # TODO: We need a service that computes these 3 values - add("Cobalt.Correlator.nrBlocksPerIntegration") - add("Cobalt.Correlator.nrIntegrationsPerBlock") - add("Cobalt.blockSize") - - - # ===================================== - # Beamformer settings - # ===================================== - add("Observation.DataProducts.Output_IncoherentStokes.enabled", bool) - add("Observation.DataProducts.Output_CoherentStokes.enabled", bool) - add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye", bool) - #add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize - add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile") - add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor") - add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which") - #add("Observation.ObservationControl.OnlineControl.Cobalt.IncoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize - add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile") - add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor") - add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which") - for sap in xrange(0, nrSAPs): - add("Observation.Beam[%d].nrTabRings" % (sap,)) - - add("Observation.Beam[%d].nrTiedArrayBeams" % (sap,)) - nrTABs = int(get("Observation.Beam[%d].nrTiedArrayBeams" % (sap,), 0)) - for tab in xrange(0, nrTABs): - add("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap,tab), bool) - - # ===================================== - # Pipeline settings - # ===================================== - # Calibrator / Averaging pipelines - add("Observation.DataProducts.Output_Correlated.enabled", bool) - add("Observation.DataProducts.Output_InstrumentModel.enabled", bool) - add("Observation.DataProducts.Input_Correlated.enabled", bool) - add("Observation.DataProducts.Input_Correlated.skip", intvector) - add("Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep") - add("Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep") - - # Imaging pipeline - add("Observation.DataProducts.Output_SkyImage.enabled", bool) - add("Observation.ObservationControl.PythonControl.Imaging.slices_per_image") - add("Observation.ObservationControl.PythonControl.Imaging.subbands_per_image") - - # Long-baseline pipeline - add("Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms") - add("Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup") - - # Pulsar pipeline - add("Observation.DataProducts.Output_Pulsar.enabled", bool) - add("Observation.DataProducts.Input_CoherentStokes.enabled", bool) - add("Observation.DataProducts.Input_CoherentStokes.skip", intvector) - add("Observation.DataProducts.Input_IncoherentStokes.enabled", bool) - add("Observation.DataProducts.Input_IncoherentStokes.skip", intvector) - - return subset + """ Extract the parset keys that are required for resource assignment. """ + + subset = {} + + def get(key, default=None): + """ Return the value of parset key `key', or `default' if the key + is not defined. """ + return parsetDict.get(PARSET_PREFIX + key, default) + + def add(key, conversion=lambda x: x): + """ Add the given key to our subset selection, using an optional + conversion. """ + value = get(key) + if value is not None: + subset[key] = conversion(value) + + """ Some conversion functions for common parameter-value types.""" + def strvector(value): + return PyParameterValue(str(value), True).getStringVector() + + def intvector(value): + return PyParameterValue(str(value), True)._expand().getIntVector() + + def bool(value): + return PyParameterValue(str(value), True).getBool() + + # ===================================== + # Parset meta info + # ===================================== + subset["Version.number"] = parsetDict.get("Version.number") + + # ===================================== + # Observation settings + # ===================================== + add("Observation.momID") + add("Observation.sampleClock") + add("Observation.nrBitsPerSample") + add("Observation.antennaSet") + add("Observation.VirtualInstrument.stationList", strvector) + add("Observation.startTime") + add("Observation.stopTime") + add("Observation.nrBeams") + + nrSAPs = int(get("Observation.nrBeams", 0)) + for sap in xrange(0, nrSAPs): + add("Observation.Beam[%d].subbandList" % (sap,), intvector) + + # ===================================== + # Correlator settings + # ===================================== + add("Observation.DataProducts.Output_Correlated.enabled", bool) + add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime") + add("Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband") + # TODO: We need a service that computes these 3 values + add("Cobalt.Correlator.nrBlocksPerIntegration") + add("Cobalt.Correlator.nrIntegrationsPerBlock") + add("Cobalt.blockSize") + + + # ===================================== + # Beamformer settings + # ===================================== + add("Observation.DataProducts.Output_IncoherentStokes.enabled", bool) + add("Observation.DataProducts.Output_CoherentStokes.enabled", bool) + add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye", bool) + #add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize + add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile") + add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor") + add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which") + #add("Observation.ObservationControl.OnlineControl.Cobalt.IncoherentStokes.nrChannelsPerSubband") # only needed to determine Cobalt.blockSize + add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile") + add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor") + add("Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which") + for sap in xrange(0, nrSAPs): + add("Observation.Beam[%d].nrTabRings" % (sap,)) + + add("Observation.Beam[%d].nrTiedArrayBeams" % (sap,)) + nrTABs = int(get("Observation.Beam[%d].nrTiedArrayBeams" % (sap,), 0)) + for tab in xrange(0, nrTABs): + add("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap,tab), bool) + + # ===================================== + # Pipeline settings + # ===================================== + # Calibrator / Averaging pipelines + add("Observation.DataProducts.Output_Correlated.enabled", bool) + add("Observation.DataProducts.Output_InstrumentModel.enabled", bool) + add("Observation.DataProducts.Input_Correlated.enabled", bool) + add("Observation.DataProducts.Input_Correlated.skip", intvector) + add("Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep") + add("Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep") + + # Imaging pipeline + add("Observation.DataProducts.Output_SkyImage.enabled", bool) + add("Observation.ObservationControl.PythonControl.Imaging.slices_per_image") + add("Observation.ObservationControl.PythonControl.Imaging.subbands_per_image") + + # Long-baseline pipeline + add("Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms") + add("Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup") + + # Pulsar pipeline + add("Observation.DataProducts.Output_Pulsar.enabled", bool) + add("Observation.DataProducts.Input_CoherentStokes.enabled", bool) + add("Observation.DataProducts.Input_CoherentStokes.skip", intvector) + add("Observation.DataProducts.Input_IncoherentStokes.enabled", bool) + add("Observation.DataProducts.Input_IncoherentStokes.skip", intvector) + + return subset class RATaskSpecified(OTDBBusListener): - def __init__(self, - otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME, - otdb_notification_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT, - otdb_service_busname=DEFAULT_OTDB_SERVICE_BUSNAME, - otdb_service_subject=DEFAULT_OTDB_SERVICENAME + '.TaskGetSpecification', - notification_busname=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME, - notification_subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT, - broker=None, **kwargs): - super(RATaskSpecified, self).__init__(busname=otdb_notification_busname, subject=otdb_notification_subject, **kwargs) - - self.parset_rpc = RPC(service=otdb_service_subject, busname=otdb_service_busname) - self.send_bus = ToBus("%s/%s" % (notification_busname, notification_subject)) - - def start_listening(self, **kwargs): - self.parset_rpc.open() - self.send_bus.open() - - super(RATaskSpecified, self).start_listening(**kwargs) - - def stop_listening(self, **kwargs): - super(RATaskSpecified, self).stop_listening(**kwargs) - - self.send_bus.close() - self.parset_rpc.close() - - def onObservationPrescheduled(self, treeId, modificationTime): - logger.info("Processing obs ID %s", treeId) - - # Request the parset - main_obsID = treeId - main_parset,_ = self.parset_rpc( OtdbID=main_obsID ) - main_parset = main_parset['TaskSpecification'] - logger.info("main_parset [%s]: %s" % (main_obsID, main_parset)) - - # Construct a dict of all the parsets we retrieved - parsets = {} - parsets[main_obsID] = main_parset - - logger.info("Processing predecessors") - - # Collect the initial set of predecessors - request_obsIDs = set(predecessors(main_parset)) - - logger.info("Processing %s", request_obsIDs) - - obsId2predId = {int(main_obsID):list(request_obsIDs)} - - # Iterate recursively over all known predecessor obsIDs, and request their parsets - while request_obsIDs: - obsID = request_obsIDs.pop() - - if obsID in parsets: - # Predecessor lists can overlap -- we already have this one - continue - - logger.info("Fetching predecessor %s", obsID) - - # Request predecessor parset - parsets[obsID],_ = self.parset_rpc( OtdbID=obsID ) - parsets[obsID] = parsets[obsID]['TaskSpecification'] - #logger.info("predecessor parset [%s]: %s" % (obsID, parsets[obsID])) - - # Add the list of predecessors - predecessor_ids = predecessors(parsets[obsID]) - request_obsIDs = request_obsIDs.union(predecessor_ids) - obsId2predId[obsID] = predecessor_ids - logger.info("obsID %s: preds: %s" % (obsID, predecessor_ids)) - logger.info("obsId2predId %s" % (obsId2predId)) - - # Convert parsets to resource indicators - logger.info("Extracting resource indicators") - specifications = dict([(obsID, resourceIndicatorsFromParset(parset)) for (obsID,parset) in parsets.iteritems()]) - - # recursive method to build the tree of obs and its predecessors - def appendChildNodes(treeNode): - node_otdb_id = treeNode['otdb_id'] - node_pred_otdb_ids = obsId2predId[node_otdb_id] - node_pred_specifications = {pred_id:specifications[pred_id] for pred_id in node_pred_otdb_ids} - - for pred_id, pred_specification in node_pred_specifications.items(): - childNode = { - "otdb_id": pred_id, - "specification": pred_specification, - "predecessors": [] - } - childNode['task_type'], childNode['task_subtype'] = convertSchedulerProcessSubtype(parsets[pred_id].get(PARSET_PREFIX+"Observation.processSubtype", "")) - - appendChildNodes(childNode) - treeNode["predecessors"].append(childNode) - - # Construct root node of tree - resultTree = { - "otdb_id": int(main_obsID), - "state": "prescheduled", - "specification": specifications[main_obsID], - "predecessors": [] - } - resultTree['task_type'], resultTree['task_subtype'] = convertSchedulerProcessSubtype(main_parset.get(PARSET_PREFIX+"Observation.processSubtype", "")) - - #recursively append predecessors as child nodes - appendChildNodes(resultTree) - - logger.info("Sending result: %s" % resultTree) - - # Put result on bus - msg = EventMessage(content=resultTree) - self.send_bus.send(msg) - - logger.info("Result sent") + def __init__(self, + otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME, + otdb_notification_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT, + otdb_service_busname=DEFAULT_OTDB_SERVICE_BUSNAME, + otdb_service_subject=DEFAULT_OTDB_SERVICENAME, + notification_busname=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME, + notification_subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT, + broker=None, **kwargs): + super(RATaskSpecified, self).__init__(busname=otdb_notification_busname, subject=otdb_notification_subject, **kwargs) + self.otdbrpc = OTDBRPC(busname=otdb_service_busname, servicename=otdb_service_subject, broker=broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now + self.send_bus = ToBus("%s/%s" % (notification_busname, notification_subject)) + + def start_listening(self, **kwargs): + self.otdbrpc.open() + self.send_bus.open() + super(RATaskSpecified, self).start_listening(**kwargs) + + def stop_listening(self, **kwargs): + super(RATaskSpecified, self).stop_listening(**kwargs) + self.send_bus.close() + self.otdbrpc.close() + + def get_predecessors(self, parset): + """ Extract the list of predecessor obs IDs from the given parset. """ + + key = PARSET_PREFIX + "Observation.Scheduler.predecessors" + strlist = PyParameterValue(str(parset[key]), True).getStringVector() + + # Key contains values starting with 'S' = Scheduler, 'L'/'T' = OTDB, 'M' = MoM + # 'S' we can probably ignore? Might be only internal in the Scheduler? + result = set() + for s in stringlist: + try: # Made the source a string for readability, but it's not efficient + if s.startswith('M'): + result.add({'source': 'mom', 'id': int(s[1:])}) + elif s.startswith('L') or s.startswith('T'): + result.add({'source': 'otdb', 'id': int(s[1:])}) + else: # 'S' + logger.info("found a predecessor ID I can't handle: %s" % s) + result.add({'source': 'other', 'id': int(s[1:])}) + except ValueError: + logger.warning("found a predecessor ID that I can't parse %s" % s) + return result + + def get_specification_with_predecessors(self, id, id_source, state, found_parsets): + logger.info("Processing ID %s from %s", (id, id_source)) + if id_source == "other": + return None + elif id_source == "mom": + otdb_id = self.otdbrpc.taskGetIDs( mom_id=id )['otdb_id'] + elif id_source == "otdb": + otdb_id = id + else: + logger.warning("Error in understanding id %s", id) + + logger.info("Processing OTDB ID %s", otdb_id) + result = {"otdb_id": otdb_id, "predecessors": []} + if state: + result["state"] = state # TODO should be status not state + else: + pass #otdbrpc.taskGetStatus not implemented and maybe not needed? + + if otdb_id in found_parsets: + parset = found_parsets[otdb_id] + else: + parset = self.otdbrpc.taskGetSpecification( OtdbID=otdb_id )['specification'] + found_parsets[otdb_id] = parset + + logger.info("parset [%s]: %s" % (otdb_id, parset)) + result['specification'] = resourceIndicatorsFromParset(parset) + + key = PARSET_PREFIX + "Observation.processSubtype" + result['task_type'], result['task_subtype'] = convertSchedulerProcessSubtype(parset.get(key, "")) + + logger.info("Processing predecessors") + predecessor_ids = self.get_predecessors(parset) + for id in predecessor_ids: + predecessor_result = self.get_specification_with_predecessors(id['id'], id['source'], "", found_parsets) + if predecessor_result: + result["predecessors"].append(predecessor_result) + return result + + def onObservationPrescheduled(self, main_id, modificationTime): + # Construct root node of tree + resultTree = get_specification_with_predecessors(main_id, "otdb", "prescheduled", {}) + logger.info("Sending result: %s" % resultTree) + + # Put result on bus + msg = EventMessage(content=resultTree) + self.send_bus.send(msg) + logger.info("Result sent") def main(): import logging diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/otdbrpc.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/otdbrpc.py index 091e2f42a15..02ba5e00960 100644 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/otdbrpc.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/otdbrpc.py @@ -3,7 +3,6 @@ import logging import datetime from lofar.messaging.RPC import RPC, RPCException, RPCWrapper -#from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME ''' Simple RPC client for Service lofarbus.*Z @@ -24,6 +23,18 @@ class OTDBRPC(RPCWrapper): broker=None): super(OTDBRPC, self).__init__(busname, servicename, broker) + def taskGetIDs(self, otdb_id=None, mom_id=None): + if otdb_id: + answer = self.rpc('TaskGetIDs', OtdbID=otdb_id, return_tuple=False) + elif mom_id: + answer = self.rpc('TaskGetIDs', MomID=mom_id, return_tuple=False) + else: + raise OTDBPRCException("TaskGetIDs was called without OTDB or Mom ID") + if not answer: + raise OTDBPRCException("TaskGetIDs returned an empty dict") + return {"tree_type": answer[0], "otdb_id": answer[1], "mom_id": answer[2]} + + def taskGetSpecification(self, otdb_id=None, mom_id=None): if otdb_id: answer = self.rpc('TaskGetSpecification', OtdbID=otdb_id) -- GitLab