diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py index 08cefa15df0fc910a9fff9bb3f51fbbd818ef8b0..9974e7e6ce30890cc430587f1f31a48f1727d6da 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py @@ -64,10 +64,11 @@ class BasicScheduler(object): self.resource_availability_checker = resource_availability_checker # We need a DIRECT connection to the database in order to do client-side (that's us) transactions - self.radb = RADatabase(dbcreds=radbcreds, log_queries=False) + self.radb = RADatabase(dbcreds=radbcreds, log_queries=True) # Ensure a valid task_id is given, since radb.getTasks() will not raise if task_id equals None - assert task_id >= 0 + if task_id < 0: + raise ValueError('BasicScheduler, task_id=%s should be >= 0', task_id) # Cache our task info self.task = self.radb.getTask(id=task_id) @@ -78,7 +79,8 @@ class BasicScheduler(object): self.unusable_resources = [] # Duration must be non-negative or weird stuff will happen - assert self.starttime <= self.endtime + if self.starttime > self.endtime: + raise ValueError('BasicScheduler, starttime=%s should be >= endtime=%s', starttime, endtime) def allocate_resources(self): """ @@ -109,11 +111,19 @@ class BasicScheduler(object): allocation_successful = True except ScheduleException, e: - logger.debug("BasicScheduler: scheduling threw exception: %s", e) - self.radb.rollback() + logger.exception("BasicScheduler: scheduling threw exception: %s", e) + self._handle_schedule_exception() return allocation_successful + def _handle_schedule_exception(self): + """ + The BasicScheduler does not care about ScheduleException's, no rollback is needed or wanted. + We just want everything in the radb, and let the user decide how to act. + """ + logger.info("BasicScheduler: ignoring ScheduleException, accepting current solution into radb") + self.radb.commit() + def _pre_process_allocation(self): """ Placeholder for derived classes, available to be able perform for any processing prior to the actual allocation @@ -229,8 +239,8 @@ class BasicScheduler(object): allowed_resources) logger.debug("Proposing tentative claims: %s", tentative_claims) except CouldNotFindClaimException as e: - logger.error('_try_schedule_resources CouldNotFindClaimException: %s', e) - raise ScheduleException("Could not schedule") + logger.exception('_try_schedule_resources CouldNotFindClaimException: %s', e) + raise ScheduleException("Could not schedule: %s" % str(e)) assert tentative_claims @@ -250,16 +260,18 @@ class BasicScheduler(object): # up more resources as a by-product, in which case other conflicts can simply be shifted to those newly freed # resources. conflict_claims = self.radb.getResourceClaims(task_ids=[self.task_id], status="conflict", extended=True) - logger.debug("Resulting claims in conflict before resolution: %s", conflict_claims) + logger.info("Resulting claims in conflict before resolution: %s", conflict_claims) + if conflict_claims and not any([self._resolve_conflict(c) for c in conflict_claims]): if need_all or len(conflict_claims) == len(tentative_claims): # Could not resolve any conflict - raise ScheduleException("Could not resolve any conflict") + raise ScheduleException("Could not resolve one or more conflicting claims: #tentative_claims=%s #conflict_claims=%s conflict_claims=%s" % ( + len(tentative_claims), len(conflict_claims), conflict_claims)) # remove conflicting claims (allowing the next iteration to propose alternatives). Note that _handle_conflicts # could have reduced the number of conflicting claims. conflict_claims = self.radb.getResourceClaims(task_ids=[self.task_id], status="conflict", extended=True) - logger.debug("Resulting claims in conflict after resolution: %s", conflict_claims) + logger.info("Resulting claims in conflict after resolution: %s", conflict_claims) conflict_claim_ids = [c["id"] for c in conflict_claims] self.radb.deleteResourceClaims(conflict_claim_ids, commit=False) @@ -272,7 +284,7 @@ class BasicScheduler(object): if e in remaining_estimates: remaining_estimates.remove(e) - logger.debug("Remaining estimates: %s", remaining_estimates) + logger.info("Remaining estimates: %s", remaining_estimates) return remaining_estimates def _resolve_conflict(self, conflict_claim): @@ -318,6 +330,13 @@ class StationScheduler(BasicScheduler): # For observations without a fixed station list, we need to derive one. TODO: this likely isnt the condition we want to decide on self.must_derive_station_list = specification_tree["task_type"] == "observation" and specification_tree["specification"]["Observation.VirtualInstrument.stationList"] == [] and specification_tree["station_requirements"] + def _handle_schedule_exception(self): + """ + All derived classes of the BasicScheduler do care about ScheduleException's, and we do want to rollback. + """ + logger.info("%s: handling ScheduleException by doing rollback of current solution", self.__class__.__name__) + self.radb.rollback() + @cache def _expand_station_list(self, resource_group): """ Given a resource group name, return a list of station names below it. """ @@ -547,19 +566,19 @@ class PriorityScheduler(StationScheduler): :return: True if conflict resolution was effective, or False if not """ - logger.debug("resolve_conflicts: conflicting_claims: %s", conflict_claim) - # try to resolve the conflict, and mark any resources unavailable if resolution fails tasks_to_kill = self._get_resolvable_conflicting_tasks(conflict_claim) for t in tasks_to_kill: + logger.info("_resolve_conflict: found task %s to kill for conflict_claim: %s", t, conflict_claim) + # add it to the list to actually kill later self.tasks_to_kill.append(t) self._kill_task_in_radb(t) if not tasks_to_kill: - logger.debug("resolve_conflicts: no tasks to kill") + logger.info("_resolve_conflict: no tasks to kill for conflict_claim %s", conflict_claim) # record which resources cannot be used anymore, because we can't kill anything on it self.unusable_resources.append(conflict_claim["resource_id"]) @@ -696,18 +715,19 @@ class DwellScheduler(PriorityScheduler): mom_servicename=mom_servicename, observation_control_busname=observation_control_busname, observation_control_servicename=observation_control_servicename, - broker=broker - ) + broker=broker) self.min_starttime = min_starttime self.max_starttime = max_starttime self.duration = duration # Duration must be non-negative or weird stuff will happen - assert self.duration >= timedelta(0, 0, 0) + if self.duration < timedelta(0, 0, 0): + raise ValueError('DwellScheduler, radb_id=%s duration=%s should be >= 0' % (self.task_id, duration)) # Time span for starttime must be sane - assert self.min_starttime <= self.max_starttime + if self.min_starttime > self.max_starttime: + raise ValueError('DwellScheduler, radb_id=%s min_starttime=%s should be <= max_starttime=%s' % (self.task_id, min_starttime, max_starttime)) def _new_starttime(self, starttime): """ @@ -718,8 +738,6 @@ class DwellScheduler(PriorityScheduler): self.starttime = starttime self.endtime = starttime + self.duration - logger.debug("DwellScheduler: Trying %s - %s", self.starttime, self.endtime) - def _post_process_allocation(self): """ After resource scheduling, update the start and end times of the task at hand in the RADB. @@ -746,18 +764,19 @@ class DwellScheduler(PriorityScheduler): self._new_starttime(self.min_starttime) while True: + logger.info("DwellScheduler: Trying to schedule radb_id=%s with starttime=%s and endtime=%s", self.task_id, self.starttime, self.endtime) + # Find a solution - success = super(DwellScheduler, self).allocate_resources() - if success: + if super(DwellScheduler, self).allocate_resources(): return True # Try the next slot new_starttime = self.earliest_potential_starttime if new_starttime <= self.starttime: - raise ScheduleException("DwellScheduler cannot advance") + raise ScheduleException("DwellScheduler radb_id=%s Cannot advance because new_starttime=%s <= self.starttime=%s", self.task_id, new_starttime, self.starttime) if new_starttime > self.max_starttime: - logger.debug("DwellScheduler: Dwelled until the end. Earliest start time is %s but cannot go beyond %s.", new_starttime, self.max_starttime) + logger.info("DwellScheduler: radb_id=%s Dwelled until the end. Earliest start time is %s but cannot go beyond %s.", self.task_id, new_starttime, self.max_starttime) return False self._new_starttime(new_starttime)