diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_availability_checker.py b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_availability_checker.py index bb2ff9118223e7202cb3834bbeb75458ab1ce6eb..067d2fcbcc55a2ca2eea1e63d523ac993c5de29d 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_availability_checker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_availability_checker.py @@ -377,11 +377,12 @@ class ResourceAvailabilityChecker(object): res_group = self.resource_group_relations[gids[i]] for rid in res_group['resource_ids']: if rid in available_recources: - type_id = available_recources[rid]['type_id'] - if type_id in needed_resources_by_type_id and available_recources[rid]['active'] and \ - available_recources[rid]['available_capacity'] > 0: - resources[type_id] = available_recources[rid] - type_ids_seen.add(type_id) + available_recource = available_recources[rid] + type_id = available_recource['type_id'] + if type_id in needed_resources_by_type_id and available_recource['active']: + if available_recource['available_capacity'] > 0: + resources[type_id] = available_recource + type_ids_seen.add(type_id) else: logger.debug("requested resource id %s is not available/claimable", rid) diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py index 68e6af68cf9e5409766aded26169a60cd01fdb3c..d522baeee0d0f58af80130bd77054d11b466ed78 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py @@ -1,4 +1,5 @@ from datetime import datetime, timedelta +from copy import deepcopy from lofar.common.cache import cache @@ -111,8 +112,11 @@ class BasicScheduler(object): allocation_successful = True except ScheduleException, e: - logger.exception("BasicScheduler: scheduling threw exception: %s", e) + logger.exception("%s: scheduling threw ScheduleException: %s", self.__class__.__name__, e) self._handle_schedule_exception() + except Exception, e: + logger.exception("%s: scheduling threw unhandled exception: %s", self.__class__.__name__, e) + raise return allocation_successful @@ -414,13 +418,17 @@ class StationScheduler(BasicScheduler): wanted_estimates = self._get_station_estimates() # Try to schedule all of the stations. - remaining_estimates = self._schedule_resources(wanted_estimates, available_resources, need_all=False) + # make a (deep)copy of available_resources and use that, + # because _schedule_resources modifies the available_capacity of the tested wanted stations. + # we rollback the radb later in this method, so we should keep the original available_resources intact. + available_resources_copy = deepcopy(available_resources) + remaining_estimates = self._schedule_resources(wanted_estimates, available_resources_copy, need_all=False) # See if our allocation meets the minimal criteria. Note that some stations may be partially allocated, # we do not count those as claimable. unclaimable_stations = set([e["station"] for e in remaining_estimates]) if not self._requirements_satisfied_without(expanded_requirements, unclaimable_stations): - raise ScheduleException("Could not allocate enough stations") + raise ScheduleException("Could not allocate enough stations. unclaimable_stations=%s" % (unclaimable_stations,)) allocated_stations = set([e["station"] for e in wanted_estimates if e not in remaining_estimates]) @@ -635,7 +643,7 @@ class PriorityScheduler(StationScheduler): """ if conflict_claim["resource_type_id"] == self.resource_availability_checker.resource_types['storage']: - raise ScheduleException("Could not resolve conflict on storage resource") + raise ScheduleException("Cannot resolve conflict on storage resource") # find all conflicting claims & which tasks they belong to conflicting_claims, conflicting_tasks = self._get_conflicting_claims_and_tasks(conflict_claim) diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py index cfe1ed60725992803aab7810e588afe58896caab..a4d2b439f8380849e9358e5ccb54b7077ab318d0 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py @@ -31,6 +31,8 @@ from lofar.sas.resourceassignment.resourceassigner.schedulers import StationSche from lofar.sas.resourceassignment.resourceassigner.schedulers import PriorityScheduler from lofar.sas.resourceassignment.resourceassigner.schedulers import DwellScheduler +from lofar.sas.resourceassignment.database.radb import _FETCH_ONE + import logging logger = logging.getLogger(__name__) @@ -48,6 +50,20 @@ class SchedulerTest(radb_common_testing.RADBCommonTest): def setUp(self): super(SchedulerTest, self).setUp() self.resource_availability_checker = ResourceAvailabilityChecker(self.radb) + self._enforce_limited_station_group_list() + + def _enforce_limited_station_group_list(self): + # for test simplicity, create a simple virtual instrument which makes debugging easier. + # this is safe, because we are working on a test database + + LIMITED_STATION_GROUP_LIST = ('CS001', 'CS002', 'RS106', 'RS205') + + unwanted_resource_group_ids = [rg['id'] for rg in self.radb.getResourceGroups() + if rg['type'] == 'station' and rg['name'] not in LIMITED_STATION_GROUP_LIST] + + self.radb._executeQuery("DELETE FROM virtual_instrument.resource_group rg WHERE rg.id in (%s)" % ( + ', '.join([str(id) for id in unwanted_resource_group_ids])),) + self.radb.commit() class BasicSchedulerTest(SchedulerTest): @@ -56,14 +72,15 @@ class BasicSchedulerTest(SchedulerTest): insert a new test specification and task into the testing radb :param mom_otdb_id: optional mom/otdb id :param starttime: optional starttime if None, then datetime(2017, 1, 1, 1, 0, 0) is used - :param starttime: optional endtime if None, then datetime(2017, 1, 1, 2, 0, 0) is used + :param endtime: optional endtime if None, then datetime(2017, 1, 1, 2, 0, 0) is used :return: the new radb's task id """ + if starttime is None: starttime = datetime.datetime(2017, 1, 1, 1, 0, 0) if endtime is None: - endtime = datetime.datetime(2017, 1, 1, 2, 0, 0), + endtime = datetime.datetime(2017, 1, 1, 2, 0, 0) return self.radb.insertSpecificationAndTask(mom_id=mom_otdb_id, otdb_id=mom_otdb_id, @@ -77,10 +94,28 @@ class BasicSchedulerTest(SchedulerTest): def get_specification_tree(self, task_id): return {} - def new_scheduler(self, task_id, resource_estimator): - return BasicScheduler(task_id, self.get_specification_tree(task_id), resource_estimator, + def new_scheduler(self, task_id, resource_estimator=None, specification_tree=None): + """factory method returning a scheduler class specific for this test class. + In this case, in the BasicSchedulerTest class, it returns a new BasicScheduler.""" + return self.new_basic_scheduler(task_id, resource_estimator, specification_tree) + + def new_basic_scheduler(self, task_id, resource_estimator=None, specification_tree=None): + """return a new BasicScheduler""" + return BasicScheduler(task_id, + specification_tree if specification_tree else self.get_specification_tree(task_id), + resource_estimator if resource_estimator else lambda _:[], self.resource_availability_checker, self.radb.dbcreds) + def get_station_bandwidth_max_capacity(self): + resource_CS001bw0 = [r for r in self.radb.getResources(resource_types="bandwidth", include_availability=True) + if r['name']=='CS001bw0'][0] + return resource_CS001bw0['total_capacity'] + + def get_CEP4_storage_max_capacity(self): + resource_cep4_storage = [r for r in self.radb.getResources(resource_types="storage", include_availability=True) + if r['name']=='CEP4_storage:/data'][0] + return resource_cep4_storage['total_capacity'] + def test_schedule_task(self): """ Whether a task (that fits) can be scheduled. """ @@ -159,24 +194,19 @@ class BasicSchedulerTest(SchedulerTest): self.assertFalse(scheduler.radb.committed) self.assertTrue(scheduler.radb.rolled_back) - def get_cs001_max_capacity(self): - resource_CS001bw0 = [r for r in self.radb.getResources(resource_types="bandwidth", include_availability=True) - if r['name']=='CS001bw0'][0] - return resource_CS001bw0['total_capacity'] - def test_schedule_two_tasks_too_large_task(self): """ Whether two tasks that fit individually but not together will be rejected by the scheduler. """ - max_cap = self.get_cs001_max_capacity() + max_bw_cap = self.get_station_bandwidth_max_capacity() # First task must succeed # we claim two bandwidth resources because CS001 has two network lines # they should both be claimed, so that the next task cannot just take the other free line. task_id = self.new_task(0) - estimates = [{ 'resource_types': {'bandwidth': max_cap}, + estimates = [{ 'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 1 }, - {'resource_types': {'bandwidth': max_cap}, + {'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 1} ] scheduler = self.new_scheduler(task_id, lambda _: estimates) @@ -185,10 +215,10 @@ class BasicSchedulerTest(SchedulerTest): # Second task must fail, because both network lines were already filled. task2_id = self.new_task(1) - estimates = [{ 'resource_types': {'bandwidth': max_cap}, + estimates = [{ 'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 1 }, - {'resource_types': {'bandwidth': max_cap}, + {'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 1} ] scheduler = self.new_scheduler(task2_id, lambda _: estimates) @@ -200,24 +230,28 @@ class BasicSchedulerTest(SchedulerTest): class StationSchedulerTest(BasicSchedulerTest): # The StationScheduler must not regress on the BasicScheduler, so we inherit all its tests - def setUp(self): - super(StationSchedulerTest, self).setUp() - self._enfore_limited_station_list() - - def _enfore_limited_station_list(self): - # for test simplicity, remove any station not in the list below - # this is safe, because we are working on a test database - LIMITED_STATION_LIST = ('CS001', 'CS002', 'RS106', 'RS205') - for rg in self.radb.getResourceGroups(): - if rg['type'] == 'station' and rg['name'] not in LIMITED_STATION_LIST: - self.radb._executeQuery("DELETE FROM virtual_instrument.resource_group rg WHERE rg.id = %s", (rg['id'],)) - self.radb.commit() - def get_specification_tree(self, task_id): - return { "task_type": "observation", "specification": { "Observation.VirtualInstrument.stationList": [] }, "station_requirements": [] } - - def new_scheduler(self, task_id, resource_estimator): - return StationScheduler(task_id, self.get_specification_tree(task_id), resource_estimator, + return { "task_type": "observation", + "specification": { "Observation.VirtualInstrument.stationList": [] }, + "station_requirements": [] } + + def new_scheduler(self, task_id, resource_estimator=None, specification_tree=None): + """overridden factory method returning a scheduler class specific for this test class. + In this case, in the StationSchedulerTest class, it returns a new StationScheduler. + + Please note that in most/all of the tests in this StationSchedulerTest test class + we explicitly use the new_station_scheduler factory method to get the specific + StationScheduler. In derived test classes, this means that we then still use a StationScheduler + and not another scheduler type via a overridden new_scheduler method. + """ + return self.new_station_scheduler(task_id, resource_estimator, specification_tree) + + def new_station_scheduler(self, task_id, resource_estimator=None, specification_tree=None): + """factory method returning a StationScheduler. + Can be overridden in derived test classes.""" + return StationScheduler(task_id, + specification_tree if specification_tree else self.get_specification_tree(task_id), + resource_estimator if resource_estimator else self.fake_resource_estimator, self.resource_availability_checker, self.radb.dbcreds) def fake_resource_estimator(self, specification_tree): @@ -228,29 +262,27 @@ class StationSchedulerTest(BasicSchedulerTest): # We don't get here without requesting stations assert stations + max_bw_cap = self.get_station_bandwidth_max_capacity() + max_storage_cap = self.get_CEP4_storage_max_capacity() + return [ - { "resource_types": {"bandwidth": 1024}, + { "resource_types": {"bandwidth": max_bw_cap }, "resource_count": 1, "station": station_name, "root_resource_group": station_name } for station_name in stations ] + [ - { "resource_types": {"storage": 512}, + { "resource_types": {"storage": 0.4*max_storage_cap}, "resource_count": 1, "root_resource_group": "CEP4" } ] - def new_station_scheduler(self, task_id, specification_tree): - """ A new scheduler for station-specific tests. """ - return StationScheduler(task_id, specification_tree, self.fake_resource_estimator, - self.resource_availability_checker, self.radb.dbcreds) - def test_expand_station_list(self): """ Test whether _expand_station_list correctly expands the station sets we defined in our FakeRADatabase. """ task_id = self.new_task(0) - scheduler = self.new_station_scheduler(task_id, self.get_specification_tree(0)) + scheduler = self.new_station_scheduler(task_id, specification_tree=self.get_specification_tree(0)) self.assertEqual(sorted(scheduler._expand_station_list("ALL")), ["CS001","CS002","RS106","RS205"]) self.assertEqual(sorted(scheduler._expand_station_list("CORE")), ["CS001","CS002"]) @@ -280,7 +312,8 @@ class StationSchedulerTest(BasicSchedulerTest): specification_tree["station_requirements"] = [ ("RS106", 1), ] task_id = self.new_task(0) - allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources() + scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree) + allocation_succesful = scheduler.allocate_resources() # Allocation must succeed self.assertTrue(allocation_succesful) @@ -295,7 +328,8 @@ class StationSchedulerTest(BasicSchedulerTest): specification_tree["station_requirements"] = [ ("ALL", 1), ] task_id = self.new_task(0) - allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources() + scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree) + allocation_succesful = scheduler.allocate_resources() # Allocation must succeed self.assertTrue(allocation_succesful) @@ -306,21 +340,22 @@ class StationSchedulerTest(BasicSchedulerTest): def test_find_zero_stations(self): """ Test whether a requirement for a zero station cannot be satisfied if no stations are left. """ - # preparation: do a first scheduling, which should succeed and claim the core stations + # preparation: do a first scheduling, which should succeed and claim the station specification_tree = self.get_specification_tree(0) - specification_tree["station_requirements"] = [ ("CS001", 1), ] + specification_tree["station_requirements"] = [ ("RS106", 1), ] task_id = self.new_task(0) - allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources() + scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed'))) - # real test, try to claim two core stations again. Should fail now. + # real test, try to claim same station again. Should fail now. specification_tree = self.get_specification_tree(0) - specification_tree["station_requirements"] = [ ("CS001", 0), ] + specification_tree["station_requirements"] = [ ("RS106", 0), ] task_id = self.new_task(1) - scheduler = self.new_station_scheduler(task_id, specification_tree) + scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree) allocation_succesful = scheduler.allocate_resources() # Allocation must fail @@ -337,7 +372,8 @@ class StationSchedulerTest(BasicSchedulerTest): specification_tree["station_requirements"] = [ ("CORE", 2), ("ALL", 4), ] task_id = self.new_task(0) - allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources() + scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree) + allocation_succesful = scheduler.allocate_resources() # Allocation must succeed self.assertTrue(allocation_succesful) @@ -352,7 +388,7 @@ class StationSchedulerTest(BasicSchedulerTest): specification_tree["station_requirements"] = [ ("CORE", 3), ] task_id = self.new_task(0) - scheduler = self.new_station_scheduler(task_id, specification_tree) + scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree) allocation_succesful = scheduler.allocate_resources() # Allocation must fail @@ -364,18 +400,19 @@ class StationSchedulerTest(BasicSchedulerTest): """ Test whether requiring too many stations (than are available) fails. """ specification_tree = self.get_specification_tree(0) - specification_tree["station_requirements"] = [ ("CORE", 2), ] + specification_tree["station_requirements"] = [ ("REMOTE", 2), ] - # preparation: do a first scheduling, which should succeed and claim the core stations + # preparation: do a first scheduling, which should succeed and claim the two remote stations task_id = self.new_task(0) - allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources() + scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) self.assertEqual(3, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed'))) - # real test, try to claim two core stations again. Should fail now. + # real test, try to claim the two remote stations again. Should fail now. task_id = self.new_task(1) - scheduler = self.new_station_scheduler(task_id, specification_tree) + scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree) allocation_succesful = scheduler.allocate_resources() self.assertFalse(allocation_succesful) @@ -393,7 +430,7 @@ class StationSchedulerTest(BasicSchedulerTest): specification_tree["station_requirements"] = [ (station_set, 2), ] task_id = self.new_task(mom_id) - scheduler = self.new_station_scheduler(task_id, specification_tree) + scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree) allocation_succesful = scheduler.allocate_resources() # Allocation must succeed @@ -407,10 +444,11 @@ class StationSchedulerTest(BasicSchedulerTest): # Two observations both requesting 2 core stations for mom_id in (0,1): specification_tree = self.get_specification_tree(mom_id) - specification_tree["station_requirements"] = [ ("CORE", 2), ] + specification_tree["station_requirements"] = [ ("REMOTE", 2), ] task_id = self.new_task(mom_id) - allocation_succesful[mom_id] = self.new_station_scheduler(task_id, specification_tree).allocate_resources() + scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree) + allocation_succesful[mom_id] = scheduler.allocate_resources() # Second allocation must fail self.assertTrue(allocation_succesful[0]) @@ -427,7 +465,8 @@ class StationSchedulerTest(BasicSchedulerTest): specification_tree["station_requirements"] = [ (station_name, 1), ] task_id = self.new_task(mom_id) - allocation_succesful[mom_id] = self.new_station_scheduler(task_id, specification_tree).allocate_resources() + scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree) + allocation_succesful[mom_id] = scheduler.allocate_resources() # Second allocation must fail self.assertTrue(allocation_succesful[0]) @@ -481,12 +520,20 @@ class PrioritySchedulerTest(StationSchedulerTest): content='', cluster='CEP4')['task_id'] - def new_scheduler(self, task_id, resource_estimator): - return PriorityScheduler(task_id, self.get_specification_tree(task_id), resource_estimator, - self.resource_availability_checker, self.radb.dbcreds) + def new_scheduler(self, task_id, resource_estimator=None, specification_tree=None): + """overridden factory method returning a scheduler class specific for this test class. + In this case, in the PrioritySchedulerTest class, it returns a new PriorityScheduler.""" + return self.new_priority_scheduler(task_id, resource_estimator, specification_tree) - def new_station_scheduler(self, task_id, specification_tree): - return PriorityScheduler(task_id, specification_tree, self.fake_resource_estimator, + def new_station_scheduler(self, task_id, resource_estimator=None, specification_tree=None): + """overridden factory method returning a scheduler class specific for this test class. + In this case, in the PrioritySchedulerTest class, it returns a new PriorityScheduler.""" + return self.new_priority_scheduler(task_id, resource_estimator, specification_tree) + + def new_priority_scheduler(self, task_id, resource_estimator=None, specification_tree=None): + return PriorityScheduler(task_id, + specification_tree if specification_tree else self.get_specification_tree(task_id), + resource_estimator if resource_estimator else self.fake_resource_estimator, self.resource_availability_checker, self.radb.dbcreds) def test_unschedule_lower_priority_future_task(self): @@ -498,14 +545,15 @@ class PrioritySchedulerTest(StationSchedulerTest): # utcnow lies before the tasks we are scheduling (the tasks lie in the future) self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0) - max_cap = self.get_cs001_max_capacity() + max_bw_cap = self.get_station_bandwidth_max_capacity() # First task must succeed (for the test the mom_id determines the prio) task_id = self.new_task(0) - estimates = [{'resource_types': {'bandwidth': max_cap}, + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 2 } ] - allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources() + scheduler = self.new_scheduler(task_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) self.assertEqual('approved', self.radb.getTask(task_id)['status']) @@ -514,11 +562,11 @@ class PrioritySchedulerTest(StationSchedulerTest): # Second task must succeed as it has a higher priority (for the test the mom_id determines the prio) task2_id = self.new_task(1000) - estimates = [{'resource_types': {'bandwidth': max_cap}, + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 2 } ] - allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources() - self.assertTrue(allocation_succesful) + scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task2_id, status='claimed'))) # First task must have been unscheduled @@ -538,22 +586,27 @@ class PrioritySchedulerTest(StationSchedulerTest): # utcnow lies before the tasks we are scheduling (the tasks lie in the future) self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0) - max_cap = self.get_cs001_max_capacity() + max_bw_cap = self.get_station_bandwidth_max_capacity() # First task must succeed - task_id = self.new_task(0) #(for the test the mom_id determines the prio) - estimates = [{'resource_types': {'bandwidth': max_cap}, - "root_resource_group": "CS001", - "resource_count": 2 } ] - allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources() + # (for the test the mom_id determines the prio) + task_id = self.new_task(0, starttime=datetime.datetime(2017, 1, 1, 12, 0, 0), + endtime=datetime.datetime(2017, 1, 1, 13, 0, 0)) + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, + "root_resource_group": "RS106", + "resource_count": 1 } ] + scheduler = self.new_scheduler(task_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) self.assertEqual('approved', self.radb.getTask(task_id)['status']) self.radb.updateTask(task_id, task_status='scheduled') self.assertEqual('scheduled', self.radb.getTask(task_id)['status']) + self.assertEqual(datetime.datetime(2017, 1, 1, 12, 0, 0), self.radb.getTask(task_id)['starttime']) + self.assertEqual(datetime.datetime(2017, 1, 1, 13, 0, 0), self.radb.getTask(task_id)['endtime']) # shift utcnow and fake that the task is running - self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 1, 10, 0) + self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 12, 10, 0) self.radb.updateTask(task_id, task_status='active') self.assertEqual('active', self.radb.getTask(task_id)['status']) @@ -561,36 +614,46 @@ class PrioritySchedulerTest(StationSchedulerTest): # start it in a minute after now # (or else it will still have overlap and conflicts with beginning of just-aborted running task) # (for the test the mom_id determines the prio) - task2_id = self.new_task(1000, starttime=datetime.datetime(2017, 1, 1, 1, 11, 0)) - estimates = [{'resource_types': {'bandwidth': max_cap}, - "root_resource_group": "CS001", - "resource_count": 2 } ] - allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources() + task2_id = self.new_task(1000, starttime=datetime.datetime(2017, 1, 1, 12, 11, 0), + endtime=datetime.datetime(2017, 1, 1, 13, 11, 0)) + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, + "root_resource_group": "RS106", + "resource_count": 1 } ] + scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) # First task must have been killed otdb_id = self.radb.getTask(task_id)["otdb_id"] self.obscontrol_mock.assert_called_with(otdb_id) - # First task must have its endtime cut short to utcnow or starttime - my_starttime = self.radb.getTask(task2_id)["starttime"] - for c in self.radb.getResourceClaims(task_ids=task_id): - self.assertLessEqual(c["endtime"], my_starttime) - self.assertEqual('claimed', c["status"]) + # First task must have its endtime cut short to utcnow + # and all claims should be ended (but still claimed) as well. + self.assertEqual(datetime.datetime(2017, 1, 1, 12, 10, 0), self.radb.getTask(task_id)['endtime']) + self.assertEqual(1, len(self.radb.getResourceClaims(task_ids=task_id))) + for claim in self.radb.getResourceClaims(task_ids=task_id): + self.assertLessEqual(claim["endtime"], datetime.datetime(2017, 1, 1, 12, 10, 0)) + self.assertEqual('claimed', claim["status"]) + + # and the starttime should still be the original + self.assertEqual(datetime.datetime(2017, 1, 1, 12, 0, 0), self.radb.getTask(task_id)['starttime']) + # and status should be aborted + self.assertEqual('aborted', self.radb.getTask(task_id)['status']) def test_do_not_unschedule_higher_priority_future_task(self): # utcnow lies before the tasks we are scheduling (the tasks lie in the future) self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0) - max_cap = self.get_cs001_max_capacity() + max_bw_cap = self.get_station_bandwidth_max_capacity() # First task must succeed (for the test the mom_id determines the prio) task_id = self.new_task(1000) - estimates = [{'resource_types': {'bandwidth': max_cap}, + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 2 } ] - allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources() + scheduler = self.new_scheduler(task_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) self.assertEqual('approved', self.radb.getTask(task_id)['status']) @@ -599,10 +662,11 @@ class PrioritySchedulerTest(StationSchedulerTest): # Second task must succeed as it has a higher priority (for the test the mom_id determines the prio) task2_id = self.new_task(0) #(for the test the mom_id determines the prio) - estimates = [{'resource_types': {'bandwidth': max_cap}, + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 2 } ] - allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources() + scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertFalse(allocation_succesful) # the second (low-prio) task could not be scheduled @@ -622,14 +686,15 @@ class PrioritySchedulerTest(StationSchedulerTest): # utcnow lies before the tasks we are scheduling (the tasks lie in the future) self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0) - max_cap = self.get_cs001_max_capacity() + max_bw_cap = self.get_station_bandwidth_max_capacity() # First (task must succeed) task_id = self.new_task(1000) #(for the test the mom_id determines the prio) - estimates = [{'resource_types': {'bandwidth': max_cap}, + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 2 } ] - allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources() + scheduler = self.new_scheduler(task_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) self.assertEqual('approved', self.radb.getTask(task_id)['status']) @@ -646,10 +711,11 @@ class PrioritySchedulerTest(StationSchedulerTest): # (or else it will still have overlap and conflicts with beginning of just-aborted running task) # (for the test the mom_id determines the prio) task2_id = self.new_task(0, starttime=datetime.datetime(2017, 1, 1, 1, 11, 0)) - estimates = [{'resource_types': {'bandwidth': max_cap}, + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 2 } ] - allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources() + scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertFalse(allocation_succesful) # the second (low-prio) task could not be scheduled @@ -666,14 +732,15 @@ class PrioritySchedulerTest(StationSchedulerTest): def test_not_unschedule_equal_priority(self): """ Whether two tasks that fit individually but not together get rejected priorities do not allow an override. """ - max_cap = self.get_cs001_max_capacity() + max_bw_cap = self.get_station_bandwidth_max_capacity() # First task must succeed task1_id = self.new_task(1) #mom_id=1 and mom_id=0 yield equal priorities - estimates = [{'resource_types': {'bandwidth': max_cap}, + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 2 } ] - allocation_succesful = self.new_scheduler(task1_id, lambda _: estimates).allocate_resources() + scheduler = self.new_scheduler(task1_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) self.assertEqual('approved', self.radb.getTask(task1_id)['status']) @@ -682,10 +749,11 @@ class PrioritySchedulerTest(StationSchedulerTest): # Second task must fail as it has a lower priority task2_id = self.new_task(0) #mom_id=1 and mom_id=0 yield equal priorities - estimates = [{'resource_types': {'bandwidth': max_cap}, + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 2 } ] - allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources() + scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertFalse(allocation_succesful) self.assertEqual('scheduled', self.radb.getTask(task1_id)['status']) @@ -700,18 +768,20 @@ class PrioritySchedulerTest(StationSchedulerTest): # utcnow lies before the tasks we are scheduling (the tasks lie in the future) self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0) - max_cap = self.get_cs001_max_capacity() + max_bw_cap = self.get_station_bandwidth_max_capacity() # First task must succeed (for the test the mom_id determines the prio) task_id = self.new_task(0) - estimates = [{'resource_types': {'bandwidth': 0.25*max_cap}, + estimates = [{'resource_types': {'bandwidth': 0.25*max_bw_cap}, "root_resource_group": "CS001", "resource_count": 1 }, - {'resource_types': {'bandwidth': 0.25 * max_cap}, + {'resource_types': {'bandwidth': 0.25 * max_bw_cap}, "root_resource_group": "CS001", "resource_count": 1} ] - allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources() + + scheduler = self.new_scheduler(task_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) self.assertEqual('approved', self.radb.getTask(task_id)['status']) @@ -720,14 +790,15 @@ class PrioritySchedulerTest(StationSchedulerTest): # Second task must succeed as it has a higher priority (for the test the mom_id determines the prio) task2_id = self.new_task(1000) - estimates = [{'resource_types': {'bandwidth': 0.25*max_cap}, + estimates = [{'resource_types': {'bandwidth': 0.25*max_bw_cap}, "root_resource_group": "CS001", "resource_count": 1 }, - {'resource_types': {'bandwidth': 0.95 * max_cap}, + {'resource_types': {'bandwidth': 0.95 * max_bw_cap}, "root_resource_group": "CS001", "resource_count": 1} ] - allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources() + scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task2_id, status='claimed'))) @@ -740,21 +811,23 @@ class PrioritySchedulerTest(StationSchedulerTest): self.assertEqual('conflict', self.radb.getTask(task_id)['status']) def test_should_not_kill_a_task_without_a_mom_id(self): - max_cap = self.get_cs001_max_capacity() + max_bw_cap = self.get_station_bandwidth_max_capacity() # First task must succeed task_id = self.new_task_without_momid(0) - estimates = [{'resource_types': {'bandwidth': max_cap}, - "root_resource_group": "CS001", - "resource_count": 2 }] - allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources() + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, + "root_resource_group": "RS106", + "resource_count": 1 }] + scheduler = self.new_scheduler(task_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) task2_id = self.new_task(1000) - estimates = [{'resource_types': {'bandwidth': max_cap}, - "root_resource_group": "CS001", - "resource_count": 2 }] - allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources() + estimates = [{'resource_types': {'bandwidth': max_bw_cap}, + "root_resource_group": "RS106", + "resource_count": 1 }] + scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertFalse(allocation_succesful) self.obscontrol_mock.assert_not_called() @@ -775,24 +848,35 @@ class DwellSchedulerTest(PrioritySchedulerTest): super(DwellSchedulerTest, self).setUp() self.resource_availability_checker = DwellSchedulerTest.TestResourceAvailabilityChecker(self.radb) - def new_scheduler(self, task_id, resource_estimator): - return DwellScheduler(task_id, self.get_specification_tree(task_id), resource_estimator, - datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime - datetime.datetime(2017, 1, 1, 1, 0, 0), # maxstarttime - datetime.timedelta(hours=1), # duration - self.resource_availability_checker, self.radb.dbcreds) - - def new_station_scheduler(self, task_id, specification_tree): - return DwellScheduler(task_id, specification_tree, self.fake_resource_estimator, - datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime - datetime.datetime(2017, 1, 1, 1, 0, 0), # maxstarttime - datetime.timedelta(hours=1), # duration - self.resource_availability_checker, self.radb.dbcreds) - - def new_dwell_scheduler(self, task_id, resource_estimator): - return DwellScheduler(task_id, self.get_specification_tree(task_id), resource_estimator, - datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime - datetime.datetime(2017, 1, 2, 1, 0, 0), # maxstarttime + def new_scheduler(self, task_id, resource_estimator=None, specification_tree=None): + """overridden factory method returning a scheduler class specific for this test class. + In this case, in the DwellSchedulerTest class, it returns a new DwellScheduler.""" + return self.new_dwell_scheduler(task_id, resource_estimator, specification_tree, allow_dwelling=False) + + def new_station_scheduler(self, task_id, resource_estimator=None, specification_tree=None): + """overridden factory method returning a scheduler class specific for this test class. + In this case, in the DwellSchedulerTest class, it returns a new DwellScheduler.""" + return self.new_dwell_scheduler(task_id, resource_estimator, specification_tree, allow_dwelling=False) + + def new_priority_scheduler(self, task_id, resource_estimator=None, specification_tree=None): + """overridden factory method returning a scheduler class specific for this test class. + In this case, in the DwellSchedulerTest class, it returns a new DwellScheduler.""" + return self.new_dwell_scheduler(task_id, resource_estimator, specification_tree, allow_dwelling=False) + + def new_dwell_scheduler(self, task_id, resource_estimator=None, specification_tree=None, allow_dwelling=True): + if allow_dwelling: + min_starttime = datetime.datetime(2017, 1, 1, 1, 0, 0) + max_starttime = datetime.datetime(2017, 1, 2, 1, 0, 0) + else: + # we do not want dwelling, so limit the dwell starttime window to the task's actual starttime. + min_starttime = self.radb.getTask(task_id)['starttime'] + max_starttime = min_starttime + + return DwellScheduler(task_id, + specification_tree if specification_tree else self.get_specification_tree(task_id), + resource_estimator if resource_estimator else self.fake_resource_estimator, + min_starttime, + max_starttime, datetime.timedelta(hours=1), # duration self.resource_availability_checker, self.radb.dbcreds) @@ -804,31 +888,36 @@ class DwellSchedulerTest(PrioritySchedulerTest): estimates = [{ 'resource_types': {'bandwidth': 512}, "root_resource_group": "CS001", "resource_count": 1 } ] - allocation_succesful = self.new_dwell_scheduler(task_id, lambda _: estimates).allocate_resources() + scheduler = self.new_dwell_scheduler(task_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) - # Task must NOT have been moved - self.assertEqual(self.radb.getTask(task_id)["starttime"], datetime.datetime(2017, 1, 1, 1, 0, 0)) + # Task must be positioned at start of dwelling period. + task = self.radb.getTask(task_id) + self.assertEqual(scheduler.min_starttime, task["starttime"]) + self.assertEqual(scheduler.min_starttime+scheduler.duration, task["endtime"]) def test_dwell(self): """ Whether a task will dwell after an existing task. """ - max_cap = self.get_cs001_max_capacity() + max_bw_cap = self.get_station_bandwidth_max_capacity() # First task must succeed task1_id = self.new_task(0) - estimates = [{ 'resource_types': {'bandwidth': max_cap}, + estimates = [{ 'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 2 }] - allocation_succesful = self.new_dwell_scheduler(task1_id, lambda _: estimates).allocate_resources() + scheduler = self.new_dwell_scheduler(task1_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) # Second task must also succeed task2_id = self.new_task(1) - estimates = [{ 'resource_types': {'bandwidth': max_cap}, + estimates = [{ 'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 2 }] - allocation_succesful = self.new_dwell_scheduler(task2_id, lambda _: estimates).allocate_resources() + scheduler = self.new_dwell_scheduler(task2_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) # Second task must have been moved, first task not @@ -840,14 +929,16 @@ class DwellSchedulerTest(PrioritySchedulerTest): def test_dwell_respect_claim_endtime(self): """ Whether a dwelling task will honour the claim endtimes, instead of the task endtime. """ - max_cap = self.get_cs001_max_capacity() + max_bw_cap = self.get_station_bandwidth_max_capacity() # First task must succeed task1_id = self.new_task(0) - estimates = [{ 'resource_types': {'bandwidth': max_cap}, + estimates = [{ 'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 2 }] - allocation_succesful = self.new_dwell_scheduler(task1_id, lambda _: estimates).allocate_resources() + # use normal basic scheduler for first normal task, which we want to schedule in a normal (non-dwell) way. + scheduler = self.new_basic_scheduler(task1_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task1_id, status='claimed'))) @@ -858,10 +949,11 @@ class DwellSchedulerTest(PrioritySchedulerTest): # Second task must also succeed task2_id = self.new_task(1) - estimates = [{ 'resource_types': {'bandwidth': max_cap}, + estimates = [{ 'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 2 }] - allocation_succesful = self.new_dwell_scheduler(task2_id, lambda _: estimates).allocate_resources() + scheduler = self.new_dwell_scheduler(task2_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) # Second task must have been moved beyond 1st claim endtime, first task not @@ -879,28 +971,30 @@ class DwellSchedulerTest(PrioritySchedulerTest): that list get subtracted from the list handed to the resource_availability checker. This test verifies that the complete list should be provided on the second try. """ - max_cap = self.get_cs001_max_capacity() + max_bw_cap = self.get_station_bandwidth_max_capacity() # First task must succeed task1_id = self.new_task(0) - estimates = [{ 'resource_types': {'bandwidth': max_cap}, + estimates = [{ 'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 2 }] - allocation_succesful = self.new_dwell_scheduler(task1_id, lambda _: estimates).allocate_resources() + scheduler = self.new_dwell_scheduler(task1_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) # Second task must also succeed task2_id = self.new_task(1) - estimates = [{ 'resource_types': {'bandwidth': max_cap}, + estimates = [{ 'resource_types': {'bandwidth': max_bw_cap}, "root_resource_group": "CS001", "resource_count": 2 }] - allocation_succesful = self.new_dwell_scheduler(task2_id, lambda _: estimates).allocate_resources() + scheduler = self.new_dwell_scheduler(task2_id, resource_estimator=lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) # avialable resources can be limited by tracking unkillable resources. They should be # cleared on the second try like in this test. - self.assertEqual(set(self.resource_availability_checker.last_available_resources), - set(self.radb.getResources(include_availability=True))) + self.assertEqual(set(r['name'] for r in self.resource_availability_checker.last_available_resources), + set(r['name'] for r in self.radb.getResources(include_availability=True))) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)