Skip to content
Snippets Groups Projects
Commit 1a8ff69b authored by Ruud Beukema's avatar Ruud Beukema
Browse files

Task #11020: fixed some wrong implementation details in the scheduler and...

Task #11020: fixed some wrong implementation details in the scheduler and added some docstrings (not complete yet!)
parent 79bc3f77
No related branches found
No related tags found
No related merge requests found
......@@ -2,7 +2,7 @@ from datetime import datetime
from lofar.common.cache import cache
from lofar.sas.resourceassignment.database import radb
from lofar.sas.resourceassignment.database.radb import RADatabase
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
......@@ -42,14 +42,28 @@ class BasicScheduler:
""" A Scheduler that allocates resources at a fixed time. Resources are searched for. """
def __init__(self, task_id, resource_availability_checker, radbcreds=None):
"""
Creates a BasicScheduler instance
:param task_id: the ID of the task to be scheduled
:param resource_availability_checker: the ResourceAvailabilityScheduler to be used by the BasicScheduler
:param radbcreds: ResourceAssigner database credentials. If None, the default credentials will be used
:raises AssertionError if task_id is a negative number or is None
"""
self.task_id = task_id
self.resource_availability_checker = resource_availability_checker
# We need a DIRECT connection to the database in order to do client-side (that's us) transactions
self.radb = radb(radbcreds)
self.radb = RADatabase(dbcreds=radbcreds, log_queries=False)
# Ensure a valid task_id is given, since radb.getTasks() will not raise if task_id equals None
assert task_id >= 0
# Cache our task info
self.task = self.radb.getTasks(task_id=self.task_id)[0]
self.task = self.radb.getTask(id=task_id)[0]
self.starttime = self.task["starttime"]
self.endtime = self.task["endtime"]
......@@ -57,9 +71,18 @@ class BasicScheduler:
self.unusable_resources = []
def allocate_resources(self, estimates):
"""
Tries to allocate resources for the given estimates.
:param estimates: a list of resources requested for a task
:returns: True if allocation was successful or False if not
"""
# any resources we cannot schedule on, either because we've already
# tried, or because there's already a claim of us there
allocation_successful = False
try:
# pre process
self._pre_process_allocation()
......@@ -70,17 +93,35 @@ class BasicScheduler:
# post process
self._post_process_allocation()
# Make changes effective
self.radb.commit()
allocation_successful = True
except ScheduleException:
self.radb.rollback()
return False
self.radb.commit()
return True
return allocation_successful
def _pre_process_allocation(self):
"""
Placeholder for derived classes, available to be able perform for any processing prior to the actual allocation
of resources done by allocate_resources().
Does nothing in this base class.
"""
pass
def _post_process_allocation(self):
"""
Placeholder for derived classes, available to be able perform for any processing after the actual allocation
of resources done by allocate_resources().
If not overridden, or at least called by the derived class(es), tries to move all claims from "tentative" to a
"claimed" status.
"""
# move all claims from tentative -> claimed
if not self.radb.updateResourceClaims(task_id=self.task_id, claim_status="claimed", commit=False):
raise ScheduleException("Failed to put tentative claims to claimed")
......@@ -104,15 +145,26 @@ class BasicScheduler:
return [r for r in resources if r["id"] not in disabled_resources]
def _finalise_claims(self, claims):
"""
Finalize the given claims
:param claims: the claims to finalize
"""
for c in claims:
c["status"] = "tentative" # we MUST insert as "tentative", as radb demands so
c["starttime"] = self.starttime
c["endtime"] = self.endtime
def _try_schedule(self, requested_resources):
""" Schedule the estimates within the available resources.
"""
Schedule the estimates within the available resources.
:param requested_resources: the requested resources to schedule
Returns the set of estimates that could not be scheduled.
:returns the set of estimates that could not be scheduled.
:raises ScheduleException if it could not schedule the resources due to conflicts
"""
available_resources = self._get_resource_availability()
......@@ -127,7 +179,7 @@ class BasicScheduler:
# insert all claims to reserve the resources in the next call to findfit and to find the conflicts according to
# the DB
claim_ids = self.radb.insertResourceClaims(self.task_id, tentative_claims, _, _, commit=False)
claim_ids = self.radb.insertResourceClaims(self.task_id, tentative_claims, 'anonymous', -1, commit=False)
# tie the claim ids to the estimates
claim_to_estimates = {cid: tentative_claims[cid]["requested_resources"] for cid in claim_ids}
......@@ -146,12 +198,17 @@ class BasicScheduler:
conflict_claim_ids = [c["id"] for c in conflict_claims]
self.radb.deleteResourceClaims(conflict_claim_ids, commit=False)
# return any estimates that we could not fullfil
# return any estimates that we could not fulfill
remaining_estimates = set().union([claim_to_estimates[cid] for cid in conflict_claim_ids])
return remaining_estimates
# TODO: work out this function a bit better
def _get_conflicting_claims_and_tasks(self, conflict_claim):
""" Return all claims that are conflicting with ours, and return the tasks associated with those claims. """
"""
Return all claims that are conflicting with ours, and return the tasks associated with those claims.
:param conflict_claim:
"""
conflicting_claims = self.radb.get_conflicting_overlapping_claims(c)
conflicting_task_ids = set([c["task_id"] for c in conflicting_claims])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment