diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py index 2ebb425e4353574d7b4fc90a33d495ccc8c4564f..45eb4b35ef9a5f0c57394cee5b5dde1b1a29db57 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py @@ -284,22 +284,23 @@ class ResourceAssigner(object): return False try: - (scheduler_result, changed_tasks) = scheduler.allocate_resources() - if not scheduler_result: - # try again with basic scheduler to end up with a situation with the 'normal' conflicting resources, which can then be evaluated by users - scheduler = BasicScheduler(task_id=spec.radb_id, - specification_tree=specification_tree, - resource_estimator=self._get_resource_estimates, - resource_availability_checker=self.resource_availability_checker, - radbcreds=self.radb_creds) + with scheduler: (scheduler_result, changed_tasks) = scheduler.allocate_resources() - return scheduler_result - elif changed_tasks: - for t in changed_tasks: - if t.status == 'aborted': #MAC_Scheduler can't handle queued right now See also schedulers.py around line 600 - self._kill_task(t) # We kill the task through obscontrol and then wait for the status from OTDB. - else: # should be approved (unscheduled) - self._send_task_status_notification(t, t.status) # Tell OTDB + if not scheduler_result: + # try again with basic scheduler to end up with a situation with the 'normal' conflicting resources, which can then be evaluated by users + with BasicScheduler(task_id=spec.radb_id, + specification_tree=specification_tree, + resource_estimator=self._get_resource_estimates, + resource_availability_checker=self.resource_availability_checker, + radbcreds=self.radb_creds) as basic_scheduler: + (scheduler_result, changed_tasks) = basic_scheduler.allocate_resources() + return scheduler_result + elif changed_tasks: + for t in changed_tasks: + if t.status == 'aborted': #MAC_Scheduler can't handle queued right now See also schedulers.py around line 600 + self._kill_task(t) # We kill the task through obscontrol and then wait for the status from OTDB. + else: # should be approved (unscheduled) + self._send_task_status_notification(t, t.status) # Tell OTDB except Exception as e: logger.exception('Error in calling scheduler.allocate_resources: %s', e) diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py index 41bbead93b8587138c33c4c4f2158d151bdf5c01..b416fad896187373a32d53b36831b83b068cae99 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py @@ -502,6 +502,13 @@ class PriorityScheduler(StationScheduler): """ Close the connections with the used services """ self.momqueryservice.close() + def __enter__(self): + self.open() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + def _pre_process_allocation(self): """ Take care of actions to be taken prior to the scheduling of resources """ super(PriorityScheduler, self)._pre_process_allocation() diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py index 2061b5a7ce1ff400904517e6dec312cf5697d3c5..014282619181b13083d2c703b72ae5dbe13e0d2a 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py @@ -2018,6 +2018,13 @@ class ResourceAssignerTest(unittest.TestCase): self.resource_assigner._kill_task(spec) self.obscontrol_mock.abort_observation.assert_called_with(spec.otdb_id) + # SW-800 The schedulers need open and close called (using context manager) + def test_do_assignement_uses_context_manager_on_dwell_scheduler(self): + self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + + self.dwell_scheduler_mock().__enter__.assert_called() + self.dwell_scheduler_mock().__exit__.assert_called() + #This class is currently missing any tests of interaction between tasks already scheduled and new tasks, # e.g. triggered ones. It would require a totally different way to set up the tests to be able to test this. diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py index d9231b77d4246b409e61163a664fe7506e97276d..3a3d9efaea801271e6cbf8848f3112ffbc50cd39 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py @@ -37,14 +37,20 @@ from lofar.sas.resourceassignment.database.radb import FETCH_ONE import logging logger = logging.getLogger(__name__) -import radb_common_testing +try: + from . import radb_common_testing +except ImportError: + import radb_common_testing + def setUpModule(): return radb_common_testing.setUpModule() + def tearDownModule(): return radb_common_testing.tearDownModule() + class SchedulerTest(radb_common_testing.RADBCommonTest): """ create test radb postgres instance, and use that in a ResourceAvailabilityChecker""" @@ -813,6 +819,50 @@ class PrioritySchedulerTest(StationSchedulerTest): self.assertFalse(allocation_successful) self.assertTrue(len(changed_tasks) == 0) + def test_open_should_call_open_on_momqueryrpc(self): + max_bw_cap = self.get_station_bandwidth_max_capacity() + task_id = self.new_task(0) + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, + "root_resource_group": "CS001", + "resource_count": 2 } ] + scheduler = self.new_scheduler(task_id, resource_estimator=lambda _: estimates) + scheduler.open() + + self.assertTrue(self.momrpc_mock.open.called) + + def test_close_should_call_close_on_momqueryrpc(self): + max_bw_cap = self.get_station_bandwidth_max_capacity() + task_id = self.new_task(0) + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, + "root_resource_group": "CS001", + "resource_count": 2 } ] + scheduler = self.new_scheduler(task_id, resource_estimator=lambda _: estimates) + scheduler.close() + + self.assertTrue(self.momrpc_mock.close.called) + + def test_context_manager_use_calls_open_on_momqueryrpc(self): + max_bw_cap = self.get_station_bandwidth_max_capacity() + task_id = self.new_task(0) + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, + "root_resource_group": "CS001", + "resource_count": 2 } ] + with self.new_scheduler(task_id, resource_estimator=lambda _: estimates) as scheduler: + pass + + self.assertTrue(self.momrpc_mock.open.called) + + def test_context_manager_use_calls_close_on_momqueryrpc(self): + max_bw_cap = self.get_station_bandwidth_max_capacity() + task_id = self.new_task(0) + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, + "root_resource_group": "CS001", + "resource_count": 2 } ] + with self.new_scheduler(task_id, resource_estimator=lambda _: estimates) as scheduler: + pass + + self.assertTrue(self.momrpc_mock.close.called) + class DwellSchedulerTest(PrioritySchedulerTest): # The DwellScheduler must not regress on the PriorityScheduler, so we inherit all its tests