#!/usr/bin/env python3 # dynamic_scheduling.py # # Copyright (C) 2020 # ASTRON (Netherlands Institute for Radio Astronomy) # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it # and/or modify it under the terms of the GNU General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # # $Id: $ """ """ import os import logging logger = logging.getLogger(__name__) from datetime import datetime, timedelta, time from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.tmssapp.tasks import schedule_independent_subtasks_in_scheduling_unit_blueprint, unschedule_subtasks_in_scheduling_unit_blueprint from lofar.sas.tmss.tmss.tmssapp.subtasks import update_subtasks_start_times_for_scheduling_unit, clear_defined_subtasks_start_stop_times_for_scheduling_unit from lofar.sas.tmss.client.tmssbuslistener import * from lofar.common.datetimeutils import round_to_second_precision from threading import Thread, Event from lofar.sas.tmss.services.scheduling.constraints import * # LOFAR needs to have a gap in between observations to (re)initialize hardware. DEFAULT_INTER_OBSERVATION_GAP = timedelta(seconds=60) DEFAULT_NEXT_STARTTIME_GAP = timedelta(seconds=180) ################## core dynamic scheduling methods ################################################ # # # This module starts with the core dynamic scheduling methods which are used in the dynamic # # scheduling service. These high level methods only filter/score/sort in a generic way. # # The detailed concrete filter/score/sort methods are pick by a strategy pattern in the # # constraints package based on each scheduling unit's scheduling_constrains template. # # # ################################################################################################### def find_best_next_schedulable_unit(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound_start_time: datetime, upper_bound_stop_time: datetime) -> ScoredSchedulingUnit: """ find the best schedulable scheduling_unit which can run withing the given time window from the given scheduling_units. :param lower_bound_start_time: evaluate the constrains at and after lower_bound_start_time. The returned unit has a start_time guaranteed at or after lower_bound_start_time. :param upper_bound_stop_time: evaluate the constrains before upper_bound_stop_time. The returned unit has a stop_time guaranteed before upper_bound_stop_time. :param scheduling_units: evaluate these scheduling_units. Returns a ScoredSchedulingUnit struct with the best next schedulable scheduling unit and its proposed start_time where it best fits its contraints. """ # ensure upper is greater than or equal to lower upper_bound_stop_time = max(lower_bound_start_time, upper_bound_stop_time) filtered_scheduling_units = filter_scheduling_units_using_constraints(scheduling_units, lower_bound_start_time, upper_bound_stop_time) if filtered_scheduling_units: best_scored_scheduling_unit = get_best_scored_scheduling_unit_scored_by_constraints(filtered_scheduling_units, lower_bound_start_time, upper_bound_stop_time) return best_scored_scheduling_unit # no filtered scheduling units found... logger.debug("No schedulable scheduling units found which meet the requirements between '%s' and '%s'", lower_bound_start_time, upper_bound_stop_time) return None def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: '''find the best next schedulable scheduling unit and try to schedule it. Overlapping existing scheduled units are unscheduled if their score is lower. :return: the scheduled scheduling unit.''' # --- setup of needed variables --- schedulable_units = get_dynamically_schedulable_scheduling_units() if len(schedulable_units) == 0: logger.info("No scheduling units found...") return # estimate the lower_bound_start_time lower_bound_start_time = get_min_earliest_possible_start_time(schedulable_units, datetime.utcnow()+DEFAULT_NEXT_STARTTIME_GAP) # estimate the upper_bound_stop_time, which may give us a small timewindow before any next scheduled unit, or a default window of a day try: upper_bound_stop_time = max(su.start_time for su in get_scheduled_scheduling_units(lower=lower_bound_start_time, upper=lower_bound_start_time + timedelta(days=1))) except ValueError: upper_bound_stop_time = lower_bound_start_time + timedelta(days=1) # no need to irritate user in log files with subsecond scheduling precision lower_bound_start_time = round_to_second_precision(lower_bound_start_time) upper_bound_stop_time = max(round_to_second_precision(upper_bound_stop_time), lower_bound_start_time) # --- core routine --- while lower_bound_start_time < upper_bound_stop_time: try: # try to find the best next scheduling_unit logger.info("schedule_next_scheduling_unit: searching for best scheduling unit to schedule between '%s' and '%s'", lower_bound_start_time, upper_bound_stop_time) best_scored_scheduling_unit = find_best_next_schedulable_unit(schedulable_units, lower_bound_start_time, upper_bound_stop_time) if best_scored_scheduling_unit: best_scheduling_unit = best_scored_scheduling_unit.scheduling_unit best_scheduling_unit_score = best_scored_scheduling_unit.weighted_score best_start_time = best_scored_scheduling_unit.start_time # make start_time "look nice" for us humans best_start_time = round_to_second_precision(best_start_time) logger.info("schedule_next_scheduling_unit: found best candidate id=%s '%s' weighted_score=%s start_time=%s", best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time) if unschededule_blocking_scheduled_units_if_needed_and_possible(best_scored_scheduling_unit): # no (old) scheduled scheduling_units in the way, so schedule our candidate! scheduled_scheduling_unit = schedule_independent_subtasks_in_scheduling_unit_blueprint(best_scheduling_unit, start_time=best_start_time) logger.info("schedule_next_scheduling_unit: scheduled best candidate id=%s '%s' score=%s start_time=%s", best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time) return scheduled_scheduling_unit except SubtaskSchedulingException as e: logger.error("Could not schedule scheduling_unit id=%s name='%s'. Error: %s", best_scheduling_unit.id, best_scheduling_unit.name, e) # nothing was found, or an error occurred. # seach again... (loop) with the remaining schedulable_units and new lower_bound_start_time schedulable_units = get_dynamically_schedulable_scheduling_units() if len(schedulable_units) == 0: logger.info("No scheduling units found...") return lower_bound_start_time = get_min_earliest_possible_start_time(schedulable_units, lower_bound_start_time + timedelta(hours=1)) def assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time: datetime): '''''' logger.info("Estimating mid-term schedule with lower_bound_start_time=%s ..." % lower_bound_start_time) scheduling_units = get_dynamically_schedulable_scheduling_units() if len(scheduling_units) == 0: logger.info("No scheduling units found...") return upper_bound_stop_time = lower_bound_start_time + timedelta(days=365) # update the start_times of the remaining ones (so they form queue, and can be visualized in a timeline) while scheduling_units and lower_bound_start_time < upper_bound_stop_time: best_scored_scheduling_unit = find_best_next_schedulable_unit(scheduling_units, lower_bound_start_time, upper_bound_stop_time) if best_scored_scheduling_unit: scheduling_unit = best_scored_scheduling_unit.scheduling_unit start_time = round_to_second_precision(best_scored_scheduling_unit.start_time) logger.info("mid-term schedule: next scheduling unit id=%s '%s' start_time=%s", scheduling_unit.id, scheduling_unit.name, start_time) update_subtasks_start_times_for_scheduling_unit(scheduling_unit, start_time) # TODO check this? # If the start_time of the subtasks are updated, should the start_time (and stop_time) of the # scheduling_unit also be updated? Currently its a cached property # keep track of the lower_bound_start_time based on last sub.stoptime and gap lower_bound_start_time = scheduling_unit.stop_time + DEFAULT_INTER_OBSERVATION_GAP scheduling_units.remove(scheduling_unit) else: # search again in a later timeslot min_earliest_possible_start_time = get_min_earliest_possible_start_time(scheduling_units, lower_bound_start_time+timedelta(minutes=10)) logger.info("lower_bound_start_time='%s', min_earliest_possible_start_time='%s'", lower_bound_start_time, min_earliest_possible_start_time) if min_earliest_possible_start_time > lower_bound_start_time: lower_bound_start_time = min_earliest_possible_start_time else: # cannot advance anymore to find more logger.warning("Cannot assign start/stop times to remaining scheduling units for mid-term schedule...") for su in scheduling_units: logger.warning("Remaining scheduling unit: id=%s '%s'", su.id, su.name) # clear start/stop times, so they don't show up in the timeline, # and we can filter/show them in a seperate list which the user can tinker on the constraints clear_defined_subtasks_start_stop_times_for_scheduling_unit(su) break logger.info("Estimating mid-term schedule... finished") def do_dynamic_schedule() -> models.SchedulingUnitBlueprint: '''do a full update of the schedule: schedule next scheduling unit and assign start stop times to remaining schedulable scheduling units''' logger.info("Updating dynamic schedule....") scheduled_unit = schedule_next_scheduling_unit() # determine next possible start time for remaining scheduling_units if scheduled_unit: lower_bound_start_time = scheduled_unit.stop_time + DEFAULT_INTER_OBSERVATION_GAP else: try: scheduled_units = get_scheduled_scheduling_units(datetime.utcnow(), datetime.utcnow()) lower_bound_start_time = max([s.stop_time for s in scheduled_units if s.stop_time is not None]) + DEFAULT_INTER_OBSERVATION_GAP except: lower_bound_start_time = datetime.utcnow() # round up to next nearest second lower_bound_start_time += timedelta(microseconds=1000000-lower_bound_start_time.microsecond) # determine mid-term schedule by assigning start/stop times to remaining schedulable units using the same search strategy assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time) logger.info("Finished updating dynamic schedule") return scheduled_unit ################## service/messagebug handler class ############################################### class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): ''' The TMSSDynamicSchedulingMessageHandler reacts to TMSS EventMessages by triggering a new full update of the dynamic schedule. The actual schedule-update method runs on a backround thread, and can take some time to complete ranging from a few seconds to several minutes. In the mean time new EventMessages may be received. These are handled by raising a flag that signals the schedule-update-thread that a new full update is needed. This way, a burst of Events results in a single update, and it also ensures that we always compute the schedule with the latest data. ''' def __init__(self): super().__init__(log_event_messages=True) self._scheduling_thread = None self._scheduling_thread_running = False self._do_schedule_event = Event() def start_handling(self): # start the background thread which waits until the _do_schedule_event event is set upon receiving to the correct TMSS EVentMessages. self._scheduling_thread = Thread(target=TMSSDynamicSchedulingMessageHandler._scheduling_loop, kwargs={'self':self}) self._scheduling_thread.daemon = True self._scheduling_thread_running = True self._scheduling_thread.start() super().start_handling() def stop_handling(self): self._scheduling_thread_running = False self._scheduling_thread.join() self._scheduling_thread = None super().stop_handling() def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): if status in ["schedulable", "observed", "finished", "cancelled"]: logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s): triggering update of dynamic schedule...", id, status) # scheduling takes a long time, longer then creating many scheduling units in bulk # so, we do not create a complete new schedule for each new unit, # but we only trigger a new schedule update. # This way we are sure that the latest units are always taken into account while scheduling, but we do not waste cpu cylces. self._do_schedule_event.set() def onSchedulingUnitDraftConstraintsUpdated(self, id: int, scheduling_constraints_doc: dict): affected_scheduling_units = models.SchedulingUnitBlueprint.objects.filter(draft__id=id).all() for scheduling_unit in affected_scheduling_units: if scheduling_unit.status == 'scheduled': unschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit) self._do_schedule_event.set() def onSettingUpdated(self, name: str, value: bool): if name == models.Flag.Choices.DYNAMIC_SCHEDULING_ENABLED.value and value: logger.info("%s was set to %s: triggering update of dynamic schedule...", name, value) self._do_schedule_event.set() def _scheduling_loop(self): while self._scheduling_thread_running: if self._do_schedule_event.wait(timeout=10): self._do_schedule_event.clear() try: if models.Setting.objects.get(name=models.Flag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value: do_dynamic_schedule() else: logger.warning("Skipping update of dynamic schedule because the setting %s=%s", models.Flag.Choices.DYNAMIC_SCHEDULING_ENABLED.value, models.Setting.objects.get(name=models.Flag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value) except Exception as e: logger.exception(str(e)) # just continue processing events. better luck next time... def create_dynamic_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER): return TMSSBusListener(handler_type=TMSSDynamicSchedulingMessageHandler, handler_kwargs=None, exchange=exchange, broker=broker) ################## helper methods ################################################################# def get_dynamically_schedulable_scheduling_units() -> [models.SchedulingUnitBlueprint]: '''get a list of all dynamically schedulable scheduling_units''' defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value='defined') defined_independend_subtask_ids = defined_independend_subtasks.values('task_blueprints__scheduling_unit_blueprint_id').distinct().all() scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=defined_independend_subtask_ids) \ .filter(draft__scheduling_constraints_template__isnull=False) \ .select_related('draft', 'draft__scheduling_constraints_template').all() return [su for su in scheduling_units if su.status == 'schedulable'] def get_scheduled_scheduling_units(lower:datetime=None, upper:datetime=None) -> [models.SchedulingUnitBlueprint]: '''get a list of all scheduled scheduling_units''' scheduled_subtasks = models.Subtask.objects.filter(state__value='scheduled') if lower is not None: scheduled_subtasks = scheduled_subtasks.filter(stop_time__gte=lower) if upper is not None: scheduled_subtasks = scheduled_subtasks.filter(start_time__lte=upper) return list(models.SchedulingUnitBlueprint.objects.filter(id__in=scheduled_subtasks.values('task_blueprints__scheduling_unit_blueprint_id').distinct()).all()) def unschededule_blocking_scheduled_units_if_needed_and_possible(candidate: ScoredSchedulingUnit) -> bool: '''check if there are any already scheduled units in the way, and unschedule them if allowed. Return True if nothing is blocking anymore.''' # check any previously scheduled units, and unschedule if needed/allowed scheduled_scheduling_units = get_scheduled_scheduling_units(lower=candidate.start_time, upper=candidate.start_time + candidate.scheduling_unit.duration) # check if we can and need to unschedule the blocking units for scheduled_scheduling_unit in scheduled_scheduling_units: scheduled_score = compute_scores(scheduled_scheduling_unit, candidate.start_time, candidate.start_time + candidate.scheduling_unit.duration) if candidate.weighted_score > scheduled_score.weighted_score: # ToDo: also check if the scheduled_scheduling_unit is manually/dynamically scheduled logger.info("unscheduling id=%s '%s' because it is in the way and has a lower score than the best candidate id=%s '%s' score=%s start_time=%s", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.weighted_score, candidate.scheduling_unit.start_time) unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) # check again... are still there any scheduled_scheduling_units in the way? scheduled_scheduling_units = get_scheduled_scheduling_units(lower=candidate.start_time, upper=candidate.start_time + candidate.scheduling_unit.duration) if scheduled_scheduling_units: # accept current solution with current scheduled_scheduling_units logger.info("keeping current scheduled unit(s) which have a better (or equal) score: %s", "; ".join( "id=%s '%s' start_time='%s'" % (su.id, su.name, su.start_time) for su in scheduled_scheduling_units)) # indicate there are still blocking units return False # all clear, nothing is blocking anymore return True