diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py index 3c4c4cf5ec686edf04f1895bff5b13223fabcf10..001cbf2fcc6a7261ba52ea0c75435b4c60234816 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py @@ -61,9 +61,6 @@ class BasicScheduler(object): self.resource_estimator = resource_estimator self.resource_availability_checker = resource_availability_checker - # For observations without a fixed station list, we need to derive one. TODO: this likely isnt the condition we want to decide on - self.must_derive_station_list = specification_tree["task_type"] == "observation" and specification_tree["specification"]["Observation.VirtualInstrument.stationList"] == [] - # We need a DIRECT connection to the database in order to do client-side (that's us) transactions self.radb = RADatabase(dbcreds=radbcreds, log_queries=False) @@ -96,8 +93,11 @@ class BasicScheduler(object): # pre process self._pre_process_allocation() + # fetch what is available + available_resources = self._get_resource_availability() + # perform the actual scheduling - self._schedule_task() + self._schedule_task(available_resources) # post process self._post_process_allocation() @@ -144,9 +144,11 @@ class BasicScheduler(object): Get the resources available for scheduling. """ - # available resources - resources = self.radb.getResources(include_availability=True, claimable_capacity_lower_bound=self.starttime, - claimable_capacity_upper_bound=self.endtime) + return self.radb.getResources(include_availability=True, claimable_capacity_lower_bound=self.starttime, + claimable_capacity_upper_bound=self.endtime) + + def _allowed_resources(self, resources): + """ Return only the resources we're allowed to use for further claims. """ # resources WE claimed (note that their size is already accounted for in 'resources') tentative_claims = self.radb.getResourceClaims(task_ids=[self.task_id], status="tentative") @@ -171,30 +173,6 @@ class BasicScheduler(object): c["starttime"] = self.starttime c["endtime"] = self.endtime - @cache - def _expand_station_list(self, resource_group): - """ Given a resource group name, return a list of station names below it. """ - - # Query the full resource group structure - resources = self.radb.getResourceGroupMemberships() - groups = resources["groups"] - - # collect subgroup ids recursively, start with the provided group name - groups_to_scan = [g for g in groups.itervalues() - if g["resource_group_name"] == resource_group] - subgroup_ids = [] - while groups_to_scan: - g = groups_to_scan.pop() - subgroup_ids.extend(g["child_ids"]) - groups_to_scan.extend([groups[cid] for cid in g["child_ids"]]) - - # collect child resource groups that are stations - stations = [group["resource_group_name"] - for group in set(subgroup_ids) - if group["resource_group_type"] == "station"] - - return stations - def _get_estimates(self, specs): """ Return the estimates for a given specification. """ @@ -207,81 +185,9 @@ class BasicScheduler(object): estimates = result["estimates"] return estimates - def _get_station_estimates(self): - """ Return the station estimates only. """ - - estimates = self._get_estimates() - return [e for e in estimates if "station" in e] - - @staticmethod - def _requirements_satisfied_without(requirements, unclaimable): - """ Return whether the given (station) requirements are satisfied if the given list - cannot be claimed. - - :param requirements A list if (stations, minimum) pairs. - :param unclaimables A set of unclaimable stations. - """ - - for wanted_list, minimum in requirements: - num_missing = len([1 for s in unclaimable if s in wanted_list]) - - if len(wanted_list) - num_missing < minimum: - logger.warning("Could not allocate %s stations out of %s (need %d stations)." % (missing, wanted_list, minimum)) - return False - - return True - - def _find_stations(self, available_resources): - """ Find out which subset of the stations we can allocate that satisfy the station requirements, and return that station set. Rolls back the database. """ - - # Station requirements are (root_resource_group, minimum) pairs - - # Construct a list of (stations, minimum) pairs we require - expanded_requirements = [(self._expand_station_list(group), minimum) for (group, minimum) in specs.station_requirements] - - # Accumulate all the stations we want from all pairs - wanted_stations = list(sum([set(stations) for stations, _ in expanded_requirements], set())) - - # Convert station lists into resource requirements - self._set_stations(wanted_stations) - wanted_estimates = self.get_station_estimates() - - # Try to schedule all of the stations. - remaining_estimates = self._schedule_resources(wanted_estimates, available_resources, need_all=False) - - # See if our allocation meets the minimal criteria. Note that some stations may be partially allocated, - # we do not count those as claimable. - unclaimable_stations = set([e["station"] for e in remaining_estimates]) - if not self._requirements_satisfied_without(expanded_requirements, unclaimable_stations): - raise ScheduleException("Could not allocate enough stations") - - allocated_stations = set([e["station"] for e in wanted_estimates if e not in remaining_estimates]) - - # Note that each station generates multiple estimates, which could be partially fulfilled. - # We thus need to explicitly remove claims for stations we are not going to use. - # For now, we just roll back and reallocate everything (stations and non stations) later on - self.roll_back() - - return allocated_stations - - def _set_stations(self, stations): - self.specification_tree["Observation.VirtualInstrument.stationList"] = "[%s]" % (",".join(stations),) - - def get_stations(self): - """ Return the list of stations we've used for scheduling the task (if any). """ - return self.specification_tree.get("Observation.VirtualInstrument.stationList",[]) - - def _schedule_task(self): + def _schedule_task(self, available_resources): """ Schedule all required resources into the available resources. """ - # Fetch available resources - available_resources = self._get_resource_availability() - - # Determine and set the station list, if not given - if self.must_derive_station_list: - stations = self._find_stations(available_resources) - self._set_stations(stations) - # Determine new estimations based on the new station list estimates = self.get_estimates() @@ -319,9 +225,12 @@ class BasicScheduler(object): logger.debug("Requested resources: %s", requested_resources) logger.debug("Available resources: %s", available_resources) + # strip any resources we're not allowed to use + allowed_resources = self._allowed_resources(available_resources) + try: tentative_claims = self.resource_availability_checker.get_is_claimable(requested_resources, - available_resources) + allowed_resources) logger.debug("Proposing tentative claims: %s", tentative_claims) except CouldNotFindClaimException as e: logger.error('_try_schedule CouldNotFindClaimException: %s', e) @@ -374,8 +283,140 @@ class BasicScheduler(object): return False +class DynamicStationsScheduler(BasicScheduler): + """ A scheduler that honours station requirements (root_resource_group, minimum) pairs, trying + to find the largest set of stations fulfilling the requirements. If an observation has a fixed + station list already, no special action is taken. + + After scheduling, the get_stations() function returns a list of the allocated stations. """ + + def __init__(self, task_id, specification_tree, resource_estimator, resource_availability_checker, + radbcreds=None, + mom_busname=DEFAULT_MOMQUERY_BUSNAME, + mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, + observation_control_busname=DEFAULT_OBSERVATION_CONTROL_BUS_NAME, + observation_control_servicename=DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME, + broker=None): + """ + Creates a DynamicStationsScheduler instance + + :param task_id: the ID of the task at hand + :param specification_tree: the full specification; will be modified where needed with respect to resources + :param resource_estimator: the ResourceEstimator function that turns a specification into resource estimates + :param resource_availability_checker: the ResourceAvailabilityChecker instance to use + :param radbcreds: the RADB credentials to use + :param mom_busname: the MoM Query service bus name (default: 'lofar.ra.command') + :param mom_servicename: the MoM Query service name (default: 'momqueryservice') + :param observation_control_busname: the ObservationControl bus name (default: 'lofar.mac.command') + :param observation_control_servicename: the ObservationControl service name (default: 'ObservationControl2') + :param broker: the message broker to use for send messages/RPC calls/etc. + """ + + super(DynamicStationsScheduler, self).__init__(task_id, specification_tree, resource_estimator, resource_availability_checker, radbcreds) + + # For observations without a fixed station list, we need to derive one. TODO: this likely isnt the condition we want to decide on + self.must_derive_station_list = specification_tree["task_type"] == "observation" and specification_tree["specification"]["Observation.VirtualInstrument.stationList"] == [] + + @cache + def _expand_station_list(self, resource_group): + """ Given a resource group name, return a list of station names below it. """ + + # Query the full resource group structure + resources = self.radb.getResourceGroupMemberships() + groups = resources["groups"] + + # collect subgroup ids recursively, start with the provided group name + groups_to_scan = [g for g in groups.itervalues() + if g["resource_group_name"] == resource_group] + subgroup_ids = [] + while groups_to_scan: + g = groups_to_scan.pop() + subgroup_ids.extend(g["child_ids"]) + groups_to_scan.extend([groups[cid] for cid in g["child_ids"]]) + + # collect child resource groups that are stations + stations = [group["resource_group_name"] + for group in set(subgroup_ids) + if group["resource_group_type"] == "station"] + + return stations + + def _get_station_estimates(self): + """ Return the station estimates only. """ + + estimates = self._get_estimates() + return [e for e in estimates if "station" in e] + + @staticmethod + def _requirements_satisfied_without(requirements, unclaimable): + """ Return whether the given (station) requirements are satisfied if the given list + cannot be claimed. + + :param requirements A list if (stations, minimum) pairs. + :param unclaimables A set of unclaimable stations. + """ + + for wanted_list, minimum in requirements: + num_missing = len([1 for s in unclaimable if s in wanted_list]) + + if len(wanted_list) - num_missing < minimum: + logger.warning("Could not allocate %s stations out of %s (need %d stations)." % (missing, wanted_list, minimum)) + return False + + return True + + def _find_stations(self, available_resources): + """ Find out which subset of the stations we can allocate that satisfy the station requirements, and return that station set. Rolls back the database. """ + + # Station requirements are (root_resource_group, minimum) pairs + + # Construct a list of (stations, minimum) pairs we require + expanded_requirements = [(self._expand_station_list(group), minimum) for (group, minimum) in specs.station_requirements] + + # Accumulate all the stations we want from all pairs + wanted_stations = list(sum([set(stations) for stations, _ in expanded_requirements], set())) + + # Convert station lists into resource requirements + self._set_stations(wanted_stations) + wanted_estimates = self.get_station_estimates() + + # Try to schedule all of the stations. + remaining_estimates = self._schedule_resources(wanted_estimates, available_resources, need_all=False) + + # See if our allocation meets the minimal criteria. Note that some stations may be partially allocated, + # we do not count those as claimable. + unclaimable_stations = set([e["station"] for e in remaining_estimates]) + if not self._requirements_satisfied_without(expanded_requirements, unclaimable_stations): + raise ScheduleException("Could not allocate enough stations") + + allocated_stations = set([e["station"] for e in wanted_estimates if e not in remaining_estimates]) + + # Note that each station generates multiple estimates, which could be partially fulfilled. + # We thus need to explicitly remove claims for stations we are not going to use. + # For now, we just roll back and reallocate everything (stations and non stations) later on + self.roll_back() + + return allocated_stations + + def _set_stations(self, stations): + self.specification_tree["Observation.VirtualInstrument.stationList"] = "[%s]" % (",".join(stations),) + + def get_stations(self): + """ Return the list of stations we've used for scheduling the task (if any). """ + return self.specification_tree.get("Observation.VirtualInstrument.stationList",[]) + + def _schedule_task(self, available_resources): + """ Schedule all required resources into the available resources. """ + + # Determine and set the station list, if not given + if self.must_derive_station_list: + stations = self._find_stations(available_resources) + self._set_stations(stations) + + # Schedule all resources + super(DynamicStationsScheduler, self)._schedule_task(available_resources) -class PriorityScheduler(BasicScheduler): +class PriorityScheduler(DynamicStationsScheduler): """ A Scheduler that searches for an allocation with a fixed start time, but flexible resources. Conflict resolution is done by killing jobs with lower priority. """