diff --git a/SAS/ResourceAssignment/TaskPrescheduler/prescheduler.py b/SAS/ResourceAssignment/TaskPrescheduler/prescheduler.py index b7ccbbb9921d548ca2b4e1da87cb947fce7e9203..8d4e261f396849d9b51d66ec08e70c7fef82c989 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/prescheduler.py +++ b/SAS/ResourceAssignment/TaskPrescheduler/prescheduler.py @@ -113,6 +113,30 @@ def resourceIndicatorsFromParset( parsetDict ): return subset +def isObservation(parsetDict): + #TODO copied from RATaskSpecified, needs to be refactored into a common RA lib. + # Translation table for task types: OTDB (type,subtype) -> RADB (type,subtype) + OTDB_to_RADB_type_translation_table = { + ("Observation", "Beam Observation"): ("observation", "bfmeasurement"), + ("Observation", "Interferometer"): ("observation", "interferometer"), + ("Observation", "TBB (piggyback)"): ("observation", "tbbmeasurement"), + ("Observation", "TBB (standalone)"): ("observation", "tbbmeasurement"), + ("Pipeline", "Averaging Pipeline"): ("pipeline", "averaging pipeline"), + ("Pipeline", "Calibration Pipeline"): ("pipeline", "calibration pipeline"), + ("Pipeline", "Imaging Pipeline"): ("pipeline", "imaging pipeline"), + ("Pipeline", "Imaging Pipeline MSSS"): ("pipeline", "imaging pipeline msss"), + ("Pipeline", "Long Baseline Pipeline"): ("pipeline", "long baseline pipeline"), + ("Pipeline", "Pulsar Pipeline"): ("pipeline", "pulsar pipeline"), + ("MAINTENANCE", ""): ("reservation", "maintenance"), + ("RESERVATION", ""): ("reservation", "project"), + } + + # parse & marshall task type information + otdb_task_type = parsetDict[INPUT_PREFIX + "Observation.processType"] + otdb_task_subtype = parsetDict[INPUT_PREFIX + "Observation.processSubtype"] + task_type, task_subtype = OTDB_to_RADB_type_translation_table[(otdb_task_type, otdb_task_subtype)] + return (task_type == "observation") + class TaskPrescheduler(OTDBBusListener): def __init__(self, otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME, @@ -172,15 +196,15 @@ class TaskPrescheduler(OTDBBusListener): logger.info('Found a task mom_id=%s with a trigger_id=%s', mom_id, response['trigger_id']) #TODO, check for stations and other resources, start/endtime, target position, then update specification - # TODO: set cobalt values only for observations - cobalt_values = calculateCobaltSettings(specification) logger.info('prescheduling otdb_id=%s because it was generated by trigger_id=%s', otdb_id, response['trigger_id']) otdb_info = {} - otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.nrBlocksPerIntegration"] = cobalt_values["nrBlocks"] - otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.nrIntegrationsPerBlock"] = cobalt_values["nrSubblocks"] - otdb_info[OUTPUT_PREFIX + COBALT + "blockSize"] = cobalt_values["blockSize"] - otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.integrationTime"] = cobalt_values["integrationTime"] + if isObservation(parset): + cobalt_values = calculateCobaltSettings(specification) + otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.nrBlocksPerIntegration"] = cobalt_values["nrBlocks"] + otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.nrIntegrationsPerBlock"] = cobalt_values["nrSubblocks"] + otdb_info[OUTPUT_PREFIX + COBALT + "blockSize"] = cobalt_values["blockSize"] + otdb_info[OUTPUT_PREFIX + COBALT + "Correlator.integrationTime"] = cobalt_values["integrationTime"] self.setOTDBinfo(otdb_id, otdb_info, 'prescheduled') else: logger.info('Did not find a trigger for task mom_id=%s, because %s', mom_id, response['errors']) @@ -192,12 +216,13 @@ class TaskPrescheduler(OTDBBusListener): def setOTDBinfo(self, otdb_id, otdb_info, otdb_status): """This function sets the values in otdb_info in OTDB, almost a copy from the RAtoOTDBPropagator""" try: - logger.info('Setting specification for otdb_id %s:\n', otdb_id) - logger.info(pprint.pformat(otdb_info)) - self.otdbrpc.taskSetSpecification(otdb_id, otdb_info) - #We probably will need this as well - #self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"], - # otdb_info["LOFAR.ObsSW.Observation.stopTime"]) + if otdb_info: + logger.info('Setting specification for otdb_id %s:\n', otdb_id) + logger.info(pprint.pformat(otdb_info)) + self.otdbrpc.taskSetSpecification(otdb_id, otdb_info) + #We probably will need this as well + #self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"], + # otdb_info["LOFAR.ObsSW.Observation.stopTime"]) logger.info('Setting status (%s) for otdb_id %s', otdb_status, otdb_id) self.otdbrpc.taskSetStatus(otdb_id, otdb_status) except Exception as e: diff --git a/SAS/ResourceAssignment/TaskPrescheduler/test/test_taskprescheduler.py b/SAS/ResourceAssignment/TaskPrescheduler/test/test_taskprescheduler.py index 6b81b015e209ebee2ba51a53405389839517066d..0f8f002340f621e5591b76bbadf413166ec50655 100755 --- a/SAS/ResourceAssignment/TaskPrescheduler/test/test_taskprescheduler.py +++ b/SAS/ResourceAssignment/TaskPrescheduler/test/test_taskprescheduler.py @@ -43,79 +43,34 @@ class PreschedulerTest(unittest.TestCase): # No __init__ because that confuses unittest.main() def reset_specification_tree(self, otdb_id, mom_id, future_start_time, future_stop_time): - #pipeline not used yet, but will be needed so I left it in. self.pipeline_specification_tree = { - u'task_subtype': u'averaging pipeline', - u'specification': { - u'ObsSW.Observation.DataProducts.Output_InstrumentModel.enabled': False, - u'ObsSW.Observation.stopTime': future_stop_time, - u'ObsSW.Observation.VirtualInstrument.stationList': [], - u'ObsSW.Observation.DataProducts.Input_CoherentStokes.enabled': False, - u'ObsSW.Observation.DataProducts.Output_CoherentStokes.enabled': False, - u'ObsSW.Observation.DataProducts.Output_SkyImage.enabled': False, - u'ObsSW.Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0], - u'ObsSW.Observation.antennaSet': u'LBA_INNER', - u'ObsSW.Observation.nrBitsPerSample': u'16', - u'ObsSW.Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'1', - u'ObsSW.Observation.DataProducts.Output_IncoherentStokes.enabled': False, - u'ObsSW.Observation.DataProducts.Input_IncoherentStokes.enabled': False, - u'ObsSW.Observation.DataProducts.Input_Correlated.enabled': True, - u'ObsSW.Observation.DataProducts.Output_Pulsar.enabled': False, - u'ObsSW.Observation.DataProducts.Input_CoherentStokes.skip': [], - u'ObsSW.Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep': u'10', - u'Version.number': u'33774', - u'ObsSW.Observation.momID': mom_id, - u'ObsSW.Observation.startTime': future_start_time, - u'ObsSW.Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'1', - u'ObsSW.Observation.nrBeams': u'0', - u'ObsSW.Observation.DataProducts.Input_IncoherentStokes.skip': [], - u'ObsSW.Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep': u'64', - u'ObsSW.Observation.DataProducts.Output_Correlated.enabled': True, - u'ObsSW.Observation.sampleClock': u'200' - }, - u'task_type': u'pipeline', - u'otdb_id': otdb_id, - u'predecessors': [{ - u'task_subtype': u'bfmeasurement', - u'specification': { - u'ObsSW.Observation.DataProducts.Output_InstrumentModel.enabled': False, - u'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor': u'1', - u'ObsSW.Observation.stopTime': future_stop_time, - u'ObsSW.Observation.VirtualInstrument.stationList': [u'RS205', u'RS503', u'CS013', u'RS508', u'RS106'], - u'ObsSW.Observation.DataProducts.Input_CoherentStokes.enabled': False, - u'ObsSW.Observation.DataProducts.Output_CoherentStokes.enabled': False, - u'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband': u'64', - u'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which': u'I', - u'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which': u'I', - u'ObsSW.Observation.Beam[0].subbandList': [100, 101, 102, 103], - u'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile': u'512', - u'ObsSW.Observation.DataProducts.Input_Correlated.skip': [], - u'ObsSW.Observation.antennaSet': u'HBA_DUAL', - u'ObsSW.Observation.nrBitsPerSample': u'8', - u'ObsSW.Observation.Beam[0].nrTabRings': u'0', - u'ObsSW.Observation.Beam[0].nrTiedArrayBeams': u'0', - u'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye': False, - u'ObsSW.Observation.nrBeams': u'1', - u'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime': u'1.0', - u'ObsSW.Observation.DataProducts.Output_IncoherentStokes.enabled': False, - u'ObsSW.Observation.DataProducts.Input_IncoherentStokes.enabled': False, - u'ObsSW.Observation.DataProducts.Input_Correlated.enabled': False, - u'ObsSW.Observation.DataProducts.Output_Pulsar.enabled': False, - u'ObsSW.Observation.DataProducts.Input_CoherentStokes.skip': [], - u'ObsSW.Observation.DataProducts.Output_SkyImage.enabled': False, - u'Version.number': u'33774', - u'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor': u'1', - u'ObsSW.Observation.momID': u'351539', - u'ObsSW.Observation.startTime': future_start_time, - u'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile': u'512', - u'ObsSW.Observation.DataProducts.Input_IncoherentStokes.skip': [], - u'ObsSW.Observation.DataProducts.Output_Correlated.enabled': True, - u'ObsSW.Observation.sampleClock': u'200' - }, - u'task_type': u'Observation', - u'otdb_id': 1290476, - u'predecessors': [] - }] + u'ObsSW.Observation.DataProducts.Output_InstrumentModel.enabled': False, + u'ObsSW.Observation.stopTime': future_stop_time, + u'ObsSW.Observation.VirtualInstrument.stationList': [], + u'ObsSW.Observation.DataProducts.Input_CoherentStokes.enabled': False, + u'ObsSW.Observation.DataProducts.Output_CoherentStokes.enabled': False, + u'ObsSW.Observation.DataProducts.Output_SkyImage.enabled': False, + u'ObsSW.Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0], + u'ObsSW.Observation.antennaSet': u'LBA_INNER', + u'ObsSW.Observation.nrBitsPerSample': u'16', + u'ObsSW.Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'1', + u'ObsSW.Observation.DataProducts.Output_IncoherentStokes.enabled': False, + u'ObsSW.Observation.DataProducts.Input_IncoherentStokes.enabled': False, + u'ObsSW.Observation.DataProducts.Input_Correlated.enabled': True, + u'ObsSW.Observation.DataProducts.Output_Pulsar.enabled': False, + u'ObsSW.Observation.DataProducts.Input_CoherentStokes.skip': [], + u'ObsSW.Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep': u'10', + u'Version.number': u'33774', + u'ObsSW.Observation.momID': mom_id, + u'ObsSW.Observation.startTime': future_start_time, + u'ObsSW.Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'1', + u'ObsSW.Observation.nrBeams': u'0', + u'ObsSW.Observation.DataProducts.Input_IncoherentStokes.skip': [], + u'ObsSW.Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep': u'64', + u'ObsSW.Observation.DataProducts.Output_Correlated.enabled': True, + u'ObsSW.Observation.sampleClock': u'200', + u'ObsSW.Observation.processType': u'Pipeline', + u'ObsSW.Observation.processSubtype': u'Averaging Pipeline', } self.observation_specification_tree = { @@ -153,7 +108,9 @@ class PreschedulerTest(unittest.TestCase): u'ObsSW.Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile': u'512', u'ObsSW.Observation.DataProducts.Input_IncoherentStokes.skip': [], u'ObsSW.Observation.DataProducts.Output_Correlated.enabled': True, - u'ObsSW.Observation.sampleClock': u'200' + u'ObsSW.Observation.sampleClock': u'200', + u'ObsSW.Observation.processType': u'Observation', + u'ObsSW.Observation.processSubtype': u'Beam Observation', } self.test_specification = { @@ -273,10 +230,22 @@ class PreschedulerTest(unittest.TestCase): self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) self.otdbrpc_mock.taskSetSpecification.assert_any_call(self.otdb_id, self.otdb_info) + def test_onObservationApproved_pipeline_SetSpecification(self): + self.otdbrpc_mock.taskGetSpecification.return_value = {'otdb_id': self.otdb_id, + 'specification': self.pipeline_specification_tree} + self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + self.otdbrpc_mock.taskSetSpecification.assert_not_called() + def test_onObservationApproved_taskSetStatus(self): self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) self.otdbrpc_mock.taskSetStatus.assert_any_call(self.otdb_id, 'prescheduled') + def test_onObservationApproved_pipeline_taskSetStatus(self): + self.otdbrpc_mock.taskGetSpecification.return_value = {'otdb_id': self.otdb_id, + 'specification': self.pipeline_specification_tree} + self.taskprescheduler.onObservationApproved(self.otdb_id, self.modification_time) + self.otdbrpc_mock.taskSetStatus.assert_any_call(self.otdb_id, 'prescheduled') + def test_calculateCobaltSettings(self): cobalt_settings = calculateCobaltSettings(self.test_specification) assert(cobalt_settings == self.test_cobalt_settings)