-
Ruud Beukema authoredRuud Beukema authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
schedulers.py 13.55 KiB
from datetime import datetime
from lofar.common.cache import cache
from lofar.sas.resourceassignment.database import radb
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import CouldNotFindClaimException
from lofar.mac.config import DEFAULT_OBSERVATION_CONTROL_BUS_NAME, DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME
from lofar.mac.observation_control_rpc import ObservationControl
""" Scheduling is the process of looking for a suitable resource slot for a given task.
The following code allows three levels of scheduling:
* The /BasicScheduler/ locates resources for a task between its specified start- and endtime.
* The /PriorityScheduler/ enhances the BasicScheduler, and kills lower-priority tasks to make
room for the given task.
* The /DwellScheduler/ enhances the PriorityScheduler, by increasing the starttime
until the task can be scheduled.
Each level contains hooks to support the next.
All schedulers modify the database directly, and commit() if a solution if found. If not, a rollback()
is performed. To use:
sched = <schedulerclass>(...)
success = sched.allocate_resources(estimates)
"""
class ScheduleException(Exception):
def __init__(self, message):
super(ScheduleException, self).__init__(message)
class BasicScheduler:
""" A Scheduler that allocates resources at a fixed time. Resources are searched for. """
def __init__(self, task_id, resource_availability_checker, radbcreds=None):
self.task_id = task_id
self.resource_availability_checker = resource_availability_checker
# We need a DIRECT connection to the database in order to do client-side (that's us) transactions
self.radb = radb(radbcreds)
# Cache our task info
self.task = self.radb.getTasks(task_id=self.task_id)[0]
self.starttime = self.task["starttime"]
self.endtime = self.task["endtime"]
# Any resources that we cannot schedule on for some reason
self.unusable_resources = []
def allocate_resources(self, estimates):
# any resources we cannot schedule on, either because we've already
# tried, or because there's already a claim of us there
try:
# pre process
self._pre_process_allocation()
# keep iterating until all estimates are turned into tentative claims
while estimates:
estimates = self._try_schedule(estimates)
# post process
self._post_process_allocation()
except ScheduleException:
self.radb.rollback()
return False
self.radb.commit()
return True
def _pre_process_allocation(self):
pass
def _post_process_allocation(self):
# move all claims from tentative -> claimed
if not self.radb.updateResourceClaims(task_id=self.task_id, claim_status="claimed", commit=False):
raise ScheduleException("Failed to put tentative claims to claimed")
def _get_resource_availability(self):
""" Get the available resources on which we can schedule. """
# available resources
resources = self.radb.getResources(include_availability=True, claimable_capacity_lower_bound=self.starttime,
claimable_capacity_upper_bound=self.endtime)
# resources WE claimed (note that their size is already accounted for in 'resources')
tentative_claims = self.radb.getResourceClaims(task_ids=self.task_id, status="tentative")
tentative_resources = [c["resource_id"] for c in tentative_claims]
# disable resources we cannot schedule on (we cannot schedule on resources with tentative
# claims until we implement merging tentative claims)
disabled_resources = self.unusable_resources + tentative_resources
# disable resources for various reasons
return [r for r in resources if r["id"] not in disabled_resources]
def _finalise_claims(self, claims):
for c in claims:
c["status"] = "tentative" # we MUST insert as "tentative", as radb demands so
c["starttime"] = self.starttime
c["endtime"] = self.endtime
def _try_schedule(self, requested_resources):
""" Schedule the estimates within the available resources.
Returns the set of estimates that could not be scheduled.
"""
available_resources = self._get_resource_availability()
try:
claims = self.resource_availability_checker.get_is_claimable(requested_resources, available_resources)
except CouldNotFindClaimException:
raise ScheduleException("Could not schedule")
# add static info to all claims
self._finalise_claims(claims)
# insert all claims to reserve the resources in the next call to findfit and to find the conflicts according to
# the DB
claim_ids = self.radb.insertResourceClaims(self.task_id, claims, _, _, commit=False)
# tie the claim ids to the estimates
claim_to_estimates = {cid: claims[cid]["requested_resources"] for cid in claim_ids}
# handle any conflicts. We need NOT resolve ALL conflicts: removing one conflict can free up more resources as
# a by-product, in which case other conflicts can simply be shifted to those newly freed resources.
conflict_claims = self.radb.getResourceClaims(task_ids=self.task_id, status="conflict")
if not any([self._resolve_conflict(c) for c in conflict_claims]):
# Could not resolve any conflict
raise ScheduleException("Could not resolve any conflict")
# remove conflicting claims (allowing the next iteration to propose alternatives). Note that _handle_conflicts
# could have reduced the number of conflicting claims.
conflict_claims = self.radb.getResourceClaims(task_ids=self.task_id, status="conflict")
conflict_claim_ids = [c["id"] for c in conflict_claims]
self.radb.deleteResourceClaims(conflict_claim_ids, commit=False)
# return any estimates that we could not fullfil
remaining_estimates = set().union([claim_to_estimates[cid] for cid in conflict_claim_ids])
return remaining_estimates
def _get_conflicting_claims_and_tasks(self, conflict_claim):
""" Return all claims that are conflicting with ours, and return the tasks associated with those claims. """
conflicting_claims = self.radb.get_conflicting_overlapping_claims(c)
conflicting_task_ids = set([c["task_id"] for c in conflicting_claims])
conflicting_tasks = self.radb.getTasks(task_ids=conflicting_task_ids)
return conflicting_claims, conflicting_tasks
def _resolve_conflict(self, conflict_claim):
""" Resolve one conflict, making it is useful to try to schedule again.
:returns True if the conflict might have been resolved.
:returns False if nothing can be done.
"""
return False
class PriorityScheduler(BasicScheduler):
""" A Scheduler that searches for an allocation with a fixed start time, but flexible resources.
Conflict resolution is done by killing jobs with lower priority. """
def __init__(self, task_id, resource_availability_checker,
radbcreds=None,
mom_busname=DEFAULT_MOMQUERY_BUSNAME,
mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
observation_control_busname=DEFAULT_OBSERVATION_CONTROL_BUS_NAME,
observation_control_servicename=DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME,
broker=None):
super(PriorityScheduler, self).__init__(task_id, resource_availability_checker, radbcreds)
self.momqueryservice = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180)
self.obscontrol = ObservationControl(sevicename=observation_control_servicename,
busname=observation_control_busname,
broker=broker,
timeout=120)
def open(self):
self.momqueryservice.open()
self.obscontrol.open()
def close(self):
self.obscontrol.close()
self.momqueryservice.close()
def _pre_process_allocation(self):
super(PriorityScheduler, self)._pre_process_allocation()
self.earliest_potential_starttime = datetime.max
self.tasks_to_kill = []
def _post_process_allocation(self):
super(PriorityScheduler, self)._post_process_allocation()
# kill all jobs in self.tasks_to_kill
for t in set(self.tasks_to_kill):
if t["type"] == "observation":
self.obscontrol.abort_observation(t["otdb_id"])
else:
# Killing scheduled pipelines only makes sense when they use resources other than storage, which is not
# the case for current pipelines
raise ScheduleException("Cannot kill jobs of type %s yet" % t["type"])
@cache
def _my_task_priority(self):
my_momid = self.task["mom_id"]
priority_dict = self.momqueryservice.get_project_priorities_for_objects([my_momid])
return priority_dict[my_momid]
def _add_task_to_kill(self, task_id):
# add it to the list to actually kill later
self.tasks_to_kill.append(task_id)
# emulate by setting task endtime to now()
self.radb.updateTaskAndResourceClaims(task_id=task_id, endtime=now(), commit=False)
def _propose_potential_starttime(self, newstarttime):
""" Propose a new start time at which allocation could succeed. We only collect the earliest one proposed. """
self.earliest_potential_starttime = min(self.earliest_potential_starttime, newstarttime)
def _resolve_conflict(self, conflict_claim):
# try to resolve the conflict, and mark any resources unavailable if resolution fails
tasks_to_kill = self._kill_conflict_tasks(conflict_claim)
if not tasks_to_kill:
# record which resources cannot be used anymore, because we can't kill anything on it
self.unusable_resources.append(conflict_claim["resource"])
# kill all tasks as requested
for k in tasks_to_kill:
self._kill_task(k)
# Return True if we killed anything
return tasks_to_kill != []
def _kill_conflict_tasks(self, conflict_claim):
"""
Modify the RA database such that one conflicting claim is resolved.
Returns (kill_task_list, unfreeable_resources)
"""
if conflict_claim["resource_type_id"] == self.resource_availability_checker.resource_types['storage']:
raise ScheduleException("Could not resolve conflict on storage resource")
# find all conflicting claims & which tasks they belong to
conflicting_claims, conflicting_tasks = self._get_conflicting_claims_and_tasks(conflict_claim)
conflicting_task_momids = [t["mom_id"] for t in conflicting_tasks]
# check which tasks we can kill
task_priorities = self.momqueryservice.get_project_priorities_for_objects(conflicting_task_momids)
kill_task_list = [t for t in conflicting_tasks if task_priorities[t["mom_id"]] < self._my_task_priority()]
# update if we're blocked by an earlier task than we know so far
earliest_potential_starttime = min([t["endtime"] for t in conflicting_tasks if t not in kill_task_list])
self._propose_potential_starttime(earliest_potential_starttime)
return kill_task_list
class DwellScheduler(PriorityScheduler):
""" A Scheduler that searches for an allocation with a flexible start time.
Example:
sched = DwellScheduler(task_id, min_starttime, max_starttime, duration)
success = sched.allocate_resources(estimates)
starttime = sched.starttime
"""
def __init__(self, task_id, min_starttime, max_starttime, duration, **kwargs):
super(DwellScheduler, self).__init__(task_id, **kwargs)
self.task_id = task_id
self.min_starttime = min_starttime
self.max_starttime = max_starttime
self.duration = duration
def _new_starttime(self, starttime):
self.starttime = starttime
self.endtime = starttime + self.duration
def _post_process_allocation(self):
super(DwellScheduler, self)._post_process_allocation()
# Update the task specification and resources to the new starttime and endtime
self.radb.updateTaskAndResourceClaims(task_id=self.task_id,
starttime=self.starttime,
endtime=self.endtime,
commit=False)
def allocate_resources(self, estimates):
""" Scan between [min_starttime, max_starttime) to find the first
possible slot where the task can be scheduled. """
self._new_starttime(self.min_starttime)
success = False
while not success and self.starttime <= self.max_starttime:
# Find a solution
success = super(DwellScheduler, self).allocate_resources(estimates)
if success:
break
# Try the next slot
new_starttime = self.earliest_potential_starttime
assert new_starttime > self.starttime, "DwellScheduler cannot advance"
self._new_starttime(new_starttime)
return success