diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py index e61b49cfaaee4b3a7eca35012645c67501c30583..2102e7ffc1e24f72905dd56b298c3594a40411f1 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py @@ -72,9 +72,11 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): if shift != timedelta(seconds=0): logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s from \'%s\' to \'%s\'", task['status'], task['id'], task['otdb_id'], format_timedelta(shift), task['starttime'], newStartTime) - if not radbrpc.updateTaskAndResourceClaims(task['id'], starttime=newStartTime, endtime=newEndTime): - logger.warning("Could not update start/endtime for pipeline radb_id=%s otdb_id=%s", - updated_task['id'], updated_task['otdb_id']) + try: + radbrpc.updateTaskAndResourceClaims(task['id'], starttime=newStartTime, endtime=newEndTime) + except Exception as e: + logger.warning("Could not update start/endtime for pipeline radb_id=%s otdb_id=%s error: %s", + task['id'], task['otdb_id'], e) updated_task = radbrpc.getTask(task['id']) @@ -143,7 +145,12 @@ class ScheduleChecker(): if task['endtime'] <= now: new_endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL) logger.info("Extending endtime to %s for pipeline radb_id=%s otdb_id=%s", new_endtime, task['id'], task['otdb_id']) - self._radbrpc.updateTaskAndResourceClaims(task['id'], endtime=new_endtime) + try: + self._radbrpc.updateTaskAndResourceClaims(task['id'], endtime=new_endtime) + except Exception as e: + logger.error("Could not extend endtime to %s for pipeline radb_id=%s otdb_id=%s error: %s", + new_endtime, task['id'], task['otdb_id'], e) + except Exception as e: logger.error("Error while checking running pipelines: %s", e) diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py index b3c4f238fba1ae987b4f9679bee21d7c1dc154c1..49c5c2cc36364b48b31020c97468bfa136bbc56e 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py @@ -4,7 +4,7 @@ from copy import deepcopy from lofar.common.cache import cache from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.sas.resourceassignment.database.radb import RADatabase +from lofar.sas.resourceassignment.database.radb import RADatabase, PostgresDBQueryExecutionError from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC @@ -153,8 +153,11 @@ class BasicScheduler(object): logger.debug("BasicScheduler: _post_process_allocation for task %s", self.task_id) # move all claims from tentative -> claimed - if not self.radb.updateResourceClaims(where_task_ids=[self.task_id], status="claimed", commit=False): - raise ScheduleException("Failed to put tentative claims to claimed") + try: + if not self.radb.updateResourceClaims(where_task_ids=[self.task_id], status="claimed", commit=False): + raise ScheduleException("Failed to put tentative claims to claimed") + except PostgresDBQueryExecutionError as e: + raise ScheduleException("Failed to put tentative claims to claimed. error: %s" % (e,)) changed_tasks = [] #Not used in the BasicScheduler, but in derived schedulers it is return changed_tasks @@ -234,66 +237,69 @@ class BasicScheduler(object): :raises ScheduleException if it could not schedule the resources due to conflicts """ - logger.debug("Requested resources: %s", requested_resources) - logger.debug("Available resources: %s", available_resources) + try: + logger.debug("Requested resources: %s", requested_resources) + logger.debug("Available resources: %s", available_resources) - assert requested_resources + assert requested_resources - # strip any resources we're not allowed to use - allowed_resources = self._allowed_resources(available_resources) + # strip any resources we're not allowed to use + allowed_resources = self._allowed_resources(available_resources) - try: - tentative_claims = self.resource_availability_checker.get_is_claimable(requested_resources, - allowed_resources) - logger.debug("Proposing tentative claims: %s", tentative_claims) - except CouldNotFindClaimException as e: - logger.exception('_try_schedule_resources CouldNotFindClaimException: %s', e) - raise ScheduleException("Could not schedule: %s" % str(e)) - - assert tentative_claims - - # add static info to all claims - self._finalise_claims(tentative_claims) - - # insert all claims to reserve the resources in the next call to findfit and to find the conflicts according to - # the DB - claim_ids = self.radb.insertResourceClaims(self.task_id, tentative_claims, 'anonymous', -1, commit=False) - logger.debug("Resulting claim IDs: %s", claim_ids) - assert len(claim_ids) == len(tentative_claims), "Could not insert resource claims" - - # tie the claim ids to the estimates. note that each "requested_resources" element is a list of claims - claim_to_estimates = {cid: tentative_claims[idx]["requested_resources"] for idx, cid in enumerate(claim_ids)} - - # try solving as much conflicts as possible. We need NOT resolve ALL conflicts: removing one conflict can free - # up more resources as a by-product, in which case other conflicts can simply be shifted to those newly freed - # resources. - conflict_claims = self.radb.getResourceClaims(task_ids=[self.task_id], status="conflict", extended=True) - logger.info("Resulting claims in conflict before resolution: %s", conflict_claims) - - if conflict_claims and not any([self._resolve_conflict(c) for c in conflict_claims]): - if need_all or len(conflict_claims) == len(tentative_claims): - # Could not resolve any conflict - raise ScheduleException("Could not resolve one or more conflicting claims: #tentative_claims=%s #conflict_claims=%s conflict_claims=%s" % ( - len(tentative_claims), len(conflict_claims), conflict_claims)) - - # remove conflicting claims (allowing the next iteration to propose alternatives). Note that _handle_conflicts - # could have reduced the number of conflicting claims. - conflict_claims = self.radb.getResourceClaims(task_ids=[self.task_id], status="conflict", extended=True) - logger.info("Resulting claims in conflict after resolution: %s", conflict_claims) - - conflict_claim_ids = [c["id"] for c in conflict_claims] - self.radb.deleteResourceClaims(conflict_claim_ids, commit=False) - - # return any estimates that we could not fulfill. Note that requested_resources can contain the same item multiple - # times, so remove only the same number of estimates that were satisfied - satisfied_estimates = sum([claim_to_estimates[cid] for cid in claim_to_estimates if cid not in conflict_claim_ids], []) - remaining_estimates = requested_resources[:] - for e in satisfied_estimates: - if e in remaining_estimates: - remaining_estimates.remove(e) - - logger.info("Remaining estimates: %s", remaining_estimates) - return remaining_estimates + try: + tentative_claims = self.resource_availability_checker.get_is_claimable(requested_resources, + allowed_resources) + logger.debug("Proposing tentative claims: %s", tentative_claims) + except CouldNotFindClaimException as e: + logger.exception('_try_schedule_resources CouldNotFindClaimException: %s', e) + raise ScheduleException("Could not schedule: %s" % str(e)) + + assert tentative_claims + + # add static info to all claims + self._finalise_claims(tentative_claims) + + # insert all claims to reserve the resources in the next call to findfit and to find the conflicts according to + # the DB + claim_ids = self.radb.insertResourceClaims(self.task_id, tentative_claims, 'anonymous', -1, commit=False) + logger.debug("Resulting claim IDs: %s", claim_ids) + assert len(claim_ids) == len(tentative_claims), "Could not insert resource claims" + + # tie the claim ids to the estimates. note that each "requested_resources" element is a list of claims + claim_to_estimates = {cid: tentative_claims[idx]["requested_resources"] for idx, cid in enumerate(claim_ids)} + + # try solving as much conflicts as possible. We need NOT resolve ALL conflicts: removing one conflict can free + # up more resources as a by-product, in which case other conflicts can simply be shifted to those newly freed + # resources. + conflict_claims = self.radb.getResourceClaims(task_ids=[self.task_id], status="conflict", extended=True) + logger.info("Resulting claims in conflict before resolution: %s", conflict_claims) + + if conflict_claims and not any([self._resolve_conflict(c) for c in conflict_claims]): + if need_all or len(conflict_claims) == len(tentative_claims): + # Could not resolve any conflict + raise ScheduleException("Could not resolve one or more conflicting claims: #tentative_claims=%s #conflict_claims=%s conflict_claims=%s" % ( + len(tentative_claims), len(conflict_claims), conflict_claims)) + + # remove conflicting claims (allowing the next iteration to propose alternatives). Note that _handle_conflicts + # could have reduced the number of conflicting claims. + conflict_claims = self.radb.getResourceClaims(task_ids=[self.task_id], status="conflict", extended=True) + logger.info("Resulting claims in conflict after resolution: %s", conflict_claims) + + conflict_claim_ids = [c["id"] for c in conflict_claims] + self.radb.deleteResourceClaims(conflict_claim_ids, commit=False) + + # return any estimates that we could not fulfill. Note that requested_resources can contain the same item multiple + # times, so remove only the same number of estimates that were satisfied + satisfied_estimates = sum([claim_to_estimates[cid] for cid in claim_to_estimates if cid not in conflict_claim_ids], []) + remaining_estimates = requested_resources[:] + for e in satisfied_estimates: + if e in remaining_estimates: + remaining_estimates.remove(e) + + logger.info("Remaining estimates: %s", remaining_estimates) + return remaining_estimates + except PostgresDBQueryExecutionError as e: + raise ScheduleException("Error while scheduling resources: %s" % (e,)) def _resolve_conflict(self, conflict_claim): """ Resolve one conflict, making it is useful to try to schedule again.