#!/usr/bin/env python # Copyright (C) 2017 # ASTRON (Netherlands Institute for Radio Astronomy) # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it # and/or modify it under the terms of the GNU General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. import unittest import mock import datetime from copy import deepcopy from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import CouldNotFindClaimException from lofar.sas.resourceassignment.resourceassigner.schedulers import ScheduleException from lofar.sas.resourceassignment.resourceassigner.schedulers import BasicScheduler from lofar.sas.resourceassignment.resourceassigner.schedulers import PriorityScheduler from lofar.sas.resourceassignment.resourceassigner.schedulers import DwellScheduler import logging logger = logging.getLogger(__name__) class FakeRADatabase(object): """ Mimic an RA Database, assuming claims overlap fully or not at all. """ def __init__(self, resource_capacity): # database self.tasks = {} self.claims = {} self.next_claim_id = 0 # cache committed state here self.committed_tasks = {} self.committed_claims = {} # maximum capacity of our resource self.resource_capacity = resource_capacity def addTask(self, id, task): self.tasks[id] = task self.tasks[id]["id"] = id self.claims[id] = [] self.committed = False self.rolled_back = False def _fits(self, claim): usage = 0 resource_id = claim["resource_id"] for claims in self.claims.values(): for c in claims: overlap_in_time = claim["starttime"] < c["endtime"] and claim["endtime"] > c["starttime"] overlap_in_resource = c["resource_id"] == resource_id if c["status"] != "conflict" and \ c["id"] != claim.get("id",None) and \ overlap_in_resource and \ overlap_in_time: usage += c["claim_size"] return usage + claim["claim_size"] <= self.resource_capacity """ Methods to mock radb. """ def getTask(self, id): return self.tasks[id] def getTasks(self, task_ids): return [t for id, t in self.tasks.iteritems() if id in task_ids] def getResources(self, *args, **kwargs): # we model two resources, can expand if needed return [ { "id": 0 }, { "id": 1 } ] def getResourceClaims(self, task_ids, status): return [claim for tid in task_ids for claim in self.claims[tid] if claim["status"] == status] def deleteResourceClaims(self, claim_ids, commit): logger.info("Deleting claims %s", claim_ids) for tid in self.claims: self.claims[tid] = [c for c in self.claims[tid] if c["id"] not in claim_ids] def updateResourceClaims(self, task_id, status, commit): # this is what we support assert status == "claimed" # can't update conflict claims to claimed for c in self.claims[task_id]: if c["status"] != "tentative": return False # update statusses for c in self.claims[task_id]: c["status"] = "claimed" return True def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, **kwargs): if starttime: logger.info("Setting starttime of task %s to %s", task_id, starttime) self.tasks[task_id]["starttime"] = starttime for c in self.claims[task_id]: c["starttime"] = starttime if endtime: logger.info("Setting endtime of task %s to %s", task_id, endtime) self.tasks[task_id]["endtime"] = endtime for c in self.claims[task_id]: c["endtime"] = endtime def insertResourceClaims(self, task_id, claims, *args, **kwargs): for c in claims: # check whether tasks do not get two claims of the same resource assert c["resource_id"] not in [d["resource_id"] for d in self.claims[task_id]], "Resource %s claimed twice by task %s" % (c["resource_id"], task_id) # derive claim status c["status"] = "tentative" if self._fits(c) else "conflict" # assign ids c["task_id"] = task_id c["id"] = self.next_claim_id self.next_claim_id += 1 # add it to our claim list self.claims[task_id].append(c) claim_ids = [c["id"] for c in claims] logger.info("Added claims %s", claim_ids) return claim_ids def get_conflicting_overlapping_claims(self, claim): # all claims overlap return [c for tid in self.claims for c in self.claims[tid] if # overlap in space c["resource_id"] == claim["resource_id"] and # "conflict" claims do not actually claim resources c["status"] != "confict" and # be antireflexive c["id"] != claim["id"]] def commit(self): logger.info("Commit") self.committed = True self.committed_claims = deepcopy(self.claims) self.committed_tasks = deepcopy(self.tasks) def rollback(self): logger.info("Rollback") self.rolled_back = True self.claims = deepcopy(self.committed_claims) self.tasks = deepcopy(self.committed_tasks) class FakeResourceAvailabilityChecker(object): resource_types = { "storage": 0, "bandwidth": 1, } def get_is_claimable(self, requested_resources, available_resources): if not available_resources: raise CouldNotFindClaimException # fullfil one request at a time to keep the code simple. We map it on # the first available resource r = requested_resources[0] rtype = r["resource_types"].keys()[0] return [{ 'requested_resources': [r], 'claim_size': r["resource_types"][rtype], 'resource_id': available_resources[0]["id"], 'resource_type_id': self.resource_types[rtype] }] class SchedulerTest(unittest.TestCase): """ Setup mechanics to use a FakeRADatabase and FakeResourceAvailabilityChecker to simulate a system with one resource at one point in time. """ def mock_ra_database(self): self.fake_ra_database = FakeRADatabase(resource_capacity=1024) ra_database_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.RADatabase') self.addCleanup(ra_database_patcher.stop) self.ra_database_mock = ra_database_patcher.start() self.ra_database_mock.return_value = self.fake_ra_database def mock_resource_availability_checker(self): self.fake_resource_availability_checker = FakeResourceAvailabilityChecker() def setUp(self): self.mock_ra_database() self.mock_resource_availability_checker() class BasicSchedulerTest(SchedulerTest): def new_task(self, task_id): self.fake_ra_database.addTask(task_id, { "starttime": datetime.datetime(2017, 1, 1, 1, 0, 0), "endtime": datetime.datetime(2017, 1, 1, 2, 0, 0), }) self.scheduler = BasicScheduler(task_id, self.fake_resource_availability_checker, None) def test_schedule_task(self): """ Whether a task (that fits) can be scheduled. """ # Resources we need self.new_task(0) estimates = [{ 'resource_types': {'bandwidth': 512} }] allocation_succesful = self.scheduler.allocate_resources(estimates) # Allocation must succeed and be committed self.assertTrue(allocation_succesful) self.assertTrue(self.fake_ra_database.committed) self.assertFalse(self.fake_ra_database.rolled_back) # Claim must be present in database claims = self.fake_ra_database.claims[0] self.assertTrue(claims) self.assertEqual(len(claims), 1) # Claim must be valid claim = claims[0] task = self.fake_ra_database.tasks[0] self.assertEqual(claim["status"], "claimed") self.assertEqual(claim["starttime"], task["starttime"]) self.assertEqual(claim["endtime"], task["endtime"]) self.assertEqual(claim["claim_size"], 512) self.assertEqual(claim["resource_type_id"], FakeResourceAvailabilityChecker.resource_types["bandwidth"]) def test_multiple_resources(self): """ Whether a task (that fits) can be scheduled. """ # Resources we need self.new_task(0) estimates = [{ 'resource_types': {'bandwidth': 512} }, { 'resource_types': {'bandwidth': 512} }] allocation_succesful = self.scheduler.allocate_resources(estimates) # Allocation must succeed self.assertTrue(allocation_succesful) def test_schedule_too_large_task(self): """ Whether a task with too large claims will be rejected by the scheduler. """ # Resources we need self.new_task(0) estimates = [{ 'resource_types': {'bandwidth': 2048} }] allocation_succesful = self.scheduler.allocate_resources(estimates) # Allocation must fail, and rollback() called self.assertFalse(allocation_succesful) self.assertFalse(self.fake_ra_database.committed) self.assertTrue(self.fake_ra_database.rolled_back) def test_schedule_two_tasks_too_large_task(self): """ Whether two tasks that fit individually but not together will be rejected by the scheduler. """ # First task must succeed self.new_task(0) estimates = [{ 'resource_types': {'bandwidth': 512} }] allocation_succesful = self.scheduler.allocate_resources(estimates) self.assertTrue(allocation_succesful) # Second task must fail self.new_task(1) estimates = [{ 'resource_types': {'bandwidth': 513} }] allocation_succesful = self.scheduler.allocate_resources(estimates) self.assertFalse(allocation_succesful) class PrioritySchedulerTest(BasicSchedulerTest): # The PriorityScheduler must not regress on the BasicScheduler, so we inherit all its tests def mock_momrpc(self): class FakeMoMQueryService(object): def get_project_priorities_for_objects(self, mom_ids): # priority increments by 1000 ids return {mom_id: mom_id/1000 for mom_id in mom_ids} self.fake_momrpc = FakeMoMQueryService() momrpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.MoMQueryRPC') self.addCleanup(momrpc_patcher.stop) self.momrpc_mock = momrpc_patcher.start() self.momrpc_mock.return_value = self.fake_momrpc def mock_obscontrol(self): obscontrol_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.ObservationControlRPCClient.abort_observation') self.addCleanup(obscontrol_patcher.stop) self.obscontrol_mock = obscontrol_patcher.start() def mock_datetime(self): datetime_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.datetime') self.addCleanup(datetime_patcher.stop) datetime_mock = datetime_patcher.start() datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0) datetime_mock.max = datetime.datetime.max def setUp(self): super(PrioritySchedulerTest, self).setUp() self.mock_momrpc() self.mock_obscontrol() self.mock_datetime() def new_task(self, task_id): self.fake_ra_database.addTask(task_id, { "mom_id": 1000 + task_id, "otdb_id": 2000 + task_id, "type": "observation", "starttime": datetime.datetime(2017, 1, 1, 1, 0, 0), "endtime": datetime.datetime(2017, 1, 1, 2, 0, 0), }) self.scheduler = PriorityScheduler(task_id, self.fake_resource_availability_checker, None) def test_kill_lower_priority(self): """ Whether two tasks that fit individually but not together will be accepted by the scheduler by killing the lower-priority task. """ # First task must succeed self.new_task(0) estimates = [{ 'resource_types': {'bandwidth': 512} }] allocation_succesful = self.scheduler.allocate_resources(estimates) self.assertTrue(allocation_succesful) # Second task must succeed as it has a higher priority self.new_task(1000) estimates = [{ 'resource_types': {'bandwidth': 513} }] allocation_succesful = self.scheduler.allocate_resources(estimates) self.assertTrue(allocation_succesful) # First task must have been killed otdb_id = self.fake_ra_database.tasks[0]["otdb_id"] self.obscontrol_mock.assert_called_with(otdb_id) # First task must have its endtime cut short my_starttime = self.fake_ra_database.tasks[1000]["starttime"] for c in self.fake_ra_database.claims[0]: self.assertLess(c["endtime"], my_starttime) def test_not_kill_higher_priority(self): """ Whether two tasks that fit individually but not together get rejected priorities do not allow an override. """ # First task must succeed self.new_task(1000) estimates = [{ 'resource_types': {'bandwidth': 512} }] allocation_succesful = self.scheduler.allocate_resources(estimates) self.assertTrue(allocation_succesful) # Second task must fail as it has a lower priority self.new_task(0) estimates = [{ 'resource_types': {'bandwidth': 513} }] allocation_succesful = self.scheduler.allocate_resources(estimates) self.assertFalse(allocation_succesful) # First task must NOT have been killed otdb_id = self.fake_ra_database.tasks[1000]["otdb_id"] with self.assertRaises(AssertionError): self.obscontrol_mock.assert_called_with(otdb_id) def test_not_kill_equal_priority(self): """ Whether two tasks that fit individually but not together get rejected priorities do not allow an override. """ # First task must succeed self.new_task(1) estimates = [{ 'resource_types': {'bandwidth': 512} }] allocation_succesful = self.scheduler.allocate_resources(estimates) self.assertTrue(allocation_succesful) # Second task must fail as it has a lower priority self.new_task(0) estimates = [{ 'resource_types': {'bandwidth': 513} }] allocation_succesful = self.scheduler.allocate_resources(estimates) self.assertFalse(allocation_succesful) def test_partial_conflict(self): """ Whether a task gets scheduled correctly if it has a partial conflict after the first fit. """ # First task must succeed self.new_task(0) estimates = [{ 'resource_types': {'bandwidth': 512} }, { 'resource_types': {'bandwidth': 512} }] allocation_succesful = self.scheduler.allocate_resources(estimates) self.assertTrue(allocation_succesful) # Second task must succeed as it has a higher priority self.new_task(1000) estimates = [{ 'resource_types': {'bandwidth': 512} }, { 'resource_types': {'bandwidth': 513} }] allocation_succesful = self.scheduler.allocate_resources(estimates) self.assertTrue(allocation_succesful) # First task must have been killed otdb_id = self.fake_ra_database.tasks[0]["otdb_id"] self.obscontrol_mock.assert_called_with(otdb_id) class DwellSchedulerTest(PrioritySchedulerTest): # The DwellScheduler must not regress on the PriorityScheduler, so we inherit all its tests def new_task(self, task_id): self.fake_ra_database.addTask(task_id, { "mom_id": 1000 + task_id, "otdb_id": 2000 + task_id, "type": "observation", "starttime": datetime.datetime(2017, 1, 1, 1, 0, 0), "endtime": datetime.datetime(2017, 1, 1, 2, 0, 0), }) self.fake_ra_database.commit() self.fake_ra_database.committed = False # dont confuse subsequent checks on whether the scheduler committed self.scheduler = DwellScheduler(task_id, datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime datetime.datetime(2017, 1, 1, 1, 0, 0), # maxstarttime datetime.timedelta(hours=1), # duration self.fake_resource_availability_checker, None) def new_dwell_task(self, task_id): self.new_task(task_id) self.scheduler = DwellScheduler(task_id, datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime datetime.datetime(2017, 1, 2, 1, 0, 0), # maxstarttime datetime.timedelta(hours=1), # duration self.fake_resource_availability_checker, None) def test_no_dwell(self): """ Whether a task will not dwell unnecessarily on an empty system. """ # Task must succeed self.new_dwell_task(0) estimates = [{ 'resource_types': {'bandwidth': 512} }] allocation_succesful = self.scheduler.allocate_resources(estimates) self.assertTrue(allocation_succesful) # Task must NOT have been moved self.assertEqual(self.fake_ra_database.tasks[0]["starttime"], datetime.datetime(2017, 1, 1, 1, 0, 0)) def test_dwell(self): """ Whether a task will dwell after an existing task. """ # First task must succeed self.new_dwell_task(0) estimates = [{ 'resource_types': {'bandwidth': 512} }] allocation_succesful = self.scheduler.allocate_resources(estimates) self.assertTrue(allocation_succesful) # Second task must also succeed self.new_dwell_task(1) estimates = [{ 'resource_types': {'bandwidth': 513} }] allocation_succesful = self.scheduler.allocate_resources(estimates) self.assertTrue(allocation_succesful) # Second task must have been moved, first task not self.assertEqual(self.fake_ra_database.tasks[0]["starttime"], datetime.datetime(2017, 1, 1, 1, 0, 0)) self.assertEqual(self.fake_ra_database.tasks[0]["endtime"], datetime.datetime(2017, 1, 1, 2, 0, 0)) self.assertEqual(self.fake_ra_database.tasks[1]["starttime"], datetime.datetime(2017, 1, 1, 2, 0, 0)) self.assertEqual(self.fake_ra_database.tasks[1]["endtime"], datetime.datetime(2017, 1, 1, 3, 0, 0)) def test_dwell_respect_claim_endtime(self): """ Whether a dwelling task will honour the claim endtimes, instead of the task endtime. """ # First task must succeed self.new_dwell_task(0) estimates = [{ 'resource_types': {'bandwidth': 512} }] allocation_succesful = self.scheduler.allocate_resources(estimates) self.assertTrue(allocation_succesful) # Extend claim self.fake_ra_database.claims[0][0]["endtime"] += datetime.timedelta(hours=1) # Second task must also succeed self.new_dwell_task(1) estimates = [{ 'resource_types': {'bandwidth': 513} }] allocation_succesful = self.scheduler.allocate_resources(estimates) self.assertTrue(allocation_succesful) # Second task must have been moved beyond claim endtime self.assertEqual(self.fake_ra_database.tasks[1]["starttime"], datetime.datetime(2017, 1, 1, 3, 0, 0)) self.assertEqual(self.fake_ra_database.tasks[1]["endtime"], datetime.datetime(2017, 1, 1, 4, 0, 0)) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) unittest.main()