diff --git a/SAS/TMSS/services/scheduling/lib/constraints/__init__.py b/SAS/TMSS/services/scheduling/lib/constraints/__init__.py index 30fff6c121309cfcde8646afe674444d69a5a5b9..170960bac4c4cd10679e9a198e15cc5e497fdd4a 100644 --- a/SAS/TMSS/services/scheduling/lib/constraints/__init__.py +++ b/SAS/TMSS/services/scheduling/lib/constraints/__init__.py @@ -28,6 +28,7 @@ import logging logger = logging.getLogger(__name__) from datetime import datetime +from typing import NamedTuple from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.exceptions import * @@ -51,10 +52,10 @@ def filter_scheduling_units_using_constraints(scheduling_units:[models.Schedulin def can_run_within_timewindow(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime, upper_bound: datetime) -> bool: - '''Check if the given scheduling_unit can run somewhere within the given time window depending on the sub's constrainstemplate/doc.''' + '''Check if the given scheduling_unit can run somewhere within the given time window depending on the sub's constrains-template/doc.''' constraints_template = scheduling_unit.draft.scheduling_constraints_template - # chose appropriate method based on template, or raise + # choose appropriate method based on template (strategy pattern), or raise if constraints_template.name == 'constraints' and constraints_template.version == 1: # import here to prevent circular imports. Do not worry about performance loss, cause python only imports once and then uses a cache. from . import template_constraints_v1 @@ -65,5 +66,63 @@ def can_run_within_timewindow(scheduling_unit: models.SchedulingUnitBlueprint, l raise UnknownTemplateException("Cannot check if scheduling_unit id=%s can run between '%s' and '%s', because we have no constraint checker for scheduling constraints template '%s' version=%s" % ( scheduling_unit.id, lower_bound, upper_bound, constraints_template.name, constraints_template.version)) -# expose only these public methods for this package, and 'hide' the template-specific imported methods, which are only used internally here. -__all__ = ["filter_scheduling_units_using_constraints", "can_run_within_timewindow"] + +class ScoredSchedulingUnit(NamedTuple): + '''struct for collecting scores per constraint and a weighted_score for a scheduling_unit at the given start_time + ''' + scheduling_unit: models.SchedulingUnitBlueprint + scores: dict + start_time: datetime + weighted_score: float + + +def compute_scores(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound:datetime, upper_bound:datetime) -> ScoredSchedulingUnit: + '''Compute the "fitness" scores per constraint for the given scheduling_unit at the given starttime depending on the sub's constrains-template/doc.''' + constraints_template = scheduling_unit.draft.scheduling_constraints_template + + # choose appropriate method based on template (strategy pattern), or raise + if constraints_template.name == 'constraints' and constraints_template.version == 1: + # import here to prevent circular imports. Do not worry about performance loss, cause python only imports once and then uses a cache. + from . import template_constraints_v1 + return template_constraints_v1.compute_scores(scheduling_unit, lower_bound, upper_bound) + + # TODO: if we get more constraint templates or versions, then add a check here and import and use the new module with the constraint methods for that specific template. (strategy pattern) + + raise UnknownTemplateException("Cannot compute scores for scheduling_unit id=%s, because we have no score computation method for scheduling constraints template '%s' version=%s" % ( + scheduling_unit.id, constraints_template.name, constraints_template.version)) + + +def get_sorted_scheduling_units_scored_by_constraints(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound:datetime, upper_bound:datetime) -> [ScoredSchedulingUnit]: + scored_scheduling_units = [compute_scores(scheduling_unit, lower_bound, upper_bound) + for scheduling_unit in scheduling_units] + return sorted(scored_scheduling_units, key=lambda x: x.weighted_score, reverse=True) + + +def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime=None) -> datetime: + '''determine the earliest possible starttime for the given scheduling unit, taking into account all its constraints''' + constraints_template = scheduling_unit.draft.scheduling_constraints_template + + # choose appropriate method based on template (strategy pattern), or raise + if constraints_template.name == 'constraints' and constraints_template.version == 1: + # import here to prevent circular imports. Do not worry about performance loss, cause python only imports once and then uses a cache. + from . import template_constraints_v1 + return template_constraints_v1.get_earliest_possible_start_time(scheduling_unit, lower_bound) + + # TODO: if we get more constraint templates or versions, then add a check here and import and use the new module with the constraint methods for that specific template. (strategy pattern) + + raise UnknownTemplateException("Cannot compute first possible starttime for scheduling_unit id=%s, because we have no constraint checker for scheduling constraints template '%s' version=%s" % ( + scheduling_unit.id, constraints_template.name, constraints_template.version)) + + +def get_min_earliest_possible_start_time(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound: datetime=None) -> datetime: + '''deterimine the earliest possible starttime over all given scheduling units, taking into account all their constraints''' + if lower_bound is None: + lower_bound = datetime.utcnow() + try: + return min(get_earliest_possible_start_time(scheduling_unit, lower_bound) for scheduling_unit in scheduling_units) + except ValueError: + return lower_bound + + + + diff --git a/SAS/TMSS/services/scheduling/lib/constraints/template_constraints_v1.py b/SAS/TMSS/services/scheduling/lib/constraints/template_constraints_v1.py index ba665f50da545d294f4bf36695e7990e68b1b902..b6486489364ac6d5d6da26a100983ecd49f37469 100644 --- a/SAS/TMSS/services/scheduling/lib/constraints/template_constraints_v1.py +++ b/SAS/TMSS/services/scheduling/lib/constraints/template_constraints_v1.py @@ -30,8 +30,9 @@ logger = logging.getLogger(__name__) from datetime import datetime, timedelta from lofar.sas.tmss.tmss.tmssapp import models -from lofar.sas.tmss.services.scheduling.dynamic_scheduling import get_first_possible_start_time -from lofar.sas.tmss.tmss.tmssapp.conversions import LOFAR_CENTER_OBSERVER +from lofar.sas.tmss.tmss.tmssapp.conversions import LOFAR_CENTER_OBSERVER, sun_rise_and_set_at_lofar_center, Time + +from . import ScoredSchedulingUnit def can_run_within_timewindow(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime, upper_bound: datetime) -> bool: '''determine if the given scheduling_unit can run withing the given timewindow evaluating all constraints from the "constraints" version 1 template''' @@ -57,7 +58,9 @@ def can_run_within_timewindow_with_daily_constraints(scheduling_unit: models.Sch # Ugly code. Should be improved. Works for demo. # create a series of timestamps in the window of opportunity, and evaluate of there are all during day or night - possible_start_time = get_first_possible_start_time(scheduling_unit, lower_bound) + possible_start_time = get_earliest_possible_start_time(scheduling_unit, lower_bound) + + # ToDo: use specified total observation duration, and ignore pipelines who don't care about day/night possible_stop_time = possible_start_time + scheduling_unit.duration timestamps = [possible_start_time] while timestamps[-1] < possible_stop_time - timedelta(hours=8): @@ -87,3 +90,73 @@ def can_run_within_timewindow_with_sky_constraints(scheduling_unit: models.Sched # maybe even split this method into sub methods for the very distinct sky constraints: min_calibrator_elevation, min_target_elevation, transit_offset & min_distance return True # for now, ignore sky contraints. + +def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime=None) -> datetime: + now = datetime.utcnow() + + if lower_bound is None: + lower_bound = now + + constraints = scheduling_unit.draft.scheduling_constraints_doc + + try: + if constraints['daily']['require_day'] or constraints['daily']['require_night']: + sun_rise, sun_set = sun_rise_and_set_at_lofar_center(lower_bound) + + # TODO: take avoid_twilight into account + if constraints['daily']['require_day']: + if lower_bound+scheduling_unit.duration > sun_set: + return LOFAR_CENTER_OBSERVER.sun_rise_time(time=Time(sun_set), which='next').to_datetime() + if lower_bound >= sun_rise: + return lower_bound + return sun_rise + + if constraints['daily']['require_night']: + if lower_bound+scheduling_unit.duration < sun_rise: + return lower_bound + if lower_bound >= sun_set: + return lower_bound + return sun_set + + # if constraints['daily']['require_night']: + # # for now, just assume next_sun_set is fine (leaving lots of open gaps) + # return next_sun_set + except Exception as e: + logger.exception(str(e)) + + # no constraints dictating starttime? make a guesstimate. + return max(lower_bound, now) + + +def compute_scores(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound:datetime, upper_bound:datetime) -> ScoredSchedulingUnit: + '''Compute the "fitness" scores per constraint for the given scheduling_unit at the given starttime depending on the sub's constrains-template/doc.''' + constraints = scheduling_unit.draft.scheduling_constraints_doc + + # TODO: add compute_scores methods for each type of constraint + # TODO: take start_time into account. For example, an LST constraint yields a better score when the starttime is such that the center of the obs is at LST. + # TODO: TMSS-??? (and more?), compute score using the constraints in constraints['daily'] + # TODO: TMSS-244 (and more?), compute score using the constraints in constraints['time'] + # TODO: TMSS-245 TMSS-250 (and more?), compute score using the constraints in constraints['sky'] + + # for now (as a proof of concept and sort of example), just return 1's + scores = {'daily': 1.0, + 'time': 1.0, + 'sky': 1.0 } + + # add "common" scores which do not depend on constraints, such as project rank and creation date + # TODO: should be normalized! + scores['project_rank'] = scheduling_unit.draft.scheduling_set.project.priority_rank + scores['age'] = (datetime.utcnow() - scheduling_unit.created_at).total_seconds() + + try: + # TODO: apply weights. Needs some new weight model in django, probably linked to constraints_template. + # for now, just average the scores + weighted_score = sum(scores.values())/len(scores) + except: + weighted_score = 1 + + return ScoredSchedulingUnit(scheduling_unit=scheduling_unit, + scores=scores, + weighted_score=weighted_score, + start_time=lower_bound) + diff --git a/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py index 564b00ffda75b5c62d13430b8e48888430799e4c..e75bf7cc3d3a6c752613e0767b64475d2c73b3af 100644 --- a/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py @@ -38,7 +38,7 @@ from lofar.sas.tmss.tmss.exceptions import * from lofar.common.datetimeutils import round_to_minute_precision, round_to_second_precision from threading import Thread, Event -from lofar.sas.tmss.services.scheduling.constraints import filter_scheduling_units_using_constraints +from lofar.sas.tmss.services.scheduling.constraints import * def get_schedulable_scheduling_units() -> [models.SchedulingUnitBlueprint]: '''get a list of all schedulable scheduling_units''' @@ -58,10 +58,10 @@ def get_scheduled_scheduling_units(lower:datetime=None, upper:datetime=None) -> return list(models.SchedulingUnitBlueprint.objects.filter(id__in=scheduled_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct()).all()) -def find_best_next_schedulable_unit(lower_bound_start_time: datetime=None, upper_bound_stop_time: datetime=None, scheduling_units:[models.SchedulingUnitBlueprint]=None) -> (models.SchedulingUnitBlueprint, datetime): +def find_best_next_schedulable_unit(lower_bound_start_time: datetime=None, upper_bound_stop_time: datetime=None, scheduling_units:[models.SchedulingUnitBlueprint]=None) -> ScoredSchedulingUnit: """ find the best schedulable scheduling_unit which can run withing the given time window, for the given scheduling_units. - Returns a tuple of the best next schedulable scheduling unit and its proposed starttime where it best fits its contraints. + Returns a ScoredSchedulingUnit struct with a.o. the best next schedulable scheduling unit and its proposed starttime where it best fits its contraints. """ if lower_bound_start_time is None: lower_bound_start_time = datetime.utcnow() @@ -79,22 +79,23 @@ def find_best_next_schedulable_unit(lower_bound_start_time: datetime=None, upper filtered_scheduling_units = filter_scheduling_units_using_constraints(scheduling_units, lower_bound_start_time, upper_bound_stop_time) if filtered_scheduling_units: - filtered_scheduling_units = sorted(filtered_scheduling_units, key=get_score, reverse=True) - best_next_schedulable_unit = filtered_scheduling_units[0] + sorted_scored_scheduling_units = get_sorted_scheduling_units_scored_by_constraints(filtered_scheduling_units, lower_bound_start_time, upper_bound_stop_time) + best_scored_scheduling_unit = sorted_scored_scheduling_units[0] - # TODO: compute best start_time so the constrainst yields the max score. - best_start_time = get_first_possible_start_time(best_next_schedulable_unit, lower_bound_start_time) + best_next_schedulable_unit = best_scored_scheduling_unit.scheduling_unit + best_start_time = best_scored_scheduling_unit.start_time if best_start_time >= lower_bound_start_time and best_start_time+best_next_schedulable_unit.duration < upper_bound_stop_time: - return best_next_schedulable_unit, best_start_time + return best_scored_scheduling_unit else: # try again a bit further into the future. There might be a spot there... + # ToDo: apply smarter guessed step size lower_bound_start_time += timedelta(minutes=10) else: # no scheduling units... - return None, None + return None - return None, None + return None def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: @@ -104,7 +105,7 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: schedulable_units = get_schedulable_scheduling_units() # estimate the lower_bound_start_time - lower_bound_start_time = get_min_first_possible_start_time(schedulable_units, datetime.utcnow() + timedelta(seconds=90)) + lower_bound_start_time = get_min_earliest_possible_start_time(schedulable_units, datetime.utcnow() + timedelta(seconds=90)) # estimate the upper_bound_stop_time, which may give us a small timewindow before any next scheduled unit, or a default window of a day try: @@ -120,20 +121,22 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: try: # try to find the best next scheduling_unit logger.info("schedule_next_scheduling_unit: searching for best scheduling unit to schedule starting between %s and %s", lower_bound_start_time, upper_bound_stop_time) - best_scheduling_unit, best_start_time = find_best_next_schedulable_unit(lower_bound_start_time, upper_bound_stop_time, schedulable_units) - if best_scheduling_unit: - best_scheduling_unit_score = get_score(best_scheduling_unit) + best_scored_scheduling_unit = find_best_next_schedulable_unit(lower_bound_start_time, upper_bound_stop_time, schedulable_units) + if best_scored_scheduling_unit: + best_scheduling_unit = best_scored_scheduling_unit.scheduling_unit + best_scheduling_unit_score = best_scored_scheduling_unit.weighted_score + best_start_time = best_scored_scheduling_unit.start_time # make start_time "look nice" for us humans - best_start_time = round_to_minute_precision(best_start_time) + best_start_time = round_to_second_precision(best_start_time) - logger.info("schedule_next_scheduling_unit: found best candidate id=%s '%s' score=%s start_time=%s", + logger.info("schedule_next_scheduling_unit: found best candidate id=%s '%s' weighted_score=%s start_time=%s", best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time) # check any previously scheduled units, and unschedule if needed/allowed scheduled_scheduling_units = get_scheduled_scheduling_units(lower=best_start_time, upper=best_start_time+best_scheduling_unit.duration) for scheduled_scheduling_unit in scheduled_scheduling_units: - if best_scheduling_unit_score > get_score(scheduled_scheduling_unit): + if best_scheduling_unit_score > compute_scores(scheduled_scheduling_unit, lower_bound_start_time, upper_bound_stop_time).weighted_score: logger.info("schedule_next_scheduling_unit: unscheduling id=%s '%s' because it is in the way and has a lower score than the best candidate id=%s '%s' score=%s start_time=%s", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time) @@ -144,7 +147,7 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: if scheduled_scheduling_units: # accept current solution with current scheduled_scheduling_units logger.info("schedule_next_scheduling_unit: keeping current scheduled unit(s) which have a better (or equal) score: %s", - "; ".join("id=%s '%s' score=%s start_time=%s" % (su.id, su.name, get_score(su), su.start_time) for su in scheduled_scheduling_units)) + "; ".join("id=%s '%s' start_time='%s'" % (su.id, su.name, su.start_time) for su in scheduled_scheduling_units)) return None else: # no (old) scheduled scheduling_units in the way, so @@ -161,7 +164,7 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: # nothing was found, or an error occurred. # seach again... (loop) with the remaining schedulable_units and new lower_bound_start_time schedulable_units = get_schedulable_scheduling_units() - lower_bound_start_time = get_min_first_possible_start_time(schedulable_units, lower_bound_start_time + timedelta(minutes=10)) + lower_bound_start_time = get_min_earliest_possible_start_time(schedulable_units, lower_bound_start_time + timedelta(minutes=10)) def assign_start_stop_times_to_schedulable_scheduling_units(): @@ -177,10 +180,11 @@ def assign_start_stop_times_to_schedulable_scheduling_units(): # update the start_times of the remaining ones (so they form queue, and can be visualized in a timeline) while scheduling_units: - scheduling_unit, start_time = find_best_next_schedulable_unit(lower_bound_start_time=last_scheduling_unit_stop_time, scheduling_units=scheduling_units) + best_scored_scheduling_unit = find_best_next_schedulable_unit(lower_bound_start_time=last_scheduling_unit_stop_time, scheduling_units=scheduling_units) - if scheduling_unit: - start_time = round_to_second_precision(start_time) + if best_scored_scheduling_unit: + scheduling_unit = best_scored_scheduling_unit.scheduling_unit + start_time = round_to_second_precision(best_scored_scheduling_unit.start_time) logger.info("mid-term schedule: next scheduling unit id=%s '%s' start_time=%s", scheduling_unit.id, scheduling_unit.name, start_time) update_subtasks_start_times_for_scheduling_unit(scheduling_unit, start_time) @@ -190,9 +194,9 @@ def assign_start_stop_times_to_schedulable_scheduling_units(): scheduling_units.remove(scheduling_unit) else: # search again in a later timeslot - min_first_possible_start_time = get_min_first_possible_start_time(scheduling_units, last_scheduling_unit_stop_time+timedelta(minutes=10)) - if min_first_possible_start_time > last_scheduling_unit_stop_time: - last_scheduling_unit_stop_time = min_first_possible_start_time + min_earliest_possible_start_time = get_min_earliest_possible_start_time(scheduling_units, last_scheduling_unit_stop_time+timedelta(minutes=10)) + if min_earliest_possible_start_time > last_scheduling_unit_stop_time: + last_scheduling_unit_stop_time = min_earliest_possible_start_time else: # cannot advance anymore to find more logger.warning("Cannot assign start/stop times to remaining scheduling units for mid-term schedule...") @@ -206,62 +210,6 @@ def assign_start_stop_times_to_schedulable_scheduling_units(): logger.info("Estimating mid-term schedule... finished") -def get_first_possible_start_time(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime=None) -> datetime: - now = datetime.utcnow() - - if lower_bound is None: - lower_bound = now - - constraints = scheduling_unit.draft.scheduling_constraints_doc - - # TODO: use factory to get function based on scheduling_constraints_template and/or constraint name - # for now, assume there is only one template, allowing straightforward filtering. - - try: - if constraints['daily']['require_day'] or constraints['daily']['require_night']: - sun_rise, sun_set = sun_rise_and_set_at_lofar_center(lower_bound) - - # TODO: take avoid_twilight into account - if constraints['daily']['require_day']: - if lower_bound+scheduling_unit.duration > sun_set: - return LOFAR_CENTER_OBSERVER.sun_rise_time(time=Time(sun_set), which='next').to_datetime() - if lower_bound >= sun_rise: - return lower_bound - return sun_rise - - if constraints['daily']['require_night']: - if lower_bound+scheduling_unit.duration < sun_rise: - return lower_bound - if lower_bound >= sun_set: - return lower_bound - return sun_set - - # if constraints['daily']['require_night']: - # # for now, just assume next_sun_set is fine (leaving lots of open gaps) - # return next_sun_set - except Exception as e: - logger.exception(str(e)) - - # no constraints dictating starttime? make a guesstimate. - return max(lower_bound, now) - - -def get_min_first_possible_start_time(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound: datetime=None) -> datetime: - if lower_bound is None: - lower_bound = datetime.utcnow() - try: - return min(get_first_possible_start_time(scheduling_unit, lower_bound) for scheduling_unit in scheduling_units) - except ValueError: - return lower_bound - - -def get_score(scheduling_unit: models.SchedulingUnitBlueprint) -> float: - # TODO: sort by constraints, lst, etc - - # for now (as a proof of concept), order the scheduling units by project priority_rank - # and a bit by how close it is to 'now' - now = datetime.utcnow() - return 5*scheduling_unit.draft.scheduling_set.project.priority_rank - (get_first_possible_start_time(scheduling_unit, now)-now).total_seconds()/7200 class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSEventMessageHandler): '''