diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py index 897d364ed8adac7f7b621b3bdace5b272c186735..f9106c1c1b2caa42fbe0e49a5ae6ecf6230b1b0d 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py @@ -3,11 +3,11 @@ import unittest import mock import datetime -from copy import deepcopy from lofar.sas.resourceassignment.resourceassigner.assignment import ResourceAssigner from lofar.parameterset import parameterset +ra_notification_prefix = "ra_notification_prefix" class TestingResourceAssigner(ResourceAssigner): def __init__(self, rarpc, rerpc, otdbrpc, momrpc, curpc, ra_notification_bus): @@ -18,9 +18,17 @@ class TestingResourceAssigner(ResourceAssigner): self.momrpc = momrpc self.curpc = curpc self.ra_notification_bus = ra_notification_bus - + self.ra_notification_prefix = ra_notification_prefix class ResourceAssignerTest(unittest.TestCase): + mom_id = 351557 + otdb_id = 1290494 + specification_id = 2323 + state = u'prescheduled' + task_type = u'pipeline' + + specification_tree = {} + non_approved_or_prescheduled_status = u'opened' non_approved_or_prescheduled_otdb_id = 1 @@ -67,21 +75,23 @@ class ResourceAssignerTest(unittest.TestCase): task_mom_id = 351543 task_otdb_id = 1290472 task_id = 2299 + task_end_time = datetime.datetime(2016, 3, 25, 22, 47, 31) + task_start_time = datetime.datetime(2016, 3, 25, 21, 47, 31) task = { "mom_id": task_mom_id, "otdb_id": task_otdb_id, "id": task_id, - "endtime": datetime.datetime(2016, 3, 25, 22, 47, 31), + "endtime": task_end_time, "name": "IS HBA_DUAL", "predecessor_ids": [], "project_mom_id": 2, "project_name": "test-lofar", - "specification_id": 2323, - "starttime": datetime.datetime(2016, 3, 25, 21, 47, 31), + "specification_id": specification_id, + "starttime": task_start_time, "status": "prescheduled", "status_id": 350, "successor_ids": [], - "type": "observation", + "type": "pipeline", "type_id": 0 } @@ -102,7 +112,7 @@ class ResourceAssignerTest(unittest.TestCase): "status": "prescheduled", "status_id": 350, "successor_ids": [], - "type": "observation", + "type": "pipeline", "type_id": 0 } @@ -123,120 +133,241 @@ class ResourceAssignerTest(unittest.TestCase): "status": "prescheduled", "status_id": 350, "successor_ids": [], - "type": "observation", + "type": "pipeline", "type_id": 0 } - mom_id = 351557 - otdb_id = 1290494 - state = u'prescheduled' - task_type = u'pipeline' + resources_with_errors_otdb_id = 1290496 + resource_error1 = "error 1" + resource_error2 = "error 2" + unknown_resource_type_name = "fuel" + unknown_resource_type_otdb_id = 123489 - specification_tree = { - u'otdb_id': otdb_id, - u'task_type': task_type, - u'state': state, - u'specification': { - u'Observation.momID': str(mom_id), - u'Observation.startTime': future_start_time, - u'Observation.stopTime': future_stop_time, - u'Observation.DataProducts.Output_InstrumentModel.enabled': False, - u'Observation.VirtualInstrument.stationList': [], - u'Observation.DataProducts.Input_CoherentStokes.enabled': False, - u'Observation.DataProducts.Output_CoherentStokes.enabled': False, - u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0], - u'Observation.antennaSet': u'LBA_INNER', - u'Observation.nrBitsPerSample': u'16', - u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'2', - u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, - u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, - u'Observation.DataProducts.Input_Correlated.enabled': True, - u'Observation.DataProducts.Output_Pulsar.enabled': False, - u'Observation.DataProducts.Input_CoherentStokes.skip': [], - u'Observation.DataProducts.Output_SkyImage.enabled': False, - u'Version.number': u'33774', - u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'2', - u'Observation.nrBeams': u'0', - u'Observation.DataProducts.Input_IncoherentStokes.skip': [], - u'Observation.DataProducts.Output_Correlated.enabled': True, - u'Observation.DataProducts.Output_Correlated.storageClusterName': 'CEP4', - u'Observation.sampleClock': u'200' + resource_unknown_property = 'unknown_property' + + rerpc_status = 0 + rerpc_needed_claim_for_bandwidth_size = 19021319494 + rerpc_needed_claim_for_bandwidth = { + 'total_size': rerpc_needed_claim_for_bandwidth_size + } + + rerpc_needed_claim_for_storage_output_files = { + 'uv': { + 'nr_of_uv_files': 481, + 'uv_file_size': 1482951104 + }, + 'saps': [ + { + 'sap_nr': 0, + 'properties': { + 'nr_of_uv_files': 319 + } + }, + { + 'sap_nr': 1, + 'properties': { + 'nr_of_uv_files': 81, + resource_unknown_property: -1 + } + }, + { + 'sap_nr': 2, + 'properties': { + 'nr_of_uv_files': 81 + } + } + ] + } + rerpc_needed_claim_for_storage_size = 713299481024 + rerpc_needed_claim_for_storage = { + 'total_size': rerpc_needed_claim_for_storage_size, + 'output_files': rerpc_needed_claim_for_storage_output_files + } + rerpc_replymessage = { + str(otdb_id): { + 'pipeline': { + 'bandwidth': rerpc_needed_claim_for_bandwidth, + 'storage': rerpc_needed_claim_for_storage + } + }, + str(resources_with_errors_otdb_id): { + 'pipeline': { + 'bandwidth': { + 'total_size': 19021319494 + }, + 'storage': { + 'total_size': 713299481024, + 'output_files': { + 'uv': { + 'nr_of_uv_files': 481, + 'uv_file_size': 1482951104 + }, + 'saps': [{ + 'sap_nr': 0, + 'properties': { + 'nr_of_uv_files': 319 + } + }, + { + 'sap_nr': 1, + 'properties': { + 'nr_of_uv_files': 81 + } + }, + { + 'sap_nr': 2, + 'properties': { + 'nr_of_uv_files': 81 + } + }] + } + }, + }, + 'errors': [resource_error1, resource_error2] }, - u'task_subtype': u'long baseline pipeline', - u'predecessors': [{ - u'task_subtype': u'averaging pipeline', + str(unknown_resource_type_otdb_id): { + 'pipeline': { + str(unknown_resource_type_name): { + } + } + } + } + + cep4bandwidth_resource_id = 116 + cep4storage_resource_id = 117 + + storage_claim = { + 'resource_id': cep4storage_resource_id, + 'starttime': task_start_time, + 'endtime': task_end_time + datetime.timedelta(days=365), + 'status': 'claimed', + 'claim_size': rerpc_needed_claim_for_storage_size, + 'properties': [ + {'type': 2, 'io_type': 'output', 'value': 481}, + {'type': 10, 'io_type': 'output', 'value': 1482951104}, + {'type': 2, 'io_type': 'output', 'sap_nr': 0, 'value': 319}, + {'type': 2, 'io_type': 'output', 'sap_nr': 1, 'value': 81}, + {'type': 2, 'io_type': 'output', 'sap_nr': 2, 'value': 81} + ] + } + + bandwith_claim = { + 'resource_id': cep4bandwidth_resource_id, + 'starttime': task_start_time, + 'endtime': task_end_time, + 'status': 'claimed', + 'claim_size': rerpc_needed_claim_for_bandwidth_size + } + + specifaction_claims = [bandwith_claim, storage_claim] + + def reset_specification_tree(self): + self.specification_tree = { + u'otdb_id': self.otdb_id, + u'task_type': self.task_type, + u'state': self.state, u'specification': { + u'Observation.momID': str(self.mom_id), + u'Observation.startTime': self.future_start_time, + u'Observation.stopTime': self.future_stop_time, u'Observation.DataProducts.Output_InstrumentModel.enabled': False, - u'Observation.stopTime': u'2016-03-25 13:51:05', u'Observation.VirtualInstrument.stationList': [], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': False, - u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0], u'Observation.antennaSet': u'LBA_INNER', u'Observation.nrBitsPerSample': u'16', - u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'1', + u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'2', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_Correlated.enabled': True, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], - u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep': u'10', + u'Observation.DataProducts.Output_SkyImage.enabled': False, u'Version.number': u'33774', - u'Observation.momID': u'351556', - u'Observation.startTime': u'2016-03-25 13:49:55', - u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'1', + u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'2', u'Observation.nrBeams': u'0', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], - u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep': u'64', u'Observation.DataProducts.Output_Correlated.enabled': True, + u'Observation.DataProducts.Output_Correlated.storageClusterName': 'CEP4', u'Observation.sampleClock': u'200' }, - u'task_type': u'pipeline', - u'otdb_id': 1290496, + u'task_subtype': u'long baseline pipeline', u'predecessors': [{ - u'task_subtype': u'bfmeasurement', + u'task_subtype': u'averaging pipeline', u'specification': { u'Observation.DataProducts.Output_InstrumentModel.enabled': False, - u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor': u'1', - u'Observation.stopTime': u'2016-03-26 00:33:31', - u'Observation.VirtualInstrument.stationList': [u'RS205', u'RS503', u'CS013', u'RS508', u'RS106'], + u'Observation.stopTime': u'2016-03-25 13:51:05', + u'Observation.VirtualInstrument.stationList': [], u'Observation.DataProducts.Input_CoherentStokes.enabled': False, u'Observation.DataProducts.Output_CoherentStokes.enabled': False, - u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband': u'64', - u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which': u'I', - u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which': u'I', - u'Observation.Beam[0].subbandList': [100, 101, 102, 103], - u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile': u'512', - u'Observation.DataProducts.Input_Correlated.skip': [], - u'Observation.antennaSet': u'HBA_DUAL', - u'Observation.nrBitsPerSample': u'8', - u'Observation.Beam[0].nrTabRings': u'0', - u'Observation.Beam[0].nrTiedArrayBeams': u'0', - u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye': False, - u'Observation.nrBeams': u'1', - u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime': u'1.0', + u'Observation.DataProducts.Output_SkyImage.enabled': False, + u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0], + u'Observation.antennaSet': u'LBA_INNER', + u'Observation.nrBitsPerSample': u'16', + u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'1', u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, - u'Observation.DataProducts.Input_Correlated.enabled': False, + u'Observation.DataProducts.Input_Correlated.enabled': True, u'Observation.DataProducts.Output_Pulsar.enabled': False, u'Observation.DataProducts.Input_CoherentStokes.skip': [], - u'Observation.DataProducts.Output_SkyImage.enabled': False, + u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep': u'10', u'Version.number': u'33774', - u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor': u'1', - u'Observation.momID': u'351539', - u'Observation.startTime': u'2016-03-26 00:31:31', - u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile': u'512', + u'Observation.momID': u'351556', + u'Observation.startTime': u'2016-03-25 13:49:55', + u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'1', + u'Observation.nrBeams': u'0', u'Observation.DataProducts.Input_IncoherentStokes.skip': [], + u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep': u'64', u'Observation.DataProducts.Output_Correlated.enabled': True, u'Observation.sampleClock': u'200' }, - u'task_type': u'observation', - u'otdb_id': 1290476, - u'predecessors': [] + u'task_type': u'pipeline', + u'otdb_id': 1290496, + u'predecessors': [{ + u'task_subtype': u'bfmeasurement', + u'specification': { + u'Observation.DataProducts.Output_InstrumentModel.enabled': False, + u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor': u'1', + u'Observation.stopTime': u'2016-03-26 00:33:31', + u'Observation.VirtualInstrument.stationList': [u'RS205', u'RS503', u'CS013', u'RS508', + u'RS106'], + u'Observation.DataProducts.Input_CoherentStokes.enabled': False, + u'Observation.DataProducts.Output_CoherentStokes.enabled': False, + u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband': u'64', + u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which': u'I', + u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which': u'I', + u'Observation.Beam[0].subbandList': [100, 101, 102, 103], + u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile': u'512', + u'Observation.DataProducts.Input_Correlated.skip': [], + u'Observation.antennaSet': u'HBA_DUAL', + u'Observation.nrBitsPerSample': u'8', + u'Observation.Beam[0].nrTabRings': u'0', + u'Observation.Beam[0].nrTiedArrayBeams': u'0', + u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye': False, + u'Observation.nrBeams': u'1', + u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime': u'1.0', + u'Observation.DataProducts.Output_IncoherentStokes.enabled': False, + u'Observation.DataProducts.Input_IncoherentStokes.enabled': False, + u'Observation.DataProducts.Input_Correlated.enabled': False, + u'Observation.DataProducts.Output_Pulsar.enabled': False, + u'Observation.DataProducts.Input_CoherentStokes.skip': [], + u'Observation.DataProducts.Output_SkyImage.enabled': False, + u'Version.number': u'33774', + u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor': u'1', + u'Observation.momID': u'351539', + u'Observation.startTime': u'2016-03-26 00:31:31', + u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile': u'512', + u'Observation.DataProducts.Input_IncoherentStokes.skip': [], + u'Observation.DataProducts.Output_Correlated.enabled': True, + u'Observation.sampleClock': u'200' + }, + u'task_type': u'observation', + u'otdb_id': 1290476, + u'predecessors': [] + }] }] - }] - } + } def setUp(self): def get_task_side_effect(*args, **kwargs): @@ -257,10 +388,58 @@ class ResourceAssignerTest(unittest.TestCase): self.addCleanup(rarpc_patcher.stop) self.rarpc_mock = rarpc_patcher.start() self.rarpc_mock.getTask.side_effect = get_task_side_effect + self.rarpc_mock.insertSpecificationAndTask.return_value = { + 'inserted': True, + 'specification_id': self.specification_id, + 'task_id': self.task_id + } + self.rarpc_mock.getResourceClaimPropertyTypes.return_value = [ + {'id': 0, 'name': 'nr_of_is_files'}, + {'id': 1, 'name': 'nr_of_cs_files'}, + {'id': 2, 'name': 'nr_of_uv_files'}, + {'id': 3, 'name': 'nr_of_im_files'}, + {'id': 4, 'name': 'nr_of_img_files'}, + {'id': 5, 'name': 'nr_of_pulp_files'}, + {'id': 6, 'name': 'nr_of_cs_stokes'}, + {'id': 7, 'name': 'nr_of_is_stokes'}, + {'id': 8, 'name': 'is_file_size'}, + {'id': 9, 'name': 'cs_file_size'}, + {'id': 10, 'name': 'uv_file_size'}, + {'id': 11, 'name': 'im_file_size'}, + {'id': 12, 'name': 'img_file_size'}, + {'id': 13, 'name': 'nr_of_pulp_files'}, + {'id': 14, 'name': 'nr_of_tabs'}, + {'id': 15, 'name': 'start_sb_nr'}, + {'id': 16, 'name': 'uv_otdb_id'}, + {'id': 17, 'name': 'cs_otdb_id'}, + {'id': 18, 'name': 'is_otdb_id'}, + {'id': 19, 'name': 'im_otdb_id'}, + {'id': 20, 'name': 'img_otdb_id'}, + {'id': 21, 'name': 'pulp_otdb_id'}, + {'id': 22, 'name': 'is_tab_nr'}, + {'id': 23, 'name': 'start_sbg_nr'} + ] + self.rarpc_mock.getResourceTypes.return_value = [ + {'id': 0, 'name': 'rsp', 'unit_id': 0, 'units': 'rsp_channel_bit'}, + {'id': 1, 'name': 'tbb', 'unit_id': 1, 'units': 'bytes'}, + {'id': 2, 'name': 'rcu', 'unit_id': 2, 'units': 'rcu_board'}, + {'id': 3, 'name': 'bandwidth', 'unit_id': 3, 'units': 'bits/second'}, + {'id': 4, 'name': 'processor', 'unit_id': 4, 'units': 'cores'}, + {'id': 5, 'name': 'storage', 'unit_id': 1, 'units': 'bytes'}, + ] + self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1, 2]} + + # incomplete response but good enough for tests + self.rarpc_mock.getResources.return_value = [ + {'id': self.cep4bandwidth_resource_id, 'name': 'cep4bandwidth', 'type_id': 3, 'type_name': 'bandwidth', 'unit_id': 3, 'unit': 'bits/second'}, + {'id': self.cep4storage_resource_id, 'name': 'cep4storage', 'type_id': 5, 'type_name': 'storage', 'unit_id': 1, 'unit': 'bytes'} + ] + self.rarpc_mock.getResourceClaims.return_value = [] rerpc_patcher = mock.patch('lofar.messaging.RPC') self.addCleanup(rerpc_patcher.stop) self.rerpc_mock = rerpc_patcher.start() + self.rerpc_mock.return_value = self.rerpc_replymessage, self.rerpc_status otdbrpc_patcher = mock.patch('lofar.sas.otdb.otdbrpc') self.addCleanup(otdbrpc_patcher.stop) @@ -292,6 +471,7 @@ class ResourceAssignerTest(unittest.TestCase): self.resourceAssigner = TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock, self.otdbrpc_mock, self.momrpc_mock, self.curpc_mock, self.ra_notification_bus_mock) + self.reset_specification_tree() def assert_all_services_opened(self): self.assertTrue(self.rarpc_mock.open.called, "RARPC.open was not called") @@ -494,21 +674,20 @@ class ResourceAssignerTest(unittest.TestCase): @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') def test_do_assignment_should_reset_observation_period_when_in_past_with_predecessor_in_future(self, datetime_mock): now = datetime.datetime.utcnow() + datetime.timedelta(days=1) - now = self.strip_ms(now) + now = self._strip_ms(now) datetime_mock.utcnow.return_value = now datetime_mock.strptime.side_effect = \ lambda date_string, format_string: datetime.datetime.strptime(date_string, format_string) future_predecessor_stop_time = now + datetime.timedelta(hours=1) - tree_with_predecessor_observation_stop_time = deepcopy(self.specification_tree) - tree_with_predecessor_observation_stop_time['predecessors'][0]['specification']['Observation.stopTime'] = \ + self.specification_tree['predecessors'][0]['specification']['Observation.stopTime'] = \ future_predecessor_stop_time.strftime('%Y-%m-%d %H:%M:%S') new_starttime = future_predecessor_stop_time + datetime.timedelta(minutes=1) new_endtime = new_starttime + datetime.timedelta(hours=1) - self.resourceAssigner.doAssignment(tree_with_predecessor_observation_stop_time) + self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.warning.assert_any_call( 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', @@ -525,7 +704,7 @@ class ResourceAssignerTest(unittest.TestCase): 'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S') }) - def strip_ms(self, now): + def _strip_ms(self, now): return datetime.datetime.strptime(now.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S') @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime') @@ -534,14 +713,13 @@ class ResourceAssignerTest(unittest.TestCase): datetime_mock.utcnow.return_value = now - tree_with_observation_duraction = deepcopy(self.specification_tree) duration = 100 - tree_with_observation_duraction['specification']['Observation.Scheduler.taskDuration'] = duration + self.specification_tree['specification']['Observation.Scheduler.taskDuration'] = duration new_starttime = now + datetime.timedelta(minutes=1) new_endtime = new_starttime + datetime.timedelta(seconds=duration) - self.resourceAssigner.doAssignment(tree_with_observation_duraction) + self.resourceAssigner.doAssignment(self.specification_tree) self.logger_mock.warning.assert_any_call( 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', @@ -558,18 +736,276 @@ class ResourceAssignerTest(unittest.TestCase): 'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S') }) + def test_do_assignment_should_log_insertion_of_specification_and_task(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call( + 'doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s' + ' cluster=%s' % + (self.mom_id, self.otdb_id, self.state, self.task_type, self.future_start_time, self.future_stop_time, + "CEP4")) + + def test_do_assignment_should_log_when_insertion_of_specification_and_task_is_done(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call('doAssignment: inserted specification (id=%s) and task (id=%s)' % + (self.specification_id, self.task_id)) + def test_do_assignment_should_log_on_non_presceduled_cep4_tasks(self): self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree) self.logger_mock.info.assert_any_call( 'skipping resource assignment for CEP4 task otdb_id=%s because status=%s' % (self.non_approved_or_prescheduled_otdb_id, - self.non_approved_or_prescheduled_status)) + self.non_approved_or_prescheduled_status)) def test_do_assignment_should_not_claim_resources_on_non_presceduled_cep4_tasks(self): - self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree) + self.resourceAssigner.doAssignment( + self.non_approved_or_prescheduled_specification_tree) + + self.rarpc_mock.insertResourceClaims.assert_not_called() + + def test_do_assginement_should_request_needed_resources(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.rerpc_mock.assert_any_call( + {"specification_tree": self.specification_tree}, timeout=10) + + def test_do_assignemnet_logs_needed_resources(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info('getNeededResouces: %s' % self.rerpc_replymessage) + + def test_do_assignment_logs_when_otdb_id_not_needed_resources(self): + self.specification_tree["otdb_id"] = self.otdb_id + 1 + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.error.assert_any_call("no otdb_id %s found in estimator results %s" % + (self.otdb_id + 1, self.rerpc_replymessage)) + + def test_do_assignment_should_not_claim_resouces_when_otdb_id_not_needed_resources(self): + self.specification_tree["otdb_id"] = self.otdb_id + 1 + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.rarpc_mock.insertResourceClaims.assert_not_called() + + def test_do_assignment_logs_when_task_type_not_in_needed_resources(self): + wrong_task_type = "observation" + self.specification_tree["task_type"] = wrong_task_type + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.error.assert_any_call("no task type %s found in estimator results %s" % + (wrong_task_type, + self.rerpc_replymessage[str(self.otdb_id)])) + + def test_do_assignment_should_not_claim_resources_when_task_type_not_in_needed_resources(self): + wrong_task_type = "observation" + self.specification_tree["task_type"] = wrong_task_type + + self.resourceAssigner.doAssignment(self.specification_tree) self.rarpc_mock.insertResourceClaims.assert_not_called() + def test_do_assignment_should_log_error_in_needed_resources(self): + self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.error.assert_any_call( + "Error(s) in estimator for otdb_id=%s radb_id=%s, setting task status to 'error'", + self.resources_with_errors_otdb_id, + self.task_id) + + def test_do_assignment_should_update_task_with_error_on_errors_in_needed_resources(self): + self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.rarpc_mock.updateTask.assert_any_call(self.task_id, status='error') + + def test_do_assignment_should_notify_bus_on_errors_in_needed_resources(self): + content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} + subject = ra_notification_prefix + 'TaskError' + + self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.assertTrue(self.ra_notification_bus_send_called_with(content, subject)) + + def ra_notification_bus_send_called_with(self, content, subject): + found = False + for call in self.ra_notification_bus_mock.send.call_args_list: + if call[0][0].subject == subject and call[0][0].content == content: + found = True + return found + + def test_do_assignment_should_log_start_of_claim_resources(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call('claimResources: task %s needed_resources=%s' % + (self.task, self.rerpc_replymessage[str(str(self.otdb_id))])) + + def test_do_assigment_should_log_when_claiming_unknown_resource_type(self): + self.specification_tree["otdb_id"] = self.unknown_resource_type_otdb_id + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.error.assert_any_call('claimResources: unknown resource_type:%s' % + self.unknown_resource_type_name) + + def test_do_assignment_logs_claimable_resources_in_specification(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call('claimResources: processing resource_type: %s contents: %s' + % ('storage', self.rerpc_needed_claim_for_storage)) + self.logger_mock.info.assert_any_call('claimResources: processing resource_type: %s contents: %s' + % ('bandwidth', self.rerpc_needed_claim_for_bandwidth)) + + def test_do_assignment_logs_created_claim_per_needed_resource_type(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call('claimResources: created claim:%s' % self.storage_claim) + self.logger_mock.info.assert_any_call('claimResources: created claim:%s' % self.bandwith_claim) + + def test_do_assignment_logs_amount_of_claims_inserted_in_radb(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call('claimResources: inserting %d claims in the radb' % 2) + + def test_do_assignment_inserts_resource_claims_in_radb(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.rarpc_mock.insertResourceClaims.assert_any_call(self.task_id, self.specifaction_claims, 1, 'anonymous', -1) + + def test_do_assignment_logs_amount_claims_inserted(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call('claimResources: %d claims were inserted in the radb' % 2) + + def test_do_assignment_logs_unknown_property_on_needed_resources(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.error.assert_any_call('claimResources: unknown prop_type:%s' % self.resource_unknown_property) + + def test_do_assignment_logs_multiple_properties_on_needed_resource(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call( + 'claimResources: processing resource_type: %s subdict_name: \'%s\' subdict_contents: %s' % + ("storage", "output_files", self.rerpc_needed_claim_for_storage_output_files)) + + def test_do_assignment_logs_when_it_was_unable_to_claim_all_resources(self): + self.rarpc_mock.insertResourceClaims.return_value = {'ids': []} + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.warning.assert_any_call( + 'doAssignment: No claims could be made. Setting task %s status to error' % self.task_id) + + def test_do_assignment_updates_task_when_it_was_unable_to_claim_all_resources(self): + self.rarpc_mock.insertResourceClaims.return_value = {'ids': []} + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.rarpc_mock.updateTask.assert_any_call(self.task_id, status='error') + + def test_do_assignment_notifies_bus_when_it_was_unable_to_claim_all_resources(self): + content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} + subject = ra_notification_prefix + 'Task' + 'Error' + + self.rarpc_mock.insertResourceClaims.return_value = {'ids': []} + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.assertTrue(self.ra_notification_bus_send_called_with(content, subject)) + + def test_do_assignment_logs_when_it_was_unable_to_claim_some_resources(self): + self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]} + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.warning.assert_any_call( + 'doAssignment: Not all claims could be inserted. Setting task %s status to conflict' % self.task_id) + + def test_do_assignment_updates_task_when_it_was_unable_to_claim_some_resources(self): + self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]} + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.rarpc_mock.updateTask.assert_any_call(self.task_id, status='conflict') + + def test_do_assignment_notifies_bus_when_it_was_unable_to_claim_some_resources(self): + content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} + subject = ra_notification_prefix + 'Task' + 'Conflict' + + self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]} + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.assertTrue(self.ra_notification_bus_send_called_with(content, subject)) + + def test_do_assignment_logs_when_there_are_conflicting_claims(self): + conflicting_claims = [{}] + + self.rarpc_mock.getResourceClaims.return_value = conflicting_claims + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.warning.assert_any_call( + 'doAssignment: %s conflicting claims detected. Task cannot be scheduled. %s' % + (len(conflicting_claims), conflicting_claims)) + + def test_do_assignment_notifies_bus_when_there_are_conflicting_claims(self): + content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} + subject = ra_notification_prefix + 'Task' + 'Conflict' + + conflicting_claims = [{}] + self.rarpc_mock.getResourceClaims.return_value = conflicting_claims + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.assertTrue(self.ra_notification_bus_send_called_with(content, subject)) + + def test_do_assignment_logs_when_all_resources_were_claimed(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.info.assert_any_call( + 'doAssignment: all claims for task %s were succesfully claimed. Setting claim statuses to allocated' % + self.task_id) + + def test_do_assignment_updates_task_and_resources_as_claimed_in_radb(self): + self.resourceAssigner.doAssignment(self.specification_tree) + + self.rarpc_mock.updateTaskAndResourceClaims.assert_any_call(self.task_id, claim_status='allocated') + + def test_do_assignment_removes_task_data_if_task_is_pipeline(self): + self.curpc_mock.getPathForOTDBId.return_value = {'found': True} + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.curpc_mock.removeTaskData.assert_any_call(self.task_otdb_id) + + def test_do_assignment_logs_when_taks_data_could_not_be_deleted(self): + message = "file was locked" + self.curpc_mock.getPathForOTDBId.return_value = {'found': True} + self.curpc_mock.removeTaskData.return_value = {'deleted': False, 'message': message} + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.logger_mock.warning.assert_any_call( + "could not remove all data on disk from previous run for otdb_id %s: %s", self.otdb_id, message) + + def test_do_assignment_notifies_bus_when_task_is_scheduled(self): + content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} + subject = ra_notification_prefix + 'Task' + 'Scheduled' + + self.resourceAssigner.doAssignment(self.specification_tree) + + self.assertTrue(self.ra_notification_bus_send_called_with(content, subject)) + if __name__ == '__main__': unittest.main()