Skip to content
Snippets Groups Projects
Commit 1e71997c authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #11049: Split off station search into separate scheduler class, some...

Task #11049: Split off station search into separate scheduler class, some minor cleanup as a result.
parent f323ea3c
No related branches found
No related tags found
No related merge requests found
...@@ -61,9 +61,6 @@ class BasicScheduler(object): ...@@ -61,9 +61,6 @@ class BasicScheduler(object):
self.resource_estimator = resource_estimator self.resource_estimator = resource_estimator
self.resource_availability_checker = resource_availability_checker self.resource_availability_checker = resource_availability_checker
# For observations without a fixed station list, we need to derive one. TODO: this likely isnt the condition we want to decide on
self.must_derive_station_list = specification_tree["task_type"] == "observation" and specification_tree["specification"]["Observation.VirtualInstrument.stationList"] == []
# We need a DIRECT connection to the database in order to do client-side (that's us) transactions # We need a DIRECT connection to the database in order to do client-side (that's us) transactions
self.radb = RADatabase(dbcreds=radbcreds, log_queries=False) self.radb = RADatabase(dbcreds=radbcreds, log_queries=False)
...@@ -96,8 +93,11 @@ class BasicScheduler(object): ...@@ -96,8 +93,11 @@ class BasicScheduler(object):
# pre process # pre process
self._pre_process_allocation() self._pre_process_allocation()
# fetch what is available
available_resources = self._get_resource_availability()
# perform the actual scheduling # perform the actual scheduling
self._schedule_task() self._schedule_task(available_resources)
# post process # post process
self._post_process_allocation() self._post_process_allocation()
...@@ -144,10 +144,12 @@ class BasicScheduler(object): ...@@ -144,10 +144,12 @@ class BasicScheduler(object):
Get the resources available for scheduling. Get the resources available for scheduling.
""" """
# available resources return self.radb.getResources(include_availability=True, claimable_capacity_lower_bound=self.starttime,
resources = self.radb.getResources(include_availability=True, claimable_capacity_lower_bound=self.starttime,
claimable_capacity_upper_bound=self.endtime) claimable_capacity_upper_bound=self.endtime)
def _allowed_resources(self, resources):
""" Return only the resources we're allowed to use for further claims. """
# resources WE claimed (note that their size is already accounted for in 'resources') # resources WE claimed (note that their size is already accounted for in 'resources')
tentative_claims = self.radb.getResourceClaims(task_ids=[self.task_id], status="tentative") tentative_claims = self.radb.getResourceClaims(task_ids=[self.task_id], status="tentative")
tentative_resources = [c["resource_id"] for c in tentative_claims] tentative_resources = [c["resource_id"] for c in tentative_claims]
...@@ -171,30 +173,6 @@ class BasicScheduler(object): ...@@ -171,30 +173,6 @@ class BasicScheduler(object):
c["starttime"] = self.starttime c["starttime"] = self.starttime
c["endtime"] = self.endtime c["endtime"] = self.endtime
@cache
def _expand_station_list(self, resource_group):
""" Given a resource group name, return a list of station names below it. """
# Query the full resource group structure
resources = self.radb.getResourceGroupMemberships()
groups = resources["groups"]
# collect subgroup ids recursively, start with the provided group name
groups_to_scan = [g for g in groups.itervalues()
if g["resource_group_name"] == resource_group]
subgroup_ids = []
while groups_to_scan:
g = groups_to_scan.pop()
subgroup_ids.extend(g["child_ids"])
groups_to_scan.extend([groups[cid] for cid in g["child_ids"]])
# collect child resource groups that are stations
stations = [group["resource_group_name"]
for group in set(subgroup_ids)
if group["resource_group_type"] == "station"]
return stations
def _get_estimates(self, specs): def _get_estimates(self, specs):
""" Return the estimates for a given specification. """ """ Return the estimates for a given specification. """
...@@ -207,81 +185,9 @@ class BasicScheduler(object): ...@@ -207,81 +185,9 @@ class BasicScheduler(object):
estimates = result["estimates"] estimates = result["estimates"]
return estimates return estimates
def _get_station_estimates(self): def _schedule_task(self, available_resources):
""" Return the station estimates only. """
estimates = self._get_estimates()
return [e for e in estimates if "station" in e]
@staticmethod
def _requirements_satisfied_without(requirements, unclaimable):
""" Return whether the given (station) requirements are satisfied if the given list
cannot be claimed.
:param requirements A list if (stations, minimum) pairs.
:param unclaimables A set of unclaimable stations.
"""
for wanted_list, minimum in requirements:
num_missing = len([1 for s in unclaimable if s in wanted_list])
if len(wanted_list) - num_missing < minimum:
logger.warning("Could not allocate %s stations out of %s (need %d stations)." % (missing, wanted_list, minimum))
return False
return True
def _find_stations(self, available_resources):
""" Find out which subset of the stations we can allocate that satisfy the station requirements, and return that station set. Rolls back the database. """
# Station requirements are (root_resource_group, minimum) pairs
# Construct a list of (stations, minimum) pairs we require
expanded_requirements = [(self._expand_station_list(group), minimum) for (group, minimum) in specs.station_requirements]
# Accumulate all the stations we want from all pairs
wanted_stations = list(sum([set(stations) for stations, _ in expanded_requirements], set()))
# Convert station lists into resource requirements
self._set_stations(wanted_stations)
wanted_estimates = self.get_station_estimates()
# Try to schedule all of the stations.
remaining_estimates = self._schedule_resources(wanted_estimates, available_resources, need_all=False)
# See if our allocation meets the minimal criteria. Note that some stations may be partially allocated,
# we do not count those as claimable.
unclaimable_stations = set([e["station"] for e in remaining_estimates])
if not self._requirements_satisfied_without(expanded_requirements, unclaimable_stations):
raise ScheduleException("Could not allocate enough stations")
allocated_stations = set([e["station"] for e in wanted_estimates if e not in remaining_estimates])
# Note that each station generates multiple estimates, which could be partially fulfilled.
# We thus need to explicitly remove claims for stations we are not going to use.
# For now, we just roll back and reallocate everything (stations and non stations) later on
self.roll_back()
return allocated_stations
def _set_stations(self, stations):
self.specification_tree["Observation.VirtualInstrument.stationList"] = "[%s]" % (",".join(stations),)
def get_stations(self):
""" Return the list of stations we've used for scheduling the task (if any). """
return self.specification_tree.get("Observation.VirtualInstrument.stationList",[])
def _schedule_task(self):
""" Schedule all required resources into the available resources. """ """ Schedule all required resources into the available resources. """
# Fetch available resources
available_resources = self._get_resource_availability()
# Determine and set the station list, if not given
if self.must_derive_station_list:
stations = self._find_stations(available_resources)
self._set_stations(stations)
# Determine new estimations based on the new station list # Determine new estimations based on the new station list
estimates = self.get_estimates() estimates = self.get_estimates()
...@@ -319,9 +225,12 @@ class BasicScheduler(object): ...@@ -319,9 +225,12 @@ class BasicScheduler(object):
logger.debug("Requested resources: %s", requested_resources) logger.debug("Requested resources: %s", requested_resources)
logger.debug("Available resources: %s", available_resources) logger.debug("Available resources: %s", available_resources)
# strip any resources we're not allowed to use
allowed_resources = self._allowed_resources(available_resources)
try: try:
tentative_claims = self.resource_availability_checker.get_is_claimable(requested_resources, tentative_claims = self.resource_availability_checker.get_is_claimable(requested_resources,
available_resources) allowed_resources)
logger.debug("Proposing tentative claims: %s", tentative_claims) logger.debug("Proposing tentative claims: %s", tentative_claims)
except CouldNotFindClaimException as e: except CouldNotFindClaimException as e:
logger.error('_try_schedule CouldNotFindClaimException: %s', e) logger.error('_try_schedule CouldNotFindClaimException: %s', e)
...@@ -374,8 +283,140 @@ class BasicScheduler(object): ...@@ -374,8 +283,140 @@ class BasicScheduler(object):
return False return False
class DynamicStationsScheduler(BasicScheduler):
""" A scheduler that honours station requirements (root_resource_group, minimum) pairs, trying
to find the largest set of stations fulfilling the requirements. If an observation has a fixed
station list already, no special action is taken.
After scheduling, the get_stations() function returns a list of the allocated stations. """
def __init__(self, task_id, specification_tree, resource_estimator, resource_availability_checker,
radbcreds=None,
mom_busname=DEFAULT_MOMQUERY_BUSNAME,
mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
observation_control_busname=DEFAULT_OBSERVATION_CONTROL_BUS_NAME,
observation_control_servicename=DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME,
broker=None):
"""
Creates a DynamicStationsScheduler instance
:param task_id: the ID of the task at hand
:param specification_tree: the full specification; will be modified where needed with respect to resources
:param resource_estimator: the ResourceEstimator function that turns a specification into resource estimates
:param resource_availability_checker: the ResourceAvailabilityChecker instance to use
:param radbcreds: the RADB credentials to use
:param mom_busname: the MoM Query service bus name (default: 'lofar.ra.command')
:param mom_servicename: the MoM Query service name (default: 'momqueryservice')
:param observation_control_busname: the ObservationControl bus name (default: 'lofar.mac.command')
:param observation_control_servicename: the ObservationControl service name (default: 'ObservationControl2')
:param broker: the message broker to use for send messages/RPC calls/etc.
"""
super(DynamicStationsScheduler, self).__init__(task_id, specification_tree, resource_estimator, resource_availability_checker, radbcreds)
# For observations without a fixed station list, we need to derive one. TODO: this likely isnt the condition we want to decide on
self.must_derive_station_list = specification_tree["task_type"] == "observation" and specification_tree["specification"]["Observation.VirtualInstrument.stationList"] == []
@cache
def _expand_station_list(self, resource_group):
""" Given a resource group name, return a list of station names below it. """
# Query the full resource group structure
resources = self.radb.getResourceGroupMemberships()
groups = resources["groups"]
# collect subgroup ids recursively, start with the provided group name
groups_to_scan = [g for g in groups.itervalues()
if g["resource_group_name"] == resource_group]
subgroup_ids = []
while groups_to_scan:
g = groups_to_scan.pop()
subgroup_ids.extend(g["child_ids"])
groups_to_scan.extend([groups[cid] for cid in g["child_ids"]])
# collect child resource groups that are stations
stations = [group["resource_group_name"]
for group in set(subgroup_ids)
if group["resource_group_type"] == "station"]
return stations
def _get_station_estimates(self):
""" Return the station estimates only. """
estimates = self._get_estimates()
return [e for e in estimates if "station" in e]
@staticmethod
def _requirements_satisfied_without(requirements, unclaimable):
""" Return whether the given (station) requirements are satisfied if the given list
cannot be claimed.
:param requirements A list if (stations, minimum) pairs.
:param unclaimables A set of unclaimable stations.
"""
for wanted_list, minimum in requirements:
num_missing = len([1 for s in unclaimable if s in wanted_list])
if len(wanted_list) - num_missing < minimum:
logger.warning("Could not allocate %s stations out of %s (need %d stations)." % (missing, wanted_list, minimum))
return False
return True
def _find_stations(self, available_resources):
""" Find out which subset of the stations we can allocate that satisfy the station requirements, and return that station set. Rolls back the database. """
# Station requirements are (root_resource_group, minimum) pairs
# Construct a list of (stations, minimum) pairs we require
expanded_requirements = [(self._expand_station_list(group), minimum) for (group, minimum) in specs.station_requirements]
# Accumulate all the stations we want from all pairs
wanted_stations = list(sum([set(stations) for stations, _ in expanded_requirements], set()))
# Convert station lists into resource requirements
self._set_stations(wanted_stations)
wanted_estimates = self.get_station_estimates()
# Try to schedule all of the stations.
remaining_estimates = self._schedule_resources(wanted_estimates, available_resources, need_all=False)
# See if our allocation meets the minimal criteria. Note that some stations may be partially allocated,
# we do not count those as claimable.
unclaimable_stations = set([e["station"] for e in remaining_estimates])
if not self._requirements_satisfied_without(expanded_requirements, unclaimable_stations):
raise ScheduleException("Could not allocate enough stations")
allocated_stations = set([e["station"] for e in wanted_estimates if e not in remaining_estimates])
# Note that each station generates multiple estimates, which could be partially fulfilled.
# We thus need to explicitly remove claims for stations we are not going to use.
# For now, we just roll back and reallocate everything (stations and non stations) later on
self.roll_back()
return allocated_stations
def _set_stations(self, stations):
self.specification_tree["Observation.VirtualInstrument.stationList"] = "[%s]" % (",".join(stations),)
def get_stations(self):
""" Return the list of stations we've used for scheduling the task (if any). """
return self.specification_tree.get("Observation.VirtualInstrument.stationList",[])
def _schedule_task(self, available_resources):
""" Schedule all required resources into the available resources. """
# Determine and set the station list, if not given
if self.must_derive_station_list:
stations = self._find_stations(available_resources)
self._set_stations(stations)
# Schedule all resources
super(DynamicStationsScheduler, self)._schedule_task(available_resources)
class PriorityScheduler(BasicScheduler): class PriorityScheduler(DynamicStationsScheduler):
""" A Scheduler that searches for an allocation with a fixed start time, but flexible resources. """ A Scheduler that searches for an allocation with a fixed start time, but flexible resources.
Conflict resolution is done by killing jobs with lower priority. """ Conflict resolution is done by killing jobs with lower priority. """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment