Select Git revision
t_resourceassigner.py
-
Alexander van Amesfoort authored
Fix merge conflict on momqueryservice.py. Moved on trunk, but changed on this branch. Solve by replacing the moved file with the changed file (thus, move the changed file over the moved file). Careful checking of (JD's) changes shows this is correct; the changes are preserved. Other moved files did not cause conflicts, since they were not changed.
Alexander van Amesfoort authoredFix merge conflict on momqueryservice.py. Moved on trunk, but changed on this branch. Solve by replacing the moved file with the changed file (thus, move the changed file over the moved file). Careful checking of (JD's) changes shows this is correct; the changes are preserved. Other moved files did not cause conflicts, since they were not changed.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
t_resourceassigner.py 55.89 KiB
#!/usr/bin/env python
import unittest
import mock
import datetime
from lofar.sas.resourceassignment.resourceassigner.assignment import ResourceAssigner
from lofar.parameterset import parameterset
ra_notification_prefix = "ra_notification_prefix"
class TestingResourceAssigner(ResourceAssigner):
def __init__(self, rarpc, rerpc, otdbrpc, momrpc, curpc, sqrpc, ra_notification_bus):
# super gets not done to be able to insert mocks as early as possible otherwise the RPC block unittesting
self.radbrpc = rarpc
self.rerpc = rerpc
self.otdbrpc = otdbrpc
self.momrpc = momrpc
self.curpc = curpc
self.sqrpc = sqrpc
self.ra_notification_bus = ra_notification_bus
self.ra_notification_prefix = ra_notification_prefix
class ResourceAssignerTest(unittest.TestCase):
mom_id = 351557
otdb_id = 1290494
specification_id = 2323
state = u'prescheduled'
task_type = u'pipeline'
specification_tree = {}
non_approved_or_prescheduled_status = u'opened'
non_approved_or_prescheduled_otdb_id = 1
future_start_time = (datetime.datetime.utcnow() + datetime.timedelta(hours=1)).strftime('%Y-%m-%d %H:%M:%S')
future_stop_time = (datetime.datetime.utcnow() + datetime.timedelta(hours=2)).strftime('%Y-%m-%d %H:%M:%S')
non_approved_or_prescheduled_specification_tree = {
u'otdb_id': non_approved_or_prescheduled_otdb_id,
u'task_type': u'pipeline',
u'state': non_approved_or_prescheduled_status,
u'specification': {
u'Observation.startTime': future_start_time,
u'Observation.stopTime': future_stop_time
}
}
approved_status = u'approved'
approved_otdb_id = 22
approved_specification_tree = {
u'otdb_id': approved_otdb_id,
u'task_type': u'pipeline',
u'state': approved_status,
u'specification': {
u'Observation.startTime': future_start_time,
u'Observation.stopTime': future_stop_time
}
}
cep2_specification_tree = {
u'otdb_id': otdb_id,
u'task_type': u'pipeline',
u'state': u'prescheduled',
u'specification': {
u'Observation.startTime': future_start_time,
u'Observation.stopTime': future_stop_time,
u'Observation.DataProducts.Output_Pulsar.enabled': True,
u'Observation.DataProducts.Output_Pulsar.storageClusterName': u'CEP2'
}
}
mom_bug_processing_cluster_name = 'CEP2'
mom_bug_otbd_id = 1234
mom_bug_specification_tree = {
u'otdb_id': mom_bug_otbd_id,
u'task_type': u'pipeline',
u'state': u'prescheduled',
u'specification': {
u'Observation.startTime': future_start_time,
u'Observation.stopTime': future_stop_time,
u'Observation.DataProducts.Output_Pulsar.enabled': True,
u'Observation.DataProducts.Output_Pulsar.storageClusterName': u'CEP4',
u'Observation.Cluster.ProcessingCluster.clusterName': mom_bug_processing_cluster_name
}
}
task_mom_id = 351543
task_otdb_id = 1290472
task_id = 2299
task_end_time = datetime.datetime(2016, 3, 25, 22, 47, 31)
task_start_time = datetime.datetime(2016, 3, 25, 21, 47, 31)
task = {
"mom_id": task_mom_id,
"otdb_id": task_otdb_id,
"id": task_id,
"endtime": task_end_time,
"name": "IS HBA_DUAL",
"predecessor_ids": [],
"project_mom_id": 2,
"project_name": "test-lofar",
"specification_id": specification_id,
"starttime": task_start_time,
"status": "prescheduled",
"status_id": 350,
"successor_ids": [],
"type": "pipeline",
"type_id": 0
}
non_existing_task_mom_id = -1
predecessor_task_mom_id = 1
predecessor_task_otdb_id = 2
predecessor_task_id = 3
predecessor_task = {
"mom_id": predecessor_task_mom_id,
"otdb_id": predecessor_task_otdb_id,
"id": predecessor_task_id,
"endtime": datetime.datetime(2016, 3, 25, 22, 47, 31),
"name": "IS HBA_DUAL",
"predecessor_ids": [],
"project_mom_id": 2,
"project_name": "test-lofar",
"specification_id": 2323,
"starttime": datetime.datetime(2016, 3, 25, 21, 47, 31),
"status": "prescheduled",
"status_id": 350,
"successor_ids": [],
"type": "pipeline",
"type_id": 0
}
successor_task_mom_id = 4
successor_task_otdb_id = 5
successor_task_id = 6
successor_task = {
"mom_id": successor_task_mom_id,
"otdb_id": successor_task_otdb_id,
"id": successor_task_id,
"endtime": datetime.datetime(2016, 3, 25, 22, 47, 31),
"name": "IS HBA_DUAL",
"predecessor_ids": [],
"project_mom_id": 2,
"project_name": "test-lofar",
"specification_id": 2323,
"starttime": datetime.datetime(2016, 3, 25, 21, 47, 31),
"status": "prescheduled",
"status_id": 350,
"successor_ids": [],
"type": "pipeline",
"type_id": 0
}
resources_with_errors_otdb_id = 1290496
resource_error1 = "error 1"
resource_error2 = "error 2"
unknown_resource_type_name = "fuel"
unknown_resource_type_otdb_id = 123489
resource_unknown_property = 'unknown_property'
rerpc_status = 0
rerpc_needed_claim_for_bandwidth_size = 19021319494
rerpc_needed_claim_for_bandwidth = {
'total_size': rerpc_needed_claim_for_bandwidth_size
}
rerpc_needed_claim_for_storage_output_files = {
'uv': {
'nr_of_uv_files': 481,
'uv_file_size': 1482951104
},
'saps': [
{
'sap_nr': 0,
'properties': {
'nr_of_uv_files': 319
}
},
{
'sap_nr': 1,
'properties': {
'nr_of_uv_files': 81,
resource_unknown_property: -1
}
},
{
'sap_nr': 2,
'properties': {
'nr_of_uv_files': 81
}
}
]
}
rerpc_needed_claim_for_storage_size = 713299481024
rerpc_needed_claim_for_storage = {
'total_size': rerpc_needed_claim_for_storage_size,
'output_files': rerpc_needed_claim_for_storage_output_files
}
rerpc_replymessage = {
str(otdb_id): {
'pipeline': {
'bandwidth': rerpc_needed_claim_for_bandwidth,
'storage': rerpc_needed_claim_for_storage
}
},
str(resources_with_errors_otdb_id): {
'pipeline': {
'bandwidth': {
'total_size': 19021319494
},
'storage': {
'total_size': 713299481024,
'output_files': {
'uv': {
'nr_of_uv_files': 481,
'uv_file_size': 1482951104
},
'saps': [{
'sap_nr': 0,
'properties': {
'nr_of_uv_files': 319
}
},
{
'sap_nr': 1,
'properties': {
'nr_of_uv_files': 81
}
},
{
'sap_nr': 2,
'properties': {
'nr_of_uv_files': 81
}
}]
}
},
},
'errors': [resource_error1, resource_error2]
},
str(unknown_resource_type_otdb_id): {
'pipeline': {
str(unknown_resource_type_name): {
}
}
}
}
cep4bandwidth_resource_id = 116
cep4storage_resource_id = 117
storage_claim = {
'resource_id': cep4storage_resource_id,
'starttime': task_start_time,
'endtime': task_end_time + datetime.timedelta(days=365),
'status': 'claimed',
'claim_size': rerpc_needed_claim_for_storage_size,
'properties': [
{'type': 2, 'io_type': 'output', 'value': 481},
{'type': 10, 'io_type': 'output', 'value': 1482951104},
{'type': 2, 'io_type': 'output', 'sap_nr': 0, 'value': 319},
{'type': 2, 'io_type': 'output', 'sap_nr': 1, 'value': 81},
{'type': 2, 'io_type': 'output', 'sap_nr': 2, 'value': 81}
]
}
bandwith_claim = {
'resource_id': cep4bandwidth_resource_id,
'starttime': task_start_time,
'endtime': task_end_time,
'status': 'claimed',
'claim_size': rerpc_needed_claim_for_bandwidth_size
}
specifaction_claims = [bandwith_claim, storage_claim]
def reset_specification_tree(self):
self.specification_tree = {
u'otdb_id': self.otdb_id,
u'task_type': self.task_type,
u'state': self.state,
u'specification': {
u'Observation.momID': str(self.mom_id),
u'Observation.startTime': self.future_start_time,
u'Observation.stopTime': self.future_stop_time,
u'Observation.DataProducts.Output_InstrumentModel.enabled': False,
u'Observation.VirtualInstrument.stationList': [],
u'Observation.DataProducts.Input_CoherentStokes.enabled': False,
u'Observation.DataProducts.Output_CoherentStokes.enabled': False,
u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0],
u'Observation.antennaSet': u'LBA_INNER',
u'Observation.nrBitsPerSample': u'16',
u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'2',
u'Observation.DataProducts.Output_IncoherentStokes.enabled': False,
u'Observation.DataProducts.Input_IncoherentStokes.enabled': False,
u'Observation.DataProducts.Input_Correlated.enabled': True,
u'Observation.DataProducts.Output_Pulsar.enabled': False,
u'Observation.DataProducts.Input_CoherentStokes.skip': [],
u'Observation.DataProducts.Output_SkyImage.enabled': False,
u'Version.number': u'33774',
u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'2',
u'Observation.nrBeams': u'0',
u'Observation.DataProducts.Input_IncoherentStokes.skip': [],
u'Observation.DataProducts.Output_Correlated.enabled': True,
u'Observation.DataProducts.Output_Correlated.storageClusterName': 'CEP4',
u'Observation.sampleClock': u'200',
u'Observation.Cluster.ProcessingCluster.clusterName': 'CEP4'
},
u'task_subtype': u'long baseline pipeline',
u'predecessors': [{
u'task_subtype': u'averaging pipeline',
u'specification': {
u'Observation.DataProducts.Output_InstrumentModel.enabled': False,
u'Observation.stopTime': u'2016-03-25 13:51:05',
u'Observation.VirtualInstrument.stationList': [],
u'Observation.DataProducts.Input_CoherentStokes.enabled': False,
u'Observation.DataProducts.Output_CoherentStokes.enabled': False,
u'Observation.DataProducts.Output_SkyImage.enabled': False,
u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0],
u'Observation.antennaSet': u'LBA_INNER',
u'Observation.nrBitsPerSample': u'16',
u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'1',
u'Observation.DataProducts.Output_IncoherentStokes.enabled': False,
u'Observation.DataProducts.Input_IncoherentStokes.enabled': False,
u'Observation.DataProducts.Input_Correlated.enabled': True,
u'Observation.DataProducts.Output_Pulsar.enabled': False,
u'Observation.DataProducts.Input_CoherentStokes.skip': [],
u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep': u'10',
u'Version.number': u'33774',
u'Observation.momID': u'351556',
u'Observation.startTime': u'2016-03-25 13:49:55',
u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'1',
u'Observation.nrBeams': u'0',
u'Observation.DataProducts.Input_IncoherentStokes.skip': [],
u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep': u'64',
u'Observation.DataProducts.Output_Correlated.enabled': True,
u'Observation.sampleClock': u'200'
},
u'task_type': u'pipeline',
u'otdb_id': 1290496,
u'predecessors': [{
u'task_subtype': u'bfmeasurement',
u'specification': {
u'Observation.DataProducts.Output_InstrumentModel.enabled': False,
u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor': u'1',
u'Observation.stopTime': u'2016-03-26 00:33:31',
u'Observation.VirtualInstrument.stationList': [u'RS205', u'RS503', u'CS013', u'RS508',
u'RS106'],
u'Observation.DataProducts.Input_CoherentStokes.enabled': False,
u'Observation.DataProducts.Output_CoherentStokes.enabled': False,
u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband': u'64',
u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which': u'I',
u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which': u'I',
u'Observation.Beam[0].subbandList': [100, 101, 102, 103],
u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile': u'512',
u'Observation.DataProducts.Input_Correlated.skip': [],
u'Observation.antennaSet': u'HBA_DUAL',
u'Observation.nrBitsPerSample': u'8',
u'Observation.Beam[0].nrTabRings': u'0',
u'Observation.Beam[0].nrTiedArrayBeams': u'0',
u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye': False,
u'Observation.nrBeams': u'1',
u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime': u'1.0',
u'Observation.DataProducts.Output_IncoherentStokes.enabled': False,
u'Observation.DataProducts.Input_IncoherentStokes.enabled': False,
u'Observation.DataProducts.Input_Correlated.enabled': False,
u'Observation.DataProducts.Output_Pulsar.enabled': False,
u'Observation.DataProducts.Input_CoherentStokes.skip': [],
u'Observation.DataProducts.Output_SkyImage.enabled': False,
u'Version.number': u'33774',
u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor': u'1',
u'Observation.momID': u'351539',
u'Observation.startTime': u'2016-03-26 00:31:31',
u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile': u'512',
u'Observation.DataProducts.Input_IncoherentStokes.skip': [],
u'Observation.DataProducts.Output_Correlated.enabled': True,
u'Observation.sampleClock': u'200'
},
u'task_type': u'observation',
u'otdb_id': 1290476,
u'predecessors': []
}]
}]
}
def setUp(self):
def get_task_side_effect(*args, **kwargs):
if 'mom_id' in kwargs:
if kwargs['mom_id'] == self.successor_task_mom_id:
return self.successor_task
elif kwargs['mom_id'] == self.predecessor_task_mom_id:
return self.predecessor_task
elif kwargs['mom_id'] == self.non_existing_task_mom_id:
return None
else:
return self.task
else:
return self.task
self.successor_task_mom_ids = [self.successor_task_mom_id]
self.predecessor_task_mom_ids = [self.predecessor_task_mom_id]
rarpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC')
self.addCleanup(rarpc_patcher.stop)
self.rarpc_mock = rarpc_patcher.start()
self.rarpc_mock.getTask.side_effect = get_task_side_effect
self.rarpc_mock.insertSpecificationAndTask.return_value = {
'inserted': True,
'specification_id': self.specification_id,
'task_id': self.task_id
}
self.rarpc_mock.getResourceClaimPropertyTypes.return_value = [
{'id': 0, 'name': 'nr_of_is_files'},
{'id': 1, 'name': 'nr_of_cs_files'},
{'id': 2, 'name': 'nr_of_uv_files'},
{'id': 3, 'name': 'nr_of_im_files'},
{'id': 4, 'name': 'nr_of_img_files'},
{'id': 5, 'name': 'nr_of_pulp_files'},
{'id': 6, 'name': 'nr_of_cs_stokes'},
{'id': 7, 'name': 'nr_of_is_stokes'},
{'id': 8, 'name': 'is_file_size'},
{'id': 9, 'name': 'cs_file_size'},
{'id': 10, 'name': 'uv_file_size'},
{'id': 11, 'name': 'im_file_size'},
{'id': 12, 'name': 'img_file_size'},
{'id': 13, 'name': 'nr_of_pulp_files'},
{'id': 14, 'name': 'nr_of_tabs'},
{'id': 15, 'name': 'start_sb_nr'},
{'id': 16, 'name': 'uv_otdb_id'},
{'id': 17, 'name': 'cs_otdb_id'},
{'id': 18, 'name': 'is_otdb_id'},
{'id': 19, 'name': 'im_otdb_id'},
{'id': 20, 'name': 'img_otdb_id'},
{'id': 21, 'name': 'pulp_otdb_id'},
{'id': 22, 'name': 'is_tab_nr'},
{'id': 23, 'name': 'start_sbg_nr'}
]
self.rarpc_mock.getResourceTypes.return_value = [
{'id': 0, 'name': 'rsp', 'unit_id': 0, 'units': 'rsp_channel_bit'},
{'id': 1, 'name': 'tbb', 'unit_id': 1, 'units': 'bytes'},
{'id': 2, 'name': 'rcu', 'unit_id': 2, 'units': 'rcu_board'},
{'id': 3, 'name': 'bandwidth', 'unit_id': 3, 'units': 'bits/second'},
{'id': 4, 'name': 'processor', 'unit_id': 4, 'units': 'cores'},
{'id': 5, 'name': 'storage', 'unit_id': 1, 'units': 'bytes'},
]
self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1, 2]}
# incomplete response but good enough for tests
self.rarpc_mock.getResources.return_value = [
{'id': self.cep4bandwidth_resource_id, 'name': 'cep4bandwidth', 'type_id': 3, 'type_name': 'bandwidth', 'unit_id': 3, 'unit': 'bits/second'},
{'id': self.cep4storage_resource_id, 'name': 'cep4storage', 'type_id': 5, 'type_name': 'storage', 'unit_id': 1, 'unit': 'bytes'}
]
self.rarpc_mock.getResourceClaims.return_value = []
rerpc_patcher = mock.patch('lofar.messaging.RPC')
self.addCleanup(rerpc_patcher.stop)
self.rerpc_mock = rerpc_patcher.start()
self.rerpc_mock.return_value = self.rerpc_replymessage, self.rerpc_status
otdbrpc_patcher = mock.patch('lofar.sas.otdb.otdbrpc')
self.addCleanup(otdbrpc_patcher.stop)
self.otdbrpc_mock = otdbrpc_patcher.start()
momrpc_patcher = mock.patch('lofar.mom.momqueryservice.momqueryrpc')
self.addCleanup(momrpc_patcher.stop)
self.momrpc_mock = momrpc_patcher.start()
self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): self.predecessor_task_mom_ids}
self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): self.successor_task_mom_ids}
curpc_patcher = mock.patch('lofar.sas.datamanagement.cleanup.rpc')
self.addCleanup(curpc_patcher.stop)
self.curpc_mock = curpc_patcher.start()
sqrpc_patcher = mock.patch('lofar.sas.datamanagement.storagequery.rpc')
self.addCleanup(sqrpc_patcher.stop)
self.sqrpc_mock = sqrpc_patcher.start()
ra_notification_bus_patcher = mock.patch('lofar.messaging.messagebus')
self.addCleanup(ra_notification_bus_patcher.stop)
self.ra_notification_bus_mock = ra_notification_bus_patcher.start()
logger_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.logger')
self.addCleanup(logger_patcher.stop)
self.logger_mock = logger_patcher.start()
move_pipeline_after_its_predecessors_patcher = mock.patch(
'lofar.sas.resourceassignment.resourceassigner.assignment.movePipelineAfterItsPredecessors')
self.addCleanup(move_pipeline_after_its_predecessors_patcher.stop)
self.movePipelineAfterItsPredecessors_mock = move_pipeline_after_its_predecessors_patcher.start()
self.resourceAssigner = TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock,
self.otdbrpc_mock, self.momrpc_mock,
self.curpc_mock, self.sqrpc_mock,
self.ra_notification_bus_mock)
self.reset_specification_tree()
def assert_all_services_opened(self):
self.assertTrue(self.rarpc_mock.open.called, "RARPC.open was not called")
self.assertTrue(self.rerpc_mock.open.called, "RPC.open was not called")
self.assertTrue(self.otdbrpc_mock.open.called, "OTDBRPC.open was not called")
self.assertTrue(self.momrpc_mock.open.called, "MOMRPC.open was not called")
self.assertTrue(self.curpc_mock.open.called, "CURPC.open was not called")
self.assertTrue(self.ra_notification_bus_mock.open.called, "ra_notification_bus.open was not called")
def assert_all_services_closed(self):
self.assertTrue(self.rarpc_mock.close.called, "RARPC.close was not called")
self.assertTrue(self.rerpc_mock.close.called, "RPC.close was not called")
self.assertTrue(self.otdbrpc_mock.close.called, "OTDBRPC.close was not called")
self.assertTrue(self.momrpc_mock.close.called, "MOMRPC.close was not called")
self.assertTrue(self.curpc_mock.close.called, "CURPC.close was not called")
self.assertTrue(self.ra_notification_bus_mock.close.called, "ra_notification_bus.close was not called")
def test_open_opens_all_services(self):
self.resourceAssigner.open()
self.assert_all_services_opened()
def test_close_closes_all_services(self):
self.resourceAssigner.close()
self.assert_all_services_closed()
def test_contextManager_opens_and_closes_all_services(self):
with TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock,
self.otdbrpc_mock, self.momrpc_mock,
self.curpc_mock, self.sqrpc_mock,
self.ra_notification_bus_mock):
self.assert_all_services_opened()
self.assert_all_services_closed()
def test_do_assignment_logs_specification(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call('doAssignment: specification_tree=%s' % self.specification_tree)
def test_do_assignment_log_non_approved_or_prescheduled_states(self):
self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree)
self.logger_mock.info.assert_any_call(
'skipping specification for otdb_id=%s because status=%s',
(self.non_approved_or_prescheduled_otdb_id, self.non_approved_or_prescheduled_status))
def test_do_assignment_non_approved_or_prescheduled_states_should_be_skipped(self):
self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree)
self.assertEqual(len(self.otdbrpc_mock.method_calls), 0,
"OTDBRPC was called for non approved or scheduled specification tree")
self.assertEqual(len(self.rarpc_mock.method_calls), 0,
"RARPC was called for non approved or scheduled specification tree")
self.assertEqual(len(self.momrpc_mock.method_calls), 0,
"MOMRPC was called for non approved or scheduled specification tree")
self.assertEqual(len(self.rerpc_mock.method_calls), 0,
"RERPC was called for non approved or scheduled specification tree")
self.assertEqual(len(self.curpc_mock.method_calls), 0,
"CURPC was called for non approved or scheduled specification tree")
self.assertEqual(len(self.ra_notification_bus_mock.method_calls), 0,
"RA notification bus was called for non approved or scheduled specification tree")
def test_do_assignment_inserts_specification_and_task_in_radb(self):
self.resourceAssigner.doAssignment(self.specification_tree)
start_time = datetime.datetime.strptime(self.future_start_time, '%Y-%m-%d %H:%M:%S')
stop_time = datetime.datetime.strptime(self.future_stop_time, '%Y-%m-%d %H:%M:%S')
parset = parameterset(self.specification_tree['specification'])
self.rarpc_mock.insertSpecificationAndTask.assert_any_call(self.mom_id, self.otdb_id, self.state,
self.task_type, start_time, stop_time, str(parset),
"CEP4")
def test_do_assignment_logs_when_insertion_of_specification_and_task_in_radb_failed(self):
self.rarpc_mock.insertSpecificationAndTask.return_value = {'inserted': False}
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.error.assert_any_call('could not insert specification and task')
def test_do_assignment_logs_when_no_predecessors_found(self):
self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): []}
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call('no predecessors for otdb_id=%s', self.task_otdb_id)
def test_do_assignment_logs_when_predecessors_are_found(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call('proccessing predecessor mom_ids=%s for mom_id=%s otdb_id=%s',
self.predecessor_task_mom_ids, self.task_mom_id, self.task_otdb_id)
def test_do_assignment_logs_when_predecessors_are_found_but_its_task_is_missing_in_radb(self):
self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): [self.non_existing_task_mom_id]}
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.warning.assert_any_call(
'could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', self.non_existing_task_mom_id, self.task_otdb_id)
def test_do_assignment_logs_when_predecessors_are_found_that_need_to_be_linked_to_task(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call(
'connecting predecessor task with mom_id=%s otdb_id=%s to it\'s successor with mom_id=%s otdb_id=%s',
self.predecessor_task_mom_id,
self.predecessor_task_otdb_id,
self.task_mom_id,
self.task_otdb_id)
def test_do_assignment_insert_predecessor_into_task_when_not_linked_to_task(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.task_id, self.predecessor_task_id)
def test_do_assignment_logs_when_no_successors_found(self):
self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): []}
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call('no successors for otdb_id=%s', self.task_otdb_id)
def test_do_assignment_logs_when_successors_are_found(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call('proccessing successor mom_ids=%s for mom_id=%s otdb_id=%s',
self.successor_task_mom_ids, self.task_mom_id, self.task_otdb_id)
def test_do_assignment_logs_when_successors_are_found_but_its_task_is_missing_in_radb(self):
self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): [self.non_existing_task_mom_id]}
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.warning.assert_any_call(
'could not find successor task with mom_id=%s in radb for task mom_id=%s otdb_id=%s',
self.non_existing_task_mom_id, self.task_mom_id, self.task_otdb_id)
def test_do_assignment_logs_when_successors_are_found_that_need_to_be_linked_to_task(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call(
'connecting successor task with mom_id=%s otdb_id=%s to it\'s predecessor with mom_id=%s otdb_id=%s',
self.successor_task_mom_id,
self.successor_task_otdb_id,
self.task_mom_id,
self.task_otdb_id)
def test_do_assignment_insert_successor_into_task_when_not_linked_to_task(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.successor_task_id, self.task_id)
def test_do_assignment_moves_pipeline_of_successor_after_predecessor(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.assertTrue(self.movePipelineAfterItsPredecessors_mock.called)
def test_do_assignment_logs_mom_bug(self):
self.resourceAssigner.doAssignment(self.mom_bug_specification_tree)
self.logger_mock.info.assert_any_call(
'overwriting and uploading processingClusterName to otdb from %s to %s for otdb_id=%s',
self.mom_bug_processing_cluster_name, 'CEP4', self.mom_bug_otbd_id)
def test_do_assignment_resets_ProcessingCluster_clusterName_on_mom_bug(self):
self.resourceAssigner.doAssignment(self.mom_bug_specification_tree)
self.otdbrpc_mock.taskSetSpecification.assert_any_call(
self.mom_bug_otbd_id,
{'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': 'CEP4'})
@mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime')
def test_do_assignment_should_reset_observation_period_when_in_past_without_predecessor_and_duration(
self, datetime_mock):
now = datetime.datetime.utcnow() + datetime.timedelta(days=1)
datetime_mock.utcnow.return_value = now
new_starttime = now + datetime.timedelta(minutes=1)
new_endtime = new_starttime + datetime.timedelta(hours=1)
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.warning.assert_any_call(
'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s',
new_starttime, new_endtime, self.otdb_id)
self.logger_mock.info.assert_any_call(
'uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s',
new_starttime,
new_endtime,
self.otdb_id)
self.otdbrpc_mock.taskSetSpecification.assert_any_call(
self.otdb_id,
{
'LOFAR.ObsSW.Observation.startTime': new_starttime.strftime('%Y-%m-%d %H:%M:%S'),
'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S')
})
@mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime')
def test_do_assignment_should_reset_observation_period_when_in_past_with_predecessor_in_future(self, datetime_mock):
now = self.freeze_time_one_day_in_the_future(datetime_mock)
future_predecessor_stop_time = now + datetime.timedelta(hours=1)
self.specification_tree['predecessors'][0]['specification']['Observation.stopTime'] = \
future_predecessor_stop_time.strftime('%Y-%m-%d %H:%M:%S')
new_starttime = future_predecessor_stop_time + datetime.timedelta(minutes=1)
new_endtime = new_starttime + datetime.timedelta(hours=1)
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.warning.assert_any_call(
'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s',
new_starttime, new_endtime, self.otdb_id)
self.logger_mock.info.assert_any_call(
'uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s',
new_starttime,
new_endtime,
self.otdb_id)
self.otdbrpc_mock.taskSetSpecification.assert_any_call(
self.otdb_id,
{
'LOFAR.ObsSW.Observation.startTime': new_starttime.strftime('%Y-%m-%d %H:%M:%S'),
'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S')
})
def freeze_time_one_day_in_the_future(self, datetime_mock):
now = datetime.datetime.utcnow() + datetime.timedelta(days=1)
now = self._strip_ms(now)
datetime_mock.utcnow.return_value = now
datetime_mock.strptime.side_effect = \
lambda date_string, format_string: datetime.datetime.strptime(date_string, format_string)
return now
def _strip_ms(self, now):
return datetime.datetime.strptime(now.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S')
@mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime')
def test_do_assignment_should_reset_observation_period_when_in_past_with_task_duration(self, datetime_mock):
now = datetime.datetime.utcnow() + datetime.timedelta(days=1)
datetime_mock.utcnow.return_value = now
duration = 100
self.specification_tree['specification']['Observation.Scheduler.taskDuration'] = duration
new_starttime = now + datetime.timedelta(minutes=1)
new_endtime = new_starttime + datetime.timedelta(seconds=duration)
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.warning.assert_any_call(
'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s',
new_starttime, new_endtime, self.otdb_id)
self.logger_mock.info.assert_any_call(
'uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s',
new_starttime,
new_endtime,
self.otdb_id)
self.otdbrpc_mock.taskSetSpecification.assert_any_call(
self.otdb_id,
{
'LOFAR.ObsSW.Observation.startTime': new_starttime.strftime('%Y-%m-%d %H:%M:%S'),
'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S')
})
def test_do_assignment_should_log_insertion_of_specification_and_task(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call(
'doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s'
' cluster=%s' %
(self.mom_id, self.otdb_id, self.state, self.task_type, self.future_start_time, self.future_stop_time,
"CEP4"))
def test_do_assignment_should_log_when_insertion_of_specification_and_task_is_done(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call('doAssignment: inserted specification (id=%s) and task (id=%s)' %
(self.specification_id, self.task_id))
def test_do_assignment_should_log_on_non_presceduled_cep4_tasks(self):
self.resourceAssigner.doAssignment(self.approved_specification_tree)
self.logger_mock.info.assert_any_call(
'skipping resource assignment for CEP4 task otdb_id=%s because status=%s' %
(self.approved_otdb_id,
self.approved_status))
def test_do_assignment_should_not_claim_resources_on_CEP2_tasks(self):
self.resourceAssigner.doAssignment(self.cep2_specification_tree)
self.rarpc_mock.insertResourceClaims.assert_not_called()
def test_do_assignment_should_not_claim_resources_on_non_presceduled_cep4_tasks(self):
self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree)
self.rarpc_mock.insertResourceClaims.assert_not_called()
def test_do_assignment_should_log_if_not_all_storageClusters_are_seen_as_CEP4(self):
self.resourceAssigner.doAssignment(self.cep2_specification_tree)
self.logger_mock.warn.assert_any_call("storageClusterName not CEP4, rejecting specification.")
def test_do_assignment_should_log_if_all_storageClusters_are_seen_as_CEP4(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call("all enabled storageClusterName's are CEP4, accepting specification.")
def test_do_assginement_should_request_needed_resources(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.rerpc_mock.assert_any_call(
{"specification_tree": self.specification_tree}, timeout=10)
def test_do_assignemnet_logs_needed_resources(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call('getNeededResouces: %s' % self.rerpc_replymessage)
self.logger_mock.info.assert_any_call('doAssignment: getNeededResouces=%s' % self.rerpc_replymessage)
def test_do_assignment_logs_when_otdb_id_not_needed_resources(self):
self.specification_tree["otdb_id"] = self.otdb_id + 1
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.error.assert_any_call("no otdb_id %s found in estimator results %s" %
(self.otdb_id + 1, self.rerpc_replymessage))
def test_do_assignment_should_not_claim_resouces_when_otdb_id_not_needed_resources(self):
self.specification_tree["otdb_id"] = self.otdb_id + 1
self.resourceAssigner.doAssignment(self.specification_tree)
self.rarpc_mock.insertResourceClaims.assert_not_called()
def test_do_assignment_logs_when_task_type_not_in_needed_resources(self):
wrong_task_type = "observation"
self.specification_tree["task_type"] = wrong_task_type
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.error.assert_any_call("no task type %s found in estimator results %s" %
(wrong_task_type,
self.rerpc_replymessage[str(self.otdb_id)]))
def test_do_assignment_should_not_claim_resources_when_task_type_not_in_needed_resources(self):
wrong_task_type = "observation"
self.specification_tree["task_type"] = wrong_task_type
self.resourceAssigner.doAssignment(self.specification_tree)
self.rarpc_mock.insertResourceClaims.assert_not_called()
def test_do_assignment_should_log_single_errors_in_needed_resources(self):
self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.error.assert_any_call("Error in estimator: %s", self.resource_error1)
self.logger_mock.error.assert_any_call("Error in estimator: %s", self.resource_error2)
def test_do_assignment_should_log_error_in_needed_resources(self):
self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.error.assert_any_call(
"Error(s) in estimator for otdb_id=%s radb_id=%s, setting task status to 'error'",
self.resources_with_errors_otdb_id,
self.task_id)
def test_do_assignment_should_update_task_with_error_on_errors_in_needed_resources(self):
self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id
self.resourceAssigner.doAssignment(self.specification_tree)
self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error')
def test_do_assignment_should_notify_bus_on_errors_in_needed_resources(self):
content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id}
subject = 'TaskError'
self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id
self.resourceAssigner.doAssignment(self.specification_tree)
self.assertBusNotificationAndLogging(content, subject)
def ra_notification_bus_send_called_with(self, content, subject):
found = False
for call in self.ra_notification_bus_mock.send.call_args_list:
if call[0][0].subject == subject and call[0][0].content == content:
found = True
return found
def test_do_assignment_should_log_start_of_claim_resources(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call('claimResources: task %s needed_resources=%s' %
(self.task, self.rerpc_replymessage[str(str(self.otdb_id))]))
def test_do_assigment_should_log_when_claiming_unknown_resource_type(self):
self.specification_tree["otdb_id"] = self.unknown_resource_type_otdb_id
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.warn.assert_any_call('claimResources: unknown resource_type:%s' %
self.unknown_resource_type_name)
def test_do_assignment_logs_claimable_resources_in_specification(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call('claimResources: processing resource_type: %s contents: %s'
% ('storage', self.rerpc_needed_claim_for_storage))
self.logger_mock.info.assert_any_call('claimResources: processing resource_type: %s contents: %s'
% ('bandwidth', self.rerpc_needed_claim_for_bandwidth))
def test_do_assignment_logs_created_claim_per_needed_resource_type(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call('claimResources: created claim:%s' % self.storage_claim)
self.logger_mock.info.assert_any_call('claimResources: created claim:%s' % self.bandwith_claim)
def test_do_assignment_logs_amount_of_claims_inserted_in_radb(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call('claimResources: inserting %d claims in the radb' % 2)
def test_do_assignment_inserts_resource_claims_in_radb(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.rarpc_mock.insertResourceClaims.assert_any_call(self.task_id, self.specifaction_claims, 1, 'anonymous', -1)
def test_do_assignment_logs_amount_claims_inserted(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call('claimResources: %d claims were inserted in the radb' % 2)
def test_do_assignment_logs_unknown_property_on_needed_resources(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.warn.assert_any_call('claimResources: unknown prop_type:%s' % self.resource_unknown_property)
def test_do_assignment_logs_multiple_properties_on_needed_resource(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call(
'claimResources: processing resource_type: %s subdict_name: \'%s\' subdict_contents: %s' %
("storage", "output_files", self.rerpc_needed_claim_for_storage_output_files))
def test_do_assignment_logs_when_it_was_unable_to_claim_all_resources(self):
self.rarpc_mock.insertResourceClaims.return_value = {'ids': []}
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.warning.assert_any_call(
'doAssignment: No claims could be made. Setting task %s status to error' % self.task_id)
def test_do_assignment_updates_task_when_it_was_unable_to_claim_all_resources(self):
self.rarpc_mock.insertResourceClaims.return_value = {'ids': []}
self.resourceAssigner.doAssignment(self.specification_tree)
self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error')
def test_do_assignment_notifies_bus_when_it_was_unable_to_claim_all_resources(self):
content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id}
subject = 'Task' + 'Error'
self.rarpc_mock.insertResourceClaims.return_value = {'ids': []}
self.resourceAssigner.doAssignment(self.specification_tree)
self.assertBusNotificationAndLogging(content, subject)
def test_do_assignment_logs_when_it_was_unable_to_claim_some_resources(self):
self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]}
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.warning.assert_any_call(
'doAssignment: Not all claims could be inserted. Setting task %s status to conflict' % self.task_id)
def test_do_assignment_updates_task_when_it_was_unable_to_claim_some_resources(self):
self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]}
self.resourceAssigner.doAssignment(self.specification_tree)
self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='conflict')
def test_do_assignment_notifies_bus_when_it_was_unable_to_claim_some_resources(self):
content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id}
subject = 'Task' + 'Conflict'
self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]}
self.resourceAssigner.doAssignment(self.specification_tree)
self.assertBusNotificationAndLogging(content, subject)
def test_do_assignment_logs_when_there_are_conflicting_claims(self):
conflicting_claims = [{}]
self.rarpc_mock.getResourceClaims.return_value = conflicting_claims
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.warning.assert_any_call(
'doAssignment: %s conflicting claims detected. Task cannot be scheduled. %s' %
(len(conflicting_claims), conflicting_claims))
def test_do_assignment_notifies_bus_when_there_are_conflicting_claims(self):
content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id}
subject = 'Task' + 'Conflict'
conflicting_claims = [{}]
self.rarpc_mock.getResourceClaims.return_value = conflicting_claims
self.resourceAssigner.doAssignment(self.specification_tree)
self.assertBusNotificationAndLogging(content, subject)
def test_do_assignment_logs_when_all_resources_were_claimed(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call(
'doAssignment: all claims for task %s were succesfully claimed. Setting claim statuses to allocated' %
self.task_id)
def test_do_assignment_updates_task_and_resources_as_claimed_in_radb(self):
self.resourceAssigner.doAssignment(self.specification_tree)
self.rarpc_mock.updateTaskAndResourceClaims.assert_any_call(self.task_id, claim_status='allocated')
def test_do_assignment_logs_task_data_removal_if_task_is_pipeline(self):
self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10}
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call("removing data on disk from previous run for otdb_id %s", self.otdb_id)
def test_do_assignment_removes_task_data_if_task_is_pipeline(self):
self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10}
self.resourceAssigner.doAssignment(self.specification_tree)
self.curpc_mock.removeTaskData.assert_any_call(self.task_otdb_id)
def test_do_assignment_logs_when_taks_data_could_not_be_deleted(self):
message = "file was locked"
self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10}
self.curpc_mock.removeTaskData.return_value = {'deleted': False, 'message': message}
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.warning.assert_any_call(
"could not remove all data on disk from previous run for otdb_id %s: %s", self.otdb_id, message)
def test_do_assignment_notifies_bus_when_task_is_scheduled(self):
content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id}
subject = 'Task' + 'Scheduled'
self.resourceAssigner.doAssignment(self.specification_tree)
self.assertBusNotificationAndLogging(content, subject)
def assertBusNotificationAndLogging(self, content, subject):
self.assertTrue(self.ra_notification_bus_send_called_with(content, ra_notification_prefix + subject))
self.logger_mock.info.assert_any_call('Sending notification %s: %s' %
(subject, str(content).replace('\n', ' ')))
@mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime')
def test_do_assignment_logs_exception_from_otdbrpc_taskSetSpecification(self, datetime_mock):
self.freeze_time_one_day_in_the_future(datetime_mock)
exception = Exception("Error something went wrong")
self.otdbrpc_mock.taskSetSpecification.side_effect = exception
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.error.assert_any_call(exception)
def test_do_assignment_logs_unparsable_start_time(self):
self.specification_tree[u'specification'][u'Observation.startTime'] = "non parse"
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.warning.assert_any_call(
'cannot parse for start/end time from specification for otdb_id=%s. searching for sane defaults...',
self.otdb_id)
def test_do_assignment_logs_unparsable_stop_time(self):
self.specification_tree[u'specification'][u'Observation.stopTime'] = "non parse"
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.warning.assert_any_call(
'cannot parse for start/end time from specification for otdb_id=%s. searching for sane defaults...',
self.otdb_id)
def test_do_assignment_logs_exception_from_otdbrpc_taskSetSpecification_with_mom_bug(self):
exception = Exception("Error something went wrong")
self.otdbrpc_mock.taskSetSpecification.side_effect = exception
self.resourceAssigner.doAssignment(self.mom_bug_specification_tree)
self.logger_mock.error.assert_any_call(exception)
def test_do_assignment_logs_exception_from_rerpc(self):
exception = Exception("Error something went wrong")
self.rerpc_mock.side_effect = exception
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.error.assert_any_call(exception)
def test_do_assignment_updates_task_on_exception_from_rerpc(self):
exception = Exception("Error something went wrong")
self.rerpc_mock.side_effect = exception
self.resourceAssigner.doAssignment(self.specification_tree)
self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error')
def test_do_assignment_notifies_bus_on_exception_from_rerpc(self):
content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id}
subject = 'Task' + 'Error'
exception = Exception("Error something went wrong")
self.rerpc_mock.side_effect = exception
self.resourceAssigner.doAssignment(self.specification_tree)
self.assertBusNotificationAndLogging(content, subject)
def test_do_assignment_logs_when_notifies_bus_thows_exception(self):
exception = Exception("Error something went wrong")
self.ra_notification_bus_mock.send.side_effect = exception
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.error.assert_any_call(str(exception))
def test_do_assignment_logs_when_momrpc_getPredecessorIds_throws_exception(self):
exception = Exception("Error something went wrong")
self.momrpc_mock.getPredecessorIds.side_effect = exception
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.error.assert_any_call(exception)
def test_do_assignment_logs_when_momrpc_getSuccessorIds_throws_exception(self):
exception = Exception("Error something went wrong")
self.momrpc_mock.getSuccessorIds.side_effect = exception
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.error.assert_any_call(exception)
@mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime')
def test_do_assignment_logs_exception_stop_time_parsing_on_predecessor(self, datetime_mock):
self.freeze_time_one_day_in_the_future(datetime_mock)
self.specification_tree[u'predecessors'][0]['specification'][u'Observation.stopTime'] = 'non parse'
exception = ValueError('time data \'non parse\' does not match format \'%Y-%m-%d %H:%M:%S\'')
self.resourceAssigner.doAssignment(self.specification_tree)
# workarround because assert_any_call(exception) seems not to work together
found = False
calls = self.logger_mock.error.call_args_list
for args, kwargs in calls:
first_argument = args[0]
if type(first_argument) is type(exception) and first_argument.args == exception.args:
found = True
self.assertTrue(found)
@mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime')
def test_do_assignment_sets_start_and_end_time_on_unparsable_datetime(self, datetime_mock):
now = self.freeze_time_one_day_in_the_future(datetime_mock)
self.specification_tree['specification'][u'Observation.stopTime'] = 'non parse'
new_starttime = now + datetime.timedelta(minutes=1)
new_endtime = new_starttime + datetime.timedelta(hours=1)
self.resourceAssigner.doAssignment(self.specification_tree)
self.logger_mock.info.assert_any_call(
'doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s'
' cluster=%s' %
(self.mom_id, self.otdb_id, self.state, self.task_type, new_starttime, new_endtime, "CEP4"))
if __name__ == '__main__':
unittest.main()