#!/usr/bin/env python3 # dynamic_scheduling.py # # Copyright (C) 2020 # ASTRON (Netherlands Institute for Radio Astronomy) # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it # and/or modify it under the terms of the GNU General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # # $Id: $ """ This dynamic_scheduling module contains all code for dynamic scheduling many scheduling_units in "the most optimal way". We have three goals when computing an obsering schedule for the telescope: 1. Observe each target under the best conditions, resulting in the best scientific output. 2. Use the precious telescope time to the fullest, thus minimizing the gaps in between observations. 3. Be able to quickly act when conditions change or when a new (triggered) scheduling_unit is added. The core scheduling methods are defined in the tasks.py and subtasks.py modules. These just schedule/unschedule/cancel individual scheduling_units/tasks/subtasks. This module uses these core methods, but should be seen as a layer on top which creates the complex behaviour of scheduling a big list of candidate scheduling_units to create a filled schedule meeting the 3 goals above. To achieve goal 1, each scheduling_unit has a set of constraints, which encode what are the best conditions to observe that scheduling_unit. Users are responsible to specify a proper balance between reasonable and tight enough constraints. To tweak the order of scheduling_units in the schedule even further, users can set project- and/or individual scheduling_unit priorities. Goals 1 & 2 are met in the Scheduler class which: - schedules all 'fixed_time' scheduling_units (if possible, else they are annotated as unschedulable) - then finds the "best" scheduling_unit to be scheduled next ("best" means: the best weighted score given each scheduling_unit's constraints, unit and project priority and gap) - then fills the rest of the schedule by moving forward in time, considering all remaining candidate scheduling_units, picking the "best" and positioning it there. That is repeated until no candidates remain, resuling in a long term schedule that is likely to happen if and only if no conditions/constraints/projects/etc change. Each change in any condition/constraint/priority/etc can result in a totally different schedule! That is ok, and exactly the goal. Goal 3 is met using a TMSSBuslistener and TMSSDynamicSchedulingMessageHandler which reacts to the relevant created/updated/deleted events by updating (un)schedulable statuses of the affected scheduling_units, and then triggering a new computation of a full schedule in the Scheduler. """ import os import logging logger = logging.getLogger(__name__) from datetime import datetime, timedelta, time from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.tmssapp.tasks import schedule_independent_subtasks_in_scheduling_unit_blueprint, unschedule_subtasks_in_scheduling_unit_blueprint, mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable, mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable, mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable, set_scheduling_unit_blueprint_start_times, reschedule_subtasks_in_scheduling_unit_blueprint from lofar.sas.tmss.tmss.tmssapp.subtasks import update_subtasks_start_times_for_scheduling_unit, clear_defined_subtasks_start_stop_times_for_scheduling_unit, cancel_subtask, mark_subtasks_and_successors_as_defined from lofar.sas.tmss.client.tmssbuslistener import * from lofar.common.datetimeutils import round_to_second_precision from threading import Thread, Event from django.db import transaction from django.db.models import QuerySet from lofar.sas.tmss.tmss.exceptions import SchedulerInterruptedException from lofar.sas.tmss.services.scheduling.constraints import * # LOFAR needs to have a gap in between observations to (re)initialize hardware. DEFAULT_NEXT_STARTTIME_GAP = timedelta(seconds=180) from lofar.sas.tmss.tmss.tmssapp.subtasks import DEFAULT_INTER_OBSERVATION_GAP class Scheduler: """ The Scheduler class considers candidate schedulable scheduling_units, and tries to schedule them or mark them as unschedulable. See also the goals describe above in the module docstring. The core algorithm is as follows: - The is a continuous scheduling loop that waits in idle state for something to schedule - When some external code calls the `trigger` method, the loop exits the waiting idle state and starts a scheduling run. Running scheduling-runs are interrupted when some external code calls the `trigger` method, leaving a half-finished schedule which is then completed/overwritten be the next full scheduling run which has just been triggered. - In a single full scheduling run `do_full_schedule_computation` we: - schedule all fixed_time scheduling units - schedule the best next dynamic scheduling unit - position the rest of the dynamic scheduling units at their most likely timestamp where they will be observed. """ def __init__(self) -> None: self._scheduling_thread = None self._scheduling_thread_running = False self._do_schedule_event = Event() super().__init__() # make sure initial status is idle models.Subsystem.Activator('scheduler').deactivate() def start_scheduling(self): '''Prepares the scheduling_units, performs one full schedule computation, and start the scheduling loop and wait for a trigger.''' # check and update if needed all scheduling_units statuses for schedulability. try: self._initialize_statuses() except Exception as e: # catch all errors and carry on, so we do end up with a running scheduling loop logger.error("Could not initialize (un)schedulable statuses at startup: %s", e) # start with at least one scheduling round, wait until done try: self.do_full_schedule_computation() except Exception as e: # catch all errors and carry on, so we do end up with a running scheduling loop logger.error("Could not do full schedule computation at startup: %s", e) # start the background thread which waits until the _do_schedule_event event is set # upon receiving to the correct TMSS EVentMessages. logger.debug("Scheduler starting scheduling thread...") self._scheduling_thread_running = True self._scheduling_thread = Thread(target=self._run_scheduling_loop) self._scheduling_thread.daemon = True self._scheduling_thread.start() logger.debug("Scheduler scheduling thread was fully started and initialized") def stop_scheduling(self): '''Stop the scheduling loop and stop processing scheduling triggers.''' self._scheduling_thread_running = False self._scheduling_thread.join() self._scheduling_thread = None def __enter__(self): '''Start scheduling in a 'with' context''' self.start_scheduling() return self def __exit__(self, exc_type, exc_val, exc_tb): '''Stop scheduling leaving the 'with' context''' self.stop_scheduling() def trigger(self): '''Interrupt idle waiting, or the current full schedule computation, and start a new full computation.''' logger.info("Scheduler triggering a new scheduling round...") self._do_schedule_event.set() @property def is_triggered(self) -> bool: return self._do_schedule_event.is_set() @property def fixed_time_scheduling_enabled(self) -> bool: return models.Setting.objects.get(name=models.SystemSettingFlag.Choices.FIXED_TIME_SCHEDULING_ENABLED.value).value @property def dynamic_scheduling_enabled(self) -> bool: return models.Setting.objects.get(name=models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value def _raise_if_triggered(self): if self.is_triggered: raise SchedulerInterruptedException() def _initialize_statuses(self): '''upon startup, do one round over all scheduling units for all projects and reservations marking them as (un)schedulable so they can be picked up in the _scheduling_loop later''' logger.debug("preparing (un)schedulable scheduling units for all (in)active projects...") mark_unschedulable_scheduling_units_for_active_projects_schedulable() mark_scheduling_units_for_inactive_projects_unschedulable() logger.info("prepared (un)schedulable scheduling units for all (in)active projects") logger.debug("preparing (un)schedulable scheduling units for all reservations...") mark_scheduling_units_blocked_by_reservations_unschedulable() logger.info("prepared (un)schedulable scheduling units for all reservations") def _run_scheduling_loop(self): '''Continuously loop waiting for incoming triggers to start a full schedule computation''' logger.info("Scheduler running scheduling loop. Waiting for events...") while self._scheduling_thread_running: logger.debug("Scheduler waiting for trigger to compute new schedule...") if self._do_schedule_event.wait(timeout=1): logger.info("Scheduler was triggered to compute new schedule...") self._do_schedule_event.clear() try: self.do_full_schedule_computation() except SchedulerInterruptedException: # log the interruption, and continue with the next loop, cause _do_schedule_event was set logger.info("Scheduler was interrupted while computing a new full schedule") except Exception as e: # log and just continue processing events. better luck next time... logger.exception(str(e)) def do_full_schedule_computation(self): '''computes a full schedule for fixed_time, dynamic best next unit, and best estimate of rest of schedule given current conditions/constraints/priorities/etc''' logger.info("Scheduler starting full schedule computation...") if self.fixed_time_scheduling_enabled: self.schedule_fixed_time_scheduling_units() else: logger.info("Scheduler skipping update of fixed_time schedule because it is not enabled in the settings") if self.dynamic_scheduling_enabled: self.do_dynamic_schedule() else: logger.info("Scheduler skipping update of dynamic schedule because it is not enabled in the settings") logger.info("Scheduler full schedule computation finished.") def schedule_fixed_time_scheduling_units(self): ''' Schedule all schedulable fixed_time scheduling units.''' with models.Subsystem.Activator('scheduler'): # exclude units for inactive projects mark_scheduling_units_for_inactive_projects_unschedulable() # get the fixed_timely schedulable scheduling_units in most-recently-updated order. schedulable_units = get_fixed_time_schedulable_scheduling_units() # only consider active projects schedulable_units = schedulable_units.filter(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value) if schedulable_units.exists(): logger.info("scheduling %s unit(s) with fixed_time at constraint for active projects", schedulable_units.count()) for i, schedulable_unit in enumerate(schedulable_units): self._raise_if_triggered() #interrupts the scheduling loop for a next round try: start_time = get_earliest_possible_start_time(schedulable_unit) if start_time is None: logger.info("Cannot determine earliest possible start_time for scheduling unit [%s/%s] id=%d, marking it unschedulable", i, len(schedulable_units), schedulable_unit.id) mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(schedulable_unit, "cannot determine start_time") continue stop_time = start_time + schedulable_unit.specified_main_observation_duration set_scheduling_unit_blueprint_start_times(schedulable_unit, first_start_time=start_time) logger.info("Scheduler checking if fixed_time scheduled scheduling unit [%s/%s] id=%d can be scheduled at '%s'", i, len(schedulable_units), schedulable_unit.id, start_time) if filter_scheduling_units_using_constraints([schedulable_unit], lower_bound=start_time, upper_bound=stop_time): try: logger.info("Scheduling fixed_time scheduled scheduling unit id=%d at '%s'", schedulable_unit.id, start_time) schedule_independent_subtasks_in_scheduling_unit_blueprint(schedulable_unit, start_time) except Exception as e: logger.error(e) mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(schedulable_unit, reason=str(e)) else: msg = "fixed_time-scheduled scheduling unit id=%d cannot be scheduled at '%s'" % (schedulable_unit.id, start_time) logger.warning(msg) mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(schedulable_unit, reason=msg) except Exception as e: logger.exception("Could not schedule fixed_time-scheduled scheduling unit id=%d: %s", schedulable_unit.id, e) else: logger.info("there are no schedulable scheduling units with fixed_time at constraint for active projects to schedule") def do_dynamic_schedule(self) -> models.SchedulingUnitBlueprint: '''do a full update of the schedule: schedule next scheduling unit and assign start stop times to remaining schedulable scheduling units''' with models.Subsystem.Activator('scheduler'): logger.info("Updating (dynamic) schedule....") # find and schedule the next best unit scheduled_unit = self.schedule_next_scheduling_unit() # determine next possible start time for remaining scheduling_units if scheduled_unit: lower_bound_start_time = scheduled_unit.on_sky_stop_time + DEFAULT_INTER_OBSERVATION_GAP else: lower_bound_start_time = datetime.utcnow()+DEFAULT_NEXT_STARTTIME_GAP # round up to next nearest minute lower_bound_start_time += timedelta(seconds=60-lower_bound_start_time.second, microseconds=1000000-lower_bound_start_time.microsecond) # estimate mid-term schedule by assigning start/stop times to remaining schedulable units using the same search strategy (without scheduling them!) self.assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time) logger.info("Finished updating dynamic schedule") return scheduled_unit def find_best_next_schedulable_unit(self, scheduling_units:[models.SchedulingUnitBlueprint], lower_bound_start_time: datetime, upper_bound_stop_time: datetime) -> ScoredSchedulingUnit: """ from the given scheduling_units, find the best schedulable scheduling_unit which can run withing the given time window. - all constraints of the scheduling_units are taken into account - if one or more scheduling_units can run exclusively in this window and not later, then only those are considered. :param scheduling_units: evaluate these scheduling_units. :param lower_bound_start_time: evaluate the constraints at and after lower_bound_start_time. The returned unit has a start_time guaranteed at or after lower_bound_start_time. :param upper_bound_stop_time: evaluate the constraints before upper_bound_stop_time. The returned unit has a stop_time guaranteed before upper_bound_stop_time. Returns a ScoredSchedulingUnit struct with the best next schedulable scheduling unit and its proposed start_time where it best fits its contraints. """ _start_search_timestamp = datetime.utcnow() # ensure upper is greater than or equal to lower upper_bound_stop_time = max(lower_bound_start_time, upper_bound_stop_time) # first, from all given scheduling_units, filter and consider only those that meet their constraints. filtered_scheduling_units = filter_scheduling_units_using_constraints(scheduling_units, lower_bound_start_time, upper_bound_stop_time, self._raise_if_triggered) logger.debug("find_best_next_schedulable_unit: units meeting constraints between '%s' and '%s': %s", lower_bound_start_time, upper_bound_stop_time, ','.join([str(su.id) for su in sorted(filtered_scheduling_units, key=lambda x: x.id)]) or 'None') # then, check if there is a subset that can only run exclusively in this window and not later. exclusive_in_this_window_scheduling_units = filter_scheduling_units_which_can_only_run_in_this_window(filtered_scheduling_units, lower_bound_start_time, upper_bound_stop_time, self._raise_if_triggered) logger.debug("find_best_next_schedulable_unit: units meeting constraints exclusively between '%s' and '%s': %s", lower_bound_start_time, upper_bound_stop_time, ','.join([str(su.id) for su in sorted(exclusive_in_this_window_scheduling_units, key=lambda x: x.id)]) or 'None') # if there are some units that can only be scheduled exclusively in this window, # then consider only those. Else, just use all. units_to_score = exclusive_in_this_window_scheduling_units if exclusive_in_this_window_scheduling_units else filtered_scheduling_units logger.debug("find_best_next_schedulable_unit: units to score between '%s' and '%s': %s", lower_bound_start_time, upper_bound_stop_time, ','.join([str(su.id) for su in sorted(units_to_score, key=lambda x: x.id)]) or 'None') # from the filtered down list of units, compute the (weighted) scores, and return the best scoring one. best_scored_scheduling_unit = get_best_scored_scheduling_unit_scored_by_constraints(units_to_score, lower_bound_start_time, upper_bound_stop_time) _elapsed = datetime.utcnow() - _start_search_timestamp if best_scored_scheduling_unit: logger.info("find_best_next_schedulable_unit: best_scored_scheduling_unit id=%s name='%s' start='%s' between '%s' and '%s' (took %.2fs)", best_scored_scheduling_unit.scheduling_unit.id, best_scored_scheduling_unit.scheduling_unit.name, best_scored_scheduling_unit.start_time, lower_bound_start_time, upper_bound_stop_time, _elapsed.total_seconds()) else: logger.info("find_best_next_schedulable_unit: could NOT find a best_scored_scheduling_unit between '%s' and '%s' (took %.2fs)", lower_bound_start_time, upper_bound_stop_time, _elapsed.total_seconds()) return best_scored_scheduling_unit def schedule_next_scheduling_unit(self) -> models.SchedulingUnitBlueprint: '''find the best next schedulable scheduling unit and try to schedule it. Overlapping existing scheduled units are unscheduled if their score is lower. :return: the scheduled scheduling unit.''' # prepare queries for subsets of schedulable_units (django uses lazy evaluation, so don't worry about wasted queries) schedulable_units_triggered = get_triggered_schedulable_scheduling_units() schedulable_units_queue_A = get_dynamically_schedulable_scheduling_units(priority_queue=models.PriorityQueueType.objects.get(value=models.PriorityQueueType.Choices.A.value)) schedulable_units_queue_B = get_dynamically_schedulable_scheduling_units(priority_queue=models.PriorityQueueType.objects.get(value=models.PriorityQueueType.Choices.B.value)) # We schedule in an absolute order: first triggered, then dynamic queue A, then queue B # if a scheduling unit can be scheduled, then we exit early. for schedulable_units in (schedulable_units_triggered, schedulable_units_queue_A, schedulable_units_queue_B): if not schedulable_units.exists(): continue # continue with the next in order schedulable_units # convert queryset to list (which fetches them from db) schedulable_units = list(schedulable_units) # --- core routine --- # search in a forward sliding window for the best scheduling_unit that can be scheduled # once found and scheduled, exit. Otherwise slide window forward and try again. # When scheduling 'just in time' we need to allow the other services/controllers/stations/boards some startup time: DEFAULT_NEXT_STARTTIME_GAP lower_bound_start_time = max(datetime.utcnow() + DEFAULT_NEXT_STARTTIME_GAP, min([su.earliest_possible_start_time for su in schedulable_units])) upper_bound_stop_time = max(lower_bound_start_time + timedelta(days=7), max([su.latest_possible_start_time for su in schedulable_units])) while lower_bound_start_time < upper_bound_stop_time: self._raise_if_triggered() # interrupts the scheduling loop for a next round try: # no need to irritate user in log files with sub-second scheduling precision lower_bound_start_time = round_to_second_precision(lower_bound_start_time) # our sliding window only looks 12 hours ahead window_upper_bound_stop_time = round_to_second_precision(lower_bound_start_time + timedelta(hours=12)) # try to find the best next scheduling_unit logger.info("schedule_next_scheduling_unit: searching for best scheduling unit to schedule between '%s' and '%s'", lower_bound_start_time, window_upper_bound_stop_time) best_scored_scheduling_unit = self.find_best_next_schedulable_unit(schedulable_units, lower_bound_start_time, window_upper_bound_stop_time) if best_scored_scheduling_unit: best_scheduling_unit = best_scored_scheduling_unit.scheduling_unit best_scheduling_unit_score = best_scored_scheduling_unit.weighted_score best_start_time = best_scored_scheduling_unit.start_time # make sure we don't start earlier than allowed assert(best_start_time >= lower_bound_start_time) # make start_time "look nice" for us humans best_start_time = round_to_second_precision(best_start_time) logger.info("schedule_next_scheduling_unit: found best candidate id=%s '%s' weighted_score=%s start_time=%s interrupts_telescope=%s", best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time, best_scheduling_unit.interrupts_telescope) # cancel and/or unschedule current units-in-the-way... # (only if possible, depending on priorities and other rules...) cancel_running_observation_if_needed_and_possible(best_scored_scheduling_unit) unschededule_blocking_scheduled_units_if_needed_and_possible(best_scored_scheduling_unit) # are there any blocking scheduled scheduling_units still in the way? blocking_scheduling_units = get_scheduled_scheduling_units(lower_bound=best_scored_scheduling_unit.start_time - DEFAULT_INTER_OBSERVATION_GAP, upper_bound=best_scored_scheduling_unit.start_time + best_scored_scheduling_unit.scheduling_unit.specified_main_observation_duration + DEFAULT_INTER_OBSERVATION_GAP) if blocking_scheduling_units.exists(): logger.info("schedule_next_scheduling_unit: cannot schedule best candidate id=%s '%s' score=%s start_time=%s interrupts_telescope=%s because there are %d other units blocking it", best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time, best_scheduling_unit.interrupts_telescope, blocking_scheduling_units.count()) # remove this best_scheduling_unit for this round of dynamic scheduling. # better luck next time. schedulable_units.remove(best_scheduling_unit) else: with transaction.atomic(): # we made room for our candidate, now try schedule it # there may still be uncancelled-running observations in the way -> SubtaskSchedulingException scheduled_scheduling_unit = schedule_independent_subtasks_in_scheduling_unit_blueprint(best_scheduling_unit, start_time=best_start_time) logger.info("schedule_next_scheduling_unit: scheduled best candidate id=%s '%s' score=%s start_time=%s interrupts_telescope=%s", best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time, best_scheduling_unit.interrupts_telescope) # we want the dynamic scheduler to schedule at most 1 unit at the time # so, unschedule any lingering dynamically scheduled unit other than the just scheduled_scheduling_unit for scheduled_scheduling_unit in get_scheduled_scheduling_units().filter(scheduling_constraints_doc__scheduler='dynamic').exclude(id=scheduled_scheduling_unit.id).all(): unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) # return the scheduled scheduling_unit, early exit out of looping over priority queues. return scheduled_scheduling_unit except SubtaskSchedulingException as e: logger.error("Could not schedule scheduling_unit id=%s name='%s'. Error: %s", best_scheduling_unit.id, best_scheduling_unit.name, e) # prevent that it keeps trying to schedule this failed unit in this scheduler-round schedulable_units.remove(best_scheduling_unit) # check if the best_scheduling_unit is unschedulable in this window, but can run later... best_scheduling_unit.refresh_from_db() if best_scheduling_unit.interrupts_telescope: # triggered observations cannot run later. mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(best_scheduling_unit, str(e)) return None elif best_scheduling_unit.status.value == models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value: if can_run_after(best_scheduling_unit, best_start_time) and not best_scheduling_unit.interrupts_telescope: logger.info("Unschedulable scheduling_unit id=%s can run later than '%s'. Marking it as schedulable again...", best_scheduling_unit.id, best_start_time) # yep, can run later, so mark it as schedulable again, and let it be handled in a new scheduler-round mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable(best_scheduling_unit) # nothing was found, or an error occurred. # it may be that in the mean time some scheduling_units are not (dynamically) schedulable anymore, filter those out. for su in schedulable_units: # filter out units which cannot run at all after the (progressing/advancing) lower_bound_start_time # mark them unschedulable if not can_run_after(su, lower_bound_start_time): mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(su, "cannot run after %s" % (lower_bound_start_time,)) su.refresh_from_db() # all units are refreshed and either schedulable or unschedulable. # refresh list of schedulable_units to be considered in next round (only schedulable) schedulable_units = [su for su in schedulable_units if su.status.value==models.SchedulingUnitStatus.Choices.SCHEDULABLE.value] lower_bound_start_time = get_min_earliest_possible_start_time(schedulable_units, lower_bound_start_time+timedelta(hours=1), self._raise_if_triggered) if lower_bound_start_time is None: break # search again... (while loop) with the remaining schedulable_units and new lower_bound_start_time return None def assign_start_stop_times_to_schedulable_scheduling_units(self, lower_bound_start_time: datetime): '''''' logger.info("Estimating mid-term schedule with lower_bound_start_time=%s ..." % lower_bound_start_time) for priority_queue in models.PriorityQueueType.objects.order_by('value').all(): scheduling_units = list(get_dynamically_schedulable_scheduling_units(priority_queue=priority_queue)) if len(scheduling_units) == 0: logger.info("No scheduling units found...") return upper_bound_stop_time = max(lower_bound_start_time + timedelta(days=31), max([su.latest_possible_start_time for su in scheduling_units])) # update the start_times of the remaining ones (so they form queue, and can be visualized in a timeline) while scheduling_units and lower_bound_start_time < upper_bound_stop_time: # our sliding window only looks 12 hours ahead window_upper_bound_stop_time = round_to_second_precision(lower_bound_start_time + timedelta(hours=12)) best_scored_scheduling_unit = self.find_best_next_schedulable_unit(scheduling_units, lower_bound_start_time, window_upper_bound_stop_time) if best_scored_scheduling_unit: scheduling_unit = best_scored_scheduling_unit.scheduling_unit start_time = round_to_second_precision(best_scored_scheduling_unit.start_time) logger.info("mid-term schedule: next scheduling unit id=%s '%s' start_time=%s", scheduling_unit.id, scheduling_unit.name, start_time) update_subtasks_start_times_for_scheduling_unit(scheduling_unit, start_time) # keep track of the lower_bound_start_time based on last sub.on_sky_stop_time and gap lower_bound_start_time = scheduling_unit.on_sky_stop_time + DEFAULT_INTER_OBSERVATION_GAP scheduling_units.remove(scheduling_unit) else: # search again in a later timeslot min_earliest_possible_start_time = get_min_earliest_possible_start_time(scheduling_units, lower_bound_start_time+timedelta(minutes=60)) logger.info("lower_bound_start_time='%s', min_earliest_possible_start_time='%s'", lower_bound_start_time, min_earliest_possible_start_time) if min_earliest_possible_start_time is not None and min_earliest_possible_start_time > lower_bound_start_time: lower_bound_start_time = min_earliest_possible_start_time else: # cannot advance anymore to find more logger.warning("Cannot assign start/stop times to remaining scheduling units for mid-term schedule...") for su in scheduling_units: logger.warning("Remaining scheduling unit: id=%s '%s'", su.id, su.name) mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(su, "cannot find any timeslot where all constraints are met") break logger.info("Estimating mid-term schedule... finished") ################## core dynamic scheduling methods ################################################ # # # This module starts with the core dynamic scheduling methods which are used in the dynamic # # scheduling service. These high level methods only filter/score/sort in a generic way. # # The detailed concrete filter/score/sort methods are pick by a strategy pattern in the # # constraints package based on each scheduling unit's scheduling_constraints template. # # # ################################################################################################### def mark_scheduling_units_for_inactive_projects_unschedulable(projects: [str]=None): scheduling_units = models.SchedulingUnitBlueprint.objects.filter(status__value__in=(models.SchedulingUnitStatus.Choices.SCHEDULABLE.value, models.SchedulingUnitStatus.Choices.SCHEDULED.value)) maybe_unschedulable_units = scheduling_units.exclude(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value) if projects: maybe_unschedulable_units = maybe_unschedulable_units.filter(draft__scheduling_set__project__name__in=projects) if maybe_unschedulable_units.exists(): logger.info("marking %s scheduled unit(s) as unschedulable for inactive project(s)%s", maybe_unschedulable_units.count(), (' ' + ', '.join(p for p in projects)) if projects else '') for scheduling_unit in maybe_unschedulable_units: logger.info("marking unit id=%s status= %s project=%s pstate=%s", scheduling_unit.id, scheduling_unit.status, scheduling_unit.project.name, scheduling_unit.project.project_state.value) with transaction.atomic(): mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit, reason="project is not active") start_time = get_earliest_possible_start_time(scheduling_unit) set_scheduling_unit_blueprint_start_times(scheduling_unit, first_start_time=start_time) def mark_unschedulable_scheduling_units_for_active_projects_schedulable(projects: [str]=None): ''' mark all scheduling_units which are currently unschedulable (for the given project(s), or all projects if None) as schedulable again. This means that these scheduling_units become schedulable again, and can thus be re-evaluated, resulting in either a scheduled or again unschedulable unit. ''' unschedulable_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value) maybe_schedulable_units = unschedulable_units.filter(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value) if projects: maybe_schedulable_units = maybe_schedulable_units.filter(draft__scheduling_set__project__name__in=projects) if maybe_schedulable_units.exists(): logger.info("marking %s unschedulable units schedulable for active project(s)%s if stations available...", maybe_schedulable_units.count(), (' ' + ', '.join(p for p in projects)) if projects else '') for scheduling_unit in maybe_schedulable_units.all(): try: if can_run_within_station_reservations(scheduling_unit): logger.info("marking scheduling unit id=%s as schedulable", scheduling_unit.id) mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable(scheduling_unit) else: logger.info("scheduling unit id=%s cannot run with current reservations and is still unschedulable", scheduling_unit.id) except Exception as e: logger.exception(e) def mark_scheduling_units_blocked_by_reservations_unschedulable(): schedulable_units = get_fixed_time_schedulable_scheduling_units().all() schedulable_units = [su for su in schedulable_units if not can_run_within_station_reservations(su)] if schedulable_units: logger.info("marking %s schedulable units unschedulable because they are blocked by a reservation", len(schedulable_units)) for scheduling_unit in schedulable_units: try: mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit, reason="blocked by reservation(s)") except Exception as e: logger.exception(e) ################## service/messagebug handler class ############################################### class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): ''' The TMSSDynamicSchedulingMessageHandler reacts to TMSS EventMessages by triggering a new full update of the dynamic schedule. The actual schedule-update method runs on a backround thread, and can take some time to complete ranging from a few seconds to several minutes. In the mean time new EventMessages may be received. These are handled by raising a flag that signals the schedule-update-thread that a new full update is needed. This way, a burst of Events results in a single update, and it also ensures that we always compute the schedule with the latest data. ''' def __init__(self): super().__init__(log_event_messages=True) self.scheduler = Scheduler() def start_handling(self): self.scheduler.start_scheduling() super().start_handling() def stop_handling(self): self.scheduler.stop_scheduling() super().stop_handling() def onSchedulingUnitBlueprintCreated(self, id: int): '''prepare the new scheduling_unit for scheduling. Set unschedulable if project not active.''' scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=id) # mark unschedulable if project not active if scheduling_unit.project.project_state.value != models.ProjectState.Choices.ACTIVE.value: mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit, reason="project is not active") # trigger next schedule computation self.scheduler.trigger() def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): if status in [models.SchedulingUnitStatus.Choices.SCHEDULABLE.value, models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value, models.SchedulingUnitStatus.Choices.OBSERVED, models.SchedulingUnitStatus.Choices.FINISHED.value, models.SchedulingUnitStatus.Choices.CANCELLED.value]: logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s): triggering update of dynamic & fixed_time schedule...", id, status) # scheduling takes a long time, longer then creating many scheduling units in bulk # so, we do not create a complete new schedule for each new unit, # but we only trigger a new schedule update. # This way we are sure that the latest units are always taken into account while scheduling, but we do not waste cpu cylces. self.scheduler.trigger() def onSchedulingUnitBlueprintConstraintsUpdated(self, id: int, scheduling_constraints_doc: dict): self.onSchedulingUnitBlueprintConstraintsRankOrQueueUpdated(id) def onSchedulingUnitBlueprintRankUpdated(self, id: int, rank: float): self.onSchedulingUnitBlueprintConstraintsRankOrQueueUpdated(id) def onSchedulingUnitBlueprintPriorityQueueUpdated(self, id: int, priority_queue: str): self.onSchedulingUnitBlueprintConstraintsRankOrQueueUpdated(id) def onSchedulingUnitBlueprintConstraintsRankOrQueueUpdated(self, id: int): scheduling_unit_blueprint = models.SchedulingUnitBlueprint.objects.get(id=id) if scheduling_unit_blueprint.status.value == models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value: logger.info("constraints/queue/priority for unschedulable scheduling unit id=%s changed: setting status to schedulable which will triggering a dynamic scheduling update...", id) mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable(scheduling_unit_blueprint) elif scheduling_unit_blueprint.status.value == models.SchedulingUnitStatus.Choices.SCHEDULED.value: logger.info("constraints/queue/priority for scheduled scheduling unit id=%s changed: unscheduling it, which will triggering a dynamic scheduling update...", id) unschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint) elif scheduling_unit_blueprint.status.value == models.SchedulingUnitStatus.Choices.SCHEDULABLE.value: logger.info("constraints/queue/priority for schedulable scheduling unit id=%s changed: triggering a dynamic scheduling update...", id) self.scheduler.trigger() def onSubTaskStatusChanged(self, id: int, status: str): if status in (models.SubtaskState.Choices.DEFINED.value, models.SubtaskState.Choices.SCHEDULED.value): # this subtask is either: # - new, # - or is now defined after being scheduled/unschedulable, # - or is now scheduled (at a new time windown) after being defined # check if there are any overlapping unschedulable subtasks, and mark these as defined. # This will result in a status update event, on which the fixed_time scheduling will be triggered. # Summary: this subtask may have moved out of the way, as a result consider unschedulable overlapping units. subtask = models.Subtask.objects.get(id=id) affected_other_subtasks = models.Subtask.independent_subtasks() \ .filter(state__value=models.SubtaskState.Choices.UNSCHEDULABLE.value) \ .filter(scheduled_start_time__lte=subtask.scheduled_stop_time) \ .filter(scheduled_stop_time__gte=subtask.scheduled_start_time) \ .filter(task_blueprint__draft__scheduling_unit_draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value) \ .exclude(id=subtask.id).all() for affected_subtask in affected_other_subtasks: mark_subtasks_and_successors_as_defined(affected_subtask) def onSettingUpdated(self, name: str, value: bool): if name in (models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value, models.SystemSettingFlag.Choices.FIXED_TIME_SCHEDULING_ENABLED.value) and value: logger.info("%s was set to %s: triggering update of schedule...", name, value) self.scheduler.trigger() def onSchedulingConstraintsWeightFactorUpdated(self, id: int): weight_factor = models.SchedulingConstraintsWeightFactor.objects.get(id=id) logger.info("weight_factor %s for template %s version %s changed to %s: triggering update of dynamic schedule...", weight_factor.constraint_name, weight_factor.scheduling_constraints_template.name, weight_factor.scheduling_constraints_template.version, weight_factor.weight) self.scheduler.trigger() def onProjectStatusUpdated(self, name: str, status: str): logger.info("project '%s' status changed to %s", name, status) if status == models.ProjectState.Choices.ACTIVE.value: mark_unschedulable_scheduling_units_for_active_projects_schedulable(projects=[name]) else: mark_scheduling_units_for_inactive_projects_unschedulable(projects=[name]) self.scheduler.trigger() def onProjectRankUpdated(self, name: str, rank: float): logger.info("project '%s' rank changed to %s", name, rank) self.scheduler.trigger() def onReservationCreated(self, id: int): self._onReservationCreatedOrUpdated(id) def onReservationUpdated(self, id: int): self._onReservationCreatedOrUpdated(id) def _onReservationCreatedOrUpdated(self, id: int): reservation = models.Reservation.objects.get(id=id) logger.info("reservation id=%s '%s' was created/updated. start_time='%s' stop_time='%s'. checking/updating (un)schedulablity... as a result a new scheduling round might be triggered if needed.", id, reservation.name or '<no_name>', reservation.start_time, reservation.stop_time) # check all unschedulable units, and see if they are not blocked (anymore) unschedulable_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value) for scheduling_unit in unschedulable_units.all(): try: if can_run_within_station_reservations(scheduling_unit): logger.info("scheduling_unit id=%s can run with the current reservations, marking it schedulable.", scheduling_unit.id) mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable(scheduling_unit) except Exception as e: logger.exception(e) # check all scheduled units, and see if they are effected by this new reservation(window) scheduled_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULED.value) scheduled_units = scheduled_units.filter(scheduled_stop_time__gte=reservation.start_time) if reservation.stop_time is not None: scheduled_units = scheduled_units.filter(scheduled_start_time__lt=reservation.stop_time) for scheduling_unit in scheduled_units: try: if not can_run_within_station_reservations(scheduling_unit): logger.info("scheduled scheduling_unit id=%s can not run with the current reservations, marking it unschedulable.", scheduling_unit.id) # if the unit is scheduled, the method below unschedules it first mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit, reason="blocked by reservation(s)") else: # re-schedule the scheduling unit so that the now reserved stations are not included anymore logger.info("re-scheduling scheduling_unit id=%s so station reservations are taken into account", scheduling_unit.id) reschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit) except Exception as e: logger.exception(e) def onReservationDeleted(self, id: int): # maybe some unschedulable/blocked units can use the spot that was used by the reservation # mark them all schedulable, and do a scheduling round to see which ones can be scheduled logger.info("reservation id=%s was deleted. checking/updating (un)schedulablity... as a result a new scheduling round might be triggered if needed.", id) mark_unschedulable_scheduling_units_for_active_projects_schedulable() def create_dynamic_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER): return TMSSBusListener(handler_type=TMSSDynamicSchedulingMessageHandler, handler_kwargs=None, num_threads=1, exchange=exchange, broker=broker) ################## helper methods ################################################################# def get_dynamically_schedulable_scheduling_units(priority_queue: models.PriorityQueueType=None) -> QuerySet: '''get a list of all dynamically schedulable scheduling_units for the given priority_queue (or all priority_queues if None)''' scheduling_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULABLE.value) scheduling_units = scheduling_units.filter(scheduling_constraints_doc__scheduler='dynamic') if priority_queue is not None: scheduling_units = scheduling_units.filter(priority_queue=priority_queue) return scheduling_units def get_fixed_time_schedulable_scheduling_units() -> QuerySet: '''get a result QuerySet of all fixed_time schedulable scheduling_units''' scheduling_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULABLE.value) scheduling_units = scheduling_units.filter(scheduling_constraints_doc__scheduler='fixed_time') scheduling_units = scheduling_units.order_by('-updated_at') return scheduling_units def get_scheduled_scheduling_units(lower_bound: datetime=None, upper_bound: datetime=None) -> QuerySet: '''get a queryset of all scheduled scheduling_units which fall within the given [lower_bound, upper_bound) window (if not None)''' scheduled_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULED.value) if lower_bound is not None: scheduled_units = scheduled_units.filter(scheduled_stop_time__gte=lower_bound) if upper_bound is not None: scheduled_units = scheduled_units.filter(scheduled_start_time__lt=upper_bound) return scheduled_units def get_triggered_schedulable_scheduling_units() -> QuerySet: '''get a list of all trigger dynamically and fixed_time schedulable scheduling_units''' scheduling_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULABLE.value) scheduling_units = scheduling_units.filter(interrupts_telescope=True) return scheduling_units def get_running_observation_subtasks(stopping_after:datetime=None) -> [models.Subtask]: '''get a list of all starting/started subtasks, optionally filter for those finishing after the provided time''' running_obs_subtasks = models.Subtask.objects.filter(state__value__in=[models.SubtaskState.Choices.STARTING.value, models.SubtaskState.Choices.STARTED.value], specifications_template__type__value=models.SubtaskType.Choices.OBSERVATION.value) if stopping_after is not None: running_obs_subtasks = running_obs_subtasks.filter(scheduled_stop_time__gte=stopping_after) return list(running_obs_subtasks.all()) def unschededule_blocking_scheduled_units_if_needed_and_possible(candidate: ScoredSchedulingUnit): '''check if there are any already scheduled units in the way, and unschedule them if allowed. ''' # check any previously scheduled units, and unschedule if needed/allowed scheduled_scheduling_units = list(get_scheduled_scheduling_units(lower_bound=candidate.start_time - DEFAULT_INTER_OBSERVATION_GAP, upper_bound=candidate.start_time + candidate.scheduling_unit.specified_main_observation_duration + DEFAULT_INTER_OBSERVATION_GAP)) if not scheduled_scheduling_units: logger.debug('no scheduled scheduling_units are blocking candidate scheduling_unit id=%s name=%s, nothing needs to be unscheduled', candidate.scheduling_unit.id, candidate.scheduling_unit.name) return # compute weighted scores for all scheduled- and the candidate scheduling units # because the weighted scores are also normalized over the given list of units. # So, to make a fair comparison if the candidate is 'better', it has to be scored in the same set. scored_scheduling_units = compute_individual_and_weighted_scores(scheduled_scheduling_units + [candidate.scheduling_unit], candidate.start_time, candidate.start_time + candidate.scheduling_unit.specified_main_observation_duration) # seperate the re-scored candidate again from the re-scored scheduled scheduling_units rescored_candidate = [x for x in scored_scheduling_units if x.scheduling_unit.id == candidate.scheduling_unit.id][0] rescored_scheduled_units = [x for x in scored_scheduling_units if x.scheduling_unit.id != candidate.scheduling_unit.id] # check if we can and need to unschedule the blocking units for rescored_scheduled_unit in rescored_scheduled_units: scheduled_scheduling_unit = rescored_scheduled_unit.scheduling_unit logger.info("checking if scheduled scheduling_unit id=%s '%s' start_time='%s' weighted_score=%s which is blocking candidate id=%s '%s' start_time='%s' weighted_score=%s can be unscheduled", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.scheduled_start_time, rescored_scheduled_unit.weighted_score, rescored_candidate.scheduling_unit.id, rescored_candidate.scheduling_unit.name, rescored_candidate.start_time, rescored_candidate.weighted_score) # in case of a triggered candidate with higher trigger priority than the scheduled unit, we don't care about # scores, but only check trigger priority. if candidate.scheduling_unit.interrupts_telescope: if (not scheduled_scheduling_unit.interrupts_telescope) or candidate.scheduling_unit.project.trigger_priority > scheduled_scheduling_unit.project.trigger_priority: logger.info("unscheduling id=%s '%s' because it is in the way and has a lower trigger_priority=%s than the best candidate id=%s '%s' trigger_priority=%s start_time=%s", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.project.trigger_priority, candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.scheduling_unit.project.trigger_priority, candidate.scheduling_unit.scheduled_start_time) unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) # compare the re-scored weighted_scores elif rescored_candidate.weighted_score > rescored_scheduled_unit.weighted_score: logger.info("unscheduling id=%s '%s' because it is in the way and has a lower score than the best candidate id=%s '%s' score=%s start_time=%s", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.weighted_score, candidate.scheduling_unit.scheduled_start_time) unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) else: logger.info("scheduling_unit id=%s '%s' start_time='%s' will not be unscheduled and keep blocking candidate id=%s '%s' start_time='%s'", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.scheduled_start_time, candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.start_time) def cancel_running_observation_if_needed_and_possible(candidate: ScoredSchedulingUnit): '''check if there are starting/started observation subtasks that block the candidate. Only triggered scheduling_units can cancel running observations and only if they belong to a project with a higher trigger_priority than the projects of the subtasks to cancel''' # todo: is it sufficient to cancel the subtasks, or do we cancel the whole scheduling unit? if candidate.scheduling_unit.interrupts_telescope: running_obs_subtasks = get_running_observation_subtasks(candidate.start_time) for obs in running_obs_subtasks: if obs.project is None: logger.warning('cannot cancel running subtask pk=%s for triggered scheduling_unit pk=%s because it does belong to a project and hence has unknown priority' % (obs.pk, candidate.scheduling_unit.name)) continue if candidate.scheduling_unit.project.trigger_priority > obs.project.trigger_priority: logger.info('cancelling observation subtask pk=%s trigger_priority=%s because it blocks the triggered scheduling_unit pk=%s trigger_priority=%s' % (obs.pk, obs.project.trigger_priority, candidate.scheduling_unit.pk, candidate.scheduling_unit.project.trigger_priority)) # todo: check if cancellation is really necessary or the trigger can be scheduled afterwards # I guess we could just do can_run_after(candidate, obs.scheduled_stop_time) here for that? # We could also only do this, of there is a 'before' constraint on each trigger. # -> Clarify and implemented with TMSS-704. cancel_subtask(obs) else: logger.info('NOT cancelling subtask pk=%s trigger_priority=%s for triggered scheduling_unit pk=%s trigger_priority=%s because its priority is too low' % (obs.pk, obs.project.trigger_priority, candidate.scheduling_unit.pk, candidate.scheduling_unit.project.trigger_priority))