Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
schedulers.py 13.55 KiB
from datetime import datetime

from lofar.common.cache import cache

from lofar.sas.resourceassignment.database import radb

from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME

from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import CouldNotFindClaimException

from lofar.mac.config import DEFAULT_OBSERVATION_CONTROL_BUS_NAME, DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME
from lofar.mac.observation_control_rpc import ObservationControl


""" Scheduling is the process of looking for a suitable resource slot for a given task.

    The following code allows three levels of scheduling:

    	* The /BasicScheduler/ locates resources for a task between its specified start- and endtime.
    	* The /PriorityScheduler/ enhances the BasicScheduler, and kills lower-priority tasks to make
    	  room for the given task.
    	* The /DwellScheduler/ enhances the PriorityScheduler, by increasing the starttime
    	  until the task can be scheduled.

    Each level contains hooks to support the next.

    All schedulers modify the database directly, and commit() if a solution if found. If not, a rollback()
    is performed. To use:

        sched = <schedulerclass>(...)
        success = sched.allocate_resources(estimates)
"""


class ScheduleException(Exception):
    def __init__(self, message):
        super(ScheduleException, self).__init__(message)


class BasicScheduler:
    """ A Scheduler that allocates resources at a fixed time. Resources are searched for. """

    def __init__(self, task_id, resource_availability_checker, radbcreds=None):
        self.task_id = task_id
        self.resource_availability_checker = resource_availability_checker

        # We need a DIRECT connection to the database in order to do client-side (that's us) transactions
        self.radb = radb(radbcreds)

        # Cache our task info
        self.task = self.radb.getTasks(task_id=self.task_id)[0]
        self.starttime = self.task["starttime"]
        self.endtime = self.task["endtime"]

        # Any resources that we cannot schedule on for some reason
        self.unusable_resources = []

    def allocate_resources(self, estimates):
        # any resources we cannot schedule on, either because we've already
        # tried, or because there's already a claim of us there

        try:
            # pre process
            self._pre_process_allocation()

            # keep iterating until all estimates are turned into tentative claims
            while estimates:
                estimates = self._try_schedule(estimates)

            # post process
            self._post_process_allocation()
        except ScheduleException:
            self.radb.rollback()
            return False

        self.radb.commit()
        return True

    def _pre_process_allocation(self):
        pass

    def _post_process_allocation(self):
        # move all claims from tentative -> claimed
        if not self.radb.updateResourceClaims(task_id=self.task_id, claim_status="claimed", commit=False):
            raise ScheduleException("Failed to put tentative claims to claimed")

    def _get_resource_availability(self):
        """ Get the available resources on which we can schedule. """

        # available resources
        resources = self.radb.getResources(include_availability=True, claimable_capacity_lower_bound=self.starttime,
                                           claimable_capacity_upper_bound=self.endtime)

        # resources WE claimed (note that their size is already accounted for in 'resources')
        tentative_claims = self.radb.getResourceClaims(task_ids=self.task_id, status="tentative")
        tentative_resources = [c["resource_id"] for c in tentative_claims]

        # disable resources we cannot schedule on (we cannot schedule on resources with tentative
        # claims until we implement merging tentative claims)
        disabled_resources = self.unusable_resources + tentative_resources

        # disable resources for various reasons
        return [r for r in resources if r["id"] not in disabled_resources]

    def _finalise_claims(self, claims):
        for c in claims:
            c["status"] = "tentative"  # we MUST insert as "tentative", as radb demands so
            c["starttime"] = self.starttime
            c["endtime"] = self.endtime

    def _try_schedule(self, requested_resources):
        """ Schedule the estimates within the available resources.

            Returns the set of estimates that could not be scheduled.
        """

        available_resources = self._get_resource_availability()
        try:
            claims = self.resource_availability_checker.get_is_claimable(requested_resources, available_resources)
        except CouldNotFindClaimException:
            raise ScheduleException("Could not schedule")

        # add static info to all claims
        self._finalise_claims(claims)

        # insert all claims to reserve the resources in the next call to findfit and to find the conflicts according to
        # the DB
        claim_ids = self.radb.insertResourceClaims(self.task_id, claims, _, _, commit=False)

        # tie the claim ids to the estimates
        claim_to_estimates = {cid: claims[cid]["requested_resources"] for cid in claim_ids}

        # handle any conflicts. We need NOT resolve ALL conflicts: removing one conflict can free up more resources as
        # a by-product, in which case other conflicts can simply be shifted to those newly freed resources.
        conflict_claims = self.radb.getResourceClaims(task_ids=self.task_id, status="conflict")
        if not any([self._resolve_conflict(c) for c in conflict_claims]):
            # Could not resolve any conflict
            raise ScheduleException("Could not resolve any conflict")

        # remove conflicting claims (allowing the next iteration to propose alternatives). Note that _handle_conflicts
        # could have reduced the number of conflicting claims.
        conflict_claims = self.radb.getResourceClaims(task_ids=self.task_id, status="conflict")
        conflict_claim_ids = [c["id"] for c in conflict_claims]
        self.radb.deleteResourceClaims(conflict_claim_ids, commit=False)

        # return any estimates that we could not fullfil
        remaining_estimates = set().union([claim_to_estimates[cid] for cid in conflict_claim_ids])
        return remaining_estimates

    def _get_conflicting_claims_and_tasks(self, conflict_claim):
        """ Return all claims that are conflicting with ours, and return the tasks associated with those claims. """

        conflicting_claims = self.radb.get_conflicting_overlapping_claims(c)
        conflicting_task_ids = set([c["task_id"] for c in conflicting_claims])
        conflicting_tasks = self.radb.getTasks(task_ids=conflicting_task_ids)

        return conflicting_claims, conflicting_tasks


    def _resolve_conflict(self, conflict_claim):
        """ Resolve one conflict, making it is useful to try to schedule again.

            :returns True if the conflict might have been resolved.
            :returns False if nothing can be done.
        """

        return False

