diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py index 5c8c6209c4f9dab8d4b06e9ab2c05d16ee9c5e4d..897d364ed8adac7f7b621b3bdace5b272c186735 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py @@ -1,93 +1,575 @@ #!/usr/bin/env python import unittest -import sys +import mock import datetime -import logging -import inspect -from lofar.messaging.RPC import RPC - -logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) -logger = logging.getLogger(__name__) - -try: - from mock import MagicMock - from mock import patch -except ImportError: - print 'Cannot run test without python MagicMock' - print 'Please install MagicMock: pip install mock' - exit(3) - -# the system under test is the ResourceAssigner, not the RARPC -# so, patch (mock) the RARPC class during these tests. -# when the ResourceAssigner instantiates an RARPC it will get the mocked class. -with patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC', autospec=True) as MockRARPC, \ - patch.object(RPC, 'execute') as mockRPC_execute, \ - patch.object(RPC, 'open'), \ - patch.object(RPC, 'close'): - mockRARPC = MockRARPC.return_value - - # modify the return values of the various RARPC methods with pre-cooked answers - mockRARPC.getTask.return_value = {"endtime": datetime.datetime(2016, 3, 25, 22, 47, 31), "id": 2299, "mom_id": 351543, "name": "IS HBA_DUAL", "otdb_id": 1290472, "predecessor_ids": [], "project_mom_id": 2, "project_name": "test-lofar", "specification_id": 2323, "starttime": datetime.datetime(2016, 3, 25, 21, 47, 31), "status": "prescheduled", "status_id": 350, "successor_ids": [], "type": "observation", "type_id": 0} - mockRARPC.getResourceTypes.return_value = [{"id": 0, "name": "rsp", "unit": "rsp_channel_bit", "unit_id": 0}, {"id": 1, "name": "tbb", "unit": "bytes", "unit_id": 1}, {"id": 2, "name": "rcu", "unit": "rcu_board", "unit_id": 2}, {"id": 3, "name": "bandwidth", "unit": "bytes/second", "unit_id": 3}, {"id": 4, "name": "processor", "unit": "cores", "unit_id": 4}, {"id": 5, "name": "storage", "unit": "bytes", "unit_id": 1}] - mockRARPC.getResources.return_value = [{"id": 116, "name": "cep4bandwidth", "type": "bandwidth", "type_id": 3, "unit": "bytes/second"},{"id": 117, "name": "cep4storage", "type": "storage", "type_id": 5, "unit": "bytes"}] - mockRARPC.getResourceClaimPropertyTypes.return_value = [{"id": 0, "name": "nr_of_is_files"}, {"id": 1, "name": "nr_of_cs_files"}, {"id": 2, "name": "nr_of_uv_files"}, {"id": 3, "name": "nr_of_im_files"}, {"id": 4, "name": "nr_of_cores"}, {"id": 5, "name": "nr_of_beamlets"}, {"id": 6, "name": "nr_of_bits"}, {"id": 7, "name": "is_file_size"}] - - def mockRARPC_insertResourceClaims(*arg, **kwarg): - logger.info("insertResourceClaims: %s" % ', '.join(str(x) for x in arg)) - return {'ids':range(len(arg[1]))} - mockRARPC.insertResourceClaims.side_effect = mockRARPC_insertResourceClaims - - #mock the RPC execute method - def mockRPCExecute(*arg, **kwarg): - #trick to get the servicename via the callstack from within this mock method - servicename = inspect.stack()[3][0].f_locals['self'].ServiceName - logger.info("mockRPCExecute servicename=%s" % servicename) - - #give pre-cooked answer depending on called service - if servicename == 'ResourceEstimation': - return {'1290472': {'observation': {'bandwidth': {'total_size': 9372800}, 'storage': {'total_size': 140592000, 'output_files': {'is': {'is_nr_stokes': 1, 'is_file_size': 36864000, 'nr_of_is_files': 1}, 'uv': {'nr_of_uv_files': 50, 'uv_file_size': 2074560}, 'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 50, 'nr_of_is_files': 1}}]}}}}}, "OK" - - return None, None - - mockRPC_execute.side_effect = mockRPCExecute - - # import ResourceAssigner now, so it will use the mocked classes and methods - from lofar.sas.resourceassignment.resourceassigner.assignment import ResourceAssigner - - try: - with ResourceAssigner() as assigner: - pass - except Exception as e: - if 'NotFound: no such queue' in e.message: - print e.message - print 'qpid environment not set up correctly for this test' - exit(3) - - #define the test class - class ResourceAssignerTest(unittest.TestCase): - '''Test the logic in the ResourceAssigner''' - - #def test_doAssignment(self): - #with ResourceAssigner() as assigner: - ##define inputs - #specification_tree={u'task_type': u'pipeline', u'specification': {u'Observation.DataProducts.Output_InstrumentModel.enabled': False, u'Observation.stopTime': u'2016-03-25 14:15:37', u'Observation.VirtualInstrument.stationList': [], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0], u'Observation.antennaSet': u'LBA_INNER', u'Observation.nrBitsPerSample': u'16', u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'2', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.enabled': True, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Version.number': u'33774', u'Observation.momID': u'351557', u'Observation.startTime': u'2016-03-25 14:14:57', u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'2', u'Observation.nrBeams': u'0', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], u'Observation.DataProducts.Output_Correlated.enabled': True, u'Observation.sampleClock': u'200'}, u'task_subtype': u'long baseline pipeline', u'state': u'prescheduled', u'otdb_id': 1290494, u'predecessors': [{u'task_subtype': u'averaging pipeline', u'specification': {u'Observation.DataProducts.Output_InstrumentModel.enabled': False, u'Observation.stopTime': u'2016-03-25 13:51:05', u'Observation.VirtualInstrument.stationList': [], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0], u'Observation.antennaSet': u'LBA_INNER', u'Observation.nrBitsPerSample': u'16', u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'1', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.enabled': True, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep': u'10', u'Version.number': u'33774', u'Observation.momID': u'351556', u'Observation.startTime': u'2016-03-25 13:49:55', u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'1', u'Observation.nrBeams': u'0', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep': u'64', u'Observation.DataProducts.Output_Correlated.enabled': True, u'Observation.sampleClock': u'200'}, u'task_type': u'pipeline', u'otdb_id': 1290496, u'predecessors': [{u'task_subtype': u'bfmeasurement', u'specification': {u'Observation.DataProducts.Output_InstrumentModel.enabled': False, u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor': u'1', u'Observation.stopTime': u'2016-03-26 00:33:31', u'Observation.VirtualInstrument.stationList': [u'RS205', u'RS503', u'CS013', u'RS508', u'RS106'], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': False, u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband': u'64', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which': u'I', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which': u'I', u'Observation.Beam[0].subbandList': [100, 101, 102, 103], u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile': u'512', u'Observation.DataProducts.Input_Correlated.skip': [], u'Observation.antennaSet': u'HBA_DUAL', u'Observation.nrBitsPerSample': u'8', u'Observation.Beam[0].nrTabRings': u'0', u'Observation.Beam[0].nrTiedArrayBeams': u'0', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye': False, u'Observation.nrBeams': u'1', u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime': u'1.0', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.enabled': False, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Version.number': u'33774', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor': u'1', u'Observation.momID': u'351539', u'Observation.startTime': u'2016-03-26 00:31:31', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile': u'512', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], u'Observation.DataProducts.Output_Correlated.enabled': True, u'Observation.sampleClock': u'200'}, u'task_type': u'observation', u'otdb_id': 1290476, u'predecessors': []}]}]} - - ##test the main assignment method - #assigner.doAssignment(specification_tree) - - ##TODO: added test asserts etc - - def test_claimResources(self): - with ResourceAssigner() as assigner: - #define inputs - estimator_output,_=assigner.rerpc() - needed_resources=estimator_output['1290472'] - task = assigner.radbrpc.getTask(1290472) - - #test claimResources method - assigner.claimResources(needed_resources, task) - - #TODO: added test asserts etc +from copy import deepcopy +from lofar.sas.resourceassignment.resourceassigner.assignment import ResourceAssigner +from lofar.parameterset import parameterset + + +class TestingResourceAssigner(ResourceAssigner): + def __init__(self, rarpc, rerpc, otdbrpc, momrpc, curpc, ra_notification_bus): + # super gets not done to be able to insert mocks as early as possible otherwise the RPC block unittesting + self.radbrpc = rarpc + self.rerpc = rerpc + self.otdbrpc = otdbrpc + self.momrpc = momrpc + self.curpc = curpc + self.ra_notification_bus = ra_notification_bus + + +class ResourceAssignerTest(unittest.TestCase): + non_approved_or_prescheduled_status = u'opened' + non_approved_or_prescheduled_otdb_id = 1 + + future_start_time = (datetime.datetime.utcnow() + datetime.timedelta(hours=1)).strftime('%Y-%m-%d %H:%M:%S') + future_stop_time = (datetime.datetime.utcnow() + datetime.timedelta(hours=2)).strftime('%Y-%m-%d %H:%M:%S') + + non_approved_or_prescheduled_specification_tree = { + u'otdb_id': non_approved_or_prescheduled_otdb_id, + u'task_type': u'pipeline', + u'state': non_approved_or_prescheduled_status, + u'specification': { + u'Observation.startTime': future_start_time, + u'Observation.stopTime': future_stop_time + } + } + + cep2_specification_tree = { + u'otdb_id': non_approved_or_prescheduled_otdb_id, + u'task_type': u'pipeline', + u'state': non_approved_or_prescheduled_status, + u'specification': { + u'Observation.startTime': future_start_time, + u'Observation.stopTime': future_stop_time, + u'Observation.DataProducts.Output_Pulsar.enabled': True, + u'Observation.DataProducts.Output_Pulsar.storageClusterName': u'CEP2' + } + } + + mom_bug_processing_cluster_name = 'CEP2' + mom_bug_otbd_id = 1234 + mom_bug_specification_tree = { + u'otdb_id': mom_bug_otbd_id, + u'task_type': u'pipeline', + u'state': u'prescheduled', + u'specification': { + u'Observation.startTime': future_start_time, + u'Observation.stopTime': future_stop_time, + u'Observation.DataProducts.Output_Pulsar.enabled': True, + u'Observation.DataProducts.Output_Pulsar.storageClusterName': u'CEP4', + u'Observation.Cluster.ProcessingCluster.clusterName': mom_bug_processing_cluster_name + } + } + + task_mom_id = 351543 + task_otdb_id = 1290472 + task_id = 2299 + task = { + "mom_id": task_mom_id, + "otdb_id": task_otdb_id, + "id": task_id, + "endtime": datetime.datetime(2016, 3, 25, 22, 47, 31), + "name": "IS HBA_DUAL", + "predecessor_ids": [], + "project_mom_id": 2, + "project_name": "test-lofar", + "specification_id": 2323, + "starttime": datetime.datetime(2016, 3, 25, 21, 47, 31), + "status": "prescheduled", + "status_id": 350, + "successor_ids": [], + "type": "observation", + "type_id": 0 + } + + predecessor_task_mom_id = 1 + predecessor_task_otdb_id = 2 + predecessor_task_id = 3 + predecessor_task = { + "mom_id": predecessor_task_mom_id, + "otdb_id": predecessor_task_otdb_id, + "id": predecessor_task_id, + "endtime": datetime.datetime(2016, 3, 25, 22, 47, 31), + "name": "IS HBA_DUAL", + "predecessor_ids": [], + "project_mom_id": 2, + "project_name": "test-lofar", + "specification_id": 2323, + "starttime": datetime.datetime(2016, 3, 25, 21, 47, 31), + "status": "prescheduled", + "status_id": 350, + "successor_ids": [], + "type": "observation", + "type_id": 0 + } + + successor_task_mom_id = 4 + successor_task_otdb_id = 5 + successor_task_id = 6 + successor_task = { + "mom_id": successor_task_mom_id, + "otdb_id": successor_task_otdb_id, + "id": successor_task_id, + "endtime": datetime.datetime(2016, 3, 25, 22, 47, 31), + "name": "IS HBA_DUAL", + "predecessor_ids": [], + "project_mom_id": 2, + "project_name": "test-lofar", + "specification_id": 2323, + "starttime": datetime.datetime(2016, 3, 25, 21, 47, 31), + "status": "prescheduled", + "status_id": 350, + "successor_ids": [], + "type": "observation", + "type_id": 0 + } + + mom_id = 351557 + otdb_id = 1290494 + state = u'prescheduled' + task_type = u'pipeline' + + specification_tree = { + u'otdb_id': otdb_id, + u'task_type': task_type, + u'state': state, + u'specification': { + u'Observation.momID': str(mom_id), + u'Observation.startTime': future_start_time, + u'Observation.stopTime': future_stop_time, + u'Observation.DataProducts.Output_InstrumentModel.enabled': False, + u'Observation.VirtualInstrument.stationList': [], + u'Observation.DataProducts.Input_CoherentStokes.enabled': False, + u'Observation.DataProducts.Output_CoherentStokes.enabled': False, + u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0], + u'Observation.antennaSet': u'LBA_INNER', + u'Observation.nrBitsPerSample': u'16', + u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'2', + u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, + u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, + u'Observation.DataProducts.Input_Correlated.enabled': True, + u'Observation.DataProducts.Output_Pulsar.enabled': False, + u'Observation.DataProducts.Input_CoherentStokes.skip': [], + u'Observation.DataProducts.Output_SkyImage.enabled': False, + u'Version.number': u'33774', + u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'2', + u'Observation.nrBeams': u'0', + u'Observation.DataProducts.Input_IncoherentStokes.skip': [], + u'Observation.DataProducts.Output_Correlated.enabled': True, + u'Observation.DataProducts.Output_Correlated.storageClusterName': 'CEP4', + u'Observation.sampleClock': u'200' + }, + u'task_subtype': u'long baseline pipeline', + u'predecessors': [{ + u'task_subtype': u'averaging pipeline', + u'specification': { + u'Observation.DataProducts.Output_InstrumentModel.enabled': False, + u'Observation.stopTime': u'2016-03-25 13:51:05', + u'Observation.VirtualInstrument.stationList': [], + u'Observation.DataProducts.Input_CoherentStokes.enabled': False, + u'Observation.DataProducts.Output_CoherentStokes.enabled': False, + u'Observation.DataProducts.Output_SkyImage.enabled': False, + u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0], + u'Observation.antennaSet': u'LBA_INNER', + u'Observation.nrBitsPerSample': u'16', + u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'1', + u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, + u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, + u'Observation.DataProducts.Input_Correlated.enabled': True, + u'Observation.DataProducts.Output_Pulsar.enabled': False, + u'Observation.DataProducts.Input_CoherentStokes.skip': [], + u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep': u'10', + u'Version.number': u'33774', + u'Observation.momID': u'351556', + u'Observation.startTime': u'2016-03-25 13:49:55', + u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'1', + u'Observation.nrBeams': u'0', + u'Observation.DataProducts.Input_IncoherentStokes.skip': [], + u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep': u'64', + u'Observation.DataProducts.Output_Correlated.enabled': True, + u'Observation.sampleClock': u'200' + }, + u'task_type': u'pipeline', + u'otdb_id': 1290496, + u'predecessors': [{ + u'task_subtype': u'bfmeasurement', + u'specification': { + u'Observation.DataProducts.Output_InstrumentModel.enabled': False, + u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor': u'1', + u'Observation.stopTime': u'2016-03-26 00:33:31', + u'Observation.VirtualInstrument.stationList': [u'RS205', u'RS503', u'CS013', u'RS508', u'RS106'], + u'Observation.DataProducts.Input_CoherentStokes.enabled': False, + u'Observation.DataProducts.Output_CoherentStokes.enabled': False, + u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband': u'64', + u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which': u'I', + u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which': u'I', + u'Observation.Beam[0].subbandList': [100, 101, 102, 103], + u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile': u'512', + u'Observation.DataProducts.Input_Correlated.skip': [], + u'Observation.antennaSet': u'HBA_DUAL', + u'Observation.nrBitsPerSample': u'8', + u'Observation.Beam[0].nrTabRings': u'0', + u'Observation.Beam[0].nrTiedArrayBeams': u'0', + u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye': False, + u'Observation.nrBeams': u'1', + u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime': u'1.0', + u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, + u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, + u'Observation.DataProducts.Input_Correlated.enabled': False, + u'Observation.DataProducts.Output_Pulsar.enabled': False, + u'Observation.DataProducts.Input_CoherentStokes.skip': [], + u'Observation.DataProducts.Output_SkyImage.enabled': False, + u'Version.number': u'33774', + u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor': u'1', + u'Observation.momID': u'351539', + u'Observation.startTime': u'2016-03-26 00:31:31', + u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile': u'512', + u'Observation.DataProducts.Input_IncoherentStokes.skip': [], + u'Observation.DataProducts.Output_Correlated.enabled': True, + u'Observation.sampleClock': u'200' + }, + u'task_type': u'observation', + u'otdb_id': 1290476, + u'predecessors': [] + }] + }] + } + + def setUp(self): + def get_task_side_effect(*args, **kwargs): + if 'mom_id' in kwargs: + if kwargs['mom_id'] == self.successor_task_mom_id: + return self.successor_task + elif kwargs['mom_id'] == self.predecessor_task_mom_id: + return self.predecessor_task + else: + return self.task + else: + return self.task + + self.successor_task_mom_ids = [self.successor_task_mom_id] + self.predecessor_task_mom_ids = [self.predecessor_task_mom_id] + + rarpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC') + self.addCleanup(rarpc_patcher.stop) + self.rarpc_mock = rarpc_patcher.start() + self.rarpc_mock.getTask.side_effect = get_task_side_effect + + rerpc_patcher = mock.patch('lofar.messaging.RPC') + self.addCleanup(rerpc_patcher.stop) + self.rerpc_mock = rerpc_patcher.start() + + otdbrpc_patcher = mock.patch('lofar.sas.otdb.otdbrpc') + self.addCleanup(otdbrpc_patcher.stop) + self.otdbrpc_mock = otdbrpc_patcher.start() + + momrpc_patcher = mock.patch('lofar.mom.momqueryservice.momqueryrpc') + self.addCleanup(momrpc_patcher.stop) + self.momrpc_mock = momrpc_patcher.start() + self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): self.predecessor_task_mom_ids} + self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): self.successor_task_mom_ids} + + curpc_patcher = mock.patch('lofar.sas.datamanagement.cleanup.rpc') + self.addCleanup(curpc_patcher.stop) + self.curpc_mock = curpc_patcher.start() + + ra_notification_bus_patcher = mock.patch('lofar.messaging.messagebus') + self.addCleanup(ra_notification_bus_patcher.stop) + self.ra_notification_bus_mock = ra_notification_bus_patcher.start() + + logger_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.logger') + self.addCleanup(logger_patcher.stop) + self.logger_mock = logger_patcher.start() + + move_pipeline_after_its_predecessors_patcher = mock.patch( + 'lofar.sas.resourceassignment.resourceassigner.assignment.movePipelineAfterItsPredecessors') + self.addCleanup(move_pipeline_after_its_predecessors_patcher.stop) + self.movePipelineAfterItsPredecessors_mock = move_pipeline_after_its_predecessors_patcher.start() + + self.resourceAssigner = TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock, + self.otdbrpc_mock, self.momrpc_mock, + self.curpc_mock, self.ra_notification_bus_mock) + + def assert_all_services_opened(self): + self.assertTrue(self.rarpc_mock.open.called, "RARPC.open was not called") + self.assertTrue(self.rerpc_mock.open.called, "RPC.open was not called") + self.assertTrue(self.otdbrpc_mock.open.called, "OTDBRPC.open was not called") + self.assertTrue(self.momrpc_mock.open.called, "MOMRPC.open was not called") + self.assertTrue(self.curpc_mock.open.called, "CURPC.open was not called") + self.assertTrue(self.ra_notification_bus_mock.open.called, "ra_notification_bus.open was not called") + + def assert_all_services_closed(self): + self.assertTrue(self.rarpc_mock.close.called, "RARPC.close was not called") + self.assertTrue(self.rerpc_mock.close.called, "RPC.close was not called") + self.assertTrue(self.otdbrpc_mock.close.called, "OTDBRPC.close was not called") + self.assertTrue(self.momrpc_mock.close.called, "MOMRPC.close was not called") + self.assertTrue(self.curpc_mock.close.called, "CURPC.close was not called") + self.assertTrue(self.ra_notification_bus_mock.close.called, "ra_notification_bus.close was not called") + + def test_open_opens_all_services(self): + self.resourceAssigner.open() + + self.assert_all_services_opened() + + def test_close_closes_all_services(self): + self.resourceAssigner.close() + + self.assert_all_services_closed() + + def test_contextManager_opens_and_closes_all_services(self): + with TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock, + self.otdbrpc_mock, self.momrpc_mock, + self.curpc_mock, self.ra_notification_bus_mock): + self.assert_all_services_opened() + + self.assert_all_services_closed() + + def test_do_assignment_logs_specification(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call('doAssignment: specification_tree=%s' % self.specification_tree) + + def test_do_assignment_log_non_approved_or_prescheduled_states(self): + self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree) + + self.logger_mock.info.assert_any_call( + 'skipping specification for otdb_id=%s because status=%s', + (self.non_approved_or_prescheduled_otdb_id, self.non_approved_or_prescheduled_status)) + + @unittest.skip("Bug: Non approved and preschedules specification should be skipped") + def test_do_assignment_non_approved_or_prescheduled_states_should_be_skipped(self): + self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree) + + self.assertEqual(len(self.otdbrpc_mock.method_calls), 0, + "OTDBRPC was called for non approved or scheduled specification tree") + self.assertEqual(len(self.rarpc_mock.method_calls), 0, + "RARPC was called for non approved or scheduled specification tree") + self.assertEqual(len(self.momrpc_mock.method_calls), 0, + "MOMRPC was called for non approved or scheduled specification tree") + self.assertEqual(len(self.rerpc_mock.method_calls), 0, + "RERPC was called for non approved or scheduled specification tree") + self.assertEqual(len(self.curpc_mock.method_calls), 0, + "CURPC was called for non approved or scheduled specification tree") + self.assertEqual(len(self.ra_notification_bus_mock.method_calls), 0, + "RA notification bus was called for non approved or scheduled specification tree") + + def test_do_assignment_inserts_specification_and_task_in_radb(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + start_time = datetime.datetime.strptime(self.future_start_time, '%Y-%m-%d %H:%M:%S') + stop_time = datetime.datetime.strptime(self.future_stop_time, '%Y-%m-%d %H:%M:%S') + parset = parameterset(self.specification_tree['specification']) + + self.rarpc_mock.insertSpecificationAndTask.assert_any_call(self.mom_id, self.otdb_id, self.state, + self.task_type, start_time, stop_time, str(parset), + "CEP4") + + def test_do_assignment_logs_when_no_predecessors_found(self): + self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): []} + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call('no predecessors for otdb_id=%s', self.task_otdb_id) + + def test_do_assignment_logs_when_predecessors_are_found(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call('proccessing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', + self.predecessor_task_mom_ids, self.task_mom_id, self.task_otdb_id) + + @unittest.skip("Bug: logger gets called with an unfound predecessor task") + def test_do_assignment_logs_when_predecessors_are_found_but_its_task_is_missing_in_radb(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.warning.assert_any_call( + 'could not find predecessor task with otdb_id=%s in radb for task otdb_id=%s', -1, self.task_otdb_id) + + def test_do_assignment_logs_when_predecessors_are_found_that_need_to_be_linked_to_task(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call( + 'connecting predecessor task with mom_id=%s otdb_id=%s to it\'s successor with mom_id=%s otdb_id=%s', + self.predecessor_task_mom_id, + self.predecessor_task_otdb_id, + self.task_mom_id, + self.task_otdb_id) + + def test_do_assignment_insert_predecessor_into_task_when_not_linked_to_task(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.task_id, self.predecessor_task_id) + + @unittest.skip("Bug: logger gets called with an unfound predecessor task") + def test_do_assignment_logs_when_predecessors_are_found_but_its_task_is_missing_in_radb(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.warning.assert_any_call( + 'could not find predecessor task with otdb_id=%s in radb for task otdb_id=%s', -1, self.task_otdb_id) + + def test_do_assignment_logs_when_no_successors_found(self): + self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): []} + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call('no successors for otdb_id=%s', self.task_otdb_id) + + def test_do_assignment_logs_when_successors_are_found(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call('proccessing successor mom_ids=%s for mom_id=%s otdb_id=%s', + self.successor_task_mom_ids, self.task_mom_id, self.task_otdb_id) + + @unittest.skip("Bug: logger gets called with an unfound successor task," + " and text also mentions predecessor instead of successor") + def test_do_assignment_logs_when_successors_are_found_but_its_task_is_missing_in_radb(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.warning.assert_any_call( + 'could not find successor task with otdb_id=%s in radb for task otdb_id=%s', + -1, self.task_otdb_id) + + def test_do_assignment_logs_when_successors_are_found_that_need_to_be_linked_to_task(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call( + 'connecting successor task with mom_id=%s otdb_id=%s to it\'s predecessor with mom_id=%s otdb_id=%s', + self.successor_task_mom_id, + self.successor_task_otdb_id, + self.task_mom_id, + self.task_otdb_id) + + def test_do_assignment_insert_successor_into_task_when_not_linked_to_task(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.successor_task_id, self.task_id) + + def test_do_assignment_moves_pipeline_of_successor_after_predecessor(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.assertTrue(self.movePipelineAfterItsPredecessors_mock.called) + + def test_do_assignment_logs_mom_bug(self): + self.resourceAssigner.doAssignment(self.mom_bug_specification_tree) + + self.logger_mock.info.assert_any_call( + 'overwriting and uploading processingClusterName to otdb from %s to %s for otdb_id=%s', + self.mom_bug_processing_cluster_name, 'CEP4', self.mom_bug_otbd_id) + + def test_do_assignment_resets_ProcessingCluster_clusterName_on_mom_bug(self): + self.resourceAssigner.doAssignment(self.mom_bug_specification_tree) + + self.otdbrpc_mock.taskSetSpecification.assert_any_call( + self.mom_bug_otbd_id, + {'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': 'CEP4'}) + + @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') + def test_do_assignment_should_reset_observation_period_when_in_past_without_predecessor_and_duration( + self, datetime_mock): + now = datetime.datetime.utcnow() + datetime.timedelta(days=1) + datetime_mock.utcnow.return_value = now + + new_starttime = now + datetime.timedelta(minutes=1) + new_endtime = new_starttime + datetime.timedelta(hours=1) + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.warning.assert_any_call( + 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', + new_starttime, new_endtime, self.otdb_id) + self.logger_mock.info.assert_any_call( + 'uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', + new_starttime, + new_endtime, + self.otdb_id) + self.otdbrpc_mock.taskSetSpecification.assert_any_call( + self.otdb_id, + { + 'LOFAR.ObsSW.Observation.startTime': new_starttime.strftime('%Y-%m-%d %H:%M:%S'), + 'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S') + }) + + @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') + def test_do_assignment_should_reset_observation_period_when_in_past_with_predecessor_in_future(self, datetime_mock): + now = datetime.datetime.utcnow() + datetime.timedelta(days=1) + now = self.strip_ms(now) + + datetime_mock.utcnow.return_value = now + datetime_mock.strptime.side_effect = \ + lambda date_string, format_string: datetime.datetime.strptime(date_string, format_string) + + future_predecessor_stop_time = now + datetime.timedelta(hours=1) + tree_with_predecessor_observation_stop_time = deepcopy(self.specification_tree) + tree_with_predecessor_observation_stop_time['predecessors'][0]['specification']['Observation.stopTime'] = \ + future_predecessor_stop_time.strftime('%Y-%m-%d %H:%M:%S') + + new_starttime = future_predecessor_stop_time + datetime.timedelta(minutes=1) + new_endtime = new_starttime + datetime.timedelta(hours=1) + + self.resourceAssigner.doAssignment(tree_with_predecessor_observation_stop_time) + + self.logger_mock.warning.assert_any_call( + 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', + new_starttime, new_endtime, self.otdb_id) + self.logger_mock.info.assert_any_call( + 'uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', + new_starttime, + new_endtime, + self.otdb_id) + self.otdbrpc_mock.taskSetSpecification.assert_any_call( + self.otdb_id, + { + 'LOFAR.ObsSW.Observation.startTime': new_starttime.strftime('%Y-%m-%d %H:%M:%S'), + 'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S') + }) + + def strip_ms(self, now): + return datetime.datetime.strptime(now.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S') + + @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') + def test_do_assignment_should_reset_observation_period_when_in_past_with_task_duration(self, datetime_mock): + now = datetime.datetime.utcnow() + datetime.timedelta(days=1) + + datetime_mock.utcnow.return_value = now + + tree_with_observation_duraction = deepcopy(self.specification_tree) + duration = 100 + tree_with_observation_duraction['specification']['Observation.Scheduler.taskDuration'] = duration + + new_starttime = now + datetime.timedelta(minutes=1) + new_endtime = new_starttime + datetime.timedelta(seconds=duration) + + self.resourceAssigner.doAssignment(tree_with_observation_duraction) + + self.logger_mock.warning.assert_any_call( + 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', + new_starttime, new_endtime, self.otdb_id) + self.logger_mock.info.assert_any_call( + 'uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', + new_starttime, + new_endtime, + self.otdb_id) + self.otdbrpc_mock.taskSetSpecification.assert_any_call( + self.otdb_id, + { + 'LOFAR.ObsSW.Observation.startTime': new_starttime.strftime('%Y-%m-%d %H:%M:%S'), + 'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S') + }) + + def test_do_assignment_should_log_on_non_presceduled_cep4_tasks(self): + self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree) + + self.logger_mock.info.assert_any_call( + 'skipping resource assignment for CEP4 task otdb_id=%s because status=%s' % + (self.non_approved_or_prescheduled_otdb_id, + self.non_approved_or_prescheduled_status)) + + def test_do_assignment_should_not_claim_resources_on_non_presceduled_cep4_tasks(self): + self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree) + + self.rarpc_mock.insertResourceClaims.assert_not_called() + +if __name__ == '__main__': unittest.main()