Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
t_schedulers.py 19.99 KiB
#!/usr/bin/env python

# Copyright (C) 2017
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.


import unittest
import mock

import datetime
from copy import deepcopy

from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import CouldNotFindClaimException

from lofar.sas.resourceassignment.resourceassigner.schedulers import ScheduleException
from lofar.sas.resourceassignment.resourceassigner.schedulers import BasicScheduler
from lofar.sas.resourceassignment.resourceassigner.schedulers import PriorityScheduler
from lofar.sas.resourceassignment.resourceassigner.schedulers import DwellScheduler

import logging
logger = logging.getLogger(__name__)

class FakeRADatabase(object):
    """ Mimic an RA Database, assuming claims overlap fully or not at all. """

    def __init__(self, resource_capacity):
        # database
        self.tasks = {}
        self.claims = {}
        self.next_claim_id = 0

        # cache committed state here
        self.committed_tasks = {}
        self.committed_claims = {}

        # maximum capacity of our resource
        self.resource_capacity = resource_capacity

    def addTask(self, id, task):
        self.tasks[id] = task
        self.tasks[id]["id"] = id
        self.claims[id] = []

        self.committed = False
        self.rolled_back = False

    def _fits(self, claim):
        usage = 0
        resource_id = claim["resource_id"]

        for claims in self.claims.values():
            for c in claims:
                overlap_in_time = claim["starttime"] < c["endtime"] and claim["endtime"] > c["starttime"]
                overlap_in_resource = c["resource_id"] == resource_id

                if c["status"] != "conflict" and \
                   c["id"] != claim.get("id",None) and \
                   overlap_in_resource and \
                   overlap_in_time:
                    usage += c["claim_size"]

        return usage + claim["claim_size"] <= self.resource_capacity

    """ Methods to mock radb. """

    def getTask(self, id):
        return self.tasks[id]

    def getTasks(self, task_ids):
        return [t for id, t in self.tasks.iteritems() if id in task_ids]

    def getResources(self, *args, **kwargs):
        # we model two resources, can expand if needed
        return [ { "id": 0 }, { "id": 1 } ]

    def getResourceClaims(self, task_ids, status):
        return [claim for tid in task_ids for claim in self.claims[tid] if claim["status"] == status]

    def deleteResourceClaims(self, claim_ids, commit):
        logger.info("Deleting claims %s", claim_ids)

        for tid in self.claims:
            self.claims[tid] = [c for c in self.claims[tid] if c["id"] not in claim_ids]

    def updateResourceClaims(self, task_id, status, commit):
        # this is what we support
        assert status == "claimed"

        # can't update conflict claims to claimed
        for c in self.claims[task_id]:
            if c["status"] != "tentative":
                return False

        # update statusses
        for c in self.claims[task_id]:
            c["status"] = "claimed"

        return True

    def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, **kwargs):
        if starttime:
            logger.info("Setting starttime of task %s to %s", task_id, starttime)

            self.tasks[task_id]["starttime"] = starttime

            for c in self.claims[task_id]:
                c["starttime"] = starttime

        if endtime:
            logger.info("Setting endtime of task %s to %s", task_id, endtime)

            self.tasks[task_id]["endtime"] = endtime

            for c in self.claims[task_id]:
                c["endtime"] = endtime

    def insertResourceClaims(self, task_id, claims, *args, **kwargs):
        for c in claims:
            # check whether tasks do not get two claims of the same resource
            assert c["resource_id"] not in [d["resource_id"] for d in self.claims[task_id]], "Resource %s claimed twice by task %s" % (c["resource_id"], task_id)

            # derive claim status
            c["status"] = "tentative" if self._fits(c) else "conflict"

            # assign ids
            c["task_id"] = task_id
            c["id"] = self.next_claim_id
            self.next_claim_id += 1

            # add it to our claim list
            self.claims[task_id].append(c)

        claim_ids = [c["id"] for c in claims]
        logger.info("Added claims %s", claim_ids)

        return claim_ids

    def get_conflicting_overlapping_claims(self, claim):
        # all claims overlap
        return [c for tid in self.claims for c in self.claims[tid] if
          # overlap in space
          c["resource_id"] == claim["resource_id"] and
          # "conflict" claims do not actually claim resources
          c["status"] != "confict" and
          # be antireflexive
          c["id"] != claim["id"]]

    def commit(self):
        logger.info("Commit")

        self.committed = True
        self.committed_claims = deepcopy(self.claims)
        self.committed_tasks  = deepcopy(self.tasks)

    def rollback(self):
        logger.info("Rollback")

        self.rolled_back = True
        self.claims = deepcopy(self.committed_claims)
        self.tasks = deepcopy(self.committed_tasks)

