Skip to content
Snippets Groups Projects
Commit 1f5df8cd authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-426: fixed scheduler tests

parent 372c1ed2
No related branches found
No related tags found
No related merge requests found
......@@ -377,11 +377,12 @@ class ResourceAvailabilityChecker(object):
res_group = self.resource_group_relations[gids[i]]
for rid in res_group['resource_ids']:
if rid in available_recources:
type_id = available_recources[rid]['type_id']
if type_id in needed_resources_by_type_id and available_recources[rid]['active'] and \
available_recources[rid]['available_capacity'] > 0:
resources[type_id] = available_recources[rid]
type_ids_seen.add(type_id)
available_recource = available_recources[rid]
type_id = available_recource['type_id']
if type_id in needed_resources_by_type_id and available_recource['active']:
if available_recource['available_capacity'] > 0:
resources[type_id] = available_recource
type_ids_seen.add(type_id)
else:
logger.debug("requested resource id %s is not available/claimable", rid)
......
from datetime import datetime, timedelta
from copy import deepcopy
from lofar.common.cache import cache
......@@ -111,8 +112,11 @@ class BasicScheduler(object):
allocation_successful = True
except ScheduleException, e:
logger.exception("BasicScheduler: scheduling threw exception: %s", e)
logger.exception("%s: scheduling threw ScheduleException: %s", self.__class__.__name__, e)
self._handle_schedule_exception()
except Exception, e:
logger.exception("%s: scheduling threw unhandled exception: %s", self.__class__.__name__, e)
raise
return allocation_successful
......@@ -414,13 +418,17 @@ class StationScheduler(BasicScheduler):
wanted_estimates = self._get_station_estimates()
# Try to schedule all of the stations.
remaining_estimates = self._schedule_resources(wanted_estimates, available_resources, need_all=False)
# make a (deep)copy of available_resources and use that,
# because _schedule_resources modifies the available_capacity of the tested wanted stations.
# we rollback the radb later in this method, so we should keep the original available_resources intact.
available_resources_copy = deepcopy(available_resources)
remaining_estimates = self._schedule_resources(wanted_estimates, available_resources_copy, need_all=False)
# See if our allocation meets the minimal criteria. Note that some stations may be partially allocated,
# we do not count those as claimable.
unclaimable_stations = set([e["station"] for e in remaining_estimates])
if not self._requirements_satisfied_without(expanded_requirements, unclaimable_stations):
raise ScheduleException("Could not allocate enough stations")
raise ScheduleException("Could not allocate enough stations. unclaimable_stations=%s" % (unclaimable_stations,))
allocated_stations = set([e["station"] for e in wanted_estimates if e not in remaining_estimates])
......@@ -635,7 +643,7 @@ class PriorityScheduler(StationScheduler):
"""
if conflict_claim["resource_type_id"] == self.resource_availability_checker.resource_types['storage']:
raise ScheduleException("Could not resolve conflict on storage resource")
raise ScheduleException("Cannot resolve conflict on storage resource")
# find all conflicting claims & which tasks they belong to
conflicting_claims, conflicting_tasks = self._get_conflicting_claims_and_tasks(conflict_claim)
......
......@@ -31,6 +31,8 @@ from lofar.sas.resourceassignment.resourceassigner.schedulers import StationSche
from lofar.sas.resourceassignment.resourceassigner.schedulers import PriorityScheduler
from lofar.sas.resourceassignment.resourceassigner.schedulers import DwellScheduler
from lofar.sas.resourceassignment.database.radb import _FETCH_ONE
import logging
logger = logging.getLogger(__name__)
......@@ -48,6 +50,20 @@ class SchedulerTest(radb_common_testing.RADBCommonTest):
def setUp(self):
super(SchedulerTest, self).setUp()
self.resource_availability_checker = ResourceAvailabilityChecker(self.radb)
self._enforce_limited_station_group_list()
def _enforce_limited_station_group_list(self):
# for test simplicity, create a simple virtual instrument which makes debugging easier.
# this is safe, because we are working on a test database
LIMITED_STATION_GROUP_LIST = ('CS001', 'CS002', 'RS106', 'RS205')
unwanted_resource_group_ids = [rg['id'] for rg in self.radb.getResourceGroups()
if rg['type'] == 'station' and rg['name'] not in LIMITED_STATION_GROUP_LIST]
self.radb._executeQuery("DELETE FROM virtual_instrument.resource_group rg WHERE rg.id in (%s)" % (
', '.join([str(id) for id in unwanted_resource_group_ids])),)
self.radb.commit()
class BasicSchedulerTest(SchedulerTest):
......@@ -56,14 +72,15 @@ class BasicSchedulerTest(SchedulerTest):
insert a new test specification and task into the testing radb
:param mom_otdb_id: optional mom/otdb id
:param starttime: optional starttime if None, then datetime(2017, 1, 1, 1, 0, 0) is used
:param starttime: optional endtime if None, then datetime(2017, 1, 1, 2, 0, 0) is used
:param endtime: optional endtime if None, then datetime(2017, 1, 1, 2, 0, 0) is used
:return: the new radb's task id
"""
if starttime is None:
starttime = datetime.datetime(2017, 1, 1, 1, 0, 0)
if endtime is None:
endtime = datetime.datetime(2017, 1, 1, 2, 0, 0),
endtime = datetime.datetime(2017, 1, 1, 2, 0, 0)
return self.radb.insertSpecificationAndTask(mom_id=mom_otdb_id,
otdb_id=mom_otdb_id,
......@@ -77,10 +94,28 @@ class BasicSchedulerTest(SchedulerTest):
def get_specification_tree(self, task_id):
return {}
def new_scheduler(self, task_id, resource_estimator):
return BasicScheduler(task_id, self.get_specification_tree(task_id), resource_estimator,
def new_scheduler(self, task_id, resource_estimator=None, specification_tree=None):
"""factory method returning a scheduler class specific for this test class.
In this case, in the BasicSchedulerTest class, it returns a new BasicScheduler."""
return self.new_basic_scheduler(task_id, resource_estimator, specification_tree)
def new_basic_scheduler(self, task_id, resource_estimator=None, specification_tree=None):
"""return a new BasicScheduler"""
return BasicScheduler(task_id,
specification_tree if specification_tree else self.get_specification_tree(task_id),
resource_estimator if resource_estimator else lambda _:[],
self.resource_availability_checker, self.radb.dbcreds)
def get_station_bandwidth_max_capacity(self):
resource_CS001bw0 = [r for r in self.radb.getResources(resource_types="bandwidth", include_availability=True)
if r['name']=='CS001bw0'][0]
return resource_CS001bw0['total_capacity']
def get_CEP4_storage_max_capacity(self):
resource_cep4_storage = [r for r in self.radb.getResources(resource_types="storage", include_availability=True)
if r['name']=='CEP4_storage:/data'][0]
return resource_cep4_storage['total_capacity']
def test_schedule_task(self):
""" Whether a task (that fits) can be scheduled. """
......@@ -159,24 +194,19 @@ class BasicSchedulerTest(SchedulerTest):
self.assertFalse(scheduler.radb.committed)
self.assertTrue(scheduler.radb.rolled_back)
def get_cs001_max_capacity(self):
resource_CS001bw0 = [r for r in self.radb.getResources(resource_types="bandwidth", include_availability=True)
if r['name']=='CS001bw0'][0]
return resource_CS001bw0['total_capacity']
def test_schedule_two_tasks_too_large_task(self):
""" Whether two tasks that fit individually but not together will be rejected by the scheduler. """
max_cap = self.get_cs001_max_capacity()
max_bw_cap = self.get_station_bandwidth_max_capacity()
# First task must succeed
# we claim two bandwidth resources because CS001 has two network lines
# they should both be claimed, so that the next task cannot just take the other free line.
task_id = self.new_task(0)
estimates = [{ 'resource_types': {'bandwidth': max_cap},
estimates = [{ 'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 1 },
{'resource_types': {'bandwidth': max_cap},
{'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 1} ]
scheduler = self.new_scheduler(task_id, lambda _: estimates)
......@@ -185,10 +215,10 @@ class BasicSchedulerTest(SchedulerTest):
# Second task must fail, because both network lines were already filled.
task2_id = self.new_task(1)
estimates = [{ 'resource_types': {'bandwidth': max_cap},
estimates = [{ 'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 1 },
{'resource_types': {'bandwidth': max_cap},
{'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 1} ]
scheduler = self.new_scheduler(task2_id, lambda _: estimates)
......@@ -200,24 +230,28 @@ class BasicSchedulerTest(SchedulerTest):
class StationSchedulerTest(BasicSchedulerTest):
# The StationScheduler must not regress on the BasicScheduler, so we inherit all its tests
def setUp(self):
super(StationSchedulerTest, self).setUp()
self._enfore_limited_station_list()
def _enfore_limited_station_list(self):
# for test simplicity, remove any station not in the list below
# this is safe, because we are working on a test database
LIMITED_STATION_LIST = ('CS001', 'CS002', 'RS106', 'RS205')
for rg in self.radb.getResourceGroups():
if rg['type'] == 'station' and rg['name'] not in LIMITED_STATION_LIST:
self.radb._executeQuery("DELETE FROM virtual_instrument.resource_group rg WHERE rg.id = %s", (rg['id'],))
self.radb.commit()
def get_specification_tree(self, task_id):
return { "task_type": "observation", "specification": { "Observation.VirtualInstrument.stationList": [] }, "station_requirements": [] }
def new_scheduler(self, task_id, resource_estimator):
return StationScheduler(task_id, self.get_specification_tree(task_id), resource_estimator,
return { "task_type": "observation",
"specification": { "Observation.VirtualInstrument.stationList": [] },
"station_requirements": [] }
def new_scheduler(self, task_id, resource_estimator=None, specification_tree=None):
"""overridden factory method returning a scheduler class specific for this test class.
In this case, in the StationSchedulerTest class, it returns a new StationScheduler.
Please note that in most/all of the tests in this StationSchedulerTest test class
we explicitly use the new_station_scheduler factory method to get the specific
StationScheduler. In derived test classes, this means that we then still use a StationScheduler
and not another scheduler type via a overridden new_scheduler method.
"""
return self.new_station_scheduler(task_id, resource_estimator, specification_tree)
def new_station_scheduler(self, task_id, resource_estimator=None, specification_tree=None):
"""factory method returning a StationScheduler.
Can be overridden in derived test classes."""
return StationScheduler(task_id,
specification_tree if specification_tree else self.get_specification_tree(task_id),
resource_estimator if resource_estimator else self.fake_resource_estimator,
self.resource_availability_checker, self.radb.dbcreds)
def fake_resource_estimator(self, specification_tree):
......@@ -228,29 +262,27 @@ class StationSchedulerTest(BasicSchedulerTest):
# We don't get here without requesting stations
assert stations
max_bw_cap = self.get_station_bandwidth_max_capacity()
max_storage_cap = self.get_CEP4_storage_max_capacity()
return [
{ "resource_types": {"bandwidth": 1024},
{ "resource_types": {"bandwidth": max_bw_cap },
"resource_count": 1,
"station": station_name,
"root_resource_group": station_name
} for station_name in stations
] + [
{ "resource_types": {"storage": 512},
{ "resource_types": {"storage": 0.4*max_storage_cap},
"resource_count": 1,
"root_resource_group": "CEP4"
}
]
def new_station_scheduler(self, task_id, specification_tree):
""" A new scheduler for station-specific tests. """
return StationScheduler(task_id, specification_tree, self.fake_resource_estimator,
self.resource_availability_checker, self.radb.dbcreds)
def test_expand_station_list(self):
""" Test whether _expand_station_list correctly expands the station sets we defined in our FakeRADatabase. """
task_id = self.new_task(0)
scheduler = self.new_station_scheduler(task_id, self.get_specification_tree(0))
scheduler = self.new_station_scheduler(task_id, specification_tree=self.get_specification_tree(0))
self.assertEqual(sorted(scheduler._expand_station_list("ALL")), ["CS001","CS002","RS106","RS205"])
self.assertEqual(sorted(scheduler._expand_station_list("CORE")), ["CS001","CS002"])
......@@ -280,7 +312,8 @@ class StationSchedulerTest(BasicSchedulerTest):
specification_tree["station_requirements"] = [ ("RS106", 1), ]
task_id = self.new_task(0)
allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources()
scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree)
allocation_succesful = scheduler.allocate_resources()
# Allocation must succeed
self.assertTrue(allocation_succesful)
......@@ -295,7 +328,8 @@ class StationSchedulerTest(BasicSchedulerTest):
specification_tree["station_requirements"] = [ ("ALL", 1), ]
task_id = self.new_task(0)
allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources()
scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree)
allocation_succesful = scheduler.allocate_resources()
# Allocation must succeed
self.assertTrue(allocation_succesful)
......@@ -306,21 +340,22 @@ class StationSchedulerTest(BasicSchedulerTest):
def test_find_zero_stations(self):
""" Test whether a requirement for a zero station cannot be satisfied if no stations are left. """
# preparation: do a first scheduling, which should succeed and claim the core stations
# preparation: do a first scheduling, which should succeed and claim the station
specification_tree = self.get_specification_tree(0)
specification_tree["station_requirements"] = [ ("CS001", 1), ]
specification_tree["station_requirements"] = [ ("RS106", 1), ]
task_id = self.new_task(0)
allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources()
scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed')))
# real test, try to claim two core stations again. Should fail now.
# real test, try to claim same station again. Should fail now.
specification_tree = self.get_specification_tree(0)
specification_tree["station_requirements"] = [ ("CS001", 0), ]
specification_tree["station_requirements"] = [ ("RS106", 0), ]
task_id = self.new_task(1)
scheduler = self.new_station_scheduler(task_id, specification_tree)
scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree)
allocation_succesful = scheduler.allocate_resources()
# Allocation must fail
......@@ -337,7 +372,8 @@ class StationSchedulerTest(BasicSchedulerTest):
specification_tree["station_requirements"] = [ ("CORE", 2), ("ALL", 4), ]
task_id = self.new_task(0)
allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources()
scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree)
allocation_succesful = scheduler.allocate_resources()
# Allocation must succeed
self.assertTrue(allocation_succesful)
......@@ -352,7 +388,7 @@ class StationSchedulerTest(BasicSchedulerTest):
specification_tree["station_requirements"] = [ ("CORE", 3), ]
task_id = self.new_task(0)
scheduler = self.new_station_scheduler(task_id, specification_tree)
scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree)
allocation_succesful = scheduler.allocate_resources()
# Allocation must fail
......@@ -364,18 +400,19 @@ class StationSchedulerTest(BasicSchedulerTest):
""" Test whether requiring too many stations (than are available) fails. """
specification_tree = self.get_specification_tree(0)
specification_tree["station_requirements"] = [ ("CORE", 2), ]
specification_tree["station_requirements"] = [ ("REMOTE", 2), ]
# preparation: do a first scheduling, which should succeed and claim the core stations
# preparation: do a first scheduling, which should succeed and claim the two remote stations
task_id = self.new_task(0)
allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources()
scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
self.assertEqual(3, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed')))
# real test, try to claim two core stations again. Should fail now.
# real test, try to claim the two remote stations again. Should fail now.
task_id = self.new_task(1)
scheduler = self.new_station_scheduler(task_id, specification_tree)
scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree)
allocation_succesful = scheduler.allocate_resources()
self.assertFalse(allocation_succesful)
......@@ -393,7 +430,7 @@ class StationSchedulerTest(BasicSchedulerTest):
specification_tree["station_requirements"] = [ (station_set, 2), ]
task_id = self.new_task(mom_id)
scheduler = self.new_station_scheduler(task_id, specification_tree)
scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree)
allocation_succesful = scheduler.allocate_resources()
# Allocation must succeed
......@@ -407,10 +444,11 @@ class StationSchedulerTest(BasicSchedulerTest):
# Two observations both requesting 2 core stations
for mom_id in (0,1):
specification_tree = self.get_specification_tree(mom_id)
specification_tree["station_requirements"] = [ ("CORE", 2), ]
specification_tree["station_requirements"] = [ ("REMOTE", 2), ]
task_id = self.new_task(mom_id)
allocation_succesful[mom_id] = self.new_station_scheduler(task_id, specification_tree).allocate_resources()
scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree)
allocation_succesful[mom_id] = scheduler.allocate_resources()
# Second allocation must fail
self.assertTrue(allocation_succesful[0])
......@@ -427,7 +465,8 @@ class StationSchedulerTest(BasicSchedulerTest):
specification_tree["station_requirements"] = [ (station_name, 1), ]
task_id = self.new_task(mom_id)
allocation_succesful[mom_id] = self.new_station_scheduler(task_id, specification_tree).allocate_resources()
scheduler = self.new_station_scheduler(task_id, specification_tree=specification_tree)
allocation_succesful[mom_id] = scheduler.allocate_resources()
# Second allocation must fail
self.assertTrue(allocation_succesful[0])
......@@ -481,12 +520,20 @@ class PrioritySchedulerTest(StationSchedulerTest):
content='',
cluster='CEP4')['task_id']
def new_scheduler(self, task_id, resource_estimator):
return PriorityScheduler(task_id, self.get_specification_tree(task_id), resource_estimator,
self.resource_availability_checker, self.radb.dbcreds)
def new_scheduler(self, task_id, resource_estimator=None, specification_tree=None):
"""overridden factory method returning a scheduler class specific for this test class.
In this case, in the PrioritySchedulerTest class, it returns a new PriorityScheduler."""
return self.new_priority_scheduler(task_id, resource_estimator, specification_tree)
def new_station_scheduler(self, task_id, specification_tree):
return PriorityScheduler(task_id, specification_tree, self.fake_resource_estimator,
def new_station_scheduler(self, task_id, resource_estimator=None, specification_tree=None):
"""overridden factory method returning a scheduler class specific for this test class.
In this case, in the PrioritySchedulerTest class, it returns a new PriorityScheduler."""
return self.new_priority_scheduler(task_id, resource_estimator, specification_tree)
def new_priority_scheduler(self, task_id, resource_estimator=None, specification_tree=None):
return PriorityScheduler(task_id,
specification_tree if specification_tree else self.get_specification_tree(task_id),
resource_estimator if resource_estimator else self.fake_resource_estimator,
self.resource_availability_checker, self.radb.dbcreds)
def test_unschedule_lower_priority_future_task(self):
......@@ -498,14 +545,15 @@ class PrioritySchedulerTest(StationSchedulerTest):
# utcnow lies before the tasks we are scheduling (the tasks lie in the future)
self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0)
max_cap = self.get_cs001_max_capacity()
max_bw_cap = self.get_station_bandwidth_max_capacity()
# First task must succeed (for the test the mom_id determines the prio)
task_id = self.new_task(0)
estimates = [{'resource_types': {'bandwidth': max_cap},
estimates = [{'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 2 } ]
allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources()
scheduler = self.new_scheduler(task_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
self.assertEqual('approved', self.radb.getTask(task_id)['status'])
......@@ -514,11 +562,11 @@ class PrioritySchedulerTest(StationSchedulerTest):
# Second task must succeed as it has a higher priority (for the test the mom_id determines the prio)
task2_id = self.new_task(1000)
estimates = [{'resource_types': {'bandwidth': max_cap},
estimates = [{'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 2 } ]
allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources()
self.assertTrue(allocation_succesful)
scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task2_id, status='claimed')))
# First task must have been unscheduled
......@@ -538,22 +586,27 @@ class PrioritySchedulerTest(StationSchedulerTest):
# utcnow lies before the tasks we are scheduling (the tasks lie in the future)
self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0)
max_cap = self.get_cs001_max_capacity()
max_bw_cap = self.get_station_bandwidth_max_capacity()
# First task must succeed
task_id = self.new_task(0) #(for the test the mom_id determines the prio)
estimates = [{'resource_types': {'bandwidth': max_cap},
"root_resource_group": "CS001",
"resource_count": 2 } ]
allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources()
# (for the test the mom_id determines the prio)
task_id = self.new_task(0, starttime=datetime.datetime(2017, 1, 1, 12, 0, 0),
endtime=datetime.datetime(2017, 1, 1, 13, 0, 0))
estimates = [{'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "RS106",
"resource_count": 1 } ]
scheduler = self.new_scheduler(task_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
self.assertEqual('approved', self.radb.getTask(task_id)['status'])
self.radb.updateTask(task_id, task_status='scheduled')
self.assertEqual('scheduled', self.radb.getTask(task_id)['status'])
self.assertEqual(datetime.datetime(2017, 1, 1, 12, 0, 0), self.radb.getTask(task_id)['starttime'])
self.assertEqual(datetime.datetime(2017, 1, 1, 13, 0, 0), self.radb.getTask(task_id)['endtime'])
# shift utcnow and fake that the task is running
self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 1, 10, 0)
self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 12, 10, 0)
self.radb.updateTask(task_id, task_status='active')
self.assertEqual('active', self.radb.getTask(task_id)['status'])
......@@ -561,36 +614,46 @@ class PrioritySchedulerTest(StationSchedulerTest):
# start it in a minute after now
# (or else it will still have overlap and conflicts with beginning of just-aborted running task)
# (for the test the mom_id determines the prio)
task2_id = self.new_task(1000, starttime=datetime.datetime(2017, 1, 1, 1, 11, 0))
estimates = [{'resource_types': {'bandwidth': max_cap},
"root_resource_group": "CS001",
"resource_count": 2 } ]
allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources()
task2_id = self.new_task(1000, starttime=datetime.datetime(2017, 1, 1, 12, 11, 0),
endtime=datetime.datetime(2017, 1, 1, 13, 11, 0))
estimates = [{'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "RS106",
"resource_count": 1 } ]
scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
# First task must have been killed
otdb_id = self.radb.getTask(task_id)["otdb_id"]
self.obscontrol_mock.assert_called_with(otdb_id)
# First task must have its endtime cut short to utcnow or starttime
my_starttime = self.radb.getTask(task2_id)["starttime"]
for c in self.radb.getResourceClaims(task_ids=task_id):
self.assertLessEqual(c["endtime"], my_starttime)
self.assertEqual('claimed', c["status"])
# First task must have its endtime cut short to utcnow
# and all claims should be ended (but still claimed) as well.
self.assertEqual(datetime.datetime(2017, 1, 1, 12, 10, 0), self.radb.getTask(task_id)['endtime'])
self.assertEqual(1, len(self.radb.getResourceClaims(task_ids=task_id)))
for claim in self.radb.getResourceClaims(task_ids=task_id):
self.assertLessEqual(claim["endtime"], datetime.datetime(2017, 1, 1, 12, 10, 0))
self.assertEqual('claimed', claim["status"])
# and the starttime should still be the original
self.assertEqual(datetime.datetime(2017, 1, 1, 12, 0, 0), self.radb.getTask(task_id)['starttime'])
# and status should be aborted
self.assertEqual('aborted', self.radb.getTask(task_id)['status'])
def test_do_not_unschedule_higher_priority_future_task(self):
# utcnow lies before the tasks we are scheduling (the tasks lie in the future)
self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0)
max_cap = self.get_cs001_max_capacity()
max_bw_cap = self.get_station_bandwidth_max_capacity()
# First task must succeed (for the test the mom_id determines the prio)
task_id = self.new_task(1000)
estimates = [{'resource_types': {'bandwidth': max_cap},
estimates = [{'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 2 } ]
allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources()
scheduler = self.new_scheduler(task_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
self.assertEqual('approved', self.radb.getTask(task_id)['status'])
......@@ -599,10 +662,11 @@ class PrioritySchedulerTest(StationSchedulerTest):
# Second task must succeed as it has a higher priority (for the test the mom_id determines the prio)
task2_id = self.new_task(0) #(for the test the mom_id determines the prio)
estimates = [{'resource_types': {'bandwidth': max_cap},
estimates = [{'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 2 } ]
allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources()
scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertFalse(allocation_succesful)
# the second (low-prio) task could not be scheduled
......@@ -622,14 +686,15 @@ class PrioritySchedulerTest(StationSchedulerTest):
# utcnow lies before the tasks we are scheduling (the tasks lie in the future)
self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0)
max_cap = self.get_cs001_max_capacity()
max_bw_cap = self.get_station_bandwidth_max_capacity()
# First (task must succeed)
task_id = self.new_task(1000) #(for the test the mom_id determines the prio)
estimates = [{'resource_types': {'bandwidth': max_cap},
estimates = [{'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 2 } ]
allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources()
scheduler = self.new_scheduler(task_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
self.assertEqual('approved', self.radb.getTask(task_id)['status'])
......@@ -646,10 +711,11 @@ class PrioritySchedulerTest(StationSchedulerTest):
# (or else it will still have overlap and conflicts with beginning of just-aborted running task)
# (for the test the mom_id determines the prio)
task2_id = self.new_task(0, starttime=datetime.datetime(2017, 1, 1, 1, 11, 0))
estimates = [{'resource_types': {'bandwidth': max_cap},
estimates = [{'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 2 } ]
allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources()
scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertFalse(allocation_succesful)
# the second (low-prio) task could not be scheduled
......@@ -666,14 +732,15 @@ class PrioritySchedulerTest(StationSchedulerTest):
def test_not_unschedule_equal_priority(self):
""" Whether two tasks that fit individually but not together get rejected priorities do not allow an override. """
max_cap = self.get_cs001_max_capacity()
max_bw_cap = self.get_station_bandwidth_max_capacity()
# First task must succeed
task1_id = self.new_task(1) #mom_id=1 and mom_id=0 yield equal priorities
estimates = [{'resource_types': {'bandwidth': max_cap},
estimates = [{'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 2 } ]
allocation_succesful = self.new_scheduler(task1_id, lambda _: estimates).allocate_resources()
scheduler = self.new_scheduler(task1_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
self.assertEqual('approved', self.radb.getTask(task1_id)['status'])
......@@ -682,10 +749,11 @@ class PrioritySchedulerTest(StationSchedulerTest):
# Second task must fail as it has a lower priority
task2_id = self.new_task(0) #mom_id=1 and mom_id=0 yield equal priorities
estimates = [{'resource_types': {'bandwidth': max_cap},
estimates = [{'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 2 } ]
allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources()
scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertFalse(allocation_succesful)
self.assertEqual('scheduled', self.radb.getTask(task1_id)['status'])
......@@ -700,18 +768,20 @@ class PrioritySchedulerTest(StationSchedulerTest):
# utcnow lies before the tasks we are scheduling (the tasks lie in the future)
self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0)
max_cap = self.get_cs001_max_capacity()
max_bw_cap = self.get_station_bandwidth_max_capacity()
# First task must succeed (for the test the mom_id determines the prio)
task_id = self.new_task(0)
estimates = [{'resource_types': {'bandwidth': 0.25*max_cap},
estimates = [{'resource_types': {'bandwidth': 0.25*max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 1 },
{'resource_types': {'bandwidth': 0.25 * max_cap},
{'resource_types': {'bandwidth': 0.25 * max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 1}
]
allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources()
scheduler = self.new_scheduler(task_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
self.assertEqual('approved', self.radb.getTask(task_id)['status'])
......@@ -720,14 +790,15 @@ class PrioritySchedulerTest(StationSchedulerTest):
# Second task must succeed as it has a higher priority (for the test the mom_id determines the prio)
task2_id = self.new_task(1000)
estimates = [{'resource_types': {'bandwidth': 0.25*max_cap},
estimates = [{'resource_types': {'bandwidth': 0.25*max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 1 },
{'resource_types': {'bandwidth': 0.95 * max_cap},
{'resource_types': {'bandwidth': 0.95 * max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 1}
]
allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources()
scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task2_id, status='claimed')))
......@@ -740,21 +811,23 @@ class PrioritySchedulerTest(StationSchedulerTest):
self.assertEqual('conflict', self.radb.getTask(task_id)['status'])
def test_should_not_kill_a_task_without_a_mom_id(self):
max_cap = self.get_cs001_max_capacity()
max_bw_cap = self.get_station_bandwidth_max_capacity()
# First task must succeed
task_id = self.new_task_without_momid(0)
estimates = [{'resource_types': {'bandwidth': max_cap},
"root_resource_group": "CS001",
"resource_count": 2 }]
allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources()
estimates = [{'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "RS106",
"resource_count": 1 }]
scheduler = self.new_scheduler(task_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
task2_id = self.new_task(1000)
estimates = [{'resource_types': {'bandwidth': max_cap},
"root_resource_group": "CS001",
"resource_count": 2 }]
allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources()
estimates = [{'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "RS106",
"resource_count": 1 }]
scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertFalse(allocation_succesful)
self.obscontrol_mock.assert_not_called()
......@@ -775,24 +848,35 @@ class DwellSchedulerTest(PrioritySchedulerTest):
super(DwellSchedulerTest, self).setUp()
self.resource_availability_checker = DwellSchedulerTest.TestResourceAvailabilityChecker(self.radb)
def new_scheduler(self, task_id, resource_estimator):
return DwellScheduler(task_id, self.get_specification_tree(task_id), resource_estimator,
datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime
datetime.datetime(2017, 1, 1, 1, 0, 0), # maxstarttime
datetime.timedelta(hours=1), # duration
self.resource_availability_checker, self.radb.dbcreds)
def new_station_scheduler(self, task_id, specification_tree):
return DwellScheduler(task_id, specification_tree, self.fake_resource_estimator,
datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime
datetime.datetime(2017, 1, 1, 1, 0, 0), # maxstarttime
datetime.timedelta(hours=1), # duration
self.resource_availability_checker, self.radb.dbcreds)
def new_dwell_scheduler(self, task_id, resource_estimator):
return DwellScheduler(task_id, self.get_specification_tree(task_id), resource_estimator,
datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime
datetime.datetime(2017, 1, 2, 1, 0, 0), # maxstarttime
def new_scheduler(self, task_id, resource_estimator=None, specification_tree=None):
"""overridden factory method returning a scheduler class specific for this test class.
In this case, in the DwellSchedulerTest class, it returns a new DwellScheduler."""
return self.new_dwell_scheduler(task_id, resource_estimator, specification_tree, allow_dwelling=False)
def new_station_scheduler(self, task_id, resource_estimator=None, specification_tree=None):
"""overridden factory method returning a scheduler class specific for this test class.
In this case, in the DwellSchedulerTest class, it returns a new DwellScheduler."""
return self.new_dwell_scheduler(task_id, resource_estimator, specification_tree, allow_dwelling=False)
def new_priority_scheduler(self, task_id, resource_estimator=None, specification_tree=None):
"""overridden factory method returning a scheduler class specific for this test class.
In this case, in the DwellSchedulerTest class, it returns a new DwellScheduler."""
return self.new_dwell_scheduler(task_id, resource_estimator, specification_tree, allow_dwelling=False)
def new_dwell_scheduler(self, task_id, resource_estimator=None, specification_tree=None, allow_dwelling=True):
if allow_dwelling:
min_starttime = datetime.datetime(2017, 1, 1, 1, 0, 0)
max_starttime = datetime.datetime(2017, 1, 2, 1, 0, 0)
else:
# we do not want dwelling, so limit the dwell starttime window to the task's actual starttime.
min_starttime = self.radb.getTask(task_id)['starttime']
max_starttime = min_starttime
return DwellScheduler(task_id,
specification_tree if specification_tree else self.get_specification_tree(task_id),
resource_estimator if resource_estimator else self.fake_resource_estimator,
min_starttime,
max_starttime,
datetime.timedelta(hours=1), # duration
self.resource_availability_checker, self.radb.dbcreds)
......@@ -804,31 +888,36 @@ class DwellSchedulerTest(PrioritySchedulerTest):
estimates = [{ 'resource_types': {'bandwidth': 512},
"root_resource_group": "CS001",
"resource_count": 1 } ]
allocation_succesful = self.new_dwell_scheduler(task_id, lambda _: estimates).allocate_resources()
scheduler = self.new_dwell_scheduler(task_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
# Task must NOT have been moved
self.assertEqual(self.radb.getTask(task_id)["starttime"], datetime.datetime(2017, 1, 1, 1, 0, 0))
# Task must be positioned at start of dwelling period.
task = self.radb.getTask(task_id)
self.assertEqual(scheduler.min_starttime, task["starttime"])
self.assertEqual(scheduler.min_starttime+scheduler.duration, task["endtime"])
def test_dwell(self):
""" Whether a task will dwell after an existing task. """
max_cap = self.get_cs001_max_capacity()
max_bw_cap = self.get_station_bandwidth_max_capacity()
# First task must succeed
task1_id = self.new_task(0)
estimates = [{ 'resource_types': {'bandwidth': max_cap},
estimates = [{ 'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 2 }]
allocation_succesful = self.new_dwell_scheduler(task1_id, lambda _: estimates).allocate_resources()
scheduler = self.new_dwell_scheduler(task1_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
# Second task must also succeed
task2_id = self.new_task(1)
estimates = [{ 'resource_types': {'bandwidth': max_cap},
estimates = [{ 'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 2 }]
allocation_succesful = self.new_dwell_scheduler(task2_id, lambda _: estimates).allocate_resources()
scheduler = self.new_dwell_scheduler(task2_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
# Second task must have been moved, first task not
......@@ -840,14 +929,16 @@ class DwellSchedulerTest(PrioritySchedulerTest):
def test_dwell_respect_claim_endtime(self):
""" Whether a dwelling task will honour the claim endtimes, instead of the task endtime. """
max_cap = self.get_cs001_max_capacity()
max_bw_cap = self.get_station_bandwidth_max_capacity()
# First task must succeed
task1_id = self.new_task(0)
estimates = [{ 'resource_types': {'bandwidth': max_cap},
estimates = [{ 'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 2 }]
allocation_succesful = self.new_dwell_scheduler(task1_id, lambda _: estimates).allocate_resources()
# use normal basic scheduler for first normal task, which we want to schedule in a normal (non-dwell) way.
scheduler = self.new_basic_scheduler(task1_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task1_id, status='claimed')))
......@@ -858,10 +949,11 @@ class DwellSchedulerTest(PrioritySchedulerTest):
# Second task must also succeed
task2_id = self.new_task(1)
estimates = [{ 'resource_types': {'bandwidth': max_cap},
estimates = [{ 'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 2 }]
allocation_succesful = self.new_dwell_scheduler(task2_id, lambda _: estimates).allocate_resources()
scheduler = self.new_dwell_scheduler(task2_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
# Second task must have been moved beyond 1st claim endtime, first task not
......@@ -879,28 +971,30 @@ class DwellSchedulerTest(PrioritySchedulerTest):
that list get subtracted from the list handed to the resource_availability checker.
This test verifies that the complete list should be provided on the second try.
"""
max_cap = self.get_cs001_max_capacity()
max_bw_cap = self.get_station_bandwidth_max_capacity()
# First task must succeed
task1_id = self.new_task(0)
estimates = [{ 'resource_types': {'bandwidth': max_cap},
estimates = [{ 'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 2 }]
allocation_succesful = self.new_dwell_scheduler(task1_id, lambda _: estimates).allocate_resources()
scheduler = self.new_dwell_scheduler(task1_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
# Second task must also succeed
task2_id = self.new_task(1)
estimates = [{ 'resource_types': {'bandwidth': max_cap},
estimates = [{ 'resource_types': {'bandwidth': max_bw_cap},
"root_resource_group": "CS001",
"resource_count": 2 }]
allocation_succesful = self.new_dwell_scheduler(task2_id, lambda _: estimates).allocate_resources()
scheduler = self.new_dwell_scheduler(task2_id, resource_estimator=lambda _: estimates)
allocation_succesful = scheduler.allocate_resources()
self.assertTrue(allocation_succesful)
# avialable resources can be limited by tracking unkillable resources. They should be
# cleared on the second try like in this test.
self.assertEqual(set(self.resource_availability_checker.last_available_resources),
set(self.radb.getResources(include_availability=True)))
self.assertEqual(set(r['name'] for r in self.resource_availability_checker.last_available_resources),
set(r['name'] for r in self.radb.getResources(include_availability=True)))
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment