diff --git a/SAS/TMSS/services/scheduling/lib/constraints/__init__.py b/SAS/TMSS/services/scheduling/lib/constraints/__init__.py index 170960bac4c4cd10679e9a198e15cc5e497fdd4a..b0e932e74ffeca511db786ebccf8def6e2eef61e 100644 --- a/SAS/TMSS/services/scheduling/lib/constraints/__init__.py +++ b/SAS/TMSS/services/scheduling/lib/constraints/__init__.py @@ -1,7 +1,5 @@ #!/usr/bin/env python3 -# dynamic_scheduling.py -# # Copyright (C) 2020 # ASTRON (Netherlands Institute for Radio Astronomy) # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands @@ -20,9 +18,20 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -# $Id: $ """ +This __init__ module for this constraints python package defines the 'API' to: + - filter a list of schedulable scheduling_units by checking their constraints: see method filter_scheduling_units_using_constraints + - sort a (possibly filtered) list of schedulable scheduling_units evaluating their constraints and computing a 'finess' score: see method get_sorted_scheduling_units_scored_by_constraints +These main methods are used in the dynamic_scheduler to pick the next best scheduling unit, and compute the midterm schedule. + +Currently we have only one SchedulingConstraintsTemplate in TMSS, named 'constraints', version 1. +But, it is envisioned that we we get more templates. +So, based on the template the actual filter and score methods are selected from a specific module. +By convention we use one module per template. Currently, we have and use only module template_constraints_v1.py + +If/When we add a new SchedulingConstraintsTemplate, then we should add a new module with the specific filter and score methods, +and add a extra 'if' in the strategy pattern used here. (see below for implementation) """ import logging @@ -33,6 +42,17 @@ from typing import NamedTuple from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.exceptions import * +################## main data struct and methods ################## + +class ScoredSchedulingUnit(NamedTuple): + '''struct for collecting scores per constraint and a weighted_score for a scheduling_unit at the given start_time + ''' + scheduling_unit: models.SchedulingUnitBlueprint + scores: dict + start_time: datetime + weighted_score: float + + def filter_scheduling_units_using_constraints(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound: datetime, upper_bound: datetime) -> [models.SchedulingUnitBlueprint]: '''return the schedulable scheduling_units which for which the constraints are 'go' within the given timewindow''' runnable_scheduling_units = [] @@ -41,7 +61,7 @@ def filter_scheduling_units_using_constraints(scheduling_units:[models.Schedulin if can_run_within_timewindow(scheduling_unit, lower_bound, upper_bound): runnable_scheduling_units.append(scheduling_unit) except UnknownTemplateException as e: - # TODO: how dow we notify the user that we cannot dynamically schedule this sub due to an unknown template? + # TODO: how do we notify the user that we cannot dynamically schedule this sub due to an unknown template? # current pragmatic solution: log warning, and set sub state to error via its schedulable subtasks logger.warning(e) for subtask in models.Subtask.independent_subtasks().filter(task_blueprint__scheduling_unit_blueprint_id=scheduling_unit.id).all(): @@ -50,6 +70,20 @@ def filter_scheduling_units_using_constraints(scheduling_units:[models.Schedulin return runnable_scheduling_units +def get_sorted_scheduling_units_scored_by_constraints(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound:datetime, upper_bound:datetime) -> [ScoredSchedulingUnit]: + scored_scheduling_units = [compute_scores(scheduling_unit, lower_bound, upper_bound) + for scheduling_unit in scheduling_units] + return sorted(scored_scheduling_units, key=lambda x: x.weighted_score, reverse=True) + + + +################## helper methods ################################################################# +# # +# these helper methods are selected by a strategy pattern based on the template name and version # +# The actual implementation can be found in the other module(s) in this package # +# Currently we only have one template with one implementation in template_constraints_v1.py # +# # +################################################################################################### def can_run_within_timewindow(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime, upper_bound: datetime) -> bool: '''Check if the given scheduling_unit can run somewhere within the given time window depending on the sub's constrains-template/doc.''' @@ -67,14 +101,6 @@ def can_run_within_timewindow(scheduling_unit: models.SchedulingUnitBlueprint, l scheduling_unit.id, lower_bound, upper_bound, constraints_template.name, constraints_template.version)) -class ScoredSchedulingUnit(NamedTuple): - '''struct for collecting scores per constraint and a weighted_score for a scheduling_unit at the given start_time - ''' - scheduling_unit: models.SchedulingUnitBlueprint - scores: dict - start_time: datetime - weighted_score: float - def compute_scores(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound:datetime, upper_bound:datetime) -> ScoredSchedulingUnit: '''Compute the "fitness" scores per constraint for the given scheduling_unit at the given starttime depending on the sub's constrains-template/doc.''' @@ -92,11 +118,6 @@ def compute_scores(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: scheduling_unit.id, constraints_template.name, constraints_template.version)) -def get_sorted_scheduling_units_scored_by_constraints(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound:datetime, upper_bound:datetime) -> [ScoredSchedulingUnit]: - scored_scheduling_units = [compute_scores(scheduling_unit, lower_bound, upper_bound) - for scheduling_unit in scheduling_units] - return sorted(scored_scheduling_units, key=lambda x: x.weighted_score, reverse=True) - def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime=None) -> datetime: '''determine the earliest possible starttime for the given scheduling unit, taking into account all its constraints''' diff --git a/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py index e75bf7cc3d3a6c752613e0767b64475d2c73b3af..51b03d386fcc4f662f57994b4c6872fee2f15d7e 100644 --- a/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py @@ -34,8 +34,7 @@ from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.tmssapp.tasks import schedule_independent_subtasks_in_scheduling_unit_blueprint, unschedule_subtasks_in_scheduling_unit_blueprint from lofar.sas.tmss.tmss.tmssapp.subtasks import update_subtasks_start_times_for_scheduling_unit, clear_defined_subtasks_start_stop_times_for_scheduling_unit from lofar.sas.tmss.client.tmssbuslistener import * -from lofar.sas.tmss.tmss.exceptions import * -from lofar.common.datetimeutils import round_to_minute_precision, round_to_second_precision +from lofar.common.datetimeutils import round_to_second_precision from threading import Thread, Event from lofar.sas.tmss.services.scheduling.constraints import * @@ -167,20 +166,22 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: lower_bound_start_time = get_min_earliest_possible_start_time(schedulable_units, lower_bound_start_time + timedelta(minutes=10)) -def assign_start_stop_times_to_schedulable_scheduling_units(): +def assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time: datetime=None): + '''''' logger.info("Estimating mid-term schedule...") - try: - last_scheduling_unit_stop_time = max(su.stop_time for su in get_scheduled_scheduling_units(lower=datetime.utcnow()) if su.stop_time is not None) - except: - last_scheduling_unit_stop_time = datetime.utcnow() + if lower_bound_start_time is None: + try: + lower_bound_start_time = max(su.stop_time for su in get_scheduled_scheduling_units(lower=datetime.utcnow()) if su.stop_time is not None) + except: + lower_bound_start_time = datetime.utcnow() scheduling_units = get_schedulable_scheduling_units() # update the start_times of the remaining ones (so they form queue, and can be visualized in a timeline) while scheduling_units: - best_scored_scheduling_unit = find_best_next_schedulable_unit(lower_bound_start_time=last_scheduling_unit_stop_time, scheduling_units=scheduling_units) + best_scored_scheduling_unit = find_best_next_schedulable_unit(lower_bound_start_time=lower_bound_start_time, scheduling_units=scheduling_units) if best_scored_scheduling_unit: scheduling_unit = best_scored_scheduling_unit.scheduling_unit @@ -189,14 +190,14 @@ def assign_start_stop_times_to_schedulable_scheduling_units(): update_subtasks_start_times_for_scheduling_unit(scheduling_unit, start_time) # keep track of the previous - last_scheduling_unit_stop_time = scheduling_unit.stop_time + lower_bound_start_time = scheduling_unit.stop_time scheduling_units.remove(scheduling_unit) else: # search again in a later timeslot - min_earliest_possible_start_time = get_min_earliest_possible_start_time(scheduling_units, last_scheduling_unit_stop_time+timedelta(minutes=10)) - if min_earliest_possible_start_time > last_scheduling_unit_stop_time: - last_scheduling_unit_stop_time = min_earliest_possible_start_time + min_earliest_possible_start_time = get_min_earliest_possible_start_time(scheduling_units, lower_bound_start_time+timedelta(minutes=10)) + if min_earliest_possible_start_time > lower_bound_start_time: + lower_bound_start_time = min_earliest_possible_start_time else: # cannot advance anymore to find more logger.warning("Cannot assign start/stop times to remaining scheduling units for mid-term schedule...") @@ -235,13 +236,13 @@ class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSEventMessag super().stop_handling() def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): - - logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s): triggering update of dynamic schedule...", id, status) - # scheduling takes a long time, longer then creating many scheduling units in bulk - # so, we do not create a complete new schedule for each new unit, - # but we only trigger a new schedule update. - # This way we are sure that the latest units are always taken into account while scheduling, but we do not waste cpu cylces. - self._do_schedule_event.set() + if status in ["schedulable", "observed", "finished", "cancelled"]: + logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s): triggering update of dynamic schedule...", id, status) + # scheduling takes a long time, longer then creating many scheduling units in bulk + # so, we do not create a complete new schedule for each new unit, + # but we only trigger a new schedule update. + # This way we are sure that the latest units are always taken into account while scheduling, but we do not waste cpu cylces. + self._do_schedule_event.set() def _scheduling_loop(self): while self._scheduling_thread_running: @@ -249,8 +250,8 @@ class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSEventMessag self._do_schedule_event.clear() try: logger.info("Updating dynamic schedule....") - schedule_next_scheduling_unit() - assign_start_stop_times_to_schedulable_scheduling_units() + scheduled_unit = schedule_next_scheduling_unit() + assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time=scheduled_unit.stop_time if scheduled_unit else None) logger.info("Updating dynamic schedule.... finished") except Exception as e: logger.exception(str(e))