Skip to content
Snippets Groups Projects
Commit e19d1682 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-190: added documentation and minor fixes

parent 33536bdb
No related branches found
No related tags found
1 merge request!252Resolve TMSS-190
#!/usr/bin/env python3
# dynamic_scheduling.py
#
# Copyright (C) 2020
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
......@@ -20,9 +18,20 @@
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: $
"""
This __init__ module for this constraints python package defines the 'API' to:
- filter a list of schedulable scheduling_units by checking their constraints: see method filter_scheduling_units_using_constraints
- sort a (possibly filtered) list of schedulable scheduling_units evaluating their constraints and computing a 'finess' score: see method get_sorted_scheduling_units_scored_by_constraints
These main methods are used in the dynamic_scheduler to pick the next best scheduling unit, and compute the midterm schedule.
Currently we have only one SchedulingConstraintsTemplate in TMSS, named 'constraints', version 1.
But, it is envisioned that we we get more templates.
So, based on the template the actual filter and score methods are selected from a specific module.
By convention we use one module per template. Currently, we have and use only module template_constraints_v1.py
If/When we add a new SchedulingConstraintsTemplate, then we should add a new module with the specific filter and score methods,
and add a extra 'if' in the strategy pattern used here. (see below for implementation)
"""
import logging
......@@ -33,6 +42,17 @@ from typing import NamedTuple
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.exceptions import *
################## main data struct and methods ##################
class ScoredSchedulingUnit(NamedTuple):
'''struct for collecting scores per constraint and a weighted_score for a scheduling_unit at the given start_time
'''
scheduling_unit: models.SchedulingUnitBlueprint
scores: dict
start_time: datetime
weighted_score: float
def filter_scheduling_units_using_constraints(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound: datetime, upper_bound: datetime) -> [models.SchedulingUnitBlueprint]:
'''return the schedulable scheduling_units which for which the constraints are 'go' within the given timewindow'''
runnable_scheduling_units = []
......@@ -41,7 +61,7 @@ def filter_scheduling_units_using_constraints(scheduling_units:[models.Schedulin
if can_run_within_timewindow(scheduling_unit, lower_bound, upper_bound):
runnable_scheduling_units.append(scheduling_unit)
except UnknownTemplateException as e:
# TODO: how dow we notify the user that we cannot dynamically schedule this sub due to an unknown template?
# TODO: how do we notify the user that we cannot dynamically schedule this sub due to an unknown template?
# current pragmatic solution: log warning, and set sub state to error via its schedulable subtasks
logger.warning(e)
for subtask in models.Subtask.independent_subtasks().filter(task_blueprint__scheduling_unit_blueprint_id=scheduling_unit.id).all():
......@@ -50,6 +70,20 @@ def filter_scheduling_units_using_constraints(scheduling_units:[models.Schedulin
return runnable_scheduling_units
def get_sorted_scheduling_units_scored_by_constraints(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound:datetime, upper_bound:datetime) -> [ScoredSchedulingUnit]:
scored_scheduling_units = [compute_scores(scheduling_unit, lower_bound, upper_bound)
for scheduling_unit in scheduling_units]
return sorted(scored_scheduling_units, key=lambda x: x.weighted_score, reverse=True)
################## helper methods #################################################################
# #
# these helper methods are selected by a strategy pattern based on the template name and version #
# The actual implementation can be found in the other module(s) in this package #
# Currently we only have one template with one implementation in template_constraints_v1.py #
# #
###################################################################################################
def can_run_within_timewindow(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime, upper_bound: datetime) -> bool:
'''Check if the given scheduling_unit can run somewhere within the given time window depending on the sub's constrains-template/doc.'''
......@@ -67,14 +101,6 @@ def can_run_within_timewindow(scheduling_unit: models.SchedulingUnitBlueprint, l
scheduling_unit.id, lower_bound, upper_bound, constraints_template.name, constraints_template.version))
class ScoredSchedulingUnit(NamedTuple):
'''struct for collecting scores per constraint and a weighted_score for a scheduling_unit at the given start_time
'''
scheduling_unit: models.SchedulingUnitBlueprint
scores: dict
start_time: datetime
weighted_score: float
def compute_scores(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound:datetime, upper_bound:datetime) -> ScoredSchedulingUnit:
'''Compute the "fitness" scores per constraint for the given scheduling_unit at the given starttime depending on the sub's constrains-template/doc.'''
......@@ -92,11 +118,6 @@ def compute_scores(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound:
scheduling_unit.id, constraints_template.name, constraints_template.version))
def get_sorted_scheduling_units_scored_by_constraints(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound:datetime, upper_bound:datetime) -> [ScoredSchedulingUnit]:
scored_scheduling_units = [compute_scores(scheduling_unit, lower_bound, upper_bound)
for scheduling_unit in scheduling_units]
return sorted(scored_scheduling_units, key=lambda x: x.weighted_score, reverse=True)
def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime=None) -> datetime:
'''determine the earliest possible starttime for the given scheduling unit, taking into account all its constraints'''
......
......@@ -34,8 +34,7 @@ from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp.tasks import schedule_independent_subtasks_in_scheduling_unit_blueprint, unschedule_subtasks_in_scheduling_unit_blueprint
from lofar.sas.tmss.tmss.tmssapp.subtasks import update_subtasks_start_times_for_scheduling_unit, clear_defined_subtasks_start_stop_times_for_scheduling_unit
from lofar.sas.tmss.client.tmssbuslistener import *
from lofar.sas.tmss.tmss.exceptions import *
from lofar.common.datetimeutils import round_to_minute_precision, round_to_second_precision
from lofar.common.datetimeutils import round_to_second_precision
from threading import Thread, Event
from lofar.sas.tmss.services.scheduling.constraints import *
......@@ -167,20 +166,22 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint:
lower_bound_start_time = get_min_earliest_possible_start_time(schedulable_units, lower_bound_start_time + timedelta(minutes=10))
def assign_start_stop_times_to_schedulable_scheduling_units():
def assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time: datetime=None):
''''''
logger.info("Estimating mid-term schedule...")
if lower_bound_start_time is None:
try:
last_scheduling_unit_stop_time = max(su.stop_time for su in get_scheduled_scheduling_units(lower=datetime.utcnow()) if su.stop_time is not None)
lower_bound_start_time = max(su.stop_time for su in get_scheduled_scheduling_units(lower=datetime.utcnow()) if su.stop_time is not None)
except:
last_scheduling_unit_stop_time = datetime.utcnow()
lower_bound_start_time = datetime.utcnow()
scheduling_units = get_schedulable_scheduling_units()
# update the start_times of the remaining ones (so they form queue, and can be visualized in a timeline)
while scheduling_units:
best_scored_scheduling_unit = find_best_next_schedulable_unit(lower_bound_start_time=last_scheduling_unit_stop_time, scheduling_units=scheduling_units)
best_scored_scheduling_unit = find_best_next_schedulable_unit(lower_bound_start_time=lower_bound_start_time, scheduling_units=scheduling_units)
if best_scored_scheduling_unit:
scheduling_unit = best_scored_scheduling_unit.scheduling_unit
......@@ -189,14 +190,14 @@ def assign_start_stop_times_to_schedulable_scheduling_units():
update_subtasks_start_times_for_scheduling_unit(scheduling_unit, start_time)
# keep track of the previous
last_scheduling_unit_stop_time = scheduling_unit.stop_time
lower_bound_start_time = scheduling_unit.stop_time
scheduling_units.remove(scheduling_unit)
else:
# search again in a later timeslot
min_earliest_possible_start_time = get_min_earliest_possible_start_time(scheduling_units, last_scheduling_unit_stop_time+timedelta(minutes=10))
if min_earliest_possible_start_time > last_scheduling_unit_stop_time:
last_scheduling_unit_stop_time = min_earliest_possible_start_time
min_earliest_possible_start_time = get_min_earliest_possible_start_time(scheduling_units, lower_bound_start_time+timedelta(minutes=10))
if min_earliest_possible_start_time > lower_bound_start_time:
lower_bound_start_time = min_earliest_possible_start_time
else:
# cannot advance anymore to find more
logger.warning("Cannot assign start/stop times to remaining scheduling units for mid-term schedule...")
......@@ -235,7 +236,7 @@ class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSEventMessag
super().stop_handling()
def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str):
if status in ["schedulable", "observed", "finished", "cancelled"]:
logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s): triggering update of dynamic schedule...", id, status)
# scheduling takes a long time, longer then creating many scheduling units in bulk
# so, we do not create a complete new schedule for each new unit,
......@@ -249,8 +250,8 @@ class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSEventMessag
self._do_schedule_event.clear()
try:
logger.info("Updating dynamic schedule....")
schedule_next_scheduling_unit()
assign_start_stop_times_to_schedulable_scheduling_units()
scheduled_unit = schedule_next_scheduling_unit()
assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time=scheduled_unit.stop_time if scheduled_unit else None)
logger.info("Updating dynamic schedule.... finished")
except Exception as e:
logger.exception(str(e))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment