diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py index c0bc8c84041f40e84142940409b6d77f491a58d8..be26b5a63938da0921c922d8dc059468f4590522 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py @@ -2,7 +2,7 @@ from datetime import datetime from lofar.common.cache import cache -from lofar.sas.resourceassignment.database import radb +from lofar.sas.resourceassignment.database.radb import RADatabase from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME @@ -42,14 +42,28 @@ class BasicScheduler: """ A Scheduler that allocates resources at a fixed time. Resources are searched for. """ def __init__(self, task_id, resource_availability_checker, radbcreds=None): + """ + Creates a BasicScheduler instance + + :param task_id: the ID of the task to be scheduled + :param resource_availability_checker: the ResourceAvailabilityScheduler to be used by the BasicScheduler + :param radbcreds: ResourceAssigner database credentials. If None, the default credentials will be used + + :raises AssertionError if task_id is a negative number or is None + """ + + self.task_id = task_id self.resource_availability_checker = resource_availability_checker # We need a DIRECT connection to the database in order to do client-side (that's us) transactions - self.radb = radb(radbcreds) + self.radb = RADatabase(dbcreds=radbcreds, log_queries=False) + + # Ensure a valid task_id is given, since radb.getTasks() will not raise if task_id equals None + assert task_id >= 0 # Cache our task info - self.task = self.radb.getTasks(task_id=self.task_id)[0] + self.task = self.radb.getTask(id=task_id)[0] self.starttime = self.task["starttime"] self.endtime = self.task["endtime"] @@ -57,9 +71,18 @@ class BasicScheduler: self.unusable_resources = [] def allocate_resources(self, estimates): + """ + Tries to allocate resources for the given estimates. + + :param estimates: a list of resources requested for a task + + :returns: True if allocation was successful or False if not + """ # any resources we cannot schedule on, either because we've already # tried, or because there's already a claim of us there + allocation_successful = False + try: # pre process self._pre_process_allocation() @@ -70,17 +93,35 @@ class BasicScheduler: # post process self._post_process_allocation() + + # Make changes effective + self.radb.commit() + + allocation_successful = True except ScheduleException: self.radb.rollback() - return False - self.radb.commit() - return True + return allocation_successful def _pre_process_allocation(self): + """ + Placeholder for derived classes, available to be able perform for any processing prior to the actual allocation + of resources done by allocate_resources(). + + Does nothing in this base class. + """ + pass def _post_process_allocation(self): + """ + Placeholder for derived classes, available to be able perform for any processing after the actual allocation + of resources done by allocate_resources(). + + If not overridden, or at least called by the derived class(es), tries to move all claims from "tentative" to a + "claimed" status. + """ + # move all claims from tentative -> claimed if not self.radb.updateResourceClaims(task_id=self.task_id, claim_status="claimed", commit=False): raise ScheduleException("Failed to put tentative claims to claimed") @@ -104,15 +145,26 @@ class BasicScheduler: return [r for r in resources if r["id"] not in disabled_resources] def _finalise_claims(self, claims): + """ + Finalize the given claims + + :param claims: the claims to finalize + """ + for c in claims: c["status"] = "tentative" # we MUST insert as "tentative", as radb demands so c["starttime"] = self.starttime c["endtime"] = self.endtime def _try_schedule(self, requested_resources): - """ Schedule the estimates within the available resources. + """ + Schedule the estimates within the available resources. + + :param requested_resources: the requested resources to schedule - Returns the set of estimates that could not be scheduled. + :returns the set of estimates that could not be scheduled. + + :raises ScheduleException if it could not schedule the resources due to conflicts """ available_resources = self._get_resource_availability() @@ -127,7 +179,7 @@ class BasicScheduler: # insert all claims to reserve the resources in the next call to findfit and to find the conflicts according to # the DB - claim_ids = self.radb.insertResourceClaims(self.task_id, tentative_claims, _, _, commit=False) + claim_ids = self.radb.insertResourceClaims(self.task_id, tentative_claims, 'anonymous', -1, commit=False) # tie the claim ids to the estimates claim_to_estimates = {cid: tentative_claims[cid]["requested_resources"] for cid in claim_ids} @@ -146,12 +198,17 @@ class BasicScheduler: conflict_claim_ids = [c["id"] for c in conflict_claims] self.radb.deleteResourceClaims(conflict_claim_ids, commit=False) - # return any estimates that we could not fullfil + # return any estimates that we could not fulfill remaining_estimates = set().union([claim_to_estimates[cid] for cid in conflict_claim_ids]) return remaining_estimates + # TODO: work out this function a bit better def _get_conflicting_claims_and_tasks(self, conflict_claim): - """ Return all claims that are conflicting with ours, and return the tasks associated with those claims. """ + """ + Return all claims that are conflicting with ours, and return the tasks associated with those claims. + + :param conflict_claim: + """ conflicting_claims = self.radb.get_conflicting_overlapping_claims(c) conflicting_task_ids = set([c["task_id"] for c in conflicting_claims])