from datetime import datetime from lofar.common.cache import cache from lofar.sas.resourceassignment.database import radb from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import CouldNotFindClaimException from lofar.mac.config import DEFAULT_OBSERVATION_CONTROL_BUS_NAME, DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME from lofar.mac.observation_control_rpc import ObservationControl """ Scheduling is the process of looking for a suitable resource slot for a given task. The following code allows three levels of scheduling: * The /BasicScheduler/ locates resources for a task between its specified start- and endtime. * The /PriorityScheduler/ enhances the BasicScheduler, and kills lower-priority tasks to make room for the given task. * The /DwellScheduler/ enhances the PriorityScheduler, by increasing the starttime until the task can be scheduled. Each level contains hooks to support the next. All schedulers modify the database directly, and commit() if a solution if found. If not, a rollback() is performed. To use: sched = <schedulerclass>(...) success = sched.allocate_resources(estimates) """ class ScheduleException(Exception): def __init__(self, message): super(ScheduleException, self).__init__(message) class BasicScheduler: """ A Scheduler that allocates resources at a fixed time. Resources are searched for. """ def __init__(self, task_id, resource_availability_checker, radbcreds=None): self.task_id = task_id self.resource_availability_checker = resource_availability_checker # We need a DIRECT connection to the database in order to do client-side (that's us) transactions self.radb = radb(radbcreds) # Cache our task info self.task = self.radb.getTasks(task_id=self.task_id)[0] self.starttime = self.task["starttime"] self.endtime = self.task["endtime"] # Any resources that we cannot schedule on for some reason self.unusable_resources = [] def allocate_resources(self, estimates): # any resources we cannot schedule on, either because we've already # tried, or because there's already a claim of us there try: # pre process self._pre_process_allocation() # keep iterating until all estimates are turned into tentative claims while estimates: estimates = self._try_schedule(estimates) # post process self._post_process_allocation() except ScheduleException: self.radb.rollback() return False self.radb.commit() return True def _pre_process_allocation(self): pass def _post_process_allocation(self): # move all claims from tentative -> claimed if not self.radb.updateResourceClaims(task_id=self.task_id, claim_status="claimed", commit=False): raise ScheduleException("Failed to put tentative claims to claimed") def _get_resource_availability(self): """ Get the available resources on which we can schedule. """ # available resources resources = self.radb.getResources(include_availability=True, claimable_capacity_lower_bound=self.starttime, claimable_capacity_upper_bound=self.endtime) # resources WE claimed (note that their size is already accounted for in 'resources') tentative_claims = self.radb.getResourceClaims(task_ids=self.task_id, status="tentative") tentative_resources = [c["resource_id"] for c in tentative_claims] # disable resources we cannot schedule on (we cannot schedule on resources with tentative # claims until we implement merging tentative claims) disabled_resources = self.unusable_resources + tentative_resources # disable resources for various reasons return [r for r in resources if r["id"] not in disabled_resources] def _finalise_claims(self, claims): for c in claims: c["status"] = "tentative" # we MUST insert as "tentative", as radb demands so c["starttime"] = self.starttime c["endtime"] = self.endtime def _try_schedule(self, requested_resources): """ Schedule the estimates within the available resources. Returns the set of estimates that could not be scheduled. """ available_resources = self._get_resource_availability() try: claims = self.resource_availability_checker.get_is_claimable(requested_resources, available_resources) except CouldNotFindClaimException: raise ScheduleException("Could not schedule") # add static info to all claims self._finalise_claims(claims) # insert all claims to reserve the resources in the next call to findfit and to find the conflicts according to # the DB claim_ids = self.radb.insertResourceClaims(self.task_id, claims, _, _, commit=False) # tie the claim ids to the estimates claim_to_estimates = {cid: claims[cid]["requested_resources"] for cid in claim_ids} # handle any conflicts. We need NOT resolve ALL conflicts: removing one conflict can free up more resources as # a by-product, in which case other conflicts can simply be shifted to those newly freed resources. conflict_claims = self.radb.getResourceClaims(task_ids=self.task_id, status="conflict") if not any([self._resolve_conflict(c) for c in conflict_claims]): # Could not resolve any conflict raise ScheduleException("Could not resolve any conflict") # remove conflicting claims (allowing the next iteration to propose alternatives). Note that _handle_conflicts # could have reduced the number of conflicting claims. conflict_claims = self.radb.getResourceClaims(task_ids=self.task_id, status="conflict") conflict_claim_ids = [c["id"] for c in conflict_claims] self.radb.deleteResourceClaims(conflict_claim_ids, commit=False) # return any estimates that we could not fullfil remaining_estimates = set().union([claim_to_estimates[cid] for cid in conflict_claim_ids]) return remaining_estimates def _get_conflicting_claims_and_tasks(self, conflict_claim): """ Return all claims that are conflicting with ours, and return the tasks associated with those claims. """ conflicting_claims = self.radb.get_conflicting_overlapping_claims(c) conflicting_task_ids = set([c["task_id"] for c in conflicting_claims]) conflicting_tasks = self.radb.getTasks(task_ids=conflicting_task_ids) return conflicting_claims, conflicting_tasks def _resolve_conflict(self, conflict_claim): """ Resolve one conflict, making it is useful to try to schedule again. :returns True if the conflict might have been resolved. :returns False if nothing can be done. """ return False class PriorityScheduler(BasicScheduler): """ A Scheduler that searches for an allocation with a fixed start time, but flexible resources. Conflict resolution is done by killing jobs with lower priority. """ def __init__(self, task_id, resource_availability_checker, radbcreds=None, mom_busname=DEFAULT_MOMQUERY_BUSNAME, mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, observation_control_busname=DEFAULT_OBSERVATION_CONTROL_BUS_NAME, observation_control_servicename=DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME, broker=None): super(PriorityScheduler, self).__init__(task_id, resource_availability_checker, radbcreds) self.momqueryservice = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180) self.obscontrol = ObservationControl(sevicename=observation_control_servicename, busname=observation_control_busname, broker=broker, timeout=120) def open(self): self.momqueryservice.open() self.obscontrol.open() def close(self): self.obscontrol.close() self.momqueryservice.close() def _pre_process_allocation(self): super(PriorityScheduler, self)._pre_process_allocation() self.earliest_potential_starttime = datetime.max self.tasks_to_kill = [] def _post_process_allocation(self): super(PriorityScheduler, self)._post_process_allocation() # kill all jobs in self.tasks_to_kill for t in set(self.tasks_to_kill): if t["type"] == "observation": self.obscontrol.abort_observation(t["otdb_id"]) else: # Killing scheduled pipelines only makes sense when they use resources other than storage, which is not # the case for current pipelines raise ScheduleException("Cannot kill jobs of type %s yet" % t["type"]) @cache def _my_task_priority(self): my_momid = self.task["mom_id"] priority_dict = self.momqueryservice.get_project_priorities_for_objects([my_momid]) return priority_dict[my_momid] def _add_task_to_kill(self, task_id): # add it to the list to actually kill later self.tasks_to_kill.append(task_id) # emulate by setting task endtime to now() self.radb.updateTaskAndResourceClaims(task_id=task_id, endtime=now(), commit=False) def _propose_potential_starttime(self, newstarttime): """ Propose a new start time at which allocation could succeed. We only collect the earliest one proposed. """ self.earliest_potential_starttime = min(self.earliest_potential_starttime, newstarttime) def _resolve_conflict(self, conflict_claim): # try to resolve the conflict, and mark any resources unavailable if resolution fails tasks_to_kill = self._kill_conflict_tasks(conflict_claim) if not tasks_to_kill: # record which resources cannot be used anymore, because we can't kill anything on it self.unusable_resources.append(conflict_claim["resource"]) # kill all tasks as requested for k in tasks_to_kill: self._kill_task(k) # Return True if we killed anything return tasks_to_kill != [] def _kill_conflict_tasks(self, conflict_claim): """ Modify the RA database such that one conflicting claim is resolved. Returns (kill_task_list, unfreeable_resources) """ if conflict_claim["resource_type_id"] == self.resource_availability_checker.resource_types['storage']: raise ScheduleException("Could not resolve conflict on storage resource") # find all conflicting claims & which tasks they belong to conflicting_claims, conflicting_tasks = self._get_conflicting_claims_and_tasks(conflict_claim) conflicting_task_momids = [t["mom_id"] for t in conflicting_tasks] # check which tasks we can kill task_priorities = self.momqueryservice.get_project_priorities_for_objects(conflicting_task_momids) kill_task_list = [t for t in conflicting_tasks if task_priorities[t["mom_id"]] < self._my_task_priority()] # update if we're blocked by an earlier task than we know so far earliest_potential_starttime = min([t["endtime"] for t in conflicting_tasks if t not in kill_task_list]) self._propose_potential_starttime(earliest_potential_starttime) return kill_task_list class DwellScheduler(PriorityScheduler): """ A Scheduler that searches for an allocation with a flexible start time. Example: sched = DwellScheduler(task_id, min_starttime, max_starttime, duration) success = sched.allocate_resources(estimates) starttime = sched.starttime """ def __init__(self, task_id, min_starttime, max_starttime, duration, **kwargs): super(DwellScheduler, self).__init__(task_id, **kwargs) self.task_id = task_id self.min_starttime = min_starttime self.max_starttime = max_starttime self.duration = duration def _new_starttime(self, starttime): self.starttime = starttime self.endtime = starttime + self.duration def _post_process_allocation(self): super(DwellScheduler, self)._post_process_allocation() # Update the task specification and resources to the new starttime and endtime self.radb.updateTaskAndResourceClaims(task_id=self.task_id, starttime=self.starttime, endtime=self.endtime, commit=False) def allocate_resources(self, estimates): """ Scan between [min_starttime, max_starttime) to find the first possible slot where the task can be scheduled. """ self._new_starttime(self.min_starttime) success = False while not success and self.starttime <= self.max_starttime: # Find a solution success = super(DwellScheduler, self).allocate_resources(estimates) if success: break # Try the next slot new_starttime = self.earliest_potential_starttime assert new_starttime > self.starttime, "DwellScheduler cannot advance" self._new_starttime(new_starttime) return success