diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py index 85f2916162038eb1482cb80a2dcdb042744b339e..f6d5ce74ad14cbea8e323728674f88d46c1425fa 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py @@ -196,6 +196,8 @@ class ResourceAssignerTest(unittest.TestCase): } resources_with_rcus_otdb_id = 1290495 + resources_with_no_resource_types_otdb_id = 1290497 + resources_with_negative_estimates_otdb_id = 1290488 resources_with_errors_otdb_id = 1290496 resource_error1 = "error 1" resource_error2 = "error 2" @@ -254,6 +256,20 @@ class ResourceAssignerTest(unittest.TestCase): }] } }, + str(resources_with_negative_estimates_otdb_id):{ + 'pipeline': { + 'errors': [], + 'estimates': [{ + 'resource_types': {'bandwidth': -2, 'storage': -2}, + 'resource_count': 2, 'root_resource_group': 'CEP4', + 'output_files': { + 'uv': [{'sap_nr': 0, 'identifications': [], + 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 0} + }] + } + }] + } + }, str(resources_with_rcus_otdb_id): { 'observation': { 'errors': [], @@ -283,6 +299,23 @@ class ResourceAssignerTest(unittest.TestCase): 'errors': [resource_error1, resource_error2] } }, + str(resources_with_no_resource_types_otdb_id): { + 'pipeline': { + 'estimates': [{ + 'output_files': { + 'uv': [{'sap_nr': 0, + 'properties': {'nr_of_uv_files': 319, 'uv_file_size': 1482951104} + }, + {'sap_nr': 1, + 'properties': {'nr_of_uv_files': 81, 'uv_file_size': 1482951104} + }, + {'sap_nr': 2, + 'properties': {'nr_of_uv_files': 81, 'uv_file_size': 1482951104} + }] + } + }], + } + }, str(unknown_resource_type_otdb_id): { 'pipeline': { str(unknown_resource_type_name): { @@ -329,7 +362,7 @@ class ResourceAssignerTest(unittest.TestCase): storage_claim = { 'resource_id': cep4storage_resource_id, - 'resource_type_id': 5, + 'resource_type_id': 5, 'starttime': task_start_time, 'used_rcus': None, 'endtime': task_end_time + datetime.timedelta(days=365), @@ -344,7 +377,7 @@ class ResourceAssignerTest(unittest.TestCase): bandwidth_claim = { 'resource_id': cep4bandwidth_resource_id, - 'resource_type_id': 3, + 'resource_type_id': 3, 'starttime': task_start_time, 'used_rcus': None, 'endtime': task_end_time, @@ -358,8 +391,18 @@ class ResourceAssignerTest(unittest.TestCase): def reset_specification_tree(self): self.specification_tree = { u'otdb_id': self.otdb_id, + u'mom_id': self.mom_id, + u'task_id': self.task_id, + u'trigger_id': None, + u'status': 'approved', u'task_type': self.task_type, - u'state': self.state, + u'min_starttime': u'2016-03-26 00:31:31', + u'endtime': u'2016-03-26 01:31:31', + u'min_duration': 0, + u'max_duration': 0, + u'duration': 60, + u'cluster': "CEP4", + u'task_subtype': u'long baseline pipeline', u'specification': { u'Observation.momID': str(self.mom_id), u'Observation.startTime': self.future_start_time, @@ -387,8 +430,18 @@ class ResourceAssignerTest(unittest.TestCase): u'Observation.sampleClock': u'200', u'Observation.Cluster.ProcessingCluster.clusterName': 'CEP4' }, - u'task_subtype': u'long baseline pipeline', u'predecessors': [{ + u'mom_id': self.predecessor_task_mom_id, + u'task_id': self.predecessor_task_id, + u'trigger_id': None, + u'status': None, + u'min_starttime': u'2016-03-25 00:31:31', + u'endtime': u'2016-03-25 01:31:31', + u'duration': 60, + u'min_duration': 60, + u'max_duration': 60, + u'cluster': "CEP4", + u'task_subtype': u'averaging pipeline', u'specification': { u'Observation.DataProducts.Output_InstrumentModel.enabled': False, @@ -421,6 +474,17 @@ class ResourceAssignerTest(unittest.TestCase): u'otdb_id': 1290496, u'predecessors': [{ u'task_subtype': u'bfmeasurement', + u'mom_id': 351539, + u'task_id': 323, + u'trigger_id': None, + u'status': None, + u'min_starttime': u'2016-03-24 00:31:31', + u'endtime': u'2016-03-24 01:31:31', + u'duration': 60, + u'min_duration': 60, + u'max_duration': 60, + u'cluster': "CEP4", + u'specification': { u'Observation.DataProducts.Output_InstrumentModel.enabled': False, u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor': u'1', @@ -459,9 +523,12 @@ class ResourceAssignerTest(unittest.TestCase): }, u'task_type': u'observation', u'otdb_id': 1290476, - u'predecessors': [] - }] - }] + u'predecessors': [], + u'successors': [] + }], + u'successors': [] + }], + u'successors': [] } def reset_task(self): @@ -480,7 +547,9 @@ class ResourceAssignerTest(unittest.TestCase): "status_id": 350, "successor_ids": [], "type": "pipeline", - "type_id": 0 + "type_id": 0, + "duration": 60, + "cluster": "CEP4" } def setUp(self): @@ -1576,10 +1645,6 @@ class ResourceAssignerTest(unittest.TestCase): self.addCleanup(logger_patcher.stop) self.logger_mock = logger_patcher.start() - # ra_checker_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_availability_checker') - # self.addCleanup(ra_checker_patcher.stop) - # self.ra_checker_mock = ra_checker_patcher.start() - dwell_scheduler_patcher = mock.patch( 'lofar.sas.resourceassignment.resourceassigner.resource_assigner.DwellScheduler' ) @@ -1587,6 +1652,34 @@ class ResourceAssignerTest(unittest.TestCase): self.dwell_scheduler_mock = dwell_scheduler_patcher.start() self.dwell_scheduler_mock().allocate_resources.return_value = True + prio_scheduler_patcher = mock.patch( + 'lofar.sas.resourceassignment.resourceassigner.resource_assigner.PriorityScheduler' + ) + self.addCleanup(prio_scheduler_patcher.stop) + self.prio_scheduler_mock = prio_scheduler_patcher.start() + self.prio_scheduler_mock().allocate_resources.return_value = True + + self.spec_mock = mock.MagicMock() + self.spec_mock.status = 'prescheduled' + self.spec_mock.radb_id = self.task_id + self.spec_mock.mom_id = self.task_mom_id + self.spec_mock.otdb_id = self.task_otdb_id + self.spec_mock.type = self.task_type + self.spec_mock.duration = datetime.timedelta(seconds=60) + self.spec_mock.starttime = datetime.datetime.utcnow() + self.spec_mock.endtime = datetime.datetime.utcnow() + self.spec_mock.min_starttime = datetime.datetime.utcnow() + self.spec_mock.max_endtime = datetime.datetime.utcnow() + self.spec_mock.calculate_dwell_values.return_value = (self.spec_mock.min_starttime, + self.spec_mock.max_endtime, + self.spec_mock.duration) + + specification_patcher = mock.patch( + 'lofar.sas.resourceassignment.resourceassigner.resource_assigner.Specification') + self.addCleanup(specification_patcher.stop) + self.specification_mock = specification_patcher.start() + self.specification_mock.return_value = self.spec_mock + # Select logger output to see def myprint(s, *args): print >>sys.stderr, s % args if args else s @@ -1596,12 +1689,6 @@ class ResourceAssignerTest(unittest.TestCase): self.logger_mock.warn.side_effect = myprint self.logger_mock.error.side_effect = myprint - move_pipeline_after_its_predecessors_patcher = mock.patch( - 'lofar.sas.resourceassignment.resourceassigner.resource_assigner.movePipelineAfterItsPredecessors' - ) - self.addCleanup(move_pipeline_after_its_predecessors_patcher.stop) - self.movePipelineAfterItsPredecessors_mock = move_pipeline_after_its_predecessors_patcher.start() - self.resource_assigner = TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock, self.otdbrpc_mock, self.momrpc_mock, self.curpc_mock, self.sqrpc_mock, @@ -1664,181 +1751,23 @@ class ResourceAssignerTest(unittest.TestCase): 'Task otdb_id=%s with status \'%s\' is not assignable. Allowed statuses are %s' % (otdb_id, status, assignable_task_states_str)) - def test_do_assignment_non_approved_or_prescheduled_states_should_be_skipped(self): - with self.assertRaises(Exception): - self.resource_assigner.do_assignment(self.non_approved_or_prescheduled_specification_tree['otdb_id'], - self.non_approved_or_prescheduled_specification_tree) - def test_do_assignment_approved_task_should_not_be_rescheduled(self): - otdb_id = self.specification_tree['otdb_id'] - self.specification_tree['state'] = 'approved' - - self.resource_assigner.do_assignment(otdb_id, self.specification_tree) - - self.logger_mock.info.assert_any_call('Task otdb_id=%s is already approved, no resource assignment needed' % - otdb_id) - - def test_do_assignment_inserts_specification_and_task_in_radb(self): - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - start_time = datetime.datetime.strptime(self.future_start_time, '%Y-%m-%d %H:%M:%S') - stop_time = datetime.datetime.strptime(self.future_stop_time, '%Y-%m-%d %H:%M:%S') - parset = parameterset(self.specification_tree['specification']) - - self.rarpc_mock.insertSpecificationAndTask.assert_any_call(self.mom_id, self.otdb_id, self.state, - self.task_type, start_time, stop_time, str(parset), - "CEP4") - - def test_do_assignment_logs_when_no_predecessors_found(self): - self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): []} - - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.info.assert_any_call('no predecessors for otdb_id=%s mom_id=%s', self.task_otdb_id, - self.task_mom_id) + self.spec_mock.status = 'approved' - def test_do_assignment_logs_when_predecessors_are_found(self): self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.info.assert_any_call('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', - self.predecessor_task_mom_ids, self.task_mom_id, self.task_otdb_id) - - def test_do_assignment_logs_when_predecessors_are_found_but_its_task_is_missing_in_radb(self): - self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): [self.non_existing_task_mom_id]} - - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.warning.assert_any_call( - 'could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', self.non_existing_task_mom_id, - self.task_otdb_id) - - def test_do_assignment_logs_when_predecessors_are_found_that_need_to_be_linked_to_task(self): - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.info.assert_any_call( - 'connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s otdb_id=%s', - self.predecessor_task_mom_id, - self.predecessor_task_otdb_id, - self.task_mom_id, - self.task_otdb_id) - - def test_do_assignment_insert_predecessor_into_task_when_not_linked_to_task(self): - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.task_id, self.predecessor_task_id) - - def test_do_assignment_logs_when_no_successors_found(self): - self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): []} - - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.info.assert_any_call('no successors for otdb_id=%s mom_id=%s', self.task_otdb_id, - self.task_mom_id) - - def test_do_assignment_logs_when_successors_are_found(self): - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.info.assert_any_call('processing successor mom_ids=%s for mom_id=%s otdb_id=%s', - self.successor_task_mom_ids, self.task_mom_id, self.task_otdb_id) - - def test_do_assignment_logs_when_successors_are_found_but_its_task_is_missing_in_radb(self): - self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): [self.non_existing_task_mom_id]} - - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.warning.assert_any_call( - 'could not find successor task with mom_id=%s in radb for task otdb_id=%s', - self.non_existing_task_mom_id, self.task_otdb_id) - - def test_do_assignment_logs_when_successors_are_found_that_need_to_be_linked_to_task(self): - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.info.assert_any_call( - 'connecting successor task with mom_id=%s otdb_id=%s to its predecessor with mom_id=%s otdb_id=%s', - self.successor_task_mom_id, - self.successor_task_otdb_id, - self.task_mom_id, - self.task_otdb_id) - - def test_do_assignment_insert_successor_into_task_when_not_linked_to_task(self): - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.successor_task_id, self.task_id) - - def test_do_assignment_moves_pipeline_of_successor_after_predecessor(self): - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.assertTrue(self.movePipelineAfterItsPredecessors_mock.called) - - def test_do_assignment_logs_mom_bug(self): - self.resource_assigner.do_assignment(self.mom_bug_specification_tree['otdb_id'], - self.mom_bug_specification_tree) - - self.logger_mock.info.assert_any_call( - 'overwriting and uploading processingClusterName to otdb from \'%s\' to \'%s\' for otdb_id=%s', - self.mom_bug_processing_cluster_name, 'CEP4', self.mom_bug_otdb_id) - - def test_do_assignment_resets_ProcessingCluster_clusterName_on_mom_bug(self): - self.resource_assigner.do_assignment(self.mom_bug_specification_tree['otdb_id'], - self.mom_bug_specification_tree) - - self.otdbrpc_mock.taskSetSpecification.assert_any_call( - self.mom_bug_otdb_id, - {'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': 'CEP4'}) - - @mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_assigner.datetime') - def test_do_assignment_should_reset_observation_period_when_in_past_without_predecessor_and_duration( - self, datetime_mock): - now = datetime.datetime.utcnow() + datetime.timedelta(days=1) - datetime_mock.utcnow.return_value = now - - new_starttime = now + datetime.timedelta(minutes=3) - new_endtime = new_starttime + datetime.timedelta(seconds=self.task_duration) + self.logger_mock.info.assert_any_call('Task otdb_id=%s is only approved, no resource assignment needed yet' % + self.specification_tree['otdb_id']) + def test_do_assignment_inserts_specification_and_task_in_radb(self): self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.warning.assert_any_call( - 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', - new_starttime, new_endtime, self.otdb_id) - self.logger_mock.info.assert_any_call( - 'uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', - new_starttime, - new_endtime, - self.otdb_id) - self.otdbrpc_mock.taskSetSpecification.assert_any_call( - self.otdb_id, - { - 'LOFAR.ObsSW.Observation.startTime': new_starttime.strftime('%Y-%m-%d %H:%M:%S'), - 'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S') - }) - - @mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_assigner.datetime') - def test_do_assignment_should_reset_observation_period_when_in_past_with_predecessor_in_future(self, datetime_mock): - now = self.freeze_time_one_day_in_the_future(datetime_mock) - - future_predecessor_stop_time = now + datetime.timedelta(hours=1) - self.specification_tree['predecessors'][0]['specification']['Observation.stopTime'] = \ - future_predecessor_stop_time.strftime('%Y-%m-%d %H:%M:%S') - - new_starttime = future_predecessor_stop_time + datetime.timedelta(minutes=3) - new_endtime = new_starttime + datetime.timedelta(seconds=self.task_duration) + self.spec_mock.insert_into_radb.assert_called() + def test_do_assignment_fills_specification_based_on_specication_dict(self): self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.logger_mock.warning.assert_any_call( - 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', - new_starttime, new_endtime, self.otdb_id) - self.logger_mock.info.assert_any_call( - 'uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', - new_starttime, - new_endtime, - self.otdb_id) - self.otdbrpc_mock.taskSetSpecification.assert_any_call( - self.otdb_id, - { - 'LOFAR.ObsSW.Observation.startTime': new_starttime.strftime('%Y-%m-%d %H:%M:%S'), - 'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S') - }) + self.spec_mock.from_dict.assert_called_with(self.specification_tree) def freeze_time_one_day_in_the_future(self, datetime_mock): now = datetime.datetime.utcnow() + datetime.timedelta(days=1) @@ -1851,125 +1780,13 @@ class ResourceAssignerTest(unittest.TestCase): def _strip_ms(self, now): return datetime.datetime.strptime(now.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S') - @mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_assigner.datetime') - def test_do_assignment_pushes_back_observation_start_and_end_times_when_in_past(self, datetime_mock): - now = datetime.datetime.utcnow() + datetime.timedelta(days=1) - - datetime_mock.utcnow.return_value = now - - new_starttime = now + datetime.timedelta(minutes=3) - new_endtime = new_starttime + datetime.timedelta(seconds=self.task_duration) - - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.warning.assert_any_call( - 'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s', - new_starttime, new_endtime, self.otdb_id) - self.logger_mock.info.assert_any_call( - 'uploading auto-generated start/end time (%s, %s) to otdb for otdb_id=%s', - new_starttime, - new_endtime, - self.otdb_id) - self.otdbrpc_mock.taskSetSpecification.assert_any_call( - self.otdb_id, - { - 'LOFAR.ObsSW.Observation.startTime': new_starttime.strftime('%Y-%m-%d %H:%M:%S'), - 'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S') - }) - - def test_get_main_task_start_and_end_times_with_unspecified_start_and_end_times(self): - """ - Verify that get_main_task_start_and_end_times() returns start/end times in the future with the default duration - """ - - self.specification_tree['specification']['Observation.startTime'] = None - self.specification_tree['specification']['Observation.stopTime'] = None - expected_duration = datetime.timedelta(hours=1) - - start_time, end_time = self.resource_assigner._get_main_task_start_and_end_times(self.specification_tree) - - duration = end_time - start_time - self.assertEqual(expected_duration, duration) - self.assertGreater(start_time, datetime.datetime.utcnow()) - - def test_get_main_task_start_and_end_times_with_unspecified_start_and_end_times_and_specified_duration(self): - """ - Verify that get_main_task_start_and_end_times() returns start/end times in the future with the specified - duration - """ - - self.specification_tree['specification']['Observation.startTime'] = None - self.specification_tree['specification']['Observation.stopTime'] = None - self.specification_tree['specification']['Observation.Scheduler.taskDuration'] = 300 # seconds - expected_duration = datetime.timedelta(seconds=300) - - start_time, end_time = self.resource_assigner._get_main_task_start_and_end_times(self.specification_tree) - - duration = end_time - start_time - self.assertEqual(expected_duration, duration) - self.assertGreater(start_time, datetime.datetime.utcnow()) - - def test_get_main_task_start_and_end_times_with_start_and_end_times_in_the_past(self): - """ - Verify that get_main_task_start_and_end_times() returns start/end times in the future but retains the original - duration. - """ - - specified_duration = datetime.timedelta(hours=5) - _start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=7) - _end_time = _start_time + specified_duration - self.specification_tree['specification']['Observation.startTime'] = _start_time.strftime('%Y-%m-%d %H:%M:%S') - self.specification_tree['specification']['Observation.stopTime'] = _end_time.strftime('%Y-%m-%d %H:%M:%S') - - start_time, end_time = self.resource_assigner._get_main_task_start_and_end_times(self.specification_tree) - - duration = end_time - start_time - self.assertEqual(specified_duration, duration) - self.assertGreater(start_time, datetime.datetime.utcnow()) - - def test_do_assignment_should_log_insertion_of_specification_and_task(self): - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.info.assert_any_call( - 'insertSpecification mom_id=%s, otdb_id=%s, status=%s, task_type=%s, start_time=%s, end_time=%s' - ' cluster=%s' % - (self.mom_id, self.otdb_id, self.state, self.task_type, self.future_start_time, self.future_stop_time, - "CEP4")) - - def test_do_assignment_should_log_when_insertion_of_specification_and_task_is_done(self): - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.info.assert_any_call('inserted specification (id=%s) and task (id=%s)' % - (self.specification_id, self.task_id)) - - def test_do_assignment_inserts_maintenance_resource_claims_in_radb(self): - self.resource_assigner.do_assignment(self.maintenance_specification_tree['otdb_id'], - self.maintenance_specification_tree) - - subject = 'TaskScheduled' - content = "{'mom_id': 351543, 'radb_id': 2299, 'otdb_id': 1290472}" - self.logger_mock.info.assert_any_call('Sending notification % s: % s' % (subject, content)) - - def test_do_assignment_inserts_projectreservation_resource_claims_in_radb(self): - self.resource_assigner.do_assignment(self.projectreservation_specification_tree['otdb_id'], - self.projectreservation_specification_tree) - - subject = 'TaskScheduled' - content = "{'mom_id': 351543, 'radb_id': 2299, 'otdb_id': 1290472}" - self.logger_mock.info.assert_any_call('Sending notification % s: % s' % (subject, content)) - - def test_do_assignment_should_not_claim_resources_on_CEP2_tasks(self): - exception_regex = "skipping resource assignment for task with cluster name" - with self.assertRaisesRegexp(Exception, exception_regex): - self.resource_assigner.do_assignment(self.cep2_specification_tree['otdb_id'], - self.cep2_specification_tree) - def test_do_assignment_should_request_needed_resources(self): self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.rerpc_mock.assert_any_call({"specification_tree": self.specification_tree}, timeout=10) def test_do_assignment_logs_when_otdb_id_not_needed_resources(self): + self.spec_mock.radb_id = self.otdb_id + 11 self.specification_tree["otdb_id"] = self.otdb_id + 11 self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) @@ -1979,6 +1796,13 @@ class ResourceAssignerTest(unittest.TestCase): (self.otdb_id + 11, self.rerpc_replymessage) ) + def test_do_assignment_puts_spec_to_error_when_resource_estimation_gives_an_error(self): + self.specification_tree["otdb_id"] = self.otdb_id + 11 + + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + + self.spec_mock.set_status.assert_called_with('error') + def test_do_assignment_should_not_claim_resouces_when_otdb_id_not_needed_resources(self): self.specification_tree["otdb_id"] = self.otdb_id + 1 @@ -1988,6 +1812,7 @@ class ResourceAssignerTest(unittest.TestCase): def test_do_assignment_logs_when_task_type_not_in_needed_resources(self): wrong_task_type = "observation" + self.spec_mock.type = wrong_task_type self.specification_tree["task_type"] = wrong_task_type self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) @@ -1999,6 +1824,7 @@ class ResourceAssignerTest(unittest.TestCase): def test_do_assignment_should_not_claim_resources_when_task_type_not_in_needed_resources(self): wrong_task_type = "observation" + self.spec_mock.type = wrong_task_type self.specification_tree["task_type"] = wrong_task_type self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) @@ -2013,22 +1839,21 @@ class ResourceAssignerTest(unittest.TestCase): self.logger_mock.error.assert_any_call("Error from Resource Estimator: %s", self.resource_error1) self.logger_mock.error.assert_any_call("Error from Resource Estimator: %s", self.resource_error2) - def test_do_assignment_should_log_error_in_needed_resources(self): - self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id + def test_do_assignment_should_log_missing_resource_types_in_estimates(self): + self.specification_tree["otdb_id"] = self.resources_with_no_resource_types_otdb_id self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.logger_mock.error.assert_any_call( - "An exception occurred while obtaining resource estimates. Exception=Error(s) in estimator for otdb_id=%s radb_id=%s" % - (self.resources_with_errors_otdb_id, self.task_id) - ) + "An exception occurred while obtaining resource estimates. Exception=missing 'resource_types' in 'estimates' in estimator results: %s" % self.rerpc_replymessage[str(self.resources_with_no_resource_types_otdb_id)][self.task_type]) - def test_do_assignment_should_update_task_with_error_on_errors_in_needed_resources(self): - self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id + def test_do_assignment_should_log_if_estimates_are_negative(self): + self.specification_tree["otdb_id"] = self.resources_with_negative_estimates_otdb_id self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error') + self.logger_mock.error.assert_any_call( + "An exception occurred while obtaining resource estimates. Exception=at least one of the estimates is not a positive number") def test_do_assignment_should_notify_bus_on_errors_in_needed_resources(self): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} @@ -2052,36 +1877,52 @@ class ResourceAssignerTest(unittest.TestCase): self.logger_mock.info.assert_any_call('Resource Estimator reply = %s', self.rerpc_replymessage) - def test_do_assignment_updates_task_when_it_was_unable_to_claim_some_or_all_resources(self): + def test_do_assignment_notifies_bus_when_it_was_unable_to_schedule(self): + content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} + subject = 'Task' + 'Conflict' + + self.spec_mock.status = 'conflict' self.dwell_scheduler_mock().allocate_resources.return_value = False - self.task["status"] = "conflict" self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='conflict') + self.assertBusNotificationAndLogging(content, subject) - def test_do_assignment_notifies_bus_when_it_was_unable_to_claim_some_or_all_resources(self): - content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} - subject = 'Task' + 'Conflict' + def test_do_assignment_should_read_spec_from_radb(self): + self.spec_mock.status = 'conflict' + self.dwell_scheduler_mock().allocate_resources.return_value = False + + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + self.spec_mock.read_from_radb.assert_called_with(self.spec_mock.radb_id) + + def test_do_assignment_should_set_status_to_conflict_again_when_cant_schedule_and_in_conflict(self): + self.spec_mock.status = 'conflict' self.dwell_scheduler_mock().allocate_resources.return_value = False - self.task["status"] = "conflict" self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - self.assertBusNotificationAndLogging(content, subject) + self.spec_mock.set_status.assert_called_with('conflict') - def test_do_assignment_notifies_bus_when_there_are_conflicting_claims(self): + def test_do_assignment_notifies_bus_when_it_was_unable_to_schedule(self): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} - subject = 'Task' + 'Conflict' + subject = 'Task' + 'Error' + self.spec_mock.status = 'error' self.dwell_scheduler_mock().allocate_resources.return_value = False - self.task["status"] = "conflict" self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) self.assertBusNotificationAndLogging(content, subject) + def test_do_assignment_should_set_status_to_error_again_when_cant_schedule_and_not_in_conflict(self): + self.spec_mock.status = 'error' + self.dwell_scheduler_mock().allocate_resources.return_value = False + + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + + self.spec_mock.set_status.assert_called_with('error') + def test_do_assignment_logs_task_data_removal_if_task_is_pipeline(self): self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10} @@ -2106,6 +1947,15 @@ class ResourceAssignerTest(unittest.TestCase): self.logger_mock.warning.assert_any_call( "could not remove all data on disk from previous run for otdb_id %s: %s", self.otdb_id, message) + def test_do_assignment_logs_exception_from_curcp_removeTaskData(self): + exception_str = "Error something went wrong" + self.curpc_mock.removeTaskData.side_effect = Exception(exception_str) + + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], + self.specification_tree) + + self.logger_mock.error.assert_any_call("Exception in cleaning up earlier data: %s", exception_str) + def test_do_assignment_notifies_bus_when_task_is_scheduled(self): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} subject = 'Task' + 'Scheduled' @@ -2114,23 +1964,16 @@ class ResourceAssignerTest(unittest.TestCase): self.assertBusNotificationAndLogging(content, subject) + def test_do_assignement_set_status_on_spec_when_scheduleable(self): + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + + self.spec_mock.set_status.assert_called_with('scheduled') + def assertBusNotificationAndLogging(self, content, subject): self.assertTrue(self.ra_notification_bus_send_called_with(content, ra_notification_prefix + subject)) self.logger_mock.info.assert_any_call('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) - @mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_assigner.datetime') - def test_do_assignment_logs_exception_from_otdbrpc_taskSetSpecification(self, datetime_mock): - self.freeze_time_one_day_in_the_future(datetime_mock) - - exception_str = "Error something went wrong" - self.otdbrpc_mock.taskSetSpecification.side_effect = Exception(exception_str) - - with self.assertRaisesRegexp(Exception, exception_str): - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.error.assert_any_call(exception_str) - def test_do_assignment_logs_exception_from_otdbrpc_taskSetSpecification_with_mom_bug(self): exception_str = "Error something went wrong" self.otdbrpc_mock.taskSetSpecification.side_effect = Exception(exception_str) @@ -2150,25 +1993,6 @@ class ResourceAssignerTest(unittest.TestCase): self.logger_mock.error.assert_any_call(exception_msg) - def test_do_assignment_updates_task_on_exception_from_rerpc(self): - exception = Exception("Error something went wrong") - self.rerpc_mock.side_effect = exception - - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error') - - def test_do_assignment_notifies_bus_on_exception_from_rerpc(self): - content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} - subject = 'Task' + 'Error' - - exception = Exception("Error something went wrong") - self.rerpc_mock.side_effect = exception - - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.assertBusNotificationAndLogging(content, subject) - def test_do_assignment_logs_when_notifies_bus_thows_exception(self): exception_msg = "Error something went wrong" self.ra_notification_bus_mock.send.side_effect = Exception(exception_msg) @@ -2196,17 +2020,5 @@ class ResourceAssignerTest(unittest.TestCase): self.logger_mock.error.assert_any_call(exception_msg) - @mock.patch('lofar.sas.resourceassignment.resourceassigner.resource_assigner.datetime') - def test_do_assignment_logs_exception_stop_time_parsing_on_predecessor(self, datetime_mock): - self.freeze_time_one_day_in_the_future(datetime_mock) - - self.specification_tree[u'predecessors'][0]['specification'][u'Observation.stopTime'] = 'non parse' - exception_str = 'time data \'non parse\' does not match format \'%Y-%m-%d %H:%M:%S\'' - - with self.assertRaisesRegexp(Exception, exception_str): - self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) - - self.logger_mock.error.assert_any_call(exception_str) - if __name__ == '__main__': unittest.main()