diff --git a/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py b/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py index 6dd1dc1e7c9b557fe7e2ef2d5f86d4fbc7ce82d5..14832f506758131b7cf08822d999ec5fa6e679da 100755 --- a/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py +++ b/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RATaskSpecified.py @@ -21,15 +21,15 @@ # # $Id$ """ -Daemon that listens to specific OTDB status changes, requests the parset of such jobs including their predecessors, and -posts them on the bus. +Daemon that listens to specific OTDB status changes, requests the parset of such jobs including +their predecessors, and posts them on the bus. """ from lofar.messaging import ToBus, EventMessage, DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.sas.otdb.OTDBBusListener import OTDBEventMessageHandler, OTDBBusListener -from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT +from lofar.sas.resourceassignment.rataskspecified.config import \ + DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT from lofar.sas.resourceassignment.common.specification import Specification -from lofar.messaging import DEFAULT_BUSNAME, DEFAULT_BROKER from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC @@ -41,44 +41,55 @@ logger = logging.getLogger(__name__) class RATaskSpecifiedOTDBEventMessageHandler(OTDBEventMessageHandler): def __init__(self, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): """ - :param radb_exchange: name of the bus on which the radb service listens (default: lofar.ra.command) - :param radb_servicename: name of the radb service (default: RADBService) + :param exchange: name of the exchange to listen on. + :param broker: name of the broker to connect to. """ super().__init__() - self.otdbrpc = OTDBRPC.create(exchange=exchange, broker=broker) + self.otdbrpc = OTDBRPC.create(exchange=exchange, broker=broker) self.radbrpc = RADBRPC.create(exchange=exchange, broker=broker) self.momrpc = MoMQueryRPC.create(exchange=exchange, broker=broker) self.send_bus = ToBus(exchange=exchange, broker=broker) + def start_handling(self): + self.otdbrpc.open() + self.momrpc.open() + self.radbrpc.open() + self.send_bus.open() + + def stop_handling(self): + self.otdbrpc.close() + self.momrpc.close() + self.radbrpc.close() + self.send_bus.close() + # This is mainly to trigger the propagation of misc field values through read_from_mom # and then sending them to the RA to OTDB Service in the resource assigner. # Might need to be a separate service if we take on more mom-otdb-adapter function. - def onObservationApproved(self, main_id, modificationTime): + def onObservationApproved(self, main_id, modification_time): self.createAndSendSpecifiedTask(main_id, 'approved') - def onObservationPrescheduled(self, main_id, modificationTime): + def onObservationPrescheduled(self, main_id, modification_time): self.createAndSendSpecifiedTask(main_id, 'prescheduled') def createAndSendSpecifiedTask(self, main_id, status): - with self.otdbrpc, self.radbrpc, self.momrpc: - spec = Specification(logger, - exchange=self.send_bus.exchange, - broker=self.send_bus.broker) - spec.status = status - spec.read_from_OTDB_with_predecessors(main_id, "otdb", {}) - spec.read_from_mom() - spec.update_start_end_times() - #spec.insert_into_radb() is still done in resource_assigner for now. - resultTree = spec.as_dict() - if spec.status == status: - logger.info("Sending result: %s" % resultTree) - # Put result on bus - msg = EventMessage(subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT, content=resultTree) - with self.send_bus: - self.send_bus.send(msg) - logger.info("Result sent") - else: - logger.warning("Problem retrieving task %i" % main_id) + spec = Specification(self.otdbrpc, self.momrpc, self.radbrpc) + spec.status = status + spec.read_from_OTDB_with_predecessors(main_id, "otdb", {}) + spec.read_from_mom() + spec.update_start_end_times() + # spec.insert_into_radb() is still done in resource_assigner for now. + result_tree = spec.as_dict() + if spec.status == status: + logger.info("Sending result: %s" % result_tree) + # Put result on bus + msg = EventMessage(subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT, + content=result_tree) + with self.send_bus: + self.send_bus.send(msg) + logger.info("Result sent") + else: + logger.warning("Problem retrieving task %i" % main_id) + def main(): # make sure we run in UTC timezone @@ -97,7 +108,8 @@ def main(): parser.add_option("-e", "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="Bus or queue to communicate on. [default: %default]") - parser.add_option('-b', '--broker', dest='broker', type='string', default=None, help='Address of the broker, default: localhost') + parser.add_option('-b', '--broker', dest='broker', type='string', default=None, + help='Address of the broker, default: localhost') (options, args) = parser.parse_args() with OTDBBusListener(handler_type=RATaskSpecifiedOTDBEventMessageHandler, diff --git a/SAS/ResourceAssignment/RATaskSpecifiedService/test/tRATaskSpecified.py b/SAS/ResourceAssignment/RATaskSpecifiedService/test/tRATaskSpecified.py index 358cab12abc14bbef4ed31dc5bbe7acae52cd129..2152d0cf57a7de23678ce9f1b2c3c6d3293b4829 100644 --- a/SAS/ResourceAssignment/RATaskSpecifiedService/test/tRATaskSpecified.py +++ b/SAS/ResourceAssignment/RATaskSpecifiedService/test/tRATaskSpecified.py @@ -1,522 +1,31 @@ #!/usr/bin/env python3 """ -This file provides the unit tests for the RATaskSpecified.py module, which is hereafter referred to as Unit Under Test -or simply uut +This file provides the unit tests for the RATaskSpecified.py module, which is hereafter referred to +as Unit Under Test or simply uut """ -import sys, os, unittest, uuid, datetime, types +import unittest from unittest import mock -from os import walk -from pprint import pprint -from lofar.parameterset import parameterset -from lofar.messaging import EventMessage, ToBus, Service, MessageHandlerInterface -from lofar.common.methodtrigger import MethodTrigger -from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICENAME +from lofar.sas.resourceassignment.rataskspecified.RATaskSpecified import \ + RATaskSpecifiedOTDBEventMessageHandler + +from mock import MagicMock import logging -logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s', level = logging.INFO) +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) -from lofar.sas.resourceassignment.rataskspecified.RATaskSpecified import * -from lofar.sas.resourceassignment.rataskspecified.RABusListener import RATaskSpecifiedBusListener - -try: - from mock import MagicMock - from mock import patch -except ImportError: - print('Cannot run test without python3 MagicMock') - print('Please install MagicMock: pip3 install mock') - exit(3) - -# TODO move the commented tests elsewere if possible otherwise remove them - -# class TestGetPredecessors(unittest.TestCase): -# """ Class containing tests to verify whether the uut properly obtains the predecessors from a parset """ -# def test_0_predecessors(self): -# """ Verify proper functioning in case of no predecessors """ -# parset = parameterset({ PARSET_PREFIX + "Observation.Scheduler.predecessors": "[]" }) -# self.assertEqual(RATaskSpecified.get_predecessors(parset), []) -# -# def test_1_predecessor(self): -# """ Verify proper functioning in case of a single predecessor """ -# parset = parameterset({ PARSET_PREFIX + "Observation.Scheduler.predecessors": "[L426528]" }) -# self.assertEqual(RATaskSpecified.get_predecessors(parset), [{'id': 426528, 'source': 'otdb'}]) -# -# def test_2_predecessors(self): -# """ Verify proper functioning in case of a two predecessors """ -# parset = parameterset({ PARSET_PREFIX + "Observation.Scheduler.predecessors": "[L426528,L1]" }) -# self.assertEqual(sorted(RATaskSpecified.get_predecessors(parset)), -# [{'id': 1, 'source': 'otdb'},{'id': 426528, 'source': 'otdb'}]) -# -# -# # ---------------------------------------------------------------------------------------------------------------------- -# DO_GENERATE_GOLDEN_OUTPUTS = False -# -# -# class TestRASubset(unittest.TestCase): -# """ -# Class containing tests to verify whether an RA parset subset is properly constructed from an input parset by the -# uut. -# """ -# -# def setUp(self): -# _, filename = os.path.split(__file__) -# self.data_sets_filename_prefix, _ = os.path.splitext(filename) -# self.data_sets_dir = "tRATaskSpecified.in_datasets" -# -# # ------------------------------------------------------------------------------------------------------------------ -# # Actual tests, one for each type of input parset -# -# def test_preprocessing_pipeline(self): -# """ Verify that get_specification properly generates an RA parset subset for a preprocessing pipeline parset """ -# # Arrange -# input_parset_file = os.path.join(self.data_sets_dir, "tRATaskSpecified.in_preprocessing") -# input_parset = parameterset(input_parset_file) -# -# # Act -# result = RATaskSpecified.get_specification(input_parset) -# -# # Assert -# self.assertEqual(result['task_type'], 'pipeline') -# self.assertEqual(result['task_subtype'], 'averaging pipeline') -# -# self.assertEqual(result['specification']['Version.number'], 33385) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Correlated.enabled'], True) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Correlated.storageClusterName'], 'CEP4') -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Correlated.identifications'], ['mom.G737227.B1.1.PT2.uv.dps']) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_InstrumentModel.enabled'], False) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_InstrumentModel.storageClusterName'], 'CEP4') -# self.assertEqual(result['specification']['Observation.DataProducts.Output_InstrumentModel.identifications'], []) -# self.assertEqual(result['specification']['Observation.DataProducts.Input_Correlated.enabled'], True) -# self.assertEqual(result['specification']['Observation.DataProducts.Input_Correlated.identifications'], ['mom.G737227.B1.1.T.SAP002.uv.dps']) -# self.assertEqual(result['specification']['Observation.DataProducts.Input_InstrumentModel.enabled'], False) -# self.assertEqual(result['specification']['Observation.DataProducts.Input_InstrumentModel.identifications'], []) -# self.assertEqual(result['specification']['Observation.ObservationControl.PythonControl.DPPP.demixer.freqstep'], 4) -# self.assertEqual(result['specification']['Observation.ObservationControl.PythonControl.DPPP.demixer.timestep'], 1) -# -# def test_beam_observation(self): -# """ Verify that get_specification properly generates an RA parset subset for a beam observation parset """ -# # Arrange -# input_parset_file = os.path.join(self.data_sets_dir, "tRATaskSpecified.in_beam_observation") -# input_parset = parameterset(input_parset_file) -# -# # Act -# result = RATaskSpecified.get_specification(input_parset) -# -# # Assert -# self.assertEqual(result['task_type'], 'observation') -# self.assertEqual(result['task_subtype'], 'bfmeasurement') -# -# self.assertEqual(result['specification']['Version.number'], 33385) -# self.assertEqual(result['specification']['Observation.momID'], '735372') -# self.assertEqual(result['specification']['Observation.sampleClock'], 200) -# self.assertEqual(result['specification']['Observation.nrBitsPerSample'], 8) -# self.assertEqual(result['specification']['Observation.antennaSet'], 'HBA_DUAL') -# self.assertEqual(result['specification']['Observation.VirtualInstrument.stationList'], ['CS004', 'CS005', 'CS003', 'CS002', 'CS007', 'CS006']) -# self.assertEqual(result['specification']['Observation.startTime'], '2016-12-11 17:02:00') -# self.assertEqual(result['specification']['Observation.stopTime'], '2016-12-11 18:02:00') -# self.assertEqual(result['specification']['Observation.nrBeams'], 3) -# self.assertEqual(result['specification']['Observation.Beam[0].subbandList'], range(100,262)) -# self.assertEqual(result['specification']['Observation.Beam[1].subbandList'], range(100,262)) -# self.assertEqual(result['specification']['Observation.Beam[2].subbandList'], range(100,262)) -# -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Correlated.enabled'], False) -# -# self.assertEqual(result['specification']['Observation.DataProducts.Output_IncoherentStokes.enabled'], True) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_IncoherentStokes.storageClusterName'],'CEP4') -# self.assertEqual(result['specification']['Observation.DataProducts.Output_IncoherentStokes.identifications'], ['mom.G735371.LOTAAS-P1296B-SAP0.1296.SAP0.obs.is','mom.G735371.LOTAAS-P1296B-SAP0.1296.SAP0.obs.is','mom.G735371.LOTAAS-P1296B-SAP1.1296.SAP1.obs.is','mom.G735371.LOTAAS-P1296B-SAP1.1296.SAP1.obs.is','mom.G735371.LOTAAS-P1296B-SAP2.1296.SAP2.obs.is','mom.G735371.LOTAAS-P1296B-SAP2.1296.SAP2.obs.is']) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_CoherentStokes.enabled'], True) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_CoherentStokes.storageClusterName'], 'CEP4') -# self.assertEqual(result['specification']['Observation.DataProducts.Output_CoherentStokes.identifications'], ['mom.G735371.LOTAAS-P1296B-SAP0.1296.SAP0.obs.cs','mom.G735371.LOTAAS-P1296B-SAP0.1296.SAP0.obs.cs','mom.G735371.LOTAAS-P1296B-SAP1.1296.SAP1.obs.cs','mom.G735371.LOTAAS-P1296B-SAP1.1296.SAP1.obs.cs','mom.G735371.LOTAAS-P1296B-SAP2.1296.SAP2.obs.cs','mom.G735371.LOTAAS-P1296B-SAP2.1296.SAP2.obs.cs']) -# self.assertEqual(result['specification']['Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye'], False) -# self.assertEqual(result['specification']['Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile'], 512) -# self.assertEqual(result['specification']['Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor'], 6) -# self.assertEqual(result['specification']['Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which'], 'I') -# self.assertEqual(result['specification']['Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile'], 512) -# self.assertEqual(result['specification']['Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor'], 6) -# self.assertEqual(result['specification']['Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which'], 'I') -# -# self.assertEqual(result['specification']['Observation.Beam[0].nrTabRings'], 4) -# self.assertEqual(result['specification']['Observation.Beam[1].nrTabRings'], 4) -# self.assertEqual(result['specification']['Observation.Beam[2].nrTabRings'], 4) -# self.assertEqual(result['specification']['Observation.Beam[0].nrTiedArrayBeams'], 13) -# self.assertEqual(result['specification']['Observation.Beam[1].nrTiedArrayBeams'], 13) -# self.assertEqual(result['specification']['Observation.Beam[2].nrTiedArrayBeams'], 13) -# -# for sap in xrange(0,3): -# for tab in xrange(0,12): -# self.assertEqual(result['specification']['Observation.Beam[%d].TiedArrayBeam[%d].coherent' % (sap,tab)],True if tab < 12 else False) -# -# def test_calibration_pipeline(self): -# """ Verify that get_specification properly generates an RA parset subset for a calibration pipeline parset """ -# # Arrange -# input_parset_file = os.path.join(self.data_sets_dir, "tRATaskSpecified.in_calibration_pipeline") -# input_parset = parameterset(input_parset_file) -# -# # Act -# result = RATaskSpecified.get_specification(input_parset) -# -# # Assert -# self.assertEqual(result['task_type'], 'pipeline') -# self.assertEqual(result['task_subtype'], 'calibration pipeline') -# -# self.assertEqual(result['specification']['Version.number'], 33385) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Correlated.enabled'], True) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Correlated.storageClusterName'], 'CEP4') -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Correlated.identifications'], ['mom.G732487.B0.1.CPC.uv.dps']) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_InstrumentModel.enabled'], True) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_InstrumentModel.storageClusterName'], 'CEP4') -# self.assertEqual(result['specification']['Observation.DataProducts.Output_InstrumentModel.identifications'], ['mom.G732487.B0.1.CPC.inst.dps']) -# self.assertEqual(result['specification']['Observation.DataProducts.Input_Correlated.enabled'], True) -# self.assertEqual(result['specification']['Observation.DataProducts.Input_Correlated.identifications'], ['mom.G732487.B0.1.C.SAP000.uv.dps']) -# self.assertEqual(result['specification']['Observation.DataProducts.Input_InstrumentModel.enabled'], False) -# self.assertEqual(result['specification']['Observation.DataProducts.Input_InstrumentModel.identifications'], []) -# self.assertEqual(result['specification']['Observation.ObservationControl.PythonControl.DPPP.demixer.freqstep'], 4) -# self.assertEqual(result['specification']['Observation.ObservationControl.PythonControl.DPPP.demixer.timestep'], 1) -# -# def test_long_baseline_pipeline(self): -# """ Verify that get_specification properly generates an RA parset subset for a long-baseline pipeline parset """ -# # Arrange -# input_parset_file = os.path.join(self.data_sets_dir, "tRATaskSpecified.in_long_baseline_pipeline") -# input_parset = parameterset(input_parset_file) -# -# # Act -# result = RATaskSpecified.get_specification(input_parset) -# -# # Assert -# self.assertEqual(result['task_type'], 'pipeline') -# self.assertEqual(result['task_subtype'], 'long baseline pipeline') -# -# self.assertEqual(result['specification']['Version.number'], 33385) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Correlated.enabled'], True) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Correlated.storageClusterName'], 'CEP4') -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Correlated.identifications'], ['mom.G724024.B0.1.LBP27.uv.dps']) -# self.assertEqual(result['specification']['Observation.DataProducts.Input_Correlated.enabled'], True) -# self.assertEqual(result['specification']['Observation.DataProducts.Input_Correlated.identifications'], ['mom.G724024.B0.1.PTLB27.uv.dps']) -# self.assertEqual(result['specification']['Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms'], 1) -# self.assertEqual(result['specification']['Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup'], 8) -# -# def _test_pulsar_pipeline(self): -# """ Verify that get_specification properly generates an RA parset subset for a pulsar pipeline parset """ -# # Arrange -# input_parset_file = os.path.join(self.data_sets_dir, "tRATaskSpecified.in_pulsar_pipeline") -# input_parset = parameterset(input_parset_file) -# -# # Act -# result = RATaskSpecified.get_specification(input_parset) -# -# # Assert -# self.assertEqual(result['task_type'], 'pipeline') -# self.assertEqual(result['task_subtype'], 'pulsar pipeline') -# -# self.assertEqual(result['specification']['Version.number'], 33385) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Pulsar.enabled'], True) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Pulsar.storageClusterName'], 'CEP4') -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Pulsar.identifications'], ['mom.G735371.LOTAAS-P1296B.1296.pipe.dps']) -# self.assertEqual(result['specification']['Observation.DataProducts.Input_CoherentStokes.enabled'], True) -# self.assertEqual(result['specification']['Observation.DataProducts.Input_CoherentStokes.identifications'], ['mom.G735371.LOTAAS-P1296B-SAP0.1296.SAP0.obs.cs','mom.G735371.LOTAAS-P1296B-SAP1.1296.SAP1.obs.cs','mom.G735371.LOTAAS-P1296B-SAP2.1296.SAP2']) -# self.assertEqual(result['specification']['Observation.DataProducts.Input_IncoherentStokes.enabled'], True) -# self.assertEqual(result['specification']['Observation.DataProducts.Input_IncoherentStokes.identifications'], ['mom.G735371.LOTAAS-P1296B-SAP0.1296.SAP0.obs.is','mom.G735371.LOTAAS-P1296B-SAP1.1296.SAP1.obs.is','mom.G735371.LOTAAS-P1296B-SAP2.1296.SAP2']) -# -# def test_interferometer_observation(self): -# """ Verify that get_specification properly generates an RA parset subset for an interferometer observation parset """ -# # Arrange -# input_parset_file = os.path.join(self.data_sets_dir, "tRATaskSpecified.in_interferometer_observation") -# input_parset = parameterset(input_parset_file) -# -# # Act -# result = RATaskSpecified.get_specification(input_parset) -# -# # Assert -# self.assertEqual(result['task_type'], 'observation') -# self.assertEqual(result['task_subtype'], 'bfmeasurement') -# -# self.assertEqual(result['specification']['Version.number'], 33385) -# self.assertEqual(result['specification']['Observation.momID'], '737228') -# self.assertEqual(result['specification']['Observation.sampleClock'], 200) -# self.assertEqual(result['specification']['Observation.nrBitsPerSample'], 8) -# self.assertEqual(result['specification']['Observation.antennaSet'], 'HBA_DUAL_INNER') -# self.assertEqual(result['specification']['Observation.VirtualInstrument.stationList'], ['CS001', 'CS002', 'CS003', 'CS004', 'CS005', 'CS006', 'CS007', 'CS011', 'CS013', 'CS017', 'CS021', 'CS024', 'CS026', 'CS028', 'CS030', 'CS031', 'CS032', 'CS101', 'CS103', 'CS201', 'CS301', 'CS302', 'CS401', 'CS501', 'DE602', 'DE603', 'DE605', 'DE609', 'FR606', 'PL610', 'PL611', 'PL612', 'RS106', 'RS205', 'RS208', 'RS210', 'RS305', 'RS306', 'RS307', 'RS310', 'RS406', 'RS407', 'RS409', 'RS503', 'RS508', 'RS509', 'SE607', 'UK608']) -# self.assertEqual(result['specification']['Observation.startTime'], '2016-12-08 23:20:25') -# self.assertEqual(result['specification']['Observation.stopTime'], '2016-12-09 07:20:25') -# self.assertEqual(result['specification']['Observation.nrBeams'], 3) -# self.assertEqual(result['specification']['Observation.Beam[0].subbandList'], [256]) -# self.assertEqual(result['specification']['Observation.Beam[1].subbandList'], [104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 182, 183, 184, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 212, 213, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 330, 331, 332, 333, 334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 349, 364, 372, 380, 388, 396, 404, 413, 421, 430, 438, 447]) -# self.assertEqual(result['specification']['Observation.Beam[2].subbandList'], [104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 182, 183, 184, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 212, 213, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 330, 331, 332, 333, 334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 349, 364, 372, 380, 388, 396, 404, 413, 421, 430, 438, 447]) -# -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Correlated.enabled'], True) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Correlated.storageClusterName'], 'CEP4') -# self.assertEqual(result['specification']['Observation.DataProducts.Output_Correlated.identifications'], ['mom.G737227.B1.1.T.SAP000.uv.dps','mom.G737227.B1.1.T.SAP001.uv.dps','mom.G737227.B1.1.T.SAP002.uv.dps']) -# self.assertEqual(result['specification']['Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime'], 1.00139) -# self.assertEqual(result['specification']['Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband'], 64) -# -# self.assertEqual(result['specification']['Observation.DataProducts.Output_IncoherentStokes.enabled'], False) -# self.assertEqual(result['specification']['Observation.DataProducts.Output_CoherentStokes.enabled'], False) -# -# def test_maintenance(self): -# """ Verify that get_specification properly generates an RA parset subset for a maintenance task """ -# # Arrange -# input_parset_file = os.path.join(self.data_sets_dir, "tRATaskSpecified.in_maintenance") -# input_parset = parameterset(input_parset_file) -# -# # Act -# result = RATaskSpecified.get_specification(input_parset) -# -# # Assert -# self.assertEqual(result['task_type'], 'reservation') -# self.assertEqual(result['task_subtype'], 'maintenance') -# -# self.assertEqual(result['specification']['Version.number'], 33385) -# self.assertEqual(result['specification']['Observation.VirtualInstrument.stationList'], ['CS003']) -# self.assertEqual(result['specification']['Observation.startTime'], '2017-04-19 06:31:00') -# self.assertEqual(result['specification']['Observation.stopTime'], '2017-04-19 12:00:00') -# -# def test_reservation(self): -# """ Verify that get_specification properly generates an RA parset subset for a reservation task """ -# # Arrange -# input_parset_file = os.path.join(self.data_sets_dir, "tRATaskSpecified.in_reservation") -# input_parset = parameterset(input_parset_file) -# -# # Act -# result = RATaskSpecified.get_specification(input_parset) -# -# # Assert -# self.assertEqual(result['task_type'], 'reservation') -# self.assertEqual(result['task_subtype'], 'project') -# -# self.assertEqual(result['specification']['Version.number'], 33385) -# self.assertEqual(result['specification']['Observation.VirtualInstrument.stationList'], ['DE601','FR606','SE607','UK608']) -# self.assertEqual(result['specification']['Observation.startTime'], '2017-04-20 08:00:00') -# self.assertEqual(result['specification']['Observation.stopTime'], '2017-04-25 09:00:00') -# -# class TestBusListening(unittest.TestCase): -# """ Tests to verify that the uut is registered to the expected OTDB-bus events """ -# def setUp(self): -# self.handler_prefix = "onObservation" -# self.uut_event_handlers = self.get_class_methods(RATaskSpecified, self.handler_prefix) -# -# def get_class_methods(self, cls, method_prefix=""): -# """ Collects the class' methods that start with a given prefix """ -# return [x for x, y in cls.__dict__.items() if type(y) == types.FunctionType and x.startswith(method_prefix)] -# -# def test_otdb_bus_event_registration(self): -# """ Verifies if the uut is registered to the right OTDB Bus events """ -# # All possible task status changes -# self.task_statuses = ['prepared', -# 'approved', -# 'on_hold', -# 'conflict', -# 'prescheduled', -# 'scheduled', -# 'queued', -# 'active', -# 'completing', -# 'finished', -# 'aborted', -# 'error', -# 'obsolete', -# 'opened', -# 'suspended' -# ] -# # Selection of task status changes the uut should trigger on exclusively -# self.uut_registered_task_statuses = sorted(['prescheduled']) -# -# # Parse the statuses from the OTDB event handlers -# statuses = sorted([status[len(self.handler_prefix):len(status)].lower() for status in self.uut_event_handlers]) -# -# # Perform verification -# self.assertEqual(statuses, self.uut_registered_task_statuses) -# -# # TODO: consider mocking the buses used (take for example the ResourceAssigner/test/t_resourceassigner.py) -# class TestService(unittest.TestCase): -# """ Treat uut as a black box and apply task status changes to verify if it does its job """ -# def setUp(self): -# """ Set up mocks of the necessary interfaces to the uut and set the fixed test-parameters """ -# # Create a random bus (used by all uut's interfaces) -# self.busname = "%s-%s" % (os.path.basename(sys.argv[0]), str(uuid.uuid4())[:8]) -# self.bus = ToBus(self.busname, { "create": "always", "delete": "always", "node": { "type": "topic" } }) -# self.bus.open() -# self.addCleanup(self.bus.close) -# -# # Define the services we use -# self.status_publisher_service = "%s/%s" % (self.busname, DEFAULT_OTDB_NOTIFICATION_SUBJECT) -# -# # Selection of task status changes the uut should trigger on -# self.uut_task_status_event_registrations = ['approved', 'prescheduled'] -# -# # -------------------------------------------------------------------------------------------------------------- -# # Setup mock OTDBHandler -# -# # Nr of parsets requested, to detect multiple requests for the same parset, or of superfluous parsets -# self.requested_parsets = 0 -# def requested_parset(): -# """ Keeps track of how many times the OTDBService has been used """ -# self.requested_parsets += 1 -# -# class OTDBHandler(MessageHandlerInterface): -# """ Emulate the OTDBService """ -# def __init__(self, **kwargs): -# super(OTDBHandler, self).__init__(**kwargs) -# self.service2MethodMap["TaskGetSpecification"] = self.get_specification -# -# @staticmethod -# def get_predecessors_from_testset(): -# """ -# Constructs a small database of predecessors; also for use outside the OTDBHandler class (hence static). -# -# Note: use adjacent number of (unsigned) id's and avoid cyclic dependencies. -# -# :return: predecessor test-database -# """ -# return { -# 1: [2, 4], -# 2: [3], -# 3: [], -# 4: [5], -# 5: [] -# } -# -# def fully_qualified_otdb_id_as_string(self, otdb_id_list=[]): -# """ In parsets the OTDB ids are prefixed with 'L' and this function does that for us""" -# return repr(['L' + str(id) for id in otdb_id_list]) -# -# def get_specification(self, OtdbID): -# """ Get the (limited to what's necessary) parset of the given OTDB id """ -# predecessors = self.get_predecessors_from_testset().get(OtdbID, -1) -# -# if predecessors == -1: -# raise Exception("Invalid OtdbID: %s" % OtdbID) -# -# # Keep track of how many times this function is called -# requested_parset() -# -# return { "TaskSpecification": { -# "Version.number": "1", -# PARSET_PREFIX + "Observation.processType": "Observation", -# PARSET_PREFIX + "Observation.processSubtype": "Beam Observation", -# PARSET_PREFIX + "Observation.ObsID": str(OtdbID), -# PARSET_PREFIX + "Observation.Scheduler.predecessors": self.fully_qualified_otdb_id_as_string(predecessors), -# } } -# -# # Fetch the OTDBHandler test-database for use in the tests -# self.predecessor_testset = OTDBHandler.get_predecessors_from_testset() -# -# # Start our fake OTDBService -# otdb_service = Service(DEFAULT_OTDB_SERVICENAME, OTDBHandler, busname=self.busname, use_service_methods=True) -# otdb_service.start_listening() -# self.addCleanup(otdb_service.stop_listening) -# -# # -------------------------------------------------------------------------------------------------------------- -# # Setup listener to catch result of our service (e.g. ResourceAssignmentService) -# self.listener = RATaskSpecifiedBusListener(busname=self.busname) -# self.listener.start_listening() -# self.addCleanup(self.listener.stop_listening) -# -# def predecessors_as_list_from_testset(self, tree_id): -# """ Obtain predecessor tree as list from test set """ -# predecessor_ids = self.predecessor_testset[tree_id] -# return sorted([predecessor_ids] + self.predecessors_as_list_from_testset(id) for id in predecessor_ids) -# -# def predecessors_as_list_from_parset_tree(self, ra_subset): -# """ Obtain predecessor tree as list from uut's RA parset subset """ -# predecessor_subsets = [s for s in ra_subset['predecessors']] -# predecessor_ids = [s['otdb_id'] for s in predecessor_subsets] -# return sorted([predecessor_ids] + self.predecessors_as_list_from_parset_tree(s) for s in predecessor_subsets) -# -# def trigger_uut(self, tree_ids): -# """ Fire all possible task status changes at the uut and collect data about its behaviour """ -# result = {} -# -# with RATaskSpecified( otdb_notification_busname=self.busname, -# otdb_service_busname=self.busname, -# notification_busname=self.busname) as uut: -# # ---------------------------------------------------------- -# # Mimic the OTDB Publisher and send a status change event -# # --------------------------------------------------------- -# with ToBus(self.status_publisher_service) as tb: -# for tree_id in tree_ids: -# # Create an empty list for appending to later -# result[tree_id] = [] -# -# for task_status in self.uut_task_status_event_registrations: -# # We want to know it when our listener receives a message -# trigger = MethodTrigger(self.listener, "onTaskSpecified") -# -# msg = EventMessage(content={ -# "treeID": tree_id, -# "state": task_status, -# "time_of_change": datetime.datetime(2016, 1, 1) -# }) -# tb.send(msg) -# -# # Wait for message to arrive -# message_received = trigger.wait() -# -# # Collect relevant parameters from trigger output -# result[tree_id].append((task_status, { -# 'message_received': message_received, -# 'task_status': trigger.args[1]['state'] if message_received else "N.A.", -# 'root_id': trigger.args[1]['otdb_id'] if message_received else 0, -# 'predecessor_ids': self.predecessors_as_list_from_parset_tree(trigger.args[1]) if message_received else [] -# })) -# return result -# -# # ------------------------------------------------------------------------------------------------------------------ -# # Actual tests, one for each predecessor tree situation -# -# def test_single_id_no_predecessors(self): -# """ Request the RA parset subset for a simulated OTDB id 3 (which has no predecessors) """ -# tree_ids = [3] -# results = self.trigger_uut(tree_ids) -# -# for tree_id, outcomes in results.items(): -# for task_status, outcome in outcomes: -# # Verify that message was received -# self.assertTrue(outcome['message_received'], -# "Expected receiving an event message for a task status change to %s" % task_status) -# -# # Verify root tree ID -# self.assertEqual(outcome['root_id'], tree_id, -# ("Root id (%d) not as expected (%d)" % (outcome['root_id'], tree_id))) -# -# # Verify task status -# self.assertEqual(outcome['task_status'], task_status) -# -# # Verify predecessors in RA parset subset tree -# expected = self.predecessors_as_list_from_testset(tree_id) -# self.assertEqual(expected, outcome['predecessor_ids']) -# -# # Make sure we requested one parset per task status status -# self.assertEqual(self.requested_parsets, len(self.uut_task_status_event_registrations)) -# -# def test_single_id_with_predecessors(self): -# """ Request the RA parset subset for a simulated OTDB id 1 (which has predecessors) """ -# tree_ids = [1] -# results = self.trigger_uut(tree_ids) -# -# for tree_id, outcomes in results.items(): -# for task_status, outcome in outcomes: -# # Check if message was received -# self.assertTrue(outcome['message_received'], -# "Expected receiving a message for task status change to %s" % task_status) -# -# # Verify root tree ID -# self.assertEqual(outcome['root_id'], tree_id, -# "Root id (%d) not as expected (%d)" % (outcome['root_id'], tree_id)) -# -# # Verify predecessors in RA parset subset tree -# expected = self.predecessors_as_list_from_testset(tree_id) -# self.assertEqual(expected, outcome['predecessor_ids']) - -class TestingRATaskSpecified(RATaskSpecified): + +class TestingRATaskSpecified(RATaskSpecifiedOTDBEventMessageHandler): def __init__(self, otdbrpc, radbrpc, momrpc, send_bus): self.otdbrpc = otdbrpc self.radbrpc = radbrpc self.momrpc = momrpc self.send_bus = send_bus + class TestRATaskSpecified(unittest.TestCase): def setUp(self): @@ -528,7 +37,8 @@ class TestRATaskSpecified(unittest.TestCase): self.mocked_spec = MagicMock() self.mocked_spec.as_dict.return_value = self.spec_result_tree - specification_patcher = mock.patch('lofar.sas.resourceassignment.rataskspecified.RATaskSpecified.Specification') + specification_patcher = mock.patch( + 'lofar.sas.resourceassignment.rataskspecified.RATaskSpecified.Specification') self.addCleanup(specification_patcher.stop) self.specification_mock = specification_patcher.start() self.specification_mock.return_value = self.mocked_spec @@ -545,7 +55,8 @@ class TestRATaskSpecified(unittest.TestCase): self.addCleanup(momrpc_patcher.stop) self.momrpc_mock = momrpc_patcher.start() - logger_patcher = mock.patch('lofar.sas.resourceassignment.rataskspecified.RATaskSpecified.logger') + logger_patcher = mock.patch( + 'lofar.sas.resourceassignment.rataskspecified.RATaskSpecified.logger') self.addCleanup(logger_patcher.stop) self.logger_mock = logger_patcher.start() @@ -553,72 +64,53 @@ class TestRATaskSpecified(unittest.TestCase): self.addCleanup(send_bus_patcher.stop) self.send_bus_mock = send_bus_patcher.start() - self.raTaskSpecified = TestingRATaskSpecified(self.otdbrpc_mock, self.radbrpc_mock, self.momrpc_mock, self.send_bus_mock) + self.raTaskSpecified = TestingRATaskSpecified(self.otdbrpc_mock, self.radbrpc_mock, + self.momrpc_mock, self.send_bus_mock) - # stop listening + # start listening - @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.start_listening') - def test_start_listening_opens_otdbrpc(self, _): - self.raTaskSpecified.start_listening() + def test_start_listening_opens_otdbrpc(self): + self.raTaskSpecified.start_handling() self.otdbrpc_mock.open.assert_called() - @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.start_listening') - def test_start_listening_opens_radbrpc(self, _): - self.raTaskSpecified.start_listening() + def test_start_listening_opens_radbrpc(self): + self.raTaskSpecified.start_handling() self.radbrpc_mock.open.assert_called() - @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.start_listening') - def test_start_listening_opens_momrpc(self, _): - self.raTaskSpecified.start_listening() + def test_start_listening_opens_momrpc(self): + self.raTaskSpecified.start_handling() self.momrpc_mock.open.assert_called() - @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.start_listening') - def test_start_listening_opens_send_bus(self, _): - self.raTaskSpecified.start_listening() + def test_start_listening_opens_send_bus(self): + self.raTaskSpecified.start_handling() self.send_bus_mock.open.assert_called() - @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.start_listening') - def test_start_listening_also_call_start_listening_on_super(self, super_mock): - self.raTaskSpecified.start_listening() - - super_mock.assert_called() - # stop listening - @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.stop_listening') - def test_stop_listening_closes_otdbrpc(self, _): - self.raTaskSpecified.stop_listening() + def test_stop_listening_closes_otdbrpc(self): + self.raTaskSpecified.stop_handling() self.otdbrpc_mock.close.assert_called() - @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.stop_listening') - def test_stop_listening_closes_radbrpc(self, _): - self.raTaskSpecified.stop_listening() + def test_stop_listening_closes_radbrpc(self): + self.raTaskSpecified.stop_handling() self.radbrpc_mock.close.assert_called() - @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.stop_listening') - def test_stop_listening_closes_momrpc(self, _): - self.raTaskSpecified.stop_listening() + def test_stop_listening_closes_momrpc(self): + self.raTaskSpecified.stop_handling() self.momrpc_mock.close.assert_called() - @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.stop_listening') - def test_stop_listening_closes_send_bus(self, _): - self.raTaskSpecified.stop_listening() + def test_stop_listening_closes_send_bus(self): + self.raTaskSpecified.stop_handling() self.send_bus_mock.close.assert_called() - @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.stop_listening') - def test_stop_listening_also_call_stop_listening_on_super(self, super_mock): - self.raTaskSpecified.stop_listening() - - super_mock.assert_called() - # onObservationPrescheduled def test_onObservationPrescheduled_should_set_status_on_specification(self): @@ -629,7 +121,8 @@ class TestRATaskSpecified(unittest.TestCase): def test_onObservationPrescheduled_should_call_read_from_OTDB_with_predecessors(self): self.raTaskSpecified.onObservationPrescheduled(self.task_main_id, self.task_status) - self.mocked_spec.read_from_OTDB_with_predecessors.assert_called_with(self.task_main_id, "otdb", {}) + self.mocked_spec.read_from_OTDB_with_predecessors.assert_called_with(self.task_main_id, + "otdb", {}) def test_onObservationPrescheduled_should_call_read_from_mom(self): self.raTaskSpecified.onObservationPrescheduled(self.task_main_id, self.task_status) @@ -654,7 +147,7 @@ class TestRATaskSpecified(unittest.TestCase): def test_onObservationPrescheduled_should_send_message_if_status_is_prescheduled(self): self.raTaskSpecified.onObservationPrescheduled(self.task_main_id, self.task_status) - self.assertEqual(self.send_bus_mock.send.call_args_list[0].content, self.spec_result_tree) + self.assertEqual(self.send_bus_mock.send.call_args_list[0][0][0].content, self.spec_result_tree) def test_onObservationPrescheduled_should_log_if_status_is_not_prescheduled(self): def set_spec_status_to_error(main_id, id_source, found_specifications): @@ -666,5 +159,6 @@ class TestRATaskSpecified(unittest.TestCase): self.logger_mock.warning.assert_any_call("Problem retrieving task %i" % self.task_main_id) + if __name__ == '__main__': unittest.main() diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py index 660dd43681b53f88361a846d155f5346f0478dd1..c8944e64df6a571bab0b886f357731f94ea57a6f 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py @@ -21,11 +21,10 @@ # import logging -from datetime import datetime, timedelta from time import sleep from threading import Thread -from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC +from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC from lofar.sas.otdb.otdbrpc import OTDBRPC @@ -36,6 +35,7 @@ from lofar.common.datetimeutils import * logger = logging.getLogger(__name__) + def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): try: #only reschedule pipelines which run on cep4 @@ -84,6 +84,7 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): except Exception as e: logger.error("Error while checking pipeline starttime: %s", e) + class ScheduleChecker(): def __init__(self, busname=DEFAULT_BUSNAME, @@ -92,10 +93,10 @@ class ScheduleChecker(): """ self._thread = None self._running = False - self._radbrpc = RARPC(busname=busname, broker=broker) - self._momrpc = MoMQueryRPC(busname=busname, broker=broker) - self._curpc = CleanupRPC(busname=busname, broker=broker) - self._otdbrpc = OTDBRPC(busname=busname, broker=broker, timeout=180) + self._radbrpc = RADBRPC.create(exchange=busname, broker=broker) + self._momrpc = MoMQueryRPC.create(exchange=busname, broker=broker) + self._curpc = CleanupRPC.create(exchange=busname, broker=broker) + self._otdbrpc = OTDBRPC.create(exchange=busname, broker=broker, timeout=180) def __enter__(self): """Internal use only. (handles scope 'with')"""