diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py index 94a43b5c872a56fec7f6466aa01b92b79baad94c..cfe1ed60725992803aab7810e588afe58896caab 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py @@ -21,11 +21,9 @@ import unittest import mock - import datetime -from copy import deepcopy -from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import CouldNotFindClaimException +from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import ResourceAvailabilityChecker, CouldNotFindClaimException from lofar.sas.resourceassignment.resourceassigner.schedulers import ScheduleException from lofar.sas.resourceassignment.resourceassigner.schedulers import BasicScheduler @@ -36,372 +34,191 @@ from lofar.sas.resourceassignment.resourceassigner.schedulers import DwellSchedu import logging logger = logging.getLogger(__name__) -class FakeRADatabase(object): - """ Mimic an RA Database, assuming claims overlap fully or not at all. """ - - def __init__(self, resource_capacity): - # database - self.tasks = {} - self.claims = {} - self.next_claim_id = 0 - - # cache committed state here - self.committed_tasks = {} - self.committed_claims = {} - - # maximum capacity of our resource - self.resource_capacity = resource_capacity - - self.resources = [{"id": x} for x in xrange(6)] - - def addTask(self, id, task): - self.tasks[id] = task - self.tasks[id]["id"] = id - self.tasks[id]["specification_id"] = id - self.claims[id] = [] - - self.committed = False - self.rolled_back = False - - def _fits(self, claim): - usage = 0 - resource_id = claim["resource_id"] - - for claims in self.claims.values(): - for c in claims: - overlap_in_time = claim["starttime"] < c["endtime"] and claim["endtime"] > c["starttime"] - overlap_in_resource = c["resource_id"] == resource_id - - if c["status"] != "conflict" and \ - c["id"] != claim.get("id",None) and \ - overlap_in_resource and \ - overlap_in_time: - usage += c["claim_size"] - - return usage + claim["claim_size"] <= self.resource_capacity - - """ Methods to mock radb. """ - - def getTask(self, id): - return self.tasks[id] - - def getTasks(self, task_ids): - return [t for id, t in self.tasks.iteritems() if id in task_ids] - - def updateSpecification(self, specification_id, starttime=None, endtime=None, content=None, cluster=None, - commit=True): - - for task_id, task in self.tasks.iteritems(): - if self.tasks[task_id]["specification_id"] == specification_id: - if starttime is not None: - self.tasks[task_id]["starttime"] = starttime - if endtime is not None: - self.tasks[task_id]["endtime"] = endtime - - return True - - def getResources(self, *args, **kwargs): - # we model six resources, can expand if needed - return self.resources - - def getResourceGroupMemberships(self): - # We model 4 stations: 2 remote, and 2 core - station_groups = { - 100: { - "resource_group_id": 100, - "resource_group_name": "ALL", - "resource_group_type": "", - "child_ids": [101, 102] - }, - 101: { - "resource_group_id": 101, - "resource_group_name": "CORE", - "resource_group_type": "", - "child_ids": [1, 2] - }, - 102: { - "resource_group_id": 102, - "resource_group_name": "REMOTE", - "resource_group_type": "", - "child_ids": [3, 4] - } - } - - def station_name(nr): - if nr < 3: - return "CS%03d" % nr - else: - return "RS%03d" % nr - - stations = { - station_nr: { - "resource_group_id": station_nr, - "resource_group_name": station_name(station_nr), - "resource_group_type": "station", - "child_ids": [], - } for station_nr in xrange(1,5) - } - - resources = station_groups; - resources.update(stations) - - return {"groups": resources} - - def getResourceClaims(self, task_ids, status, extended): - for tid in task_ids: - assert tid in self.tasks - assert tid in self.claims - - return [claim for tid in task_ids for claim in self.claims[tid] if claim["status"] == status] - - def deleteResourceClaims(self, claim_ids, commit): - logger.info("Deleting claims %s", claim_ids) - - for tid in self.claims: - self.claims[tid] = [c for c in self.claims[tid] if c["id"] not in claim_ids] - - def updateResourceClaims(self, where_task_ids, status, commit): - # this is what we support - assert status == "claimed" - - for task_id in where_task_ids: - # can't update conflict claims to claimed - for c in self.claims[task_id]: - if c["status"] != "tentative": - return False - - # update statusses - for c in self.claims[task_id]: - c["status"] = "claimed" - - return True - - def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, **kwargs): - if starttime: - logger.info("Setting starttime of task %s to %s", task_id, starttime) - - self.tasks[task_id]["starttime"] = starttime - - for c in self.claims[task_id]: - c["starttime"] = starttime - - if endtime: - logger.info("Setting endtime of task %s to %s", task_id, endtime) - - self.tasks[task_id]["endtime"] = endtime - - for c in self.claims[task_id]: - c["endtime"] = endtime - - def insertResourceClaims(self, task_id, claims, *args, **kwargs): - for c in claims: - # check whether tasks do not get two claims of the same resource - assert c["resource_id"] not in [d["resource_id"] for d in self.claims[task_id]], "Resource %s claimed twice by task %s" % (c["resource_id"], task_id) - - # derive claim status - c["status"] = "tentative" if self._fits(c) else "conflict" - - # assign ids - c["task_id"] = task_id - c["id"] = self.next_claim_id - self.next_claim_id += 1 - - # add it to our claim list - self.claims[task_id].append(c) - - claim_ids = [c["id"] for c in claims] - logger.info("Added claims %s", claim_ids) - - return claim_ids - - def get_overlapping_claims(self, claim_id, claim_status="claimed"): - overlapping_claims = [] - - logger.info('get_overlapping_claims(claim_id=%s, claim_status=%s) self.claims content:', claim_id, claim_status) - for claim_id, claim_value in self.claims.iteritems(): - logger.info('%s: %s', claim_id, claim_value) - - # all claims overlap - claims_for_id = self.claims[claim_id] - for claim in claims_for_id: - overlapping_claims += [c for _, claims in self.claims.iteritems() for c in claims if - # overlap in space - c["resource_id"] == claim["resource_id"] and - # "conflict" claims do not actually claim resources - c["status"] == claim_status and - # be antireflexive - c["id"] != claim["id"]] - - return overlapping_claims - - def commit(self): - logger.info("Commit") - - self.rolled_back = False - self.committed = True - self.committed_claims = deepcopy(self.claims) - self.committed_tasks = deepcopy(self.tasks) - - def rollback(self): - logger.info("Rollback") - - self.rolled_back = True - self.claims = deepcopy(self.committed_claims) - self.tasks = deepcopy(self.committed_tasks) - - -class FakeResourceAvailabilityChecker(object): - resource_types = { - "storage": 0, - "bandwidth": 1} +import radb_common_testing - requested_resources = [] - available_resources = [] +def setUpModule(): + return radb_common_testing.setUpModule() - def get_is_claimable(self, requested_resources, available_resources): - self.requested_resources = requested_resources - self.available_resources = available_resources +def tearDownModule(): + return radb_common_testing.tearDownModule() - if not available_resources: - raise CouldNotFindClaimException - - # fullfil one request at a time to keep the code simple. We map it on - # the first available resource - r = requested_resources[0] - - # use resource 0, or resource #stationnr - rid = int(r["station"][2:]) if "station" in r else available_resources[0]["id"] - if rid not in [x["id"] for x in available_resources]: - raise CouldNotFindClaimException - - rtype = r["resource_types"].keys()[0] - return [{ - 'requested_resources': [r], - 'claim_size': r["resource_types"][rtype], - 'resource_id': rid, - 'resource_type_id': self.resource_types[rtype] - }] - - -class SchedulerTest(unittest.TestCase): - """ Setup mechanics to use a FakeRADatabase and FakeResourceAvailabilityChecker to simulate a system with - one resource at one point in time. """ - - def mock_ra_database(self): - self.fake_ra_database = FakeRADatabase(resource_capacity=1024) - - ra_database_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.RADatabase') - self.addCleanup(ra_database_patcher.stop) - self.ra_database_mock = ra_database_patcher.start() - self.ra_database_mock.return_value = self.fake_ra_database - - def mock_resource_availability_checker(self): - self.fake_resource_availability_checker = FakeResourceAvailabilityChecker() +class SchedulerTest(radb_common_testing.RADBCommonTest): + """ create test radb postgres instance, and use that in a ResourceAvailabilityChecker""" def setUp(self): - self.mock_ra_database() - self.mock_resource_availability_checker() + super(SchedulerTest, self).setUp() + self.resource_availability_checker = ResourceAvailabilityChecker(self.radb) class BasicSchedulerTest(SchedulerTest): - def new_task(self, task_id): - self.fake_ra_database.addTask(task_id, { - "starttime": datetime.datetime(2017, 1, 1, 1, 0, 0), - "endtime": datetime.datetime(2017, 1, 1, 2, 0, 0), - }) + def new_task(self, mom_otdb_id=0, starttime=None, endtime=None): + """ + insert a new test specification and task into the testing radb + :param mom_otdb_id: optional mom/otdb id + :param starttime: optional starttime if None, then datetime(2017, 1, 1, 1, 0, 0) is used + :param starttime: optional endtime if None, then datetime(2017, 1, 1, 2, 0, 0) is used + :return: the new radb's task id + """ + if starttime is None: + starttime = datetime.datetime(2017, 1, 1, 1, 0, 0) + + if endtime is None: + endtime = datetime.datetime(2017, 1, 1, 2, 0, 0), - self.fake_ra_database.commit() - self.fake_ra_database.committed = False # dont confuse subsequent checks on whether the scheduler committed + return self.radb.insertSpecificationAndTask(mom_id=mom_otdb_id, + otdb_id=mom_otdb_id, + task_status='approved', + task_type='observation', + starttime=starttime, + endtime=endtime, + content='', + cluster='CEP4')['task_id'] def get_specification_tree(self, task_id): return {} def new_scheduler(self, task_id, resource_estimator): - return BasicScheduler(task_id, self.get_specification_tree(task_id), resource_estimator, self.fake_resource_availability_checker, None) + return BasicScheduler(task_id, self.get_specification_tree(task_id), resource_estimator, + self.resource_availability_checker, self.radb.dbcreds) def test_schedule_task(self): """ Whether a task (that fits) can be scheduled. """ # Resources we need - self.new_task(0) - estimates = [{ 'resource_types': {'bandwidth': 512} }] - allocation_succesful = self.new_scheduler(0, lambda _: estimates).allocate_resources() + task_id = self.new_task(0) + estimates = [{ 'resource_types': {'bandwidth': 512}, + "root_resource_group": "CS001", + "resource_count": 1 } ] + scheduler = self.new_scheduler(task_id, lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() # Allocation must succeed and be committed self.assertTrue(allocation_succesful) - self.assertTrue(self.fake_ra_database.committed) - self.assertFalse(self.fake_ra_database.rolled_back) + self.assertTrue(scheduler.radb.committed) + self.assertFalse(scheduler.radb.rolled_back) # Claim must be present in database - claims = self.fake_ra_database.claims[0] + claims = self.radb.getResourceClaims(task_ids=task_id, extended=True) self.assertTrue(claims) self.assertEqual(len(claims), 1) # Claim must be valid claim = claims[0] - task = self.fake_ra_database.tasks[0] + task = self.radb.getTask(task_id) - self.assertEqual(claim["status"], "claimed") - self.assertEqual(claim["starttime"], task["starttime"]) - self.assertEqual(claim["endtime"], task["endtime"]) - self.assertEqual(claim["claim_size"], 512) - self.assertEqual(claim["resource_type_id"], FakeResourceAvailabilityChecker.resource_types["bandwidth"]) + self.assertEqual(claim["status"], "claimed") + self.assertEqual(claim["starttime"], task["starttime"]) + self.assertEqual(claim["endtime"], task["endtime"]) + self.assertEqual(claim["claim_size"], 512) + self.assertEqual(claim["resource_type_name"], "bandwidth") def test_multiple_resources(self): """ Whether a task (that fits) can be scheduled. """ # Resources we need - self.new_task(0) - estimates = [{ 'resource_types': {'bandwidth': 512} }, - { 'resource_types': {'bandwidth': 512} }] - allocation_succesful = self.new_scheduler(0, lambda _: estimates).allocate_resources() + task_id = self.new_task(0) + estimates = [{ 'resource_types': {'bandwidth': 512}, + "root_resource_group": "CS001", + "resource_count": 1 }, + {'resource_types': {'bandwidth': 512}, + "root_resource_group": "CS002", + "resource_count": 1} ] + + scheduler = self.new_scheduler(task_id, lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() + self.assertTrue(scheduler.radb.committed) + self.assertFalse(scheduler.radb.rolled_back) # Allocation must succeed self.assertTrue(allocation_succesful) + # Claim must be present in database + claims = self.radb.getResourceClaims(task_ids=task_id, extended=True) + self.assertTrue(claims) + self.assertEqual(len(claims), 2) + def test_schedule_too_large_task(self): """ Whether a task with too large claims will be rejected by the scheduler. """ # Resources we need - self.new_task(0) - estimates = [{ 'resource_types': {'bandwidth': 2048} }] - allocation_succesful = self.new_scheduler(0, lambda _: estimates).allocate_resources() + task_id = self.new_task(0) + estimates = [{ 'resource_types': {'bandwidth': 1e99}, + "root_resource_group": "CS001", + "resource_count": 1 } ] + scheduler = self.new_scheduler(task_id, lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() if self.__class__ == BasicSchedulerTest: # This inheritence of test is not ideal # Allocation must fail, and commit called so we get a conflicted state self.assertFalse(allocation_succesful) - self.assertTrue(self.fake_ra_database.committed) - self.assertFalse(self.fake_ra_database.rolled_back) + self.assertTrue(scheduler.radb.committed) + self.assertFalse(scheduler.radb.rolled_back) else: # Allocation must fail, and rollback called self.assertFalse(allocation_succesful) - self.assertFalse(self.fake_ra_database.committed) - self.assertTrue(self.fake_ra_database.rolled_back) + self.assertFalse(scheduler.radb.committed) + self.assertTrue(scheduler.radb.rolled_back) + + def get_cs001_max_capacity(self): + resource_CS001bw0 = [r for r in self.radb.getResources(resource_types="bandwidth", include_availability=True) + if r['name']=='CS001bw0'][0] + return resource_CS001bw0['total_capacity'] def test_schedule_two_tasks_too_large_task(self): """ Whether two tasks that fit individually but not together will be rejected by the scheduler. """ + max_cap = self.get_cs001_max_capacity() + # First task must succeed - self.new_task(0) - estimates = [{ 'resource_types': {'bandwidth': 512} }] - allocation_succesful = self.new_scheduler(0, lambda _: estimates).allocate_resources() + # we claim two bandwidth resources because CS001 has two network lines + # they should both be claimed, so that the next task cannot just take the other free line. + task_id = self.new_task(0) + estimates = [{ 'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 1 }, + {'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 1} ] + scheduler = self.new_scheduler(task_id, lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertTrue(allocation_succesful) - # Second task must fail - self.new_task(1) - estimates = [{ 'resource_types': {'bandwidth': 513} }] - allocation_succesful = self.new_scheduler(1, lambda _: estimates).allocate_resources() + # Second task must fail, because both network lines were already filled. + task2_id = self.new_task(1) + estimates = [{ 'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 1 }, + {'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 1} ] + scheduler = self.new_scheduler(task2_id, lambda _: estimates) + allocation_succesful = scheduler.allocate_resources() self.assertFalse(allocation_succesful) + class StationSchedulerTest(BasicSchedulerTest): # The StationScheduler must not regress on the BasicScheduler, so we inherit all its tests + def setUp(self): + super(StationSchedulerTest, self).setUp() + self._enfore_limited_station_list() + + def _enfore_limited_station_list(self): + # for test simplicity, remove any station not in the list below + # this is safe, because we are working on a test database + LIMITED_STATION_LIST = ('CS001', 'CS002', 'RS106', 'RS205') + for rg in self.radb.getResourceGroups(): + if rg['type'] == 'station' and rg['name'] not in LIMITED_STATION_LIST: + self.radb._executeQuery("DELETE FROM virtual_instrument.resource_group rg WHERE rg.id = %s", (rg['id'],)) + self.radb.commit() + def get_specification_tree(self, task_id): return { "task_type": "observation", "specification": { "Observation.VirtualInstrument.stationList": [] }, "station_requirements": [] } def new_scheduler(self, task_id, resource_estimator): - return StationScheduler(task_id, self.get_specification_tree(task_id), resource_estimator, self.fake_resource_availability_checker, None) + return StationScheduler(task_id, self.get_specification_tree(task_id), resource_estimator, + self.resource_availability_checker, self.radb.dbcreds) def fake_resource_estimator(self, specification_tree): """ Return an estimate for each station, plus a fixed storage claim of half the available storage capacity. """ @@ -415,28 +232,29 @@ class StationSchedulerTest(BasicSchedulerTest): { "resource_types": {"bandwidth": 1024}, "resource_count": 1, "station": station_name, - "root_resource_group": "%sLBA" % (station_name,) + "root_resource_group": station_name } for station_name in stations ] + [ { "resource_types": {"storage": 512}, "resource_count": 1, - } + "root_resource_group": "CEP4" + } ] def new_station_scheduler(self, task_id, specification_tree): """ A new scheduler for station-specific tests. """ - - return StationScheduler(task_id, specification_tree, self.fake_resource_estimator, FakeResourceAvailabilityChecker(), None) + return StationScheduler(task_id, specification_tree, self.fake_resource_estimator, + self.resource_availability_checker, self.radb.dbcreds) def test_expand_station_list(self): """ Test whether _expand_station_list correctly expands the station sets we defined in our FakeRADatabase. """ - self.new_task(0) - scheduler = self.new_station_scheduler(0, self.get_specification_tree(0)) + task_id = self.new_task(0) + scheduler = self.new_station_scheduler(task_id, self.get_specification_tree(0)) - self.assertEqual(sorted(scheduler._expand_station_list("ALL")), ["CS001","CS002","RS003","RS004"]) + self.assertEqual(sorted(scheduler._expand_station_list("ALL")), ["CS001","CS002","RS106","RS205"]) self.assertEqual(sorted(scheduler._expand_station_list("CORE")), ["CS001","CS002"]) - self.assertEqual(sorted(scheduler._expand_station_list("REMOTE")), ["RS003","RS004"]) + self.assertEqual(sorted(scheduler._expand_station_list("REMOTE")), ["RS106","RS205"]) self.assertEqual(sorted(scheduler._expand_station_list("CS002")), ["CS002"]) with self.assertRaises(ScheduleException): @@ -459,16 +277,16 @@ class StationSchedulerTest(BasicSchedulerTest): """ Test whether a requirement for a single station can be satisfied. """ specification_tree = self.get_specification_tree(0) - specification_tree["station_requirements"] = [ ("RS003", 1), ] + specification_tree["station_requirements"] = [ ("RS106", 1), ] - self.new_task(0) - allocation_succesful = self.new_station_scheduler(0, specification_tree).allocate_resources() + task_id = self.new_task(0) + allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources() # Allocation must succeed self.assertTrue(allocation_succesful) # The specified station must be allocated, plus storage claim - self.assertTrue(len(self.fake_ra_database.claims[0]) == 2) + self.assertTrue(len(self.radb.getResourceClaims(task_ids=task_id, status='claimed')) == 2) def test_find_any_station(self): """ Test whether a requirement for a single station can be satisfied. """ @@ -476,39 +294,41 @@ class StationSchedulerTest(BasicSchedulerTest): specification_tree = self.get_specification_tree(0) specification_tree["station_requirements"] = [ ("ALL", 1), ] - self.new_task(0) - allocation_succesful = self.new_station_scheduler(0, specification_tree).allocate_resources() + task_id = self.new_task(0) + allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources() # Allocation must succeed self.assertTrue(allocation_succesful) # All 4 stations must be allocated (allocation is greedy), plus storage claim - self.assertTrue(len(self.fake_ra_database.claims[0]) == 5) + self.assertTrue(len(self.radb.getResourceClaims(task_ids=task_id, status='claimed')) == 5) def test_find_zero_stations(self): """ Test whether a requirement for a zero station cannot be satisfied if no stations are left. """ + # preparation: do a first scheduling, which should succeed and claim the core stations specification_tree = self.get_specification_tree(0) - specification_tree["station_requirements"] = [ ("CS001", 0), ] + specification_tree["station_requirements"] = [ ("CS001", 1), ] + task_id = self.new_task(0) + allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources() - self.new_task(0) - task = self.fake_ra_database.tasks[0] + self.assertTrue(allocation_succesful) + self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed'))) - # allocate CS001 by hand - self.fake_ra_database.claims["hidden"] = [{ - "id": "hidden", - "resource_id": 1, - "claim_size": 1024, - "starttime": task["starttime"], - "endtime": task["endtime"], - "status": "claimed", - "task_id": "hidden" - }] + # real test, try to claim two core stations again. Should fail now. + specification_tree = self.get_specification_tree(0) + specification_tree["station_requirements"] = [ ("CS001", 0), ] - allocation_succesful = self.new_station_scheduler(0, specification_tree).allocate_resources() + task_id = self.new_task(1) + scheduler = self.new_station_scheduler(task_id, specification_tree) + allocation_succesful = scheduler.allocate_resources() - # Allocation must succeed + # Allocation must fail self.assertFalse(allocation_succesful) + self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed'))) + self.assertFalse(scheduler.radb.committed) + self.assertTrue(scheduler.radb.rolled_back) + def test_find_overlap_stations(self): """ Test whether requirements for overlapping station sets can be satisfied. """ @@ -516,14 +336,14 @@ class StationSchedulerTest(BasicSchedulerTest): specification_tree = self.get_specification_tree(0) specification_tree["station_requirements"] = [ ("CORE", 2), ("ALL", 4), ] - self.new_task(0) - allocation_succesful = self.new_station_scheduler(0, specification_tree).allocate_resources() + task_id = self.new_task(0) + allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources() # Allocation must succeed self.assertTrue(allocation_succesful) # All 4 stations must be allocated (allocation is greedy), plus storage claim - self.assertTrue(len(self.fake_ra_database.claims[0]) == 5) + self.assertTrue(len(self.radb.getResourceClaims(task_ids=task_id, status='claimed')) == 5) def test_require_too_many_stations(self): """ Test whether requiring too many stations (than exist) fails. """ @@ -531,13 +351,14 @@ class StationSchedulerTest(BasicSchedulerTest): specification_tree = self.get_specification_tree(0) specification_tree["station_requirements"] = [ ("CORE", 3), ] - self.new_task(0) - allocation_succesful = self.new_station_scheduler(0, specification_tree).allocate_resources() + task_id = self.new_task(0) + scheduler = self.new_station_scheduler(task_id, specification_tree) + allocation_succesful = scheduler.allocate_resources() # Allocation must fail self.assertFalse(allocation_succesful) - self.assertFalse(self.fake_ra_database.committed) - self.assertTrue(self.fake_ra_database.rolled_back) + self.assertFalse(scheduler.radb.committed) + self.assertTrue(scheduler.radb.rolled_back) def test_require_more_stations_than_available(self): """ Test whether requiring too many stations (than are available) fails. """ @@ -545,57 +366,51 @@ class StationSchedulerTest(BasicSchedulerTest): specification_tree = self.get_specification_tree(0) specification_tree["station_requirements"] = [ ("CORE", 2), ] - self.new_task(0) - task = self.fake_ra_database.tasks[0] + # preparation: do a first scheduling, which should succeed and claim the core stations + task_id = self.new_task(0) + allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources() - # allocate CS001 by hand - self.fake_ra_database.claims["hidden"] = [{ - "id": "hidden", - "resource_id": 1, - "claim_size": 1024, - "starttime": task["starttime"], - "endtime": task["endtime"], - "status": "claimed", - "task_id": "hidden" - }] - - self.fake_ra_database.commit() - self.fake_ra_database.committed = False # dont confuse subsequent checks on whether the scheduler committed + self.assertTrue(allocation_succesful) + self.assertEqual(3, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed'))) - # try to allocate our task - allocation_succesful = self.new_station_scheduler(0, specification_tree).allocate_resources() + # real test, try to claim two core stations again. Should fail now. + task_id = self.new_task(1) + scheduler = self.new_station_scheduler(task_id, specification_tree) + allocation_succesful = scheduler.allocate_resources() - # Allocation must fail self.assertFalse(allocation_succesful) - self.assertFalse(self.fake_ra_database.committed) - self.assertTrue(self.fake_ra_database.rolled_back) + self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed'))) + self.assertFalse(scheduler.radb.committed) + self.assertTrue(scheduler.radb.rolled_back) + def test_2obs_coexist(self): """ Test whether 2 obs requiring different station sets can be scheduled in parallel. """ - for task_id in (0,1): - station_set = "CORE" if task_id == 0 else "REMOTE" - specification_tree = self.get_specification_tree(task_id) + for mom_id in (0,1): + station_set = "CORE" if mom_id == 0 else "REMOTE" + specification_tree = self.get_specification_tree(mom_id) specification_tree["station_requirements"] = [ (station_set, 2), ] - self.new_task(task_id) - allocation_succesful = self.new_station_scheduler(task_id, specification_tree).allocate_resources() + task_id = self.new_task(mom_id) + scheduler = self.new_station_scheduler(task_id, specification_tree) + allocation_succesful = scheduler.allocate_resources() # Allocation must succeed self.assertTrue(allocation_succesful) - self.assertTrue(len(self.fake_ra_database.claims[task_id]) == 3) # 2 stations + 1 storage claim + self.assertTrue(len(self.radb.getResourceClaims(task_ids=task_id, status='claimed')) == 3) # 2 stations + 1 storage claim def test_2obs_no_fit(self): """ Test whether 2 obs requiring station sets from the same set will conflict. """ allocation_succesful = {} # Two observations both requesting 2 core stations - for task_id in (0,1): - specification_tree = self.get_specification_tree(task_id) + for mom_id in (0,1): + specification_tree = self.get_specification_tree(mom_id) specification_tree["station_requirements"] = [ ("CORE", 2), ] - self.new_task(task_id) - allocation_succesful[task_id] = self.new_station_scheduler(task_id, specification_tree).allocate_resources() + task_id = self.new_task(mom_id) + allocation_succesful[mom_id] = self.new_station_scheduler(task_id, specification_tree).allocate_resources() # Second allocation must fail self.assertTrue(allocation_succesful[0]) @@ -606,13 +421,13 @@ class StationSchedulerTest(BasicSchedulerTest): allocation_succesful = {} # Two observations both requesting 2 core stations - for task_id in (0,1,2): - station_name = { 0: "CS001", 1: "CS002", 2: "RS003" }[task_id] - specification_tree = self.get_specification_tree(task_id) + for mom_id in (0,1,2): + station_name = { 0: "CS001", 1: "CS002", 2: "RS106" }[mom_id] + specification_tree = self.get_specification_tree(mom_id) specification_tree["station_requirements"] = [ (station_name, 1), ] - self.new_task(task_id) - allocation_succesful[task_id] = self.new_station_scheduler(task_id, specification_tree).allocate_resources() + task_id = self.new_task(mom_id) + allocation_succesful[mom_id] = self.new_station_scheduler(task_id, specification_tree).allocate_resources() # Second allocation must fail self.assertTrue(allocation_succesful[0]) @@ -643,10 +458,11 @@ class PrioritySchedulerTest(StationSchedulerTest): def mock_datetime(self): datetime_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.datetime') self.addCleanup(datetime_patcher.stop) - datetime_mock = datetime_patcher.start() + self.datetime_mock = datetime_patcher.start() - datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0) - datetime_mock.max = datetime.datetime.max + # utcnow lies before the tasks we are scheduling (the tasks lie in the future) + self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0) + self.datetime_mock.max = datetime.datetime.max def setUp(self): super(PrioritySchedulerTest, self).setUp() @@ -655,225 +471,404 @@ class PrioritySchedulerTest(StationSchedulerTest): self.mock_obscontrol() self.mock_datetime() - def new_task(self, task_id): - self.fake_ra_database.addTask(task_id, { - "mom_id": 1000 + task_id, - "otdb_id": 2000 + task_id, - "type": "observation", - "starttime": datetime.datetime(2017, 1, 1, 1, 0, 0), - "endtime": datetime.datetime(2017, 1, 1, 2, 0, 0), - }) - - self.fake_ra_database.commit() - self.fake_ra_database.committed = False # dont confuse subsequent checks on whether the scheduler committed - - def new_task_without_momid(self, task_id): - self.fake_ra_database.addTask(task_id, { - "mom_id": None, - "otdb_id": 2000 + task_id, - "type": "observation", - "starttime": datetime.datetime(2017, 1, 1, 1, 0, 0), - "endtime": datetime.datetime(2017, 1, 1, 2, 0, 0), - }) - - self.fake_ra_database.commit() - self.fake_ra_database.committed = False # dont confuse subsequent checks on whether the scheduler committed + def new_task_without_momid(self, otdb_id): + return self.radb.insertSpecificationAndTask(mom_id=None, + otdb_id=otdb_id, + task_status='approved', + task_type='observation', + starttime=datetime.datetime(2017, 1, 1, 1, 0, 0), + endtime=datetime.datetime(2017, 1, 1, 2, 0, 0), + content='', + cluster='CEP4')['task_id'] def new_scheduler(self, task_id, resource_estimator): - return PriorityScheduler(task_id, self.get_specification_tree(task_id), resource_estimator, self.fake_resource_availability_checker, None) + return PriorityScheduler(task_id, self.get_specification_tree(task_id), resource_estimator, + self.resource_availability_checker, self.radb.dbcreds) def new_station_scheduler(self, task_id, specification_tree): - return PriorityScheduler(task_id, specification_tree, self.fake_resource_estimator, FakeResourceAvailabilityChecker(), None) + return PriorityScheduler(task_id, specification_tree, self.fake_resource_estimator, + self.resource_availability_checker, self.radb.dbcreds) - def test_kill_lower_priority(self): + def test_unschedule_lower_priority_future_task(self): """ - Whether two tasks that fit individually but not together will be accepted by the scheduler by killing the + Whether two future tasks that fit individually but not together will be accepted by the scheduler by unscheduling the lower-priority task. """ + # utcnow lies before the tasks we are scheduling (the tasks lie in the future) + self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0) + + max_cap = self.get_cs001_max_capacity() + + # First task must succeed (for the test the mom_id determines the prio) + task_id = self.new_task(0) + estimates = [{'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 } ] + allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources() + self.assertTrue(allocation_succesful) + + self.assertEqual('approved', self.radb.getTask(task_id)['status']) + self.radb.updateTask(task_id, task_status='scheduled') + self.assertEqual('scheduled', self.radb.getTask(task_id)['status']) + + # Second task must succeed as it has a higher priority (for the test the mom_id determines the prio) + task2_id = self.new_task(1000) + estimates = [{'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 } ] + allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources() + self.assertTrue(allocation_succesful) + self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task2_id, status='claimed'))) + + # First task must have been unscheduled + # as a result, it should not have any claimed claims anymore + self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed'))) + self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task_id, status='conflict'))) + # and the low-prio task should now have conflict state (cause the high-prio task claimed the resources) + self.assertEqual('conflict', self.radb.getTask(task_id)['status']) + + + def test_kill_lower_priority_running_task(self): + """ + Whether two tasks that fit individually but not together will be accepted by the scheduler by killing the + running lower-priority task. + """ + + # utcnow lies before the tasks we are scheduling (the tasks lie in the future) + self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0) + + max_cap = self.get_cs001_max_capacity() + # First task must succeed - self.new_task(0) - estimates = [{'resource_types': {'bandwidth': 512}}] - allocation_succesful = self.new_scheduler(0, lambda _: estimates).allocate_resources() + task_id = self.new_task(0) #(for the test the mom_id determines the prio) + estimates = [{'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 } ] + allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources() self.assertTrue(allocation_succesful) + self.assertEqual('approved', self.radb.getTask(task_id)['status']) + self.radb.updateTask(task_id, task_status='scheduled') + self.assertEqual('scheduled', self.radb.getTask(task_id)['status']) + + # shift utcnow and fake that the task is running + self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 1, 10, 0) + self.radb.updateTask(task_id, task_status='active') + self.assertEqual('active', self.radb.getTask(task_id)['status']) + # Second task must succeed as it has a higher priority - self.new_task(1000) - estimates = [{'resource_types': {'bandwidth': 513}}] - allocation_succesful = self.new_scheduler(1000, lambda _: estimates).allocate_resources() + # start it in a minute after now + # (or else it will still have overlap and conflicts with beginning of just-aborted running task) + # (for the test the mom_id determines the prio) + task2_id = self.new_task(1000, starttime=datetime.datetime(2017, 1, 1, 1, 11, 0)) + estimates = [{'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 } ] + allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources() self.assertTrue(allocation_succesful) # First task must have been killed - otdb_id = self.fake_ra_database.tasks[0]["otdb_id"] + otdb_id = self.radb.getTask(task_id)["otdb_id"] self.obscontrol_mock.assert_called_with(otdb_id) # First task must have its endtime cut short to utcnow or starttime - my_starttime = self.fake_ra_database.tasks[1000]["starttime"] - for c in self.fake_ra_database.claims[0]: + my_starttime = self.radb.getTask(task2_id)["starttime"] + for c in self.radb.getResourceClaims(task_ids=task_id): self.assertLessEqual(c["endtime"], my_starttime) + self.assertEqual('claimed', c["status"]) - def test_not_kill_higher_priority(self): - """ Whether two tasks that fit individually but not together get rejected priorities do not allow an override. """ - # First task must succeed - self.new_task(1000) - estimates = [{ 'resource_types': {'bandwidth': 512} }] - allocation_succesful = self.new_scheduler(1000, lambda _: estimates).allocate_resources() + def test_do_not_unschedule_higher_priority_future_task(self): + # utcnow lies before the tasks we are scheduling (the tasks lie in the future) + self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0) + + max_cap = self.get_cs001_max_capacity() + + # First task must succeed (for the test the mom_id determines the prio) + task_id = self.new_task(1000) + estimates = [{'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 } ] + allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources() self.assertTrue(allocation_succesful) - # Second task must fail as it has a lower priority - self.new_task(0) - estimates = [{ 'resource_types': {'bandwidth': 513} }] - allocation_succesful = self.new_scheduler(0, lambda _: estimates).allocate_resources() + self.assertEqual('approved', self.radb.getTask(task_id)['status']) + self.radb.updateTask(task_id, task_status='scheduled') + self.assertEqual('scheduled', self.radb.getTask(task_id)['status']) + + # Second task must succeed as it has a higher priority (for the test the mom_id determines the prio) + task2_id = self.new_task(0) #(for the test the mom_id determines the prio) + estimates = [{'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 } ] + allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources() self.assertFalse(allocation_succesful) + # the second (low-prio) task could not be scheduled + # as a result there are no claims allocated and the task stays in approved state. + # Thought by JS: I think that's wrong, and does not give the proper feedback to the user. + # I think that the claims and task should go to conflict to make it clear to the user what happened. + self.assertEqual('approved', self.radb.getTask(task2_id)['status']) + self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task2_id))) + + # First task must NOT have been unscheduled + self.assertEqual('scheduled', self.radb.getTask(task_id)['status']) + self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed'))) + + + def test_do_not_kill_higher_priority_running_task(self): + + # utcnow lies before the tasks we are scheduling (the tasks lie in the future) + self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0) + + max_cap = self.get_cs001_max_capacity() + + # First (task must succeed) + task_id = self.new_task(1000) #(for the test the mom_id determines the prio) + estimates = [{'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 } ] + allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources() + self.assertTrue(allocation_succesful) + + self.assertEqual('approved', self.radb.getTask(task_id)['status']) + self.radb.updateTask(task_id, task_status='scheduled') + self.assertEqual('scheduled', self.radb.getTask(task_id)['status']) + + # shift utcnow and fake that the task is running + self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 1, 10, 0) + self.radb.updateTask(task_id, task_status='active') + self.assertEqual('active', self.radb.getTask(task_id)['status']) + + # Second task must succeed as it has a higher priority + # start it in a minute after now + # (or else it will still have overlap and conflicts with beginning of just-aborted running task) + # (for the test the mom_id determines the prio) + task2_id = self.new_task(0, starttime=datetime.datetime(2017, 1, 1, 1, 11, 0)) + estimates = [{'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 } ] + allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources() + self.assertFalse(allocation_succesful) + + # the second (low-prio) task could not be scheduled + # as a result there are no claims allocated and the task stays in approved state. + # Thought by JS: I think that's wrong, and does not give the proper feedback to the user. + # I think that the claims and task should go to conflict to make it clear to the user what happened. + self.assertEqual('approved', self.radb.getTask(task2_id)['status']) + self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task2_id))) + # First task must NOT have been killed - otdb_id = self.fake_ra_database.tasks[1000]["otdb_id"] - with self.assertRaises(AssertionError): - self.obscontrol_mock.assert_called_with(otdb_id) + self.assertEqual('active', self.radb.getTask(task_id)['status']) + self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed'))) - def test_not_kill_equal_priority(self): + def test_not_unschedule_equal_priority(self): """ Whether two tasks that fit individually but not together get rejected priorities do not allow an override. """ + max_cap = self.get_cs001_max_capacity() + # First task must succeed - self.new_task(1) - estimates = [{ 'resource_types': {'bandwidth': 512} }] - allocation_succesful = self.new_scheduler(1, lambda _: estimates).allocate_resources() + task1_id = self.new_task(1) #mom_id=1 and mom_id=0 yield equal priorities + estimates = [{'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 } ] + allocation_succesful = self.new_scheduler(task1_id, lambda _: estimates).allocate_resources() self.assertTrue(allocation_succesful) + self.assertEqual('approved', self.radb.getTask(task1_id)['status']) + self.radb.updateTask(task1_id, task_status='scheduled') + self.assertEqual('scheduled', self.radb.getTask(task1_id)['status']) + # Second task must fail as it has a lower priority - self.new_task(0) - estimates = [{ 'resource_types': {'bandwidth': 513} }] - allocation_succesful = self.new_scheduler(0, lambda _: estimates).allocate_resources() + task2_id = self.new_task(0) #mom_id=1 and mom_id=0 yield equal priorities + estimates = [{'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 } ] + allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources() self.assertFalse(allocation_succesful) + self.assertEqual('scheduled', self.radb.getTask(task1_id)['status']) + # Thought by JS: I think it's wrong that task2 has approved status, and does not give the proper feedback to the user. + # I think that the claims and task should go to conflict to make it clear to the user what happened. + self.assertEqual('approved', self.radb.getTask(task2_id)['status']) + def test_partial_conflict(self): """ Whether a task gets scheduled correctly if it has a partial conflict after the first fit. """ - # First task must succeed - self.new_task(0) - estimates = [{ 'resource_types': {'bandwidth': 512} }, - { 'resource_types': {'bandwidth': 512} }] - allocation_succesful = self.new_scheduler(0, lambda _: estimates).allocate_resources() + + # utcnow lies before the tasks we are scheduling (the tasks lie in the future) + self.datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0) + + max_cap = self.get_cs001_max_capacity() + + # First task must succeed (for the test the mom_id determines the prio) + task_id = self.new_task(0) + estimates = [{'resource_types': {'bandwidth': 0.25*max_cap}, + "root_resource_group": "CS001", + "resource_count": 1 }, + {'resource_types': {'bandwidth': 0.25 * max_cap}, + "root_resource_group": "CS001", + "resource_count": 1} + ] + allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources() self.assertTrue(allocation_succesful) - # Second task must succeed as it has a higher priority - self.new_task(1000) - estimates = [{ 'resource_types': {'bandwidth': 512} }, - { 'resource_types': {'bandwidth': 513} }] - allocation_succesful = self.new_scheduler(1000, lambda _: estimates).allocate_resources() + self.assertEqual('approved', self.radb.getTask(task_id)['status']) + self.radb.updateTask(task_id, task_status='scheduled') + self.assertEqual('scheduled', self.radb.getTask(task_id)['status']) + + # Second task must succeed as it has a higher priority (for the test the mom_id determines the prio) + task2_id = self.new_task(1000) + estimates = [{'resource_types': {'bandwidth': 0.25*max_cap}, + "root_resource_group": "CS001", + "resource_count": 1 }, + {'resource_types': {'bandwidth': 0.95 * max_cap}, + "root_resource_group": "CS001", + "resource_count": 1} + ] + allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources() self.assertTrue(allocation_succesful) + self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task2_id, status='claimed'))) - # First task must have been killed - otdb_id = self.fake_ra_database.tasks[0]["otdb_id"] - self.obscontrol_mock.assert_called_with(otdb_id) + # First task must have been unscheduled + # as a result, it should not have any claimed claims anymore + self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed'))) + self.assertEqual(1, len(self.radb.getResourceClaims(task_ids=task_id, status='tentative'))) + self.assertEqual(1, len(self.radb.getResourceClaims(task_ids=task_id, status='conflict'))) + # and the low-prio task should now have conflict state (cause the high-prio task claimed the resources) + self.assertEqual('conflict', self.radb.getTask(task_id)['status']) def test_should_not_kill_a_task_without_a_mom_id(self): + max_cap = self.get_cs001_max_capacity() + # First task must succeed - self.new_task_without_momid(0) - estimates = [{'resource_types': {'bandwidth': 512}}] - allocation_succesful = self.new_scheduler(0, lambda _: estimates).allocate_resources() + task_id = self.new_task_without_momid(0) + estimates = [{'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 }] + allocation_succesful = self.new_scheduler(task_id, lambda _: estimates).allocate_resources() self.assertTrue(allocation_succesful) - self.new_task(1000) - estimates = [{'resource_types': {'bandwidth': 513}}] - allocation_succesful = self.new_scheduler(1000, lambda _: estimates).allocate_resources() + task2_id = self.new_task(1000) + estimates = [{'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 }] + allocation_succesful = self.new_scheduler(task2_id, lambda _: estimates).allocate_resources() self.assertFalse(allocation_succesful) - otdb_id = self.fake_ra_database.tasks[0]["otdb_id"] self.obscontrol_mock.assert_not_called() class DwellSchedulerTest(PrioritySchedulerTest): # The DwellScheduler must not regress on the PriorityScheduler, so we inherit all its tests - def new_task(self, task_id): - self.fake_ra_database.addTask(task_id, { - "mom_id": 1000 + task_id, - "otdb_id": 2000 + task_id, - "type": "observation", - "starttime": datetime.datetime(2017, 1, 1, 1, 0, 0), - "endtime": datetime.datetime(2017, 1, 1, 2, 0, 0), - }) + class TestResourceAvailabilityChecker(ResourceAvailabilityChecker): + """Helper class to keep track of arguments in calls to get_is_claimable""" + def get_is_claimable(self, requested_resources, available_resources): + self.last_requested_resources = requested_resources + self.last_available_resources = available_resources + return super(DwellSchedulerTest.TestResourceAvailabilityChecker, self).get_is_claimable(requested_resources, + available_resources) - self.fake_ra_database.commit() - self.fake_ra_database.committed = False # dont confuse subsequent checks on whether the scheduler committed + def setUp(self): + super(DwellSchedulerTest, self).setUp() + self.resource_availability_checker = DwellSchedulerTest.TestResourceAvailabilityChecker(self.radb) def new_scheduler(self, task_id, resource_estimator): return DwellScheduler(task_id, self.get_specification_tree(task_id), resource_estimator, - datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime - datetime.datetime(2017, 1, 1, 1, 0, 0), # maxstarttime - datetime.timedelta(hours=1), # duration - self.fake_resource_availability_checker, None) + datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime + datetime.datetime(2017, 1, 1, 1, 0, 0), # maxstarttime + datetime.timedelta(hours=1), # duration + self.resource_availability_checker, self.radb.dbcreds) def new_station_scheduler(self, task_id, specification_tree): - return DwellScheduler(task_id, specification_tree, self.fake_resource_estimator, - datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime - datetime.datetime(2017, 1, 1, 1, 0, 0), # maxstarttime - datetime.timedelta(hours=1), # duration - FakeResourceAvailabilityChecker(), None) + return DwellScheduler(task_id, specification_tree, self.fake_resource_estimator, + datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime + datetime.datetime(2017, 1, 1, 1, 0, 0), # maxstarttime + datetime.timedelta(hours=1), # duration + self.resource_availability_checker, self.radb.dbcreds) def new_dwell_scheduler(self, task_id, resource_estimator): return DwellScheduler(task_id, self.get_specification_tree(task_id), resource_estimator, - datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime - datetime.datetime(2017, 1, 2, 1, 0, 0), # maxstarttime - datetime.timedelta(hours=1), # duration - self.fake_resource_availability_checker, None) + datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime + datetime.datetime(2017, 1, 2, 1, 0, 0), # maxstarttime + datetime.timedelta(hours=1), # duration + self.resource_availability_checker, self.radb.dbcreds) def test_no_dwell(self): """ Whether a task will not dwell unnecessarily on an empty system. """ # Task must succeed - self.new_task(0) - estimates = [{ 'resource_types': {'bandwidth': 512} }] - allocation_succesful = self.new_dwell_scheduler(0, lambda _: estimates).allocate_resources() + task_id = self.new_task(0) + estimates = [{ 'resource_types': {'bandwidth': 512}, + "root_resource_group": "CS001", + "resource_count": 1 } ] + allocation_succesful = self.new_dwell_scheduler(task_id, lambda _: estimates).allocate_resources() self.assertTrue(allocation_succesful) # Task must NOT have been moved - self.assertEqual(self.fake_ra_database.tasks[0]["starttime"], datetime.datetime(2017, 1, 1, 1, 0, 0)) + self.assertEqual(self.radb.getTask(task_id)["starttime"], datetime.datetime(2017, 1, 1, 1, 0, 0)) def test_dwell(self): """ Whether a task will dwell after an existing task. """ + max_cap = self.get_cs001_max_capacity() + # First task must succeed - self.new_task(0) - estimates = [{ 'resource_types': {'bandwidth': 512} }] - allocation_succesful = self.new_dwell_scheduler(0, lambda _: estimates).allocate_resources() + task1_id = self.new_task(0) + estimates = [{ 'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 }] + allocation_succesful = self.new_dwell_scheduler(task1_id, lambda _: estimates).allocate_resources() self.assertTrue(allocation_succesful) # Second task must also succeed - self.new_task(1) - estimates = [{ 'resource_types': {'bandwidth': 513} }] - allocation_succesful = self.new_dwell_scheduler(1, lambda _: estimates).allocate_resources() + task2_id = self.new_task(1) + estimates = [{ 'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 }] + allocation_succesful = self.new_dwell_scheduler(task2_id, lambda _: estimates).allocate_resources() self.assertTrue(allocation_succesful) # Second task must have been moved, first task not - self.assertEqual(self.fake_ra_database.tasks[0]["starttime"], datetime.datetime(2017, 1, 1, 1, 0, 0)) - self.assertEqual(self.fake_ra_database.tasks[0]["endtime"], datetime.datetime(2017, 1, 1, 2, 0, 0)) - self.assertEqual(self.fake_ra_database.tasks[1]["starttime"], datetime.datetime(2017, 1, 1, 2, 1, 0)) - self.assertEqual(self.fake_ra_database.tasks[1]["endtime"], datetime.datetime(2017, 1, 1, 3, 1, 0)) + self.assertEqual(self.radb.getTask(task1_id)["starttime"], datetime.datetime(2017, 1, 1, 1, 0, 0)) + self.assertEqual(self.radb.getTask(task1_id)["endtime"], datetime.datetime(2017, 1, 1, 2, 0, 0)) + self.assertEqual(self.radb.getTask(task2_id)["starttime"], datetime.datetime(2017, 1, 1, 2, 1, 0)) + self.assertEqual(self.radb.getTask(task2_id)["endtime"], datetime.datetime(2017, 1, 1, 3, 1, 0)) def test_dwell_respect_claim_endtime(self): """ Whether a dwelling task will honour the claim endtimes, instead of the task endtime. """ + max_cap = self.get_cs001_max_capacity() + # First task must succeed - self.new_task(0) - estimates = [{ 'resource_types': {'bandwidth': 512} }] - allocation_succesful = self.new_dwell_scheduler(0, lambda _: estimates).allocate_resources() + task1_id = self.new_task(0) + estimates = [{ 'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 }] + allocation_succesful = self.new_dwell_scheduler(task1_id, lambda _: estimates).allocate_resources() self.assertTrue(allocation_succesful) + self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task1_id, status='claimed'))) # Extend claim - self.fake_ra_database.claims[0][0]["endtime"] += datetime.timedelta(hours=1) + task = self.radb.getTask(task1_id) + self.radb.updateResourceClaims(where_task_ids=task1_id, endtime=task["endtime"] + datetime.timedelta(hours=1)) + self.assertEqual(2, len(self.radb.getResourceClaims(task_ids=task1_id, status='claimed'))) # Second task must also succeed - self.new_task(1) - estimates = [{ 'resource_types': {'bandwidth': 513} }] - allocation_succesful = self.new_dwell_scheduler(1, lambda _: estimates).allocate_resources() + task2_id = self.new_task(1) + estimates = [{ 'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 }] + allocation_succesful = self.new_dwell_scheduler(task2_id, lambda _: estimates).allocate_resources() self.assertTrue(allocation_succesful) - # Second task must have been moved beyond claim endtime - self.assertEqual(self.fake_ra_database.tasks[1]["starttime"], datetime.datetime(2017, 1, 1, 3, 1, 0)) - self.assertEqual(self.fake_ra_database.tasks[1]["endtime"], datetime.datetime(2017, 1, 1, 4, 1, 0)) + # Second task must have been moved beyond 1st claim endtime, first task not + self.assertEqual(self.radb.getTask(task1_id)["starttime"], datetime.datetime(2017, 1, 1, 1, 0, 0)) + self.assertEqual(self.radb.getTask(task1_id)["endtime"], datetime.datetime(2017, 1, 1, 2, 0, 0)) + self.assertEqual(self.radb.getTask(task2_id)["starttime"], datetime.datetime(2017, 1, 1, 3, 1, 0)) + self.assertEqual(self.radb.getTask(task2_id)["endtime"], datetime.datetime(2017, 1, 1, 4, 1, 0)) def test_dwellScheduler_should_give_all_available_resources_on_second_pass(self): """ @@ -884,23 +879,28 @@ class DwellSchedulerTest(PrioritySchedulerTest): that list get subtracted from the list handed to the resource_availability checker. This test verifies that the complete list should be provided on the second try. """ + max_cap = self.get_cs001_max_capacity() # First task must succeed - self.new_task(0) - estimates = [{'resource_types': {'bandwidth': 512}}] - allocation_succesful = self.new_dwell_scheduler(0, lambda _: estimates).allocate_resources() + task1_id = self.new_task(0) + estimates = [{ 'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 }] + allocation_succesful = self.new_dwell_scheduler(task1_id, lambda _: estimates).allocate_resources() self.assertTrue(allocation_succesful) # Second task must also succeed - self.new_task(1) - estimates = [{ 'resource_types': {'bandwidth': 513} }] - allocation_succesful = self.new_dwell_scheduler(1, lambda _: estimates).allocate_resources() + task2_id = self.new_task(1) + estimates = [{ 'resource_types': {'bandwidth': max_cap}, + "root_resource_group": "CS001", + "resource_count": 2 }] + allocation_succesful = self.new_dwell_scheduler(task2_id, lambda _: estimates).allocate_resources() self.assertTrue(allocation_succesful) # avialable resources can be limited by tracking unkillable resources. They should be # cleared on the second try like in this test. - self.assertEqual(self.fake_resource_availability_checker.available_resources, - self.fake_ra_database.resources) + self.assertEqual(set(self.resource_availability_checker.last_available_resources), + set(self.radb.getResources(include_availability=True))) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)