-
Jan David Mol authoredJan David Mol authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
t_schedulers.py 19.99 KiB
#!/usr/bin/env python
# Copyright (C) 2017
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import unittest
import mock
import datetime
from copy import deepcopy
from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import CouldNotFindClaimException
from lofar.sas.resourceassignment.resourceassigner.schedulers import ScheduleException
from lofar.sas.resourceassignment.resourceassigner.schedulers import BasicScheduler
from lofar.sas.resourceassignment.resourceassigner.schedulers import PriorityScheduler
from lofar.sas.resourceassignment.resourceassigner.schedulers import DwellScheduler
import logging
logger = logging.getLogger(__name__)
class FakeRADatabase(object):
""" Mimic an RA Database, assuming claims overlap fully or not at all. """
def __init__(self, resource_capacity):
# database
self.tasks = {}
self.claims = {}
self.next_claim_id = 0
# cache committed state here
self.committed_tasks = {}
self.committed_claims = {}
# maximum capacity of our resource
self.resource_capacity = resource_capacity
def addTask(self, id, task):
self.tasks[id] = task
self.tasks[id]["id"] = id
self.claims[id] = []
self.committed = False
self.rolled_back = False
def _fits(self, claim):
usage = 0
resource_id = claim["resource_id"]
for claims in self.claims.values():
for c in claims:
overlap_in_time = claim["starttime"] < c["endtime"] and claim["endtime"] > c["starttime"]
overlap_in_resource = c["resource_id"] == resource_id
if c["status"] != "conflict" and \
c["id"] != claim.get("id",None) and \
overlap_in_resource and \
overlap_in_time:
usage += c["claim_size"]
return usage + claim["claim_size"] <= self.resource_capacity
""" Methods to mock radb. """
def getTask(self, id):
return self.tasks[id]
def getTasks(self, task_ids):
return [t for id, t in self.tasks.iteritems() if id in task_ids]
def getResources(self, *args, **kwargs):
# we model two resources, can expand if needed
return [ { "id": 0 }, { "id": 1 } ]
def getResourceClaims(self, task_ids, status):
return [claim for tid in task_ids for claim in self.claims[tid] if claim["status"] == status]
def deleteResourceClaims(self, claim_ids, commit):
logger.info("Deleting claims %s", claim_ids)
for tid in self.claims:
self.claims[tid] = [c for c in self.claims[tid] if c["id"] not in claim_ids]
def updateResourceClaims(self, task_id, status, commit):
# this is what we support
assert status == "claimed"
# can't update conflict claims to claimed
for c in self.claims[task_id]:
if c["status"] != "tentative":
return False
# update statusses
for c in self.claims[task_id]:
c["status"] = "claimed"
return True
def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, **kwargs):
if starttime:
logger.info("Setting starttime of task %s to %s", task_id, starttime)
self.tasks[task_id]["starttime"] = starttime
for c in self.claims[task_id]:
c["starttime"] = starttime
if endtime:
logger.info("Setting endtime of task %s to %s", task_id, endtime)
self.tasks[task_id]["endtime"] = endtime
for c in self.claims[task_id]:
c["endtime"] = endtime
def insertResourceClaims(self, task_id, claims, *args, **kwargs):
for c in claims:
# check whether tasks do not get two claims of the same resource
assert c["resource_id"] not in [d["resource_id"] for d in self.claims[task_id]], "Resource %s claimed twice by task %s" % (c["resource_id"], task_id)
# derive claim status
c["status"] = "tentative" if self._fits(c) else "conflict"
# assign ids
c["task_id"] = task_id
c["id"] = self.next_claim_id
self.next_claim_id += 1
# add it to our claim list
self.claims[task_id].append(c)
claim_ids = [c["id"] for c in claims]
logger.info("Added claims %s", claim_ids)
return claim_ids
def get_conflicting_overlapping_claims(self, claim):
# all claims overlap
return [c for tid in self.claims for c in self.claims[tid] if
# overlap in space
c["resource_id"] == claim["resource_id"] and
# "conflict" claims do not actually claim resources
c["status"] != "confict" and
# be antireflexive
c["id"] != claim["id"]]
def commit(self):
logger.info("Commit")
self.committed = True
self.committed_claims = deepcopy(self.claims)
self.committed_tasks = deepcopy(self.tasks)
def rollback(self):
logger.info("Rollback")
self.rolled_back = True
self.claims = deepcopy(self.committed_claims)
self.tasks = deepcopy(self.committed_tasks)
class FakeResourceAvailabilityChecker(object):
resource_types = {
"storage": 0,
"bandwidth": 1,
}
def get_is_claimable(self, requested_resources, available_resources):
if not available_resources:
raise CouldNotFindClaimException
# fullfil one request at a time to keep the code simple. We map it on
# the first available resource
r = requested_resources[0]
rtype = r["resource_types"].keys()[0]
return [{
'requested_resources': [r],
'claim_size': r["resource_types"][rtype],
'resource_id': available_resources[0]["id"],
'resource_type_id': self.resource_types[rtype]
}]
class SchedulerTest(unittest.TestCase):
""" Setup mechanics to use a FakeRADatabase and FakeResourceAvailabilityChecker to simulate a system with
one resource at one point in time. """
def mock_ra_database(self):
self.fake_ra_database = FakeRADatabase(resource_capacity=1024)
ra_database_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.RADatabase')
self.addCleanup(ra_database_patcher.stop)
self.ra_database_mock = ra_database_patcher.start()
self.ra_database_mock.return_value = self.fake_ra_database
def mock_resource_availability_checker(self):
self.fake_resource_availability_checker = FakeResourceAvailabilityChecker()
def setUp(self):
self.mock_ra_database()
self.mock_resource_availability_checker()
class BasicSchedulerTest(SchedulerTest):
def new_task(self, task_id):
self.fake_ra_database.addTask(task_id, {
"starttime": datetime.datetime(2017, 1, 1, 1, 0, 0),
"endtime": datetime.datetime(2017, 1, 1, 2, 0, 0),
})
self.scheduler = BasicScheduler(task_id, self.fake_resource_availability_checker, None)
def test_schedule_task(self):
""" Whether a task (that fits) can be scheduled. """
# Resources we need
self.new_task(0)
estimates = [{ 'resource_types': {'bandwidth': 512} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
# Allocation must succeed and be committed
self.assertTrue(allocation_succesful)
self.assertTrue(self.fake_ra_database.committed)
self.assertFalse(self.fake_ra_database.rolled_back)
# Claim must be present in database
claims = self.fake_ra_database.claims[0]
self.assertTrue(claims)
self.assertEqual(len(claims), 1)
# Claim must be valid
claim = claims[0]
task = self.fake_ra_database.tasks[0]
self.assertEqual(claim["status"], "claimed")
self.assertEqual(claim["starttime"], task["starttime"])
self.assertEqual(claim["endtime"], task["endtime"])
self.assertEqual(claim["claim_size"], 512)
self.assertEqual(claim["resource_type_id"], FakeResourceAvailabilityChecker.resource_types["bandwidth"])
def test_multiple_resources(self):
""" Whether a task (that fits) can be scheduled. """
# Resources we need
self.new_task(0)
estimates = [{ 'resource_types': {'bandwidth': 512} },
{ 'resource_types': {'bandwidth': 512} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
# Allocation must succeed
self.assertTrue(allocation_succesful)
def test_schedule_too_large_task(self):
""" Whether a task with too large claims will be rejected by the scheduler. """
# Resources we need
self.new_task(0)
estimates = [{ 'resource_types': {'bandwidth': 2048} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
# Allocation must fail, and rollback() called
self.assertFalse(allocation_succesful)
self.assertFalse(self.fake_ra_database.committed)
self.assertTrue(self.fake_ra_database.rolled_back)
def test_schedule_two_tasks_too_large_task(self):
""" Whether two tasks that fit individually but not together will be rejected by the scheduler. """
# First task must succeed
self.new_task(0)
estimates = [{ 'resource_types': {'bandwidth': 512} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
self.assertTrue(allocation_succesful)
# Second task must fail
self.new_task(1)
estimates = [{ 'resource_types': {'bandwidth': 513} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
self.assertFalse(allocation_succesful)
class PrioritySchedulerTest(BasicSchedulerTest):
# The PriorityScheduler must not regress on the BasicScheduler, so we inherit all its tests
def mock_momrpc(self):
class FakeMoMQueryService(object):
def get_project_priorities_for_objects(self, mom_ids):
# priority increments by 1000 ids
return {mom_id: mom_id/1000 for mom_id in mom_ids}
self.fake_momrpc = FakeMoMQueryService()
momrpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.MoMQueryRPC')
self.addCleanup(momrpc_patcher.stop)
self.momrpc_mock = momrpc_patcher.start()
self.momrpc_mock.return_value = self.fake_momrpc
def mock_obscontrol(self):
obscontrol_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.ObservationControlRPCClient.abort_observation')
self.addCleanup(obscontrol_patcher.stop)
self.obscontrol_mock = obscontrol_patcher.start()
def mock_datetime(self):
datetime_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.datetime')
self.addCleanup(datetime_patcher.stop)
datetime_mock = datetime_patcher.start()
datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0)
datetime_mock.max = datetime.datetime.max
def setUp(self):
super(PrioritySchedulerTest, self).setUp()
self.mock_momrpc()
self.mock_obscontrol()
self.mock_datetime()
def new_task(self, task_id):
self.fake_ra_database.addTask(task_id, {
"mom_id": 1000 + task_id,
"otdb_id": 2000 + task_id,
"type": "observation",
"starttime": datetime.datetime(2017, 1, 1, 1, 0, 0),
"endtime": datetime.datetime(2017, 1, 1, 2, 0, 0),
})
self.scheduler = PriorityScheduler(task_id, self.fake_resource_availability_checker, None)
def test_kill_lower_priority(self):
""" Whether two tasks that fit individually but not together will be accepted by the scheduler by killing the lower-priority task. """
# First task must succeed
self.new_task(0)
estimates = [{ 'resource_types': {'bandwidth': 512} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
self.assertTrue(allocation_succesful)
# Second task must succeed as it has a higher priority
self.new_task(1000)
estimates = [{ 'resource_types': {'bandwidth': 513} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
self.assertTrue(allocation_succesful)
# First task must have been killed
otdb_id = self.fake_ra_database.tasks[0]["otdb_id"]
self.obscontrol_mock.assert_called_with(otdb_id)
# First task must have its endtime cut short
my_starttime = self.fake_ra_database.tasks[1000]["starttime"]
for c in self.fake_ra_database.claims[0]:
self.assertLess(c["endtime"], my_starttime)
def test_not_kill_higher_priority(self):
""" Whether two tasks that fit individually but not together get rejected priorities do not allow an override. """
# First task must succeed
self.new_task(1000)
estimates = [{ 'resource_types': {'bandwidth': 512} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
self.assertTrue(allocation_succesful)
# Second task must fail as it has a lower priority
self.new_task(0)
estimates = [{ 'resource_types': {'bandwidth': 513} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
self.assertFalse(allocation_succesful)
# First task must NOT have been killed
otdb_id = self.fake_ra_database.tasks[1000]["otdb_id"]
with self.assertRaises(AssertionError):
self.obscontrol_mock.assert_called_with(otdb_id)
def test_not_kill_equal_priority(self):
""" Whether two tasks that fit individually but not together get rejected priorities do not allow an override. """
# First task must succeed
self.new_task(1)
estimates = [{ 'resource_types': {'bandwidth': 512} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
self.assertTrue(allocation_succesful)
# Second task must fail as it has a lower priority
self.new_task(0)
estimates = [{ 'resource_types': {'bandwidth': 513} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
self.assertFalse(allocation_succesful)
def test_partial_conflict(self):
""" Whether a task gets scheduled correctly if it has a partial conflict after the first fit. """
# First task must succeed
self.new_task(0)
estimates = [{ 'resource_types': {'bandwidth': 512} },
{ 'resource_types': {'bandwidth': 512} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
self.assertTrue(allocation_succesful)
# Second task must succeed as it has a higher priority
self.new_task(1000)
estimates = [{ 'resource_types': {'bandwidth': 512} },
{ 'resource_types': {'bandwidth': 513} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
self.assertTrue(allocation_succesful)
# First task must have been killed
otdb_id = self.fake_ra_database.tasks[0]["otdb_id"]
self.obscontrol_mock.assert_called_with(otdb_id)
class DwellSchedulerTest(PrioritySchedulerTest):
# The DwellScheduler must not regress on the PriorityScheduler, so we inherit all its tests
def new_task(self, task_id):
self.fake_ra_database.addTask(task_id, {
"mom_id": 1000 + task_id,
"otdb_id": 2000 + task_id,
"type": "observation",
"starttime": datetime.datetime(2017, 1, 1, 1, 0, 0),
"endtime": datetime.datetime(2017, 1, 1, 2, 0, 0),
})
self.fake_ra_database.commit()
self.fake_ra_database.committed = False # dont confuse subsequent checks on whether the scheduler committed
self.scheduler = DwellScheduler(task_id,
datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime
datetime.datetime(2017, 1, 1, 1, 0, 0), # maxstarttime
datetime.timedelta(hours=1), # duration
self.fake_resource_availability_checker, None)
def new_dwell_task(self, task_id):
self.new_task(task_id)
self.scheduler = DwellScheduler(task_id,
datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime
datetime.datetime(2017, 1, 2, 1, 0, 0), # maxstarttime
datetime.timedelta(hours=1), # duration
self.fake_resource_availability_checker, None)
def test_no_dwell(self):
""" Whether a task will not dwell unnecessarily on an empty system. """
# Task must succeed
self.new_dwell_task(0)
estimates = [{ 'resource_types': {'bandwidth': 512} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
self.assertTrue(allocation_succesful)
# Task must NOT have been moved
self.assertEqual(self.fake_ra_database.tasks[0]["starttime"], datetime.datetime(2017, 1, 1, 1, 0, 0))
def test_dwell(self):
""" Whether a task will dwell after an existing task. """
# First task must succeed
self.new_dwell_task(0)
estimates = [{ 'resource_types': {'bandwidth': 512} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
self.assertTrue(allocation_succesful)
# Second task must also succeed
self.new_dwell_task(1)
estimates = [{ 'resource_types': {'bandwidth': 513} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
self.assertTrue(allocation_succesful)
# Second task must have been moved, first task not
self.assertEqual(self.fake_ra_database.tasks[0]["starttime"], datetime.datetime(2017, 1, 1, 1, 0, 0))
self.assertEqual(self.fake_ra_database.tasks[0]["endtime"], datetime.datetime(2017, 1, 1, 2, 0, 0))
self.assertEqual(self.fake_ra_database.tasks[1]["starttime"], datetime.datetime(2017, 1, 1, 2, 0, 0))
self.assertEqual(self.fake_ra_database.tasks[1]["endtime"], datetime.datetime(2017, 1, 1, 3, 0, 0))
def test_dwell_respect_claim_endtime(self):
""" Whether a dwelling task will honour the claim endtimes, instead of the task endtime. """
# First task must succeed
self.new_dwell_task(0)
estimates = [{ 'resource_types': {'bandwidth': 512} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
self.assertTrue(allocation_succesful)
# Extend claim
self.fake_ra_database.claims[0][0]["endtime"] += datetime.timedelta(hours=1)
# Second task must also succeed
self.new_dwell_task(1)
estimates = [{ 'resource_types': {'bandwidth': 513} }]
allocation_succesful = self.scheduler.allocate_resources(estimates)
self.assertTrue(allocation_succesful)
# Second task must have been moved beyond claim endtime
self.assertEqual(self.fake_ra_database.tasks[1]["starttime"], datetime.datetime(2017, 1, 1, 3, 0, 0))
self.assertEqual(self.fake_ra_database.tasks[1]["endtime"], datetime.datetime(2017, 1, 1, 4, 0, 0))
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)
unittest.main()