class PriorityScheduler(BasicScheduler):
    """ A Scheduler that searches for an allocation with a fixed start time, but flexible resources.
        Conflict resolution is done by killing jobs with lower priority. """

    def __init__(self, task_id, resource_availability_checker,
                 radbcreds=None,
                 mom_busname=DEFAULT_MOMQUERY_BUSNAME,
                 mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
                 observation_control_busname=DEFAULT_OBSERVATION_CONTROL_BUS_NAME,
                 observation_control_servicename=DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME,
                 broker=None):

        super(PriorityScheduler, self).__init__(task_id, resource_availability_checker, radbcreds)

        self.momqueryservice = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180)
        self.obscontrol = ObservationControl(sevicename=observation_control_servicename,
                                             busname=observation_control_busname,
                                             broker=broker,
                                             timeout=120)

    def open(self):
        self.momqueryservice.open()
        self.obscontrol.open()

    def close(self):
        self.obscontrol.close()
        self.momqueryservice.close()

    def _pre_process_allocation(self):
        super(PriorityScheduler, self)._pre_process_allocation()

        self.earliest_potential_starttime = datetime.max
        self.tasks_to_kill = []

    def _post_process_allocation(self):
        super(PriorityScheduler, self)._post_process_allocation()

        # kill all jobs in self.tasks_to_kill
        for t in set(self.tasks_to_kill):
            if t["type"] == "observation":
                self.obscontrol.abort_observation(t["otdb_id"])
            else:
                # Killing scheduled pipelines only makes sense when they use resources other than storage, which is not
                # the case for current pipelines
                raise ScheduleException("Cannot kill jobs of type %s yet" % t["type"])

    @cache
    def _my_task_priority(self):
        my_momid = self.task["mom_id"]
        priority_dict = self.momqueryservice.get_project_priorities_for_objects([my_momid])
        return priority_dict[my_momid]

    def _add_task_to_kill(self, task_id):
        # add it to the list to actually kill later
        self.tasks_to_kill.append(task_id)

        # emulate by setting task endtime to now()
        self.radb.updateTaskAndResourceClaims(task_id=task_id, endtime=now(), commit=False)

    def _propose_potential_starttime(self, newstarttime):
        """ Propose a new start time at which allocation could succeed. We only collect the earliest one proposed. """
        self.earliest_potential_starttime = min(self.earliest_potential_starttime, newstarttime)

    def _resolve_conflict(self, conflict_claim):
        # try to resolve the conflict, and mark any resources unavailable if resolution fails
        tasks_to_kill = self._kill_conflict_tasks(conflict_claim)

        if not tasks_to_kill:
            # record which resources cannot be used anymore, because we can't kill anything on it
            self.unusable_resources.append(conflict_claim["resource"])

        # kill all tasks as requested
        for k in tasks_to_kill:
            self._kill_task(k)

        # Return True if we killed anything
        return tasks_to_kill != []

    def _kill_conflict_tasks(self, conflict_claim):
        """
            Modify the RA database such that one conflicting claim is resolved.

            Returns (kill_task_list, unfreeable_resources)
        """

        if conflict_claim["resource_type_id"] == self.resource_availability_checker.resource_types['storage']:
            raise ScheduleException("Could not resolve conflict on storage resource")

        # find all conflicting claims & which tasks they belong to
        conflicting_claims, conflicting_tasks = self._get_conflicting_claims_and_tasks(conflict_claim)
        conflicting_task_momids = [t["mom_id"] for t in conflicting_tasks]

        # check which tasks we can kill
        task_priorities = self.momqueryservice.get_project_priorities_for_objects(conflicting_task_momids)
        kill_task_list = [t for t in conflicting_tasks if task_priorities[t["mom_id"]] < self._my_task_priority()]

        # update if we're blocked by an earlier task than we know so far
        earliest_potential_starttime = min([t["endtime"] for t in conflicting_tasks if t not in kill_task_list])
        self._propose_potential_starttime(earliest_potential_starttime)

        return kill_task_list


class DwellScheduler(PriorityScheduler):
    """ A Scheduler that searches for an allocation with a flexible start time.

        Example:

        sched = DwellScheduler(task_id, min_starttime, max_starttime, duration)
        success = sched.allocate_resources(estimates)
        starttime = sched.starttime
    """

    def __init__(self, task_id, min_starttime, max_starttime, duration, **kwargs):
        super(DwellScheduler, self).__init__(task_id, **kwargs)

        self.task_id = task_id
        self.min_starttime = min_starttime
        self.max_starttime = max_starttime
        self.duration = duration

    def _new_starttime(self, starttime):
        self.starttime = starttime
        self.endtime = starttime + self.duration

    def _post_process_allocation(self):
        super(DwellScheduler, self)._post_process_allocation()

        # Update the task specification and resources to the new starttime and endtime
        self.radb.updateTaskAndResourceClaims(task_id=self.task_id,
                                              starttime=self.starttime,
                                              endtime=self.endtime,
                                              commit=False)

    def allocate_resources(self, estimates):
        """ Scan between [min_starttime, max_starttime) to find the first
            possible slot where the task can be scheduled. """

        self._new_starttime(self.min_starttime)
        success = False

        while not success and self.starttime <= self.max_starttime:
            # Find a solution
            success = super(DwellScheduler, self).allocate_resources(estimates)
            if success:
                break

            # Try the next slot
            new_starttime = self.earliest_potential_starttime
            assert new_starttime > self.starttime, "DwellScheduler cannot advance"

            self._new_starttime(new_starttime)

        return success