class FakeResourceAvailabilityChecker(object):
  resource_types = {
    "storage": 0,
    "bandwidth": 1,
  }

  def get_is_claimable(self, requested_resources, available_resources):
    if not available_resources:
        raise CouldNotFindClaimException

    # fullfil one request at a time to keep the code simple. We map it on
    # the first available resource
    r = requested_resources[0]
    rtype = r["resource_types"].keys()[0]
    return [{
      'requested_resources': [r],
      'claim_size': r["resource_types"][rtype],
      'resource_id': available_resources[0]["id"],
      'resource_type_id': self.resource_types[rtype]
    }]

class SchedulerTest(unittest.TestCase):
    """ Setup mechanics to use a FakeRADatabase and FakeResourceAvailabilityChecker to simulate a system with
        one resource at one point in time. """

    def mock_ra_database(self):
        self.fake_ra_database = FakeRADatabase(resource_capacity=1024)

        ra_database_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.RADatabase')
        self.addCleanup(ra_database_patcher.stop)
        self.ra_database_mock = ra_database_patcher.start()
        self.ra_database_mock.return_value = self.fake_ra_database

    def mock_resource_availability_checker(self):
        self.fake_resource_availability_checker = FakeResourceAvailabilityChecker()

    def setUp(self):
        self.mock_ra_database()
        self.mock_resource_availability_checker()

