#!/usr/bin/env python import unittest import mock import datetime from lofar.sas.resourceassignment.resourceassigner.assignment import ResourceAssigner from lofar.parameterset import parameterset ra_notification_prefix = "ra_notification_prefix" class TestingResourceAssigner(ResourceAssigner): def __init__(self, rarpc, rerpc, otdbrpc, momrpc, curpc, sqrpc, ra_notification_bus): # super gets not done to be able to insert mocks as early as possible otherwise the RPC block unittesting self.radbrpc = rarpc self.rerpc = rerpc self.otdbrpc = otdbrpc self.momrpc = momrpc self.curpc = curpc self.sqrpc = sqrpc self.ra_notification_bus = ra_notification_bus self.ra_notification_prefix = ra_notification_prefix class ResourceAssignerTest(unittest.TestCase): mom_id = 351557 otdb_id = 1290494 specification_id = 2323 state = u'prescheduled' task_type = u'pipeline' specification_tree = {} non_approved_or_prescheduled_status = u'opened' non_approved_or_prescheduled_otdb_id = 1 future_start_time = (datetime.datetime.utcnow() + datetime.timedelta(hours=1)).strftime('%Y-%m-%d %H:%M:%S') future_stop_time = (datetime.datetime.utcnow() + datetime.timedelta(hours=2)).strftime('%Y-%m-%d %H:%M:%S') non_approved_or_prescheduled_specification_tree = { u'otdb_id': non_approved_or_prescheduled_otdb_id, u'task_type': u'pipeline', u'state': non_approved_or_prescheduled_status, u'specification': { u'Observation.startTime': future_start_time, u'Observation.stopTime': future_stop_time } } approved_status = u'approved' approved_otdb_id = 22 approved_specification_tree = { u'otdb_id': approved_otdb_id, u'task_type': u'pipeline', u'state': approved_status, u'specification': { u'Observation.startTime': future_start_time, u'Observation.stopTime': future_stop_time } } cep2_specification_tree = { u'otdb_id': otdb_id, u'task_type': u'pipeline', u'state': u'prescheduled', u'specification': { u'Observation.startTime': future_start_time, u'Observation.stopTime': future_stop_time, u'Observation.DataProducts.Output_Pulsar.enabled': True, u'Observation.DataProducts.Output_Pulsar.storageClusterName': u'CEP2' } } mom_bug_processing_cluster_name = 'CEP2' mom_bug_otbd_id = 1234 mom_bug_specification_tree = { u'otdb_id': mom_bug_otbd_id, u'task_type': u'pipeline', u'state': u'prescheduled', u'specification': { u'Observation.startTime': future_start_time, u'Observation.stopTime': future_stop_time, u'Observation.DataProducts.Output_Pulsar.enabled': True, u'Observation.DataProducts.Output_Pulsar.storageClusterName': u'CEP4', u'Observation.Cluster.ProcessingCluster.clusterName': mom_bug_processing_cluster_name } } task_mom_id = 351543 task_otdb_id = 1290472 task_id = 2299 task_end_time = datetime.datetime(2016, 3, 25, 22, 47, 31) task_start_time = datetime.datetime(2016, 3, 25, 21, 47, 31) task = { "mom_id": task_mom_id, "otdb_id": task_otdb_id, "id": task_id, "endtime": task_end_time, "name": "IS HBA_DUAL", "predecessor_ids": [], "project_mom_id": 2, "project_name": "test-lofar", "specification_id": specification_id, "starttime": task_start_time, "status": "prescheduled", "status_id": 350, "successor_ids": [], "type": "pipeline", "type_id": 0 } non_existing_task_mom_id = -1 predecessor_task_mom_id = 1 predecessor_task_otdb_id = 2 predecessor_task_id = 3 predecessor_task = { "mom_id": predecessor_task_mom_id, "otdb_id": predecessor_task_otdb_id, "id": predecessor_task_id, "endtime": datetime.datetime(2016, 3, 25, 22, 47, 31), "name": "IS HBA_DUAL", "predecessor_ids": [], "project_mom_id": 2, "project_name": "test-lofar", "specification_id": 2323, "starttime": datetime.datetime(2016, 3, 25, 21, 47, 31), "status": "prescheduled", "status_id": 350, "successor_ids": [], "type": "pipeline", "type_id": 0 } successor_task_mom_id = 4 successor_task_otdb_id = 5 successor_task_id = 6 successor_task = { "mom_id": successor_task_mom_id, "otdb_id": successor_task_otdb_id, "id": successor_task_id, "endtime": datetime.datetime(2016, 3, 25, 22, 47, 31), "name": "IS HBA_DUAL", "predecessor_ids": [], "project_mom_id": 2, "project_name": "test-lofar", "specification_id": 2323, "starttime": datetime.datetime(2016, 3, 25, 21, 47, 31), "status": "prescheduled", "status_id": 350, "successor_ids": [], "type": "pipeline", "type_id": 0 } resources_with_errors_otdb_id = 1290496 resource_error1 = "error 1" resource_error2 = "error 2" unknown_resource_type_name = "fuel" unknown_resource_type_otdb_id = 123489 resource_unknown_property = 'unknown_property' rerpc_status = 0 rerpc_needed_claim_for_bandwidth_size = 19021319494 rerpc_needed_claim_for_bandwidth = { 'total_size': rerpc_needed_claim_for_bandwidth_size } rerpc_needed_claim_for_storage_output_files = { 'uv': { 'nr_of_uv_files': 481, 'uv_file_size': 1482951104 }, 'saps': [ { 'sap_nr': 0, 'properties': { 'nr_of_uv_files': 319 } }, { 'sap_nr': 1, 'properties': { 'nr_of_uv_files': 81, resource_unknown_property: -1 } }, { 'sap_nr': 2, 'properties': { 'nr_of_uv_files': 81 } } ] } rerpc_needed_claim_for_storage_size = 713299481024 rerpc_needed_claim_for_storage = { 'total_size': rerpc_needed_claim_for_storage_size, 'output_files': rerpc_needed_claim_for_storage_output_files } rerpc_replymessage = { str(otdb_id): { 'pipeline': { 'bandwidth': rerpc_needed_claim_for_bandwidth, 'storage': rerpc_needed_claim_for_storage } }, str(resources_with_errors_otdb_id): { 'pipeline': { 'bandwidth': { 'total_size': 19021319494 }, 'storage': { 'total_size': 713299481024, 'output_files': { 'uv': { 'nr_of_uv_files': 481, 'uv_file_size': 1482951104 }, 'saps': [{ 'sap_nr': 0, 'properties': { 'nr_of_uv_files': 319 } }, { 'sap_nr': 1, 'properties': { 'nr_of_uv_files': 81 } }, { 'sap_nr': 2, 'properties': { 'nr_of_uv_files': 81 } }] } }, }, 'errors': [resource_error1, resource_error2] }, str(unknown_resource_type_otdb_id): { 'pipeline': { str(unknown_resource_type_name): { } } } } cep4bandwidth_resource_id = 116 cep4storage_resource_id = 117 storage_claim = { 'resource_id': cep4storage_resource_id, 'starttime': task_start_time, 'endtime': task_end_time + datetime.timedelta(days=365), 'status': 'claimed', 'claim_size': rerpc_needed_claim_for_storage_size, 'properties': [ {'type': 2, 'io_type': 'output', 'value': 481}, {'type': 10, 'io_type': 'output', 'value': 1482951104}, {'type': 2, 'io_type': 'output', 'sap_nr': 0, 'value': 319}, {'type': 2, 'io_type': 'output', 'sap_nr': 1, 'value': 81}, {'type': 2, 'io_type': 'output', 'sap_nr': 2, 'value': 81} ] } bandwith_claim = { 'resource_id': cep4bandwidth_resource_id, 'starttime': task_start_time, 'endtime': task_end_time, 'status': 'claimed', 'claim_size': rerpc_needed_claim_for_bandwidth_size } specifaction_claims = [bandwith_claim, storage_claim] def reset_specification_tree(self): self.specification_tree = { u'otdb_id': self.otdb_id, u'task_type': self.task_type, u'state': self.state, u'specification': { u'Observation.momID': str(self.mom_id), u'Observation.startTime': self.future_start_time, u'Observation.stopTime': self.future_stop_time, u'Observation.DataProducts.Output_InstrumentModel.enabled': False, u'Observation.VirtualInstrument.stationList': [], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0], u'Observation.antennaSet': u'LBA_INNER', u'Observation.nrBitsPerSample': u'16', u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'2', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.enabled': True, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Version.number': u'33774', u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'2', u'Observation.nrBeams': u'0', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], u'Observation.DataProducts.Output_Correlated.enabled': True, u'Observation.DataProducts.Output_Correlated.storageClusterName': 'CEP4', u'Observation.sampleClock': u'200', u'Observation.Cluster.ProcessingCluster.clusterName': 'CEP4' }, u'task_subtype': u'long baseline pipeline', u'predecessors': [{ u'task_subtype': u'averaging pipeline', u'specification': { u'Observation.DataProducts.Output_InstrumentModel.enabled': False, u'Observation.stopTime': u'2016-03-25 13:51:05', u'Observation.VirtualInstrument.stationList': [], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0], u'Observation.antennaSet': u'LBA_INNER', u'Observation.nrBitsPerSample': u'16', u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'1', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.enabled': True, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep': u'10', u'Version.number': u'33774', u'Observation.momID': u'351556', u'Observation.startTime': u'2016-03-25 13:49:55', u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'1', u'Observation.nrBeams': u'0', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep': u'64', u'Observation.DataProducts.Output_Correlated.enabled': True, u'Observation.sampleClock': u'200' }, u'task_type': u'pipeline', u'otdb_id': 1290496, u'predecessors': [{ u'task_subtype': u'bfmeasurement', u'specification': { u'Observation.DataProducts.Output_InstrumentModel.enabled': False, u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor': u'1', u'Observation.stopTime': u'2016-03-26 00:33:31', u'Observation.VirtualInstrument.stationList': [u'RS205', u'RS503', u'CS013', u'RS508', u'RS106'], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': False, u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband': u'64', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which': u'I', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which': u'I', u'Observation.Beam[0].subbandList': [100, 101, 102, 103], u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile': u'512', u'Observation.DataProducts.Input_Correlated.skip': [], u'Observation.antennaSet': u'HBA_DUAL', u'Observation.nrBitsPerSample': u'8', u'Observation.Beam[0].nrTabRings': u'0', u'Observation.Beam[0].nrTiedArrayBeams': u'0', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye': False, u'Observation.nrBeams': u'1', u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime': u'1.0', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.enabled': False, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Version.number': u'33774', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor': u'1', u'Observation.momID': u'351539', u'Observation.startTime': u'2016-03-26 00:31:31', u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile': u'512', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], u'Observation.DataProducts.Output_Correlated.enabled': True, u'Observation.sampleClock': u'200' }, u'task_type': u'observation', u'otdb_id': 1290476, u'predecessors': [] }] }] } def setUp(self): def get_task_side_effect(*args, **kwargs): if 'mom_id' in kwargs: if kwargs['mom_id'] == self.successor_task_mom_id: return self.successor_task elif kwargs['mom_id'] == self.predecessor_task_mom_id: return self.predecessor_task elif kwargs['mom_id'] == self.non_existing_task_mom_id: return None else: return self.task else: return self.task self.successor_task_mom_ids = [self.successor_task_mom_id] self.predecessor_task_mom_ids = [self.predecessor_task_mom_id] rarpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC') self.addCleanup(rarpc_patcher.stop) self.rarpc_mock = rarpc_patcher.start() self.rarpc_mock.getTask.side_effect = get_task_side_effect self.rarpc_mock.insertSpecificationAndTask.return_value = { 'inserted': True, 'specification_id': self.specification_id, 'task_id': self.task_id } self.rarpc_mock.getResourceClaimPropertyTypes.return_value = [ {'id': 0, 'name': 'nr_of_is_files'}, {'id': 1, 'name': 'nr_of_cs_files'}, {'id': 2, 'name': 'nr_of_uv_files'}, {'id': 3, 'name': 'nr_of_im_files'}, {'id': 4, 'name': 'nr_of_img_files'}, {'id': 5, 'name': 'nr_of_pulp_files'}, {'id': 6, 'name': 'nr_of_cs_stokes'}, {'id': 7, 'name': 'nr_of_is_stokes'}, {'id': 8, 'name': 'is_file_size'}, {'id': 9, 'name': 'cs_file_size'}, {'id': 10, 'name': 'uv_file_size'}, {'id': 11, 'name': 'im_file_size'}, {'id': 12, 'name': 'img_file_size'}, {'id': 13, 'name': 'nr_of_pulp_files'}, {'id': 14, 'name': 'nr_of_cs_parts'}, {'id': 15, 'name': 'start_sb_nr'}, {'id': 16, 'name': 'uv_otdb_id'}, {'id': 17, 'name': 'cs_otdb_id'}, {'id': 18, 'name': 'is_otdb_id'}, {'id': 19, 'name': 'im_otdb_id'}, {'id': 20, 'name': 'img_otdb_id'}, {'id': 21, 'name': 'pulp_otdb_id'}, {'id': 22, 'name': 'is_tab_nr'}, {'id': 23, 'name': 'start_sbg_nr'}, {'id': 24, 'name': 'pulp_file_size'} ] self.rarpc_mock.getResourceTypes.return_value = [ {'id': 0, 'name': 'rsp', 'unit_id': 0, 'units': 'rsp_channel_bit'}, {'id': 1, 'name': 'tbb', 'unit_id': 1, 'units': 'bytes'}, {'id': 2, 'name': 'rcu', 'unit_id': 2, 'units': 'rcu_board'}, {'id': 3, 'name': 'bandwidth', 'unit_id': 3, 'units': 'bits/second'}, {'id': 4, 'name': 'processor', 'unit_id': 4, 'units': 'cores'}, {'id': 5, 'name': 'storage', 'unit_id': 1, 'units': 'bytes'}, ] self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1, 2]} # incomplete response but good enough for tests self.rarpc_mock.getResources.return_value = [ {'id': self.cep4bandwidth_resource_id, 'name': 'cep4bandwidth', 'type_id': 3, 'type_name': 'bandwidth', 'unit_id': 3, 'unit': 'bits/second'}, {'id': self.cep4storage_resource_id, 'name': 'cep4storage', 'type_id': 5, 'type_name': 'storage', 'unit_id': 1, 'unit': 'bytes'} ] self.rarpc_mock.getResourceClaims.return_value = [] rerpc_patcher = mock.patch('lofar.messaging.RPC') self.addCleanup(rerpc_patcher.stop) self.rerpc_mock = rerpc_patcher.start() self.rerpc_mock.return_value = self.rerpc_replymessage, self.rerpc_status otdbrpc_patcher = mock.patch('lofar.sas.otdb.otdbrpc') self.addCleanup(otdbrpc_patcher.stop) self.otdbrpc_mock = otdbrpc_patcher.start() momrpc_patcher = mock.patch('lofar.mom.momqueryservice.momqueryrpc') self.addCleanup(momrpc_patcher.stop) self.momrpc_mock = momrpc_patcher.start() self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): self.predecessor_task_mom_ids} self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): self.successor_task_mom_ids} curpc_patcher = mock.patch('lofar.sas.datamanagement.cleanup.rpc') self.addCleanup(curpc_patcher.stop) self.curpc_mock = curpc_patcher.start() sqrpc_patcher = mock.patch('lofar.sas.datamanagement.storagequery.rpc') self.addCleanup(sqrpc_patcher.stop) self.sqrpc_mock = sqrpc_patcher.start() ra_notification_bus_patcher = mock.patch('lofar.messaging.messagebus') self.addCleanup(ra_notification_bus_patcher.stop) self.ra_notification_bus_mock = ra_notification_bus_patcher.start() logger_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.logger') self.addCleanup(logger_patcher.stop) self.logger_mock = logger_patcher.start() move_pipeline_after_its_predecessors_patcher = mock.patch( 'lofar.sas.resourceassignment.resourceassigner.assignment.movePipelineAfterItsPredecessors') self.addCleanup(move_pipeline_after_its_predecessors_patcher.stop) self.movePipelineAfterItsPredecessors_mock = move_pipeline_after_its_predecessors_patcher.start() self.resourceAssigner = TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock, self.otdbrpc_mock, self.momrpc_mock, self.curpc_mock, self.sqrpc_mock, self.ra_notification_bus_mock) self.reset_specification_tree() def assert_all_services_opened(self): self.assertTrue(self.rarpc_mock.open.called, "RARPC.open was not called") self.assertTrue(self.rerpc_mock.open.called, "RPC.open was not called") self.assertTrue(self.otdbrpc_mock.open.called, "OTDBRPC.open was not called") self.assertTrue(self.momrpc_mock.open.called, "MOMRPC.open was not called") self.assertTrue(self.curpc_mock.open.called, "CURPC.open was not called") self.assertTrue(self.ra_notification_bus_mock.open.called, "ra_notification_bus.open was not called") def assert_all_services_closed(self): self.assertTrue(self.rarpc_mock.close.called, "RARPC.close was not called") self.assertTrue(self.rerpc_mock.close.called, "RPC.close was not called") self.assertTrue(self.otdbrpc_mock.close.called, "OTDBRPC.close was not called") self.assertTrue(self.momrpc_mock.close.called, "MOMRPC.close was not called") self.assertTrue(self.curpc_mock.close.called, "CURPC.close was not called") self.assertTrue(self.ra_notification_bus_mock.close.called, "ra_notification_bus.close was not called") def test_open_opens_all_services(self): self.resourceAssigner.open() self.assert_all_services_opened() def test_close_closes_all_services(self): self.resourceAssigner.close() self.assert_all_services_closed() def test_contextManager_opens_and_closes_all_services(self): with TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock, self.otdbrpc_mock, self.momrpc_mock, self.curpc_mock, self.sqrpc_mock, self.ra_notification_bus_mock): self.assert_all_services_opened() self.assert_all_services_closed() def test_do_assignment_logs_specification(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call('doAssignment: specification_tree=%s' % self.specification_tree) def test_do_assignment_log_non_approved_or_prescheduled_states(self): self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree) self.logger_mock.info.assert_any_call( 'skipping specification for otdb_id=%s because status=%s', (self.non_approved_or_prescheduled_otdb_id, self.non_approved_or_prescheduled_status)) def test_do_assignment_non_approved_or_prescheduled_states_should_be_skipped(self): self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree) self.assertEqual(len(self.otdbrpc_mock.method_calls), 0, "OTDBRPC was called for non approved or scheduled specification tree") self.assertEqual(len(self.rarpc_mock.method_calls), 0, "RARPC was called for non approved or scheduled specification tree") self.assertEqual(len(self.momrpc_mock.method_calls), 0, "MOMRPC was called for non approved or scheduled specification tree") self.assertEqual(len(self.rerpc_mock.method_calls), 0, "RERPC was called for non approved or scheduled specification tree") self.assertEqual(len(self.curpc_mock.method_calls), 0, "CURPC was called for non approved or scheduled specification tree") self.assertEqual(len(self.ra_notification_bus_mock.method_calls), 0, "RA notification bus was called for non approved or scheduled specification tree") def test_do_assignment_inserts_specification_and_task_in_radb(self): self.resourceAssigner.doAssignment(self.specification_tree) start_time = datetime.datetime.strptime(self.future_start_time, '%Y-%m-%d %H:%M:%S') stop_time = datetime.datetime.strptime(self.future_stop_time, '%Y-%m-%d %H:%M:%S') parset = parameterset(self.specification_tree['specification']) self.rarpc_mock.insertSpecificationAndTask.assert_any_call(self.mom_id, self.otdb_id, self.state, self.task_type, start_time, stop_time, str(parset), "CEP4") def test_do_assignment_logs_when_insertion_of_specification_and_task_in_radb_failed(self): self.rarpc_mock.insertSpecificationAndTask.return_value = {'inserted': False} self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.error.assert_any_call('could not insert specification and task') def test_do_assignment_logs_when_no_predecessors_found(self): self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): []} self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call('no predecessors for otdb_id=%s', self.task_otdb_id) def test_do_assignment_logs_when_predecessors_are_found(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call('proccessing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', self.predecessor_task_mom_ids, self.task_mom_id, self.task_otdb_id) def test_do_assignment_logs_when_predecessors_are_found_but_its_task_is_missing_in_radb(self): self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): [self.non_existing_task_mom_id]} self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.warning.assert_any_call( 'could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', self.non_existing_task_mom_id, self.task_otdb_id) def test_do_assignment_logs_when_predecessors_are_found_that_need_to_be_linked_to_task(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call( 'connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s otdb_id=%s', self.predecessor_task_mom_id, self.predecessor_task_otdb_id, self.task_mom_id, self.task_otdb_id) def test_do_assignment_insert_predecessor_into_task_when_not_linked_to_task(self): self.resourceAssigner.doAssignment(self.specification_tree) self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.task_id, self.predecessor_task_id) def test_do_assignment_logs_when_no_successors_found(self): self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): []} self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call('no successors for otdb_id=%s', self.task_otdb_id) def test_do_assignment_logs_when_successors_are_found(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call('proccessing successor mom_ids=%s for mom_id=%s otdb_id=%s', self.successor_task_mom_ids, self.task_mom_id, self.task_otdb_id) def test_do_assignment_logs_when_successors_are_found_but_its_task_is_missing_in_radb(self): self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): [self.non_existing_task_mom_id]} self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.warning.assert_any_call( 'could not find successor task with mom_id=%s in radb for task mom_id=%s otdb_id=%s', self.non_existing_task_mom_id, self.task_mom_id, self.task_otdb_id) def test_do_assignment_logs_when_successors_are_found_that_need_to_be_linked_to_task(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call( 'connecting successor task with mom_id=%s otdb_id=%s to its predecessor with mom_id=%s otdb_id=%s', self.successor_task_mom_id, self.successor_task_otdb_id, self.task_mom_id, self.task_otdb_id) def test_do_assignment_insert_successor_into_task_when_not_linked_to_task(self): self.resourceAssigner.doAssignment(self.specification_tree) self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.successor_task_id, self.task_id) def test_do_assignment_moves_pipeline_of_successor_after_predecessor(self): self.resourceAssigner.doAssignment(self.specification_tree) self.assertTrue(self.movePipelineAfterItsPredecessors_mock.called) def test_do_assignment_logs_mom_bug(self): self.resourceAssigner.doAssignment(self.mom_bug_specification_tree) self.logger_mock.info.assert_any_call( 'overwriting and uploading processingClusterName to otdb from %s to %s for otdb_id=%s', self.mom_bug_processing_cluster_name, 'CEP4', self.mom_bug_otbd_id) def test_do_assignment_resets_ProcessingCluster_clusterName_on_mom_bug(self): self.resourceAssigner.doAssignment(self.mom_bug_specification_tree) self.otdbrpc_mock.taskSetSpecification.assert_any_call( self.mom_bug_otbd_id, {'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': 'CEP4'}) @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') def test_do_assignment_should_reset_observation_period_when_in_past_without_predecessor_and_duration( self, datetime_mock): now = datetime.datetime.utcnow() + datetime.timedelta(days=1) datetime_mock.utcnow.return_value = now new_starttime = now + datetime.timedelta(minutes=1) new_endtime = new_starttime + datetime.timedelta(hours=1) self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.warning.assert_any_call( 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', new_starttime, new_endtime, self.otdb_id) self.logger_mock.info.assert_any_call( 'uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', new_starttime, new_endtime, self.otdb_id) self.otdbrpc_mock.taskSetSpecification.assert_any_call( self.otdb_id, { 'LOFAR.ObsSW.Observation.startTime': new_starttime.strftime('%Y-%m-%d %H:%M:%S'), 'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S') }) @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') def test_do_assignment_should_reset_observation_period_when_in_past_with_predecessor_in_future(self, datetime_mock): now = self.freeze_time_one_day_in_the_future(datetime_mock) future_predecessor_stop_time = now + datetime.timedelta(hours=1) self.specification_tree['predecessors'][0]['specification']['Observation.stopTime'] = \ future_predecessor_stop_time.strftime('%Y-%m-%d %H:%M:%S') new_starttime = future_predecessor_stop_time + datetime.timedelta(minutes=1) new_endtime = new_starttime + datetime.timedelta(hours=1) self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.warning.assert_any_call( 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', new_starttime, new_endtime, self.otdb_id) self.logger_mock.info.assert_any_call( 'uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', new_starttime, new_endtime, self.otdb_id) self.otdbrpc_mock.taskSetSpecification.assert_any_call( self.otdb_id, { 'LOFAR.ObsSW.Observation.startTime': new_starttime.strftime('%Y-%m-%d %H:%M:%S'), 'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S') }) def freeze_time_one_day_in_the_future(self, datetime_mock): now = datetime.datetime.utcnow() + datetime.timedelta(days=1) now = self._strip_ms(now) datetime_mock.utcnow.return_value = now datetime_mock.strptime.side_effect = \ lambda date_string, format_string: datetime.datetime.strptime(date_string, format_string) return now def _strip_ms(self, now): return datetime.datetime.strptime(now.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S') @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') def test_do_assignment_should_reset_observation_period_when_in_past_with_task_duration(self, datetime_mock): now = datetime.datetime.utcnow() + datetime.timedelta(days=1) datetime_mock.utcnow.return_value = now duration = 100 self.specification_tree['specification']['Observation.Scheduler.taskDuration'] = duration new_starttime = now + datetime.timedelta(minutes=1) new_endtime = new_starttime + datetime.timedelta(seconds=duration) self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.warning.assert_any_call( 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', new_starttime, new_endtime, self.otdb_id) self.logger_mock.info.assert_any_call( 'uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', new_starttime, new_endtime, self.otdb_id) self.otdbrpc_mock.taskSetSpecification.assert_any_call( self.otdb_id, { 'LOFAR.ObsSW.Observation.startTime': new_starttime.strftime('%Y-%m-%d %H:%M:%S'), 'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S') }) def test_do_assignment_should_log_insertion_of_specification_and_task(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call( 'doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s' ' cluster=%s' % (self.mom_id, self.otdb_id, self.state, self.task_type, self.future_start_time, self.future_stop_time, "CEP4")) def test_do_assignment_should_log_when_insertion_of_specification_and_task_is_done(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call('doAssignment: inserted specification (id=%s) and task (id=%s)' % (self.specification_id, self.task_id)) def test_do_assignment_should_log_on_non_presceduled_cep4_tasks(self): self.resourceAssigner.doAssignment(self.approved_specification_tree) self.logger_mock.info.assert_any_call( 'skipping resource assignment for CEP4 task otdb_id=%s because status=%s' % (self.approved_otdb_id, self.approved_status)) def test_do_assignment_should_not_claim_resources_on_CEP2_tasks(self): self.resourceAssigner.doAssignment(self.cep2_specification_tree) self.rarpc_mock.insertResourceClaims.assert_not_called() def test_do_assignment_should_not_claim_resources_on_non_presceduled_cep4_tasks(self): self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree) self.rarpc_mock.insertResourceClaims.assert_not_called() def test_do_assignment_should_log_if_not_all_storageClusters_are_seen_as_CEP4(self): self.resourceAssigner.doAssignment(self.cep2_specification_tree) self.logger_mock.warn.assert_any_call("storageClusterName not CEP4, rejecting specification.") def test_do_assignment_should_log_if_all_storageClusters_are_seen_as_CEP4(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call("all enabled storageClusterName's are CEP4, accepting specification.") def test_do_assginement_should_request_needed_resources(self): self.resourceAssigner.doAssignment(self.specification_tree) self.rerpc_mock.assert_any_call( {"specification_tree": self.specification_tree}, timeout=10) def test_do_assignemnet_logs_needed_resources(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call('getNeededResouces: %s' % self.rerpc_replymessage) self.logger_mock.info.assert_any_call('doAssignment: getNeededResouces=%s' % self.rerpc_replymessage) def test_do_assignment_logs_when_otdb_id_not_needed_resources(self): self.specification_tree["otdb_id"] = self.otdb_id + 1 self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.error.assert_any_call("no otdb_id %s found in estimator results %s" % (self.otdb_id + 1, self.rerpc_replymessage)) def test_do_assignment_should_not_claim_resouces_when_otdb_id_not_needed_resources(self): self.specification_tree["otdb_id"] = self.otdb_id + 1 self.resourceAssigner.doAssignment(self.specification_tree) self.rarpc_mock.insertResourceClaims.assert_not_called() def test_do_assignment_logs_when_task_type_not_in_needed_resources(self): wrong_task_type = "observation" self.specification_tree["task_type"] = wrong_task_type self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.error.assert_any_call("no task type %s found in estimator results %s" % (wrong_task_type, self.rerpc_replymessage[str(self.otdb_id)])) def test_do_assignment_should_not_claim_resources_when_task_type_not_in_needed_resources(self): wrong_task_type = "observation" self.specification_tree["task_type"] = wrong_task_type self.resourceAssigner.doAssignment(self.specification_tree) self.rarpc_mock.insertResourceClaims.assert_not_called() def test_do_assignment_should_log_single_errors_in_needed_resources(self): self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.error.assert_any_call("Error in estimator: %s", self.resource_error1) self.logger_mock.error.assert_any_call("Error in estimator: %s", self.resource_error2) def test_do_assignment_should_log_error_in_needed_resources(self): self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.error.assert_any_call( "Error(s) in estimator for otdb_id=%s radb_id=%s, setting task status to 'error'", self.resources_with_errors_otdb_id, self.task_id) def test_do_assignment_should_update_task_with_error_on_errors_in_needed_resources(self): self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id self.resourceAssigner.doAssignment(self.specification_tree) self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error') def test_do_assignment_should_notify_bus_on_errors_in_needed_resources(self): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} subject = 'TaskError' self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id self.resourceAssigner.doAssignment(self.specification_tree) self.assertBusNotificationAndLogging(content, subject) def ra_notification_bus_send_called_with(self, content, subject): found = False for call in self.ra_notification_bus_mock.send.call_args_list: if call[0][0].subject == subject and call[0][0].content == content: found = True return found def test_do_assignment_should_log_start_of_claim_resources(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call('claimResources: task %s needed_resources=%s' % (self.task, self.rerpc_replymessage[str(str(self.otdb_id))])) def test_do_assigment_should_log_when_claiming_unknown_resource_type(self): self.specification_tree["otdb_id"] = self.unknown_resource_type_otdb_id self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.warn.assert_any_call('claimResources: unknown resource_type:%s' % self.unknown_resource_type_name) def test_do_assignment_logs_claimable_resources_in_specification(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call('claimResources: processing resource_type: %s contents: %s' % ('storage', self.rerpc_needed_claim_for_storage)) self.logger_mock.info.assert_any_call('claimResources: processing resource_type: %s contents: %s' % ('bandwidth', self.rerpc_needed_claim_for_bandwidth)) def test_do_assignment_logs_created_claim_per_needed_resource_type(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call('claimResources: created claim:%s' % self.storage_claim) self.logger_mock.info.assert_any_call('claimResources: created claim:%s' % self.bandwith_claim) def test_do_assignment_logs_amount_of_claims_inserted_in_radb(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call('claimResources: inserting %d claims in the radb' % 2) def test_do_assignment_inserts_resource_claims_in_radb(self): self.resourceAssigner.doAssignment(self.specification_tree) self.rarpc_mock.insertResourceClaims.assert_any_call(self.task_id, self.specifaction_claims, 1, 'anonymous', -1) def test_do_assignment_logs_amount_claims_inserted(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call('claimResources: %d claims were inserted in the radb' % 2) def test_do_assignment_logs_unknown_property_on_needed_resources(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.warn.assert_any_call('claimResources: unknown prop_type:%s' % self.resource_unknown_property) def test_do_assignment_logs_multiple_properties_on_needed_resource(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call( 'claimResources: processing resource_type: %s subdict_name: \'%s\' subdict_contents: %s' % ("storage", "output_files", self.rerpc_needed_claim_for_storage_output_files)) def test_do_assignment_logs_when_it_was_unable_to_claim_all_resources(self): self.rarpc_mock.insertResourceClaims.return_value = {'ids': []} self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.warning.assert_any_call( 'doAssignment: No claims could be made. Setting task %s status to error' % self.task_id) def test_do_assignment_updates_task_when_it_was_unable_to_claim_all_resources(self): self.rarpc_mock.insertResourceClaims.return_value = {'ids': []} self.resourceAssigner.doAssignment(self.specification_tree) self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error') def test_do_assignment_notifies_bus_when_it_was_unable_to_claim_all_resources(self): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} subject = 'Task' + 'Error' self.rarpc_mock.insertResourceClaims.return_value = {'ids': []} self.resourceAssigner.doAssignment(self.specification_tree) self.assertBusNotificationAndLogging(content, subject) def test_do_assignment_logs_when_it_was_unable_to_claim_some_resources(self): self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]} self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.warning.assert_any_call( 'doAssignment: Not all claims could be inserted. Setting task %s status to conflict' % self.task_id) def test_do_assignment_updates_task_when_it_was_unable_to_claim_some_resources(self): self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]} self.resourceAssigner.doAssignment(self.specification_tree) self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='conflict') def test_do_assignment_notifies_bus_when_it_was_unable_to_claim_some_resources(self): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} subject = 'Task' + 'Conflict' self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]} self.resourceAssigner.doAssignment(self.specification_tree) self.assertBusNotificationAndLogging(content, subject) def test_do_assignment_logs_when_there_are_conflicting_claims(self): conflicting_claims = [{}] self.rarpc_mock.getResourceClaims.return_value = conflicting_claims self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.warning.assert_any_call( 'doAssignment: %s conflicting claims detected. Task cannot be scheduled. %s' % (len(conflicting_claims), conflicting_claims)) def test_do_assignment_notifies_bus_when_there_are_conflicting_claims(self): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} subject = 'Task' + 'Conflict' conflicting_claims = [{}] self.rarpc_mock.getResourceClaims.return_value = conflicting_claims self.resourceAssigner.doAssignment(self.specification_tree) self.assertBusNotificationAndLogging(content, subject) def test_do_assignment_logs_when_all_resources_were_claimed(self): self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call( 'doAssignment: all claims for task %s were succesfully claimed. Setting claim statuses to allocated' % self.task_id) def test_do_assignment_updates_task_and_resources_as_claimed_in_radb(self): self.resourceAssigner.doAssignment(self.specification_tree) self.rarpc_mock.updateTaskAndResourceClaims.assert_any_call(self.task_id, claim_status='allocated') def test_do_assignment_logs_task_data_removal_if_task_is_pipeline(self): self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10} self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call("removing data on disk from previous run for otdb_id %s", self.otdb_id) def test_do_assignment_removes_task_data_if_task_is_pipeline(self): self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10} self.resourceAssigner.doAssignment(self.specification_tree) self.curpc_mock.removeTaskData.assert_any_call(self.task_otdb_id) def test_do_assignment_logs_when_taks_data_could_not_be_deleted(self): message = "file was locked" self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10} self.curpc_mock.removeTaskData.return_value = {'deleted': False, 'message': message} self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.warning.assert_any_call( "could not remove all data on disk from previous run for otdb_id %s: %s", self.otdb_id, message) def test_do_assignment_notifies_bus_when_task_is_scheduled(self): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} subject = 'Task' + 'Scheduled' self.resourceAssigner.doAssignment(self.specification_tree) self.assertBusNotificationAndLogging(content, subject) def assertBusNotificationAndLogging(self, content, subject): self.assertTrue(self.ra_notification_bus_send_called_with(content, ra_notification_prefix + subject)) self.logger_mock.info.assert_any_call('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') def test_do_assignment_logs_exception_from_otdbrpc_taskSetSpecification(self, datetime_mock): self.freeze_time_one_day_in_the_future(datetime_mock) exception = Exception("Error something went wrong") self.otdbrpc_mock.taskSetSpecification.side_effect = exception self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.error.assert_any_call(exception) def test_do_assignment_logs_unparsable_start_time(self): self.specification_tree[u'specification'][u'Observation.startTime'] = "non parse" self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.warning.assert_any_call( 'cannot parse for start/end time from specification for otdb_id=%s. searching for sane defaults...', self.otdb_id) def test_do_assignment_logs_unparsable_stop_time(self): self.specification_tree[u'specification'][u'Observation.stopTime'] = "non parse" self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.warning.assert_any_call( 'cannot parse for start/end time from specification for otdb_id=%s. searching for sane defaults...', self.otdb_id) def test_do_assignment_logs_exception_from_otdbrpc_taskSetSpecification_with_mom_bug(self): exception = Exception("Error something went wrong") self.otdbrpc_mock.taskSetSpecification.side_effect = exception self.resourceAssigner.doAssignment(self.mom_bug_specification_tree) self.logger_mock.error.assert_any_call(exception) def test_do_assignment_logs_exception_from_rerpc(self): exception = Exception("Error something went wrong") self.rerpc_mock.side_effect = exception self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.error.assert_any_call(exception) def test_do_assignment_updates_task_on_exception_from_rerpc(self): exception = Exception("Error something went wrong") self.rerpc_mock.side_effect = exception self.resourceAssigner.doAssignment(self.specification_tree) self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error') def test_do_assignment_notifies_bus_on_exception_from_rerpc(self): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} subject = 'Task' + 'Error' exception = Exception("Error something went wrong") self.rerpc_mock.side_effect = exception self.resourceAssigner.doAssignment(self.specification_tree) self.assertBusNotificationAndLogging(content, subject) def test_do_assignment_logs_when_notifies_bus_thows_exception(self): exception = Exception("Error something went wrong") self.ra_notification_bus_mock.send.side_effect = exception self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.error.assert_any_call(str(exception)) def test_do_assignment_logs_when_momrpc_getPredecessorIds_throws_exception(self): exception = Exception("Error something went wrong") self.momrpc_mock.getPredecessorIds.side_effect = exception self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.error.assert_any_call(exception) def test_do_assignment_logs_when_momrpc_getSuccessorIds_throws_exception(self): exception = Exception("Error something went wrong") self.momrpc_mock.getSuccessorIds.side_effect = exception self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.error.assert_any_call(exception) @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') def test_do_assignment_logs_exception_stop_time_parsing_on_predecessor(self, datetime_mock): self.freeze_time_one_day_in_the_future(datetime_mock) self.specification_tree[u'predecessors'][0]['specification'][u'Observation.stopTime'] = 'non parse' exception = ValueError('time data \'non parse\' does not match format \'%Y-%m-%d %H:%M:%S\'') self.resourceAssigner.doAssignment(self.specification_tree) # workarround because assert_any_call(exception) seems not to work together found = False calls = self.logger_mock.error.call_args_list for args, kwargs in calls: first_argument = args[0] if type(first_argument) is type(exception) and first_argument.args == exception.args: found = True self.assertTrue(found) @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') def test_do_assignment_sets_start_and_end_time_on_unparsable_datetime(self, datetime_mock): now = self.freeze_time_one_day_in_the_future(datetime_mock) self.specification_tree['specification'][u'Observation.stopTime'] = 'non parse' new_starttime = now + datetime.timedelta(minutes=1) new_endtime = new_starttime + datetime.timedelta(hours=1) self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.info.assert_any_call( 'doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s' ' cluster=%s' % (self.mom_id, self.otdb_id, self.state, self.task_type, new_starttime, new_endtime, "CEP4")) if __name__ == '__main__': unittest.main()