class BasicSchedulerTest(SchedulerTest):
    def new_task(self, task_id):
        self.fake_ra_database.addTask(task_id, {
               "starttime": datetime.datetime(2017, 1, 1, 1, 0, 0),
               "endtime":   datetime.datetime(2017, 1, 1, 2, 0, 0),
             })

        self.scheduler = BasicScheduler(task_id, self.fake_resource_availability_checker, None)


    def test_schedule_task(self):
        """ Whether a task (that fits) can be scheduled. """

        # Resources we need
        self.new_task(0)
        estimates = [{ 'resource_types': {'bandwidth': 512} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)

        # Allocation must succeed and be committed
        self.assertTrue(allocation_succesful)
        self.assertTrue(self.fake_ra_database.committed)
        self.assertFalse(self.fake_ra_database.rolled_back)

        # Claim must be present in database
        claims = self.fake_ra_database.claims[0]
        self.assertTrue(claims)
        self.assertEqual(len(claims), 1)

        # Claim must be valid
        claim = claims[0]
        task = self.fake_ra_database.tasks[0]

        self.assertEqual(claim["status"],           "claimed")
        self.assertEqual(claim["starttime"],        task["starttime"])
        self.assertEqual(claim["endtime"],          task["endtime"])
        self.assertEqual(claim["claim_size"],       512)
        self.assertEqual(claim["resource_type_id"], FakeResourceAvailabilityChecker.resource_types["bandwidth"])


    def test_multiple_resources(self):
        """ Whether a task (that fits) can be scheduled. """

        # Resources we need
        self.new_task(0)
        estimates = [{ 'resource_types': {'bandwidth': 512} },
                     { 'resource_types': {'bandwidth': 512} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)

        # Allocation must succeed
        self.assertTrue(allocation_succesful)

    def test_schedule_too_large_task(self):
        """ Whether a task with too large claims will be rejected by the scheduler. """

        # Resources we need
        self.new_task(0)
        estimates = [{ 'resource_types': {'bandwidth': 2048} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)

        # Allocation must fail, and rollback() called
        self.assertFalse(allocation_succesful)
        self.assertFalse(self.fake_ra_database.committed)
        self.assertTrue(self.fake_ra_database.rolled_back)

    def test_schedule_two_tasks_too_large_task(self):
        """ Whether two tasks that fit individually but not together will be rejected by the scheduler. """

        # First task must succeed
        self.new_task(0)
        estimates = [{ 'resource_types': {'bandwidth': 512} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)
        self.assertTrue(allocation_succesful)

        # Second task must fail
        self.new_task(1)
        estimates = [{ 'resource_types': {'bandwidth': 513} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)
        self.assertFalse(allocation_succesful)

class PrioritySchedulerTest(BasicSchedulerTest):
    # The PriorityScheduler must not regress on the BasicScheduler, so we inherit all its tests

    def mock_momrpc(self):
        class FakeMoMQueryService(object):
            def get_project_priorities_for_objects(self, mom_ids):
                # priority increments by 1000 ids
                return {mom_id: mom_id/1000 for mom_id in mom_ids}

        self.fake_momrpc = FakeMoMQueryService()

        momrpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.MoMQueryRPC')
        self.addCleanup(momrpc_patcher.stop)
        self.momrpc_mock = momrpc_patcher.start()
        self.momrpc_mock.return_value = self.fake_momrpc

    def mock_obscontrol(self):
        obscontrol_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.ObservationControlRPCClient.abort_observation')
        self.addCleanup(obscontrol_patcher.stop)
        self.obscontrol_mock = obscontrol_patcher.start()

    def mock_datetime(self):
        datetime_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.datetime')
        self.addCleanup(datetime_patcher.stop)
        datetime_mock = datetime_patcher.start()

        datetime_mock.utcnow.return_value = datetime.datetime(2017, 1, 1, 0, 0, 0)
        datetime_mock.max = datetime.datetime.max

    def setUp(self):
        super(PrioritySchedulerTest, self).setUp()

        self.mock_momrpc()
        self.mock_obscontrol()
        self.mock_datetime()

    def new_task(self, task_id):
        self.fake_ra_database.addTask(task_id, {
               "mom_id":    1000 + task_id,
               "otdb_id":   2000 + task_id,
               "type":      "observation",
               "starttime": datetime.datetime(2017, 1, 1, 1, 0, 0),
               "endtime":   datetime.datetime(2017, 1, 1, 2, 0, 0),
             })

        self.scheduler = PriorityScheduler(task_id, self.fake_resource_availability_checker, None)

    def test_kill_lower_priority(self):
        """ Whether two tasks that fit individually but not together will be accepted by the scheduler by killing the lower-priority task. """

        # First task must succeed
        self.new_task(0)
        estimates = [{ 'resource_types': {'bandwidth': 512} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)
        self.assertTrue(allocation_succesful)

        # Second task must succeed as it has a higher priority
        self.new_task(1000)
        estimates = [{ 'resource_types': {'bandwidth': 513} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)
        self.assertTrue(allocation_succesful)

        # First task must have been killed
        otdb_id = self.fake_ra_database.tasks[0]["otdb_id"]
        self.obscontrol_mock.assert_called_with(otdb_id)

        # First task must have its endtime cut short
        my_starttime = self.fake_ra_database.tasks[1000]["starttime"]
        for c in self.fake_ra_database.claims[0]:
            self.assertLess(c["endtime"], my_starttime)

    def test_not_kill_higher_priority(self):
        """ Whether two tasks that fit individually but not together get rejected priorities do not allow an override. """

        # First task must succeed
        self.new_task(1000)
        estimates = [{ 'resource_types': {'bandwidth': 512} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)
        self.assertTrue(allocation_succesful)

        # Second task must fail as it has a lower priority
        self.new_task(0)
        estimates = [{ 'resource_types': {'bandwidth': 513} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)
        self.assertFalse(allocation_succesful)

        # First task must NOT have been killed
        otdb_id = self.fake_ra_database.tasks[1000]["otdb_id"]
        with self.assertRaises(AssertionError):
            self.obscontrol_mock.assert_called_with(otdb_id)

    def test_not_kill_equal_priority(self):
        """ Whether two tasks that fit individually but not together get rejected priorities do not allow an override. """

        # First task must succeed
        self.new_task(1)
        estimates = [{ 'resource_types': {'bandwidth': 512} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)
        self.assertTrue(allocation_succesful)

        # Second task must fail as it has a lower priority
        self.new_task(0)
        estimates = [{ 'resource_types': {'bandwidth': 513} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)
        self.assertFalse(allocation_succesful)

    def test_partial_conflict(self):
        """ Whether a task gets scheduled correctly if it has a partial conflict after the first fit. """

        # First task must succeed
        self.new_task(0)
        estimates = [{ 'resource_types': {'bandwidth': 512} },
                     { 'resource_types': {'bandwidth': 512} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)
        self.assertTrue(allocation_succesful)

        # Second task must succeed as it has a higher priority
        self.new_task(1000)
        estimates = [{ 'resource_types': {'bandwidth': 512} },
                     { 'resource_types': {'bandwidth': 513} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)
        self.assertTrue(allocation_succesful)

        # First task must have been killed
        otdb_id = self.fake_ra_database.tasks[0]["otdb_id"]
        self.obscontrol_mock.assert_called_with(otdb_id)

class DwellSchedulerTest(PrioritySchedulerTest):
    # The DwellScheduler must not regress on the PriorityScheduler, so we inherit all its tests

    def new_task(self, task_id):
        self.fake_ra_database.addTask(task_id, {
               "mom_id":    1000 + task_id,
               "otdb_id":   2000 + task_id,
               "type":      "observation",
               "starttime": datetime.datetime(2017, 1, 1, 1, 0, 0),
               "endtime":   datetime.datetime(2017, 1, 1, 2, 0, 0),
             })

        self.fake_ra_database.commit()
        self.fake_ra_database.committed = False # dont confuse subsequent checks on whether the scheduler committed

        self.scheduler = DwellScheduler(task_id,
            datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime
            datetime.datetime(2017, 1, 1, 1, 0, 0), # maxstarttime
            datetime.timedelta(hours=1),            # duration
            self.fake_resource_availability_checker, None)

    def new_dwell_task(self, task_id):
        self.new_task(task_id)

        self.scheduler = DwellScheduler(task_id,
            datetime.datetime(2017, 1, 1, 1, 0, 0), # minstarttime
            datetime.datetime(2017, 1, 2, 1, 0, 0), # maxstarttime
            datetime.timedelta(hours=1),            # duration
            self.fake_resource_availability_checker, None)

    def test_no_dwell(self):
        """ Whether a task will not dwell unnecessarily on an empty system. """

        # Task must succeed
        self.new_dwell_task(0)
        estimates = [{ 'resource_types': {'bandwidth': 512} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)
        self.assertTrue(allocation_succesful)

        # Task must NOT have been moved
        self.assertEqual(self.fake_ra_database.tasks[0]["starttime"], datetime.datetime(2017, 1, 1, 1, 0, 0))

    def test_dwell(self):
        """ Whether a task will dwell after an existing task. """

        # First task must succeed
        self.new_dwell_task(0)
        estimates = [{ 'resource_types': {'bandwidth': 512} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)
        self.assertTrue(allocation_succesful)

        # Second task must also succeed
        self.new_dwell_task(1)
        estimates = [{ 'resource_types': {'bandwidth': 513} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)
        self.assertTrue(allocation_succesful)

        # Second task must have been moved, first task not
        self.assertEqual(self.fake_ra_database.tasks[0]["starttime"], datetime.datetime(2017, 1, 1, 1, 0, 0))
        self.assertEqual(self.fake_ra_database.tasks[0]["endtime"],   datetime.datetime(2017, 1, 1, 2, 0, 0))
        self.assertEqual(self.fake_ra_database.tasks[1]["starttime"], datetime.datetime(2017, 1, 1, 2, 0, 0))
        self.assertEqual(self.fake_ra_database.tasks[1]["endtime"],   datetime.datetime(2017, 1, 1, 3, 0, 0))

    def test_dwell_respect_claim_endtime(self):
        """ Whether a dwelling task will honour the claim endtimes, instead of the task endtime. """

        # First task must succeed
        self.new_dwell_task(0)
        estimates = [{ 'resource_types': {'bandwidth': 512} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)
        self.assertTrue(allocation_succesful)

        # Extend claim
        self.fake_ra_database.claims[0][0]["endtime"] += datetime.timedelta(hours=1)

        # Second task must also succeed
        self.new_dwell_task(1)
        estimates = [{ 'resource_types': {'bandwidth': 513} }]
        allocation_succesful = self.scheduler.allocate_resources(estimates)
        self.assertTrue(allocation_succesful)

        # Second task must have been moved beyond claim endtime
        self.assertEqual(self.fake_ra_database.tasks[1]["starttime"], datetime.datetime(2017, 1, 1, 3, 0, 0))
        self.assertEqual(self.fake_ra_database.tasks[1]["endtime"],   datetime.datetime(2017, 1, 1, 4, 0, 0))

if __name__ == '__main__':
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)

    unittest.main()