#!/usr/bin/env python3 # dynamic_scheduling.py # # Copyright (C) 2020 # ASTRON (Netherlands Institute for Radio Astronomy) # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it # and/or modify it under the terms of the GNU General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # # $Id: $ """ This dynamic_scheduling module contains all code for dynamic scheduling many scheduling_units in "the most optimal way". We have three goals when computing an obsering schedule for the telescope: 1. Observe each target under the best conditions, resulting in the best scientific output. 2. Use the precious telescope time to the fullest, thus minimizing the gaps in between observations. 3. Be able to quickly act when conditions change or when a new (triggered) scheduling_unit is added. The core scheduling methods are defined in the tasks.py and subtasks.py modules. These just schedule/unschedule/cancel individual scheduling_units/tasks/subtasks. This module uses these core methods, but should be seen as a layer on top which creates the complex behaviour of scheduling a big list of candidate scheduling_units to create a filled schedule meeting the 3 goals above. To achieve goal 1, each scheduling_unit has a set of constraints, which encode what are the best conditions to observe that scheduling_unit. Users are responsible to specify a proper balance between reasonable and tight enough constraints. To tweak the order of scheduling_units in the schedule even further, users can set project- and/or individual scheduling_unit priorities. Goals 1 & 2 are met in the Scheduler class which: - schedules all 'fixed_time' scheduling_units (if possible, else they are annotated as unschedulable) - then finds the "best" scheduling_unit to be scheduled next ("best" means: the best weighted score given each scheduling_unit's constraints, unit and project priority and gap) - then fills the rest of the schedule by moving forward in time, considering all remaining candidate scheduling_units, picking the "best" and positioning it there. That is repeated until no candidates remain, resuling in a long term schedule that is likely to happen if and only if no conditions/constraints/projects/etc change. Each change in any condition/constraint/priority/etc can result in a totally different schedule! That is ok, and exactly the goal. Goal 3 is met using a TMSSBuslistener and TMSSDynamicSchedulingMessageHandler which reacts to the relevant created/updated/deleted events by updating (un)schedulable statuses of the affected scheduling_units, and then triggering a new computation of a full schedule in the Scheduler. """ import astropy.coordinates from datetime import datetime, timedelta, time from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.tmssapp.tasks import schedule_independent_subtasks_in_scheduling_unit_blueprint, unschedule_subtasks_in_scheduling_unit_blueprint, mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable, mark_subtasks_in_scheduling_unit_blueprint_as_schedulable, mark_subtasks_in_scheduling_unit_blueprint_as_schedulable, set_scheduling_unit_blueprint_start_times, reschedule_subtasks_in_scheduling_unit_blueprint, get_gaps_to_previous_and_next_observations_in_scheduling_unit, cancel_scheduling_unit_blueprint from lofar.sas.tmss.tmss.tmssapp.subtasks import update_subtasks_start_times_for_scheduling_unit, mark_subtasks_and_successors_as_defined, get_gaps_between_scheduled_units_in_window, get_used_stations_in_timewindow from lofar.sas.tmss.client.tmssbuslistener import * from lofar.common.datetimeutils import round_to_second_precision from threading import Thread, Event from django.db import transaction from django.db.models import Max from lofar.sas.tmss.tmss.exceptions import SchedulerInterruptedException from lofar.sas.tmss.services.scheduling.constraints import * import logging logger = logging.getLogger(__name__) from operator import attrgetter # LOFAR needs to have a gap in between observations to (re)initialize hardware. DEFAULT_NEXT_STARTTIME_GAP = timedelta(seconds=180) from lofar.sas.tmss.tmss.tmssapp.subtasks import DEFAULT_INTER_OBSERVATION_GAP class Scheduler: """ The Scheduler class considers candidate schedulable scheduling_units, and tries to schedule them or mark them as unschedulable. See also the goals describe above in the module docstring. The core algorithm is as follows: - The is a continuous scheduling loop that waits in idle state for something to schedule - When some external code calls the `trigger` method, the loop exits the waiting idle state and starts a scheduling run. Running scheduling-runs are interrupted when some external code calls the `trigger` method, leaving a half-finished schedule which is then completed/overwritten be the next full scheduling run which has just been triggered. - In a single full scheduling run `do_full_schedule_computation` we: - schedule all fixed_time scheduling units - schedule the best next dynamic scheduling unit - position the rest of the dynamic scheduling units at their most likely timestamp where they will be observed. """ def __init__(self) -> None: self._scheduling_thread = None self._scheduling_thread_running = False self._do_schedule_event = Event() self.search_gridder = Gridder(grid_minutes=1*60) self.fine_gridder = Gridder(grid_minutes=1) super().__init__() # make sure initial status is idle models.Subsystem.Activator('scheduler').deactivate() def start_scheduling(self): '''Prepares the scheduling_units, performs one full schedule computation, and start the scheduling loop and wait for a trigger.''' # check and update if needed all scheduling_units statuses for schedulability. try: self._initialize_statuses() except Exception as e: # catch all errors and carry on, so we do end up with a running scheduling loop logger.error("Could not initialize (un)schedulable statuses at startup: %s", e) # start the background thread which waits until the _do_schedule_event event is set # upon receiving to the correct TMSS EVentMessages. logger.debug("Scheduler starting scheduling thread...") self._scheduling_thread_running = True self._scheduling_thread = Thread(target=self._run_scheduling_loop) self._scheduling_thread.daemon = True self._scheduling_thread.start() logger.debug("Scheduler scheduling thread was fully started and initialized") # start with at least one scheduling round, trigger it, and let the just started scheduling_loop do it's thing self.trigger() def stop_scheduling(self): '''Stop the scheduling loop and stop processing scheduling triggers.''' self._scheduling_thread_running = False self._scheduling_thread.join() self._scheduling_thread = None def __enter__(self): '''Start scheduling in a 'with' context''' self.start_scheduling() return self def __exit__(self, exc_type, exc_val, exc_tb): '''Stop scheduling leaving the 'with' context''' self.stop_scheduling() def trigger(self): '''Interrupt idle waiting, or the current full schedule computation, and start a new full computation.''' logger.info("Scheduler triggering a new scheduling round...") self._do_schedule_event.set() @property def is_triggered(self) -> bool: return self._do_schedule_event.is_set() @property def is_scheduling(self) -> bool: '''is the scheduler active?''' return models.Subsystem.objects.get(name='scheduler').status.value==models.SubsystemStatus.Choices.ACTIVE.value @property def fixed_time_scheduling_enabled(self) -> bool: return models.Setting.objects.get(name=models.SystemSettingFlag.Choices.FIXED_TIME_SCHEDULING_ENABLED.value).value @property def dynamic_scheduling_enabled(self) -> bool: return models.Setting.objects.get(name=models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value def _raise_if_triggered(self): if self.is_triggered: raise SchedulerInterruptedException() def _initialize_statuses(self): '''upon startup, do one round over all scheduling units for all projects and reservations marking them as (un)schedulable so they can be picked up in the _scheduling_loop later''' logger.debug("preparing (un)schedulable scheduling units for all (in)active projects...") mark_unschedulable_scheduling_units_for_active_projects_schedulable() mark_scheduling_units_for_inactive_projects_unschedulable() logger.info("prepared (un)schedulable scheduling units for all (in)active projects") logger.debug("preparing (un)schedulable scheduling units for all reservations...") mark_scheduling_units_blocked_by_reservations_unschedulable() logger.info("prepared (un)schedulable scheduling units for all reservations") def _run_scheduling_loop(self): '''Continuously loop waiting for incoming triggers to start a full schedule computation''' logger.info("Scheduler running scheduling loop. Waiting for events...") while self._scheduling_thread_running: logger.debug("Scheduler waiting for trigger to compute new schedule...") if self._do_schedule_event.wait(timeout=10): logger.info("Scheduler was triggered to compute new schedule...") self._do_schedule_event.clear() try: self.do_full_schedule_computation() except SchedulerInterruptedException: # log the interruption, and continue with the next loop, cause _do_schedule_event was set logger.info("Scheduler was interrupted while computing a new full schedule") except Exception as e: # log and just continue processing events. better luck next time... logger.exception(str(e)) def do_full_schedule_computation(self) -> [models.SchedulingUnitBlueprint]: '''computes a full schedule for fixed_time, dynamic best next unit, and best estimate of rest of schedule given current conditions/constraints/priorities/etc''' logger.info("Scheduler starting full schedule computation...") scheduled_units = [] if self.fixed_time_scheduling_enabled: scheduled_units.extend(self.schedule_fixed_time_scheduling_units()) else: logger.info("Scheduler skipping update of fixed_time schedule because it is not enabled in the settings") if self.dynamic_scheduling_enabled: scheduled_units.extend(self.do_dynamic_schedule()) else: logger.info("Scheduler skipping update of dynamic schedule because it is not enabled in the settings") logger.info("Scheduler full schedule computation finished.") self.log_schedule(log_level=logging.INFO) return scheduled_units def schedule_fixed_time_scheduling_units(self) -> [models.SchedulingUnitBlueprint]: ''' Schedule all schedulable fixed_time scheduling units.''' scheduled_units = [] with models.Subsystem.Activator('scheduler'): # exclude units for inactive projects mark_scheduling_units_for_inactive_projects_unschedulable() # get the fixed_timely schedulable scheduling_units in most-recently-updated order. schedulable_units = get_fixed_time_schedulable_scheduling_units() # only consider active projects schedulable_units = schedulable_units.filter(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value) if schedulable_units.exists(): logger.info("scheduling %s unit(s) with fixed_time at constraint for active projects", schedulable_units.count()) for i, schedulable_unit in enumerate(schedulable_units, 1): self._raise_if_triggered() #interrupts the scheduling loop for a next round try: # first put the unit at it's requested 'at' timestamp. at_timestamp = get_at_constraint_timestamp(schedulable_unit) set_scheduling_unit_blueprint_start_times(schedulable_unit, first_start_time=at_timestamp) if not can_run_at(schedulable_unit, at_timestamp, self.fine_gridder): unschedulable_unit = determine_unschedulable_reason_and_mark_unschedulable_if_needed(schedulable_unit, at_timestamp, at_timestamp + schedulable_unit.specified_observation_duration, proposed_start_time=at_timestamp, gridder=self.search_gridder, raise_if_interruped=self._raise_if_triggered) logger.warning("Cannot schedule fixed_time unit [%s/%s] id=%d at '%s': %s", i, len(schedulable_units), unschedulable_unit.id, at_timestamp, unschedulable_unit.unschedulable_reason) continue scheduled_unit = self.try_schedule_unit(schedulable_unit, at_timestamp) if scheduled_unit: assert (scheduled_unit.status.value == models.SchedulingUnitStatus.Choices.SCHEDULED.value) logger.info("Scheduled fixed_time unit [%s/%s] id=%d at '%s'", i, len(schedulable_units), schedulable_unit.id, at_timestamp) scheduled_units.append(scheduled_unit) else: unschedulable_unit = determine_unschedulable_reason_and_mark_unschedulable_if_needed(schedulable_unit, at_timestamp, at_timestamp + schedulable_unit.specified_observation_duration, proposed_start_time=at_timestamp, gridder=self.search_gridder, raise_if_interruped=self._raise_if_triggered) logger.warning("Could not schedule fixed_time unit [%s/%s] id=%d at '%s': %s", i, len(schedulable_units), unschedulable_unit.id, at_timestamp, unschedulable_unit.unschedulable_reason) self.log_schedule(log_level=logging.DEBUG) except Exception as e: if isinstance(e, SchedulerInterruptedException): # Scheduler was interrupted, re-raise and let the scheduling loop handle it raise elif isinstance(e, SubtaskSchedulingException): logger.warning("Could not schedule fixed_time-scheduled scheduling unit id=%d: %s", schedulable_unit.id, e) else: logger.exception("Could not schedule fixed_time-scheduled scheduling unit id=%d: %s", schedulable_unit.id, e) unschedulable_unit = determine_unschedulable_reason_and_mark_unschedulable_if_needed(schedulable_unit, at_timestamp, at_timestamp + schedulable_unit.specified_observation_duration, proposed_start_time=at_timestamp, gridder=self.search_gridder, raise_if_interruped=self._raise_if_triggered) if unschedulable_unit.status.value != models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value: # final bail out... we could not determine_unschedulable_reason_and_mark_unschedulable_if_needed, # so just mark it unschedulable with the exception as reason mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(schedulable_unit, reason=str(e)) else: logger.info("there are no schedulable scheduling units with fixed_time at constraint for active projects to schedule") # A unit that has been scheduled in this run can subsequently get unscheduled again, in case it blocked a higher # priority unit. Mark those unschedulable and only return those that are in scheduled state after the whole run. for scheduled_unit in scheduled_units: scheduled_unit.refresh_from_db() if scheduled_unit.status.value != models.SchedulingUnitStatus.Choices.SCHEDULED.value: logger.warning("Fixed_time-scheduled scheduling unit id=%d has subsequently been unscheduled again. Marking it unschedulable.", scheduled_unit.id) unschedulable_unit = determine_unschedulable_reason_and_mark_unschedulable_if_needed(scheduled_unit, at_timestamp, at_timestamp + scheduled_unit.specified_observation_duration, proposed_start_time=at_timestamp, gridder=self.search_gridder, raise_if_interruped=self._raise_if_triggered) assert(unschedulable_unit.status.value == models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value) # what are the truly scheduled fixed time units? scheduled_units = [scheduled_unit for scheduled_unit in scheduled_units if scheduled_unit.status.value == models.SchedulingUnitStatus.Choices.SCHEDULED.value] # now try to place B-prio units in between the scheduled fixed-time units scheduled_units = sorted(scheduled_units, key=lambda su: su.scheduled_start_time) for scheduled_unit in list(scheduled_units): scheduled_B_units = self.schedule_B_priority_units_in_gaps_around_scheduling_unit(scheduled_unit) scheduled_units.extend(scheduled_B_units) # return all scheduled (fixedtime and B-prio) units return scheduled_units def do_dynamic_schedule(self) -> [models.SchedulingUnitBlueprint]: '''do a full update of the schedule: schedule next scheduling unit (and if possible, squeeze in B-prio units between now and the upcoming scheduled unit). Assign start stop times to remaining schedulable scheduling units. Returns a list of the scheduled one A-prio and 0-many B-prio units''' with models.Subsystem.Activator('scheduler'): logger.info("Updating (dynamic) schedule....") scheduled_units = [] lower_bound = round_to_second_precision(datetime.utcnow() + DEFAULT_NEXT_STARTTIME_GAP) upper_bound = lower_bound + timedelta(days=1) if get_dynamically_schedulable_scheduling_units().exists(): lower_bound = max(lower_bound, round_to_second_precision(min([su.earliest_possible_cycle_start_time for su in get_dynamically_schedulable_scheduling_units()]))) upper_bound = round_to_second_precision(max(upper_bound, max([su.latest_possible_cycle_start_time for su in get_dynamically_schedulable_scheduling_units()]))) while lower_bound < upper_bound: self._raise_if_triggered() # find and schedule the next best unit # ignore/exclude as candidates the unit(s) which are already scheduled in this round. # when new candidates are overlapping with already scheduled units, they are re-evaluated to see who wins. to_be_excluded_units = set(scheduled_units) - set(get_dynamically_schedulable_scheduling_units().all()) scheduled_unit = self.schedule_next_scheduling_unit(lower_bound, lower_bound + timedelta(hours=24), exclude_units=to_be_excluded_units) if scheduled_unit: scheduled_units.append(scheduled_unit) # see if we can fit any B-prio units in the new gap(s) in the schedule? scheduled_B_units = self.schedule_B_priority_units_in_gaps_around_scheduling_unit(scheduled_unit) scheduled_units.extend(scheduled_B_units) else: # nothing was scheduled in the window lower_bound+24h, so advance and see if anything fits # check for any overlapping units... blocking_units = models.SchedulingUnitBlueprint.objects.filter( obsolete_since__isnull=True).filter( status__value__in=(models.SchedulingUnitStatus.Choices.SCHEDULED.value, models.SchedulingUnitStatus.Choices.OBSERVING.value)).filter( on_sky_stop_time__gte=lower_bound).filter( on_sky_stop_time__lte=lower_bound + timedelta(hours=24)) if blocking_units.exists(): # advance beyond the blocking unit(s) max_blocking_stop_time = blocking_units.aggregate(Max('on_sky_stop_time'))['on_sky_stop_time__max'] lower_bound = max(max_blocking_stop_time + DEFAULT_INTER_OBSERVATION_GAP, lower_bound + timedelta(hours=1)) else: # just advance lower_bound += timedelta(hours=3) # for nice "visual" feedback to the user, move each "old" schedulable unit to the lower_bound # this also indicates to the user that the unit has been considered for times < lower_bound, and they could not be scheduled there. for unit in get_dynamically_schedulable_scheduling_units().filter(scheduled_start_time__lt=lower_bound).all(): update_subtasks_start_times_for_scheduling_unit(unit, lower_bound) # any units left to be scheduled? If so, loop again, else break out of while loop if not get_dynamically_schedulable_scheduling_units().exists(): # there are no more schedulable units. check to-be-re-evaluated scheduled future units. if not get_scheduled_scheduling_units(lower_bound, upper_bound, 'dynamic').exists(): # there are no more to-be-re-evaluated scheduled future units. break logger.info("Finished updating dynamic schedule") # loop over all remaining non-scheduled schedulable units, and make them unschedulable for the big search window (if they are unschedulable) # so they are ignored next time. # It's up to the user/operator to tweak their constraints which makes them schedulable again, for a next try. for su in get_dynamically_schedulable_scheduling_units().all(): determine_unschedulable_reason_and_mark_unschedulable_if_needed(su, datetime.utcnow(), su.latest_possible_cycle_start_time, proposed_start_time=None, gridder=self.search_gridder, raise_if_interruped=self._raise_if_triggered) return scheduled_units def find_best_next_schedulable_unit(self, scheduling_units:[models.SchedulingUnitBlueprint], lower_bound_start_time: datetime, upper_bound_stop_time: datetime) -> ScoredSchedulingUnit: """ from the given scheduling_units, find the best schedulable scheduling_unit which can run withing the given time window. - all constraints of the scheduling_units are taken into account - if one or more scheduling_units can run exclusively in this window and not later, then only those are considered. :param scheduling_units: evaluate these scheduling_units. :param lower_bound_start_time: evaluate the constraints at and after lower_bound_start_time. The returned unit has a start_time guaranteed at or after lower_bound_start_time. :param upper_bound_stop_time: evaluate the constraints before upper_bound_stop_time. The returned unit has a stop_time guaranteed before upper_bound_stop_time. Returns a ScoredSchedulingUnit struct with the best next schedulable scheduling unit and its proposed start_time where it best fits its contraints. """ if not scheduling_units: return None _start_search_timestamp = datetime.utcnow() # ensure upper is greater than or equal to lower upper_bound_stop_time = max(lower_bound_start_time, upper_bound_stop_time) # first, define a helper method... def _do_find_best_next_schedulable_unit(_units, _lower_bound, _upper_bound): '''inner helper method doing the actual find on a list of units of either A- or B-prio, with a window that cannot be devided further into smaller windows/gaps''' # check gap size: minimal obs duration of 1 minute, plus an inter_observation_gap at both sides if (_upper_bound - _lower_bound) <= 2 * DEFAULT_INTER_OBSERVATION_GAP + timedelta(minutes=1): # gap is too small logger.debug("find_best_next_schedulable_unit: skipping too small %dmin window where nothing fits ['%s', '%s']", (_upper_bound - _lower_bound).total_seconds() / 60.0, _lower_bound, _upper_bound) return None if len(_units)==0: logger.debug("find_best_next_schedulable_unit: no units to evaluate in window ['%s', '%s']",_lower_bound, _upper_bound) return None logger.info("find_best_next_schedulable_unit: evaluating constraints for %d %s-queue units in %dmin window ['%s', '%s']: %s", len(_units), _units[0].priority_queue.value, (_upper_bound-_lower_bound).total_seconds()/60.0, _lower_bound, _upper_bound, ','.join([str(su.id) for su in sorted(_units, key=lambda x: x.id)]) or 'None') # first, from all given scheduling_units, filter and consider only those that meet their time-constraints. filtered_scheduling_units = filter_scheduling_units_using_time_constraints(_units, _lower_bound, _upper_bound, self._raise_if_triggered) logger.info("find_best_next_schedulable_unit: %d units meeting time-constraints in %dmin window ['%s', '%s']: %s", len(filtered_scheduling_units), (_upper_bound-_lower_bound).total_seconds()/60.0, _lower_bound, _upper_bound, ','.join([str(su.id) for su in sorted(filtered_scheduling_units, key=lambda x: x.id)]) or 'None') # then, filter and consider only those that meet the rest of the constraints. filtered_scheduling_units = filter_scheduling_units_using_constraints(filtered_scheduling_units, _lower_bound, _upper_bound, self._raise_if_triggered, self.search_gridder) logger.info("find_best_next_schedulable_unit: %d units meeting constraints in %dmin window ['%s', '%s']: %s", len(filtered_scheduling_units), (_upper_bound-_lower_bound).total_seconds()/60.0, _lower_bound, _upper_bound, ','.join([str(su.id) for su in sorted(filtered_scheduling_units, key=lambda x: x.id)]) or 'None') if not filtered_scheduling_units: return None # then, check if there is a subset that can only run exclusively in this window and not later. exclusive_in_this_window_scheduling_units = filter_scheduling_units_which_can_only_run_in_this_window(filtered_scheduling_units, _lower_bound, _upper_bound, self._raise_if_triggered, gridder=self.search_gridder) if exclusive_in_this_window_scheduling_units: logger.info("find_best_next_schedulable_unit: units meeting constraints exclusively in window ['%s', '%s']: %s", _lower_bound, _upper_bound, ','.join([str(su.id) for su in sorted(exclusive_in_this_window_scheduling_units, key=lambda x: x.id)]) or 'None') # if there are some units that can only be scheduled exclusively in this window, # then consider only those. Else, just use all. units_to_score = exclusive_in_this_window_scheduling_units if exclusive_in_this_window_scheduling_units else filtered_scheduling_units if not units_to_score: return None logger.info("find_best_next_schedulable_unit: %d units to score in %dmin window ['%s', '%s']: %s", len(units_to_score), (_upper_bound-_lower_bound).total_seconds()/60.0, _lower_bound, _upper_bound, ','.join([str(su.id) for su in sorted(units_to_score, key=lambda x: x.id)]) or 'None') # from the filtered down list of units, compute the (weighted) scores, and return the best scoring one. return get_best_scored_scheduling_unit_scored_by_constraints(units_to_score, _lower_bound, _upper_bound, check_reservations=True, coarse_gridder=self.search_gridder, fine_gridder=self.fine_gridder, raise_if_interruped=self._raise_if_triggered) # split the list of units in B-prio and non_B-prio # try to find the best unit in the non_B first scheduling_units_non_B = sorted([su for su in scheduling_units if su.priority_queue.value != models.PriorityQueueType.Choices.B.value], key=attrgetter('id')) if scheduling_units_non_B: best_scored_scheduling_unit = _do_find_best_next_schedulable_unit(scheduling_units_non_B, lower_bound_start_time, upper_bound_stop_time) if best_scored_scheduling_unit: _elapsed = datetime.utcnow() - _start_search_timestamp logger.info("find_best_next_schedulable_unit: best_scored_scheduling_unit id=%s name='%s' start='%s' in %dmin window ['%s', '%s'] (took %.2fs)", best_scored_scheduling_unit.scheduling_unit.id, best_scored_scheduling_unit.scheduling_unit.name, best_scored_scheduling_unit.start_time, (upper_bound_stop_time-lower_bound_start_time).total_seconds()/60.0, lower_bound_start_time, upper_bound_stop_time, _elapsed.total_seconds()) return best_scored_scheduling_unit # ok, no best non-B unit was found. Try the B-units now. scheduling_units_B = sorted([su for su in scheduling_units if su.priority_queue.value == models.PriorityQueueType.Choices.B.value], key=attrgetter('id')) # because B-prio units cannot unschedule overlapping/blocking A/triggered units, we need to search for gaps between already scheduled units. # and determine the scheduling search windows, taking into account the gaps between A/triggered units for B, or just the full window for A/triggered # group the B-units by stations, because the (non-)overlapping stations determine to gap to existing scheduled units. used_stations = set(get_used_stations_in_timewindow(lower_bound_start_time, upper_bound_stop_time)) grouped_by_stations_scheduling_units_B = {} for unit_B in scheduling_units_B: stations = tuple(sorted(list(set(unit_B.main_observation_specified_stations)-used_stations))) if stations not in grouped_by_stations_scheduling_units_B: grouped_by_stations_scheduling_units_B[stations] = [unit_B] else: grouped_by_stations_scheduling_units_B[stations].append(unit_B) for stations, scheduling_units_B_group in grouped_by_stations_scheduling_units_B.items(): scheduling_windows_for_B = get_gaps_between_scheduled_units_in_window(lower_bound_start_time, upper_bound_stop_time, DEFAULT_INTER_OBSERVATION_GAP, stations) logger.info("find_best_next_schedulable_unit: gaps for %d B-prio units-with-stations=%s in window ['%s', '%s']: %s", len(scheduling_units_B_group), ','.join(stations), lower_bound_start_time, upper_bound_stop_time, ', '.join("['%s', '%s']"%(gap[0],gap[1]) for gap in scheduling_windows_for_B)) for window in scheduling_windows_for_B: best_scored_scheduling_unit = _do_find_best_next_schedulable_unit(scheduling_units_B_group, window[0], window[1]) if best_scored_scheduling_unit: _elapsed = datetime.utcnow() - _start_search_timestamp logger.info("find_best_next_schedulable_unit: best_scored_scheduling_unit id=%s name='%s' B start='%s' in subwindow ['%s', '%s'] of window ['%s', '%s'] (took %.2fs)", best_scored_scheduling_unit.scheduling_unit.id, best_scored_scheduling_unit.scheduling_unit.name, best_scored_scheduling_unit.start_time, window[0], window[1], lower_bound_start_time, upper_bound_stop_time, _elapsed.total_seconds()) return best_scored_scheduling_unit _elapsed = datetime.utcnow() - _start_search_timestamp logger.info("find_best_next_schedulable_unit: could NOT find a best_scored_scheduling_unit in %dmin window ['%s', '%s'] (took %.2fs)", (upper_bound_stop_time-lower_bound_start_time).total_seconds()/60.0, lower_bound_start_time, upper_bound_stop_time, _elapsed.total_seconds()) return None def schedule_next_scheduling_unit(self, lower_bound: datetime=None, upper_bound: datetime=None, exclude_units: []=None) -> models.SchedulingUnitBlueprint: '''find the best next upcoming schedulable scheduling unit and try to schedule it. Overlapping existing scheduled units are unscheduled if their score is lower. :return: the scheduled scheduling unit.''' queue_A = models.PriorityQueueType.objects.get(value=models.PriorityQueueType.Choices.A.value) queue_B = models.PriorityQueueType.objects.get(value=models.PriorityQueueType.Choices.B.value) # prepare queries for subsets of schedulable_units (django uses lazy evaluation, so don't worry about wasted queries) schedulable_units_triggered = get_triggered_schedulable_scheduling_units() schedulable_units_queue_A = get_dynamically_schedulable_scheduling_units(priority_queue=queue_A, include_triggered=False) schedulable_units_queue_B = get_dynamically_schedulable_scheduling_units(priority_queue=queue_B, include_triggered=False) # We schedule in an absolute order: first triggered, then dynamic queue A (and only when these are depleted, then queue B) # if a scheduling unit can be scheduled, then we exit early. for group_cntr, schedulable_units in enumerate([schedulable_units_triggered, schedulable_units_queue_A, schedulable_units_queue_B]): if group_cntr == 0: # just evaluate the triggered units candidate_units = list(schedulable_units) else: # make union over all schedulable and scheduled unit(s), # because the scheduled unit needs to be scored and evaluated as well priority_queue = schedulable_units[0].priority_queue if schedulable_units.first() else models.PriorityQueueType.objects.get(value=models.PriorityQueueType.Choices.A.value) # take the already scheduled scheduling_units along while seeking the best next unit for a fair new comparison. dynamicly_scheduled_scheduling_units = get_scheduled_scheduling_units(lower_bound, scheduler='dynamic', priority_queue=priority_queue) candidate_units = list(schedulable_units | dynamicly_scheduled_scheduling_units) # candidates do need to be in a cycle (for cycle start/stop bounds) candidate_units = [su for su in candidate_units if su.draft.scheduling_set.project.cycles.exists()] # remove to-be-excluded units exclude_units_ids = set([su.id for su in exclude_units or []]) candidate_units = [su for su in candidate_units if su.id not in exclude_units_ids] if len(candidate_units) == 0: continue # continue with the next in order schedulable_units candidate_queue = candidate_units[0].priority_queue logger.info("schedule_next_scheduling_unit: %s candidate_units %s-priority: %s", len(candidate_units), candidate_queue.value, ','.join([str(su.id) for su in sorted(candidate_units, key=lambda x: x.id)]) or 'None') # search in a forward sliding window for the best scheduling_unit that can be scheduled if lower_bound is None: lower_bound = datetime.utcnow() + DEFAULT_NEXT_STARTTIME_GAP if upper_bound is None: upper_bound = lower_bound + timedelta(hours=24) lower_bound_start_time = round_to_second_precision(lower_bound) upper_bound_stop_time = round_to_second_precision(upper_bound) window_lower_bound_start_time = lower_bound_start_time window_upper_bound_stop_time = min(window_lower_bound_start_time + timedelta(hours=12), upper_bound_stop_time-timedelta(seconds=1)) while window_upper_bound_stop_time < upper_bound_stop_time: self._raise_if_triggered() # interrupts the scheduling loop for a next round try: # the search window is mostly advanced/shifted at the upper_bound, see end of while loop # limit the window size to max 24h, because all observations are (way) less than 24h long window_lower_bound_start_time = max(window_lower_bound_start_time, window_upper_bound_stop_time - timedelta(hours=24)) # no need to irritate user in log files with sub-second scheduling precision window_lower_bound_start_time = round_to_second_precision(window_lower_bound_start_time) window_upper_bound_stop_time = round_to_second_precision(window_upper_bound_stop_time) # try to find the best next scheduling_unit logger.info("schedule_next_scheduling_unit: searching for best %s-priority scheduling unit to schedule in window ['%s', '%s']", candidate_queue.value, window_lower_bound_start_time, window_upper_bound_stop_time) best_scored_scheduling_unit = self.find_best_next_schedulable_unit(candidate_units, window_lower_bound_start_time, window_upper_bound_stop_time) if best_scored_scheduling_unit: best_scheduling_unit = best_scored_scheduling_unit.scheduling_unit best_scheduling_unit_score = best_scored_scheduling_unit.weighted_score best_start_time = best_scored_scheduling_unit.start_time if best_scheduling_unit.status.value == models.SchedulingUnitStatus.Choices.SCHEDULABLE.value: # make sure we don't start earlier than allowed assert best_start_time >= window_lower_bound_start_time, "The computed start_time='%s' should be larger than the search_window's lower_bound='%s', but it's not."%(best_start_time, lower_bound_start_time) # make start_time "look nice" for us humans best_start_time = round_to_second_precision(best_start_time) logger.info("schedule_next_scheduling_unit: found best candidate id=%s '%s' weighted_score=%.3f start_time=%s interrupts_telescope=%s queue=%s", best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time, best_scheduling_unit.interrupts_telescope, best_scheduling_unit.priority_queue.value) scheduled_unit = self.try_schedule_unit(best_scheduling_unit, best_start_time) if scheduled_unit is None: # we had a best_scored_scheduling_unit, but it could not be scheduled here. # remove it from the candidates, and do not evaluate it again in this window. candidate_units.remove(best_scheduling_unit) continue else: self.log_schedule(log_level=logging.INFO, lower_bound=scheduled_unit.scheduled_start_time - timedelta(hours=12), upper_bound=scheduled_unit.scheduled_start_time + timedelta(hours=12)) # done, just return the scheduled_unit return scheduled_unit else: logger.info("schedule_next_scheduling_unit: no %s-prio scheduling unit found which could be scheduled in window ['%s', '%s']", candidate_queue.value, window_lower_bound_start_time, window_upper_bound_stop_time) except SubtaskSchedulingException as e: logger.error("%s: Could not schedule scheduling_unit id=%s name='%s'. Error: %s", best_scheduling_unit.id, best_scheduling_unit.name, e) # prevent that it keeps trying to schedule this failed unit in this scheduler-round candidate_units.remove(best_scheduling_unit) # check if the best_scheduling_unit is unschedulable in this window, but can run later... best_scheduling_unit.refresh_from_db() if best_scheduling_unit.interrupts_telescope: # triggered observations cannot run later. mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(best_scheduling_unit, str(e)) return None else: if can_run_after(best_scheduling_unit, best_start_time, self.search_gridder) and not best_scheduling_unit.interrupts_telescope: logger.info("schedule_next_scheduling_unit: Unschedulable scheduling_unit id=%s can run later than '%s'. Marking it as schedulable again...", best_scheduling_unit.id, best_start_time) # yep, can run later, so mark it as schedulable again, and let it be handled in a new scheduler-round mark_subtasks_in_scheduling_unit_blueprint_as_schedulable(best_scheduling_unit) else: mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(best_scheduling_unit, str(e)) if not candidate_units: logger.debug("%s: no more %s-prio candidate units...", candidate_queue.value) break # break out of window scanning while loop, continue with next priority-queue units # advance the window at the upper side only so more candidates fit in # our sliding window only looks 12 hours ahead window_upper_bound_stop_time += timedelta(hours=6) # search again... (while loop) with the remaining units and new window_upper_bound_stop_time # no candidate fits. return None def try_schedule_unit(self, scheduling_unit: models.SchedulingUnitBlueprint, start_time: datetime) -> models.SchedulingUnitBlueprint: '''Try scheduling the given scheduling_unit (at its scheduled_start_time). Cancel/unschedule any blocking/overlapping units depending on priorities and scores''' logger.info("try_schedule_unit id=%s queue=%s '%s' start_time=%s central_lst='%s' interrupts_telescope=%s", scheduling_unit.id, scheduling_unit.priority_queue.value, scheduling_unit.name, start_time, scheduling_unit.main_observation_scheduled_central_lst, scheduling_unit.interrupts_telescope) # check if the given unit was already scheduled at the given time. If so, skip. scheduling_unit_from_db = models.SchedulingUnitBlueprint.objects.get(id=scheduling_unit.id) if scheduling_unit_from_db.status.value==models.SchedulingUnitStatus.Choices.SCHEDULED.value: if abs((scheduling_unit_from_db.scheduled_start_time - start_time).total_seconds() < 1): logger.info("scheduling_unit id=%s '%s' was already scheduled at start_time='%s', skipping...", scheduling_unit.id, scheduling_unit.name, start_time) return scheduling_unit_from_db # cancel and/or unschedule current units-in-the-way... # (only if possible, depending on priorities and other rules...) cancel_overlapping_running_observation_if_needed_and_possible(scheduling_unit, start_time) unschededule_blocking_scheduled_units_if_needed_and_possible(scheduling_unit, start_time, self.fine_gridder) unschededule_previously_scheduled_unit_if_needed_and_possible(scheduling_unit, start_time) # are there any blocking scheduled or observing scheduling_units still in the way? blocking_scheduling_units = get_blocking_scheduled_or_observing_units(scheduling_unit, start_time) if blocking_scheduling_units.exists(): logger.warning("cannot schedule scheduling_unit id=%s '%s' at start_time=%s interrupts_telescope=%s because there are %d other units blocking it", scheduling_unit.id, scheduling_unit.name, start_time, scheduling_unit.interrupts_telescope, blocking_scheduling_units.count()) return None with transaction.atomic(): # we made room for our candidate, now try schedule it # there may still be uncancelled-running observations in the way -> SubtaskSchedulingException scheduled_scheduling_unit = schedule_independent_subtasks_in_scheduling_unit_blueprint(scheduling_unit, start_time=start_time) logger.info("scheduled scheduling_unit id=%s queue=%s '%s' start_time=%s central_lst='%s' interrupts_telescope=%s\nspecified stations: %s\n used stations: %s", scheduling_unit.id, scheduling_unit.priority_queue.value, scheduling_unit.name, start_time, scheduling_unit.main_observation_scheduled_central_lst, scheduling_unit.interrupts_telescope, ','.join(scheduling_unit.main_observation_specified_stations), ','.join(scheduling_unit.main_observation_used_stations)) return scheduled_scheduling_unit def schedule_B_priority_units_in_gaps_around_scheduling_unit(self, scheduling_unit: models.SchedulingUnitBlueprint) -> [models.SchedulingUnitBlueprint]: '''try to schedule one or more scheduling units from queue B in the gap between the given scheduled_unit and its previous observed+ unit''' scheduled_units = [] if not self.dynamic_scheduling_enabled: logger.debug("skipping schedule_B_priority_units_in_gaps because dynamic_scheduling is not enabled") return scheduled_units logger.debug("schedule_B_priority_units_in_gaps: looking for B-queue units to be scheduled in gap(s) around unit id=%s", scheduling_unit.id) # evaluate all schedulable and already-scheduled B_queue units schedulable_units_queue_B = get_dynamically_schedulable_scheduling_units(priority_queue=models.PriorityQueueType.objects.get(value=models.PriorityQueueType.Choices.B.value), include_scheduled=True).exclude(id=scheduling_unit.id) if not schedulable_units_queue_B.exists(): return scheduled_units gaps = get_gaps_to_previous_and_next_observations_in_scheduling_unit(scheduling_unit, include_schedulable_unschedulable=False) gaps = [gap for gap in gaps if gap[1] != datetime.max] # skip open-ended future gaps gaps = sorted(gaps, key=lambda gap: gap[0]) # sorted ascending in time for gap in gaps: lower_bound_start_time = round_to_second_precision(max(datetime.utcnow()+DEFAULT_NEXT_STARTTIME_GAP, gap[0]+DEFAULT_INTER_OBSERVATION_GAP)) upper_bound_stop_time = round_to_second_precision(min(gap[1] - DEFAULT_INTER_OBSERVATION_GAP, scheduling_unit.latest_possible_cycle_stop_time)) # check gap size: minimal obs duration of 1 minute, plus an inter_observation_gap at both sides if (upper_bound_stop_time - lower_bound_start_time) <= 2*DEFAULT_INTER_OBSERVATION_GAP + timedelta(minutes=1): continue # gap is too small logger.info("schedule_B_priority_units_in_gaps: evaluating %s B-queue units in %d[min]-wide gap ['%s', '%s') %s unit id=%s", schedulable_units_queue_B.count(), (upper_bound_stop_time-lower_bound_start_time).total_seconds()/60, lower_bound_start_time, upper_bound_stop_time, "before" if gap[1] <= scheduling_unit.scheduled_start_time else "after", scheduling_unit.id) best_B_candidate_for_gap = self.find_best_next_schedulable_unit(schedulable_units_queue_B, lower_bound_start_time=lower_bound_start_time, upper_bound_stop_time=upper_bound_stop_time) if best_B_candidate_for_gap is not None and best_B_candidate_for_gap.scheduling_unit is not None: assert(best_B_candidate_for_gap.start_time >= lower_bound_start_time) assert(best_B_candidate_for_gap.start_time + best_B_candidate_for_gap.scheduling_unit.specified_observation_duration < upper_bound_stop_time) try: logger.info("schedule_B_priority_units_in_gaps: trying to schedule B-queue unit id=%s at start_time='%s' in gap ['%s', '%s')", best_B_candidate_for_gap.scheduling_unit.id, best_B_candidate_for_gap.start_time, lower_bound_start_time, upper_bound_stop_time) maybe_scheduled_unit = self.try_schedule_unit(best_B_candidate_for_gap.scheduling_unit, best_B_candidate_for_gap.start_time) if maybe_scheduled_unit is not None and maybe_scheduled_unit.status.value==models.SchedulingUnitStatus.Choices.SCHEDULED.value: scheduled_units.append(best_B_candidate_for_gap.scheduling_unit) except Exception as e: logger.exception("schedule_B_priority_units_in_gaps: Could not schedule B-queue unit id=%s in gap( ['%s', '%s'). %s", best_B_candidate_for_gap.scheduling_unit.id, lower_bound_start_time, upper_bound_stop_time, str(e)) try: if best_B_candidate_for_gap.scheduling_unit in scheduled_units: # unit has successfully been scheduled. # Recurse. There may be a new gap, so let's try to squeeze in more. best_B_candidate_for_gap.scheduling_unit.refresh_from_db() logger.debug("schedule_B_priority_units_in_gaps: recursing to schedule more B-queue units next to just scheduled unit id=%s", best_B_candidate_for_gap.scheduling_unit.id) scheduled_units.extend(self.schedule_B_priority_units_in_gaps_around_scheduling_unit(best_B_candidate_for_gap.scheduling_unit)) except RecursionError as e: logger.error("Max recursion depth reached. Skipping further scheduling of B-queue units in %d[min]-wide gap( ['%s', '%s')", (upper_bound_stop_time-lower_bound_start_time).total_seconds()/60, lower_bound_start_time, upper_bound_stop_time) else: logger.info("schedule_B_priority_units_in_gaps: could not find any B-queue unit out of the %d candidates which fits in %dmin gap ['%s', '%s') next to unit id=%s", schedulable_units_queue_B.count(), (upper_bound_stop_time-lower_bound_start_time).total_seconds()/60.0, lower_bound_start_time, upper_bound_stop_time, scheduling_unit.id) return scheduled_units def log_schedule(self, log_level: int=logging.INFO, lower_bound: datetime=None, upper_bound: datetime=None): '''Log the upcoming schedule in a table like format (as per lower_bound and later. If lower_bound is not given, then as per 'now')''' try: if not logger.isEnabledFor(log_level): return units_in_schedule = models.SchedulingUnitBlueprint.objects.filter( status__value__in=models.SchedulingUnitStatus.SCHEDULABLE_ACTIVE_OR_FINISHED_STATUS_VALUES).filter( scheduled_start_time__isnull=False).filter( obsolete_since__isnull=True) if lower_bound is None: lower_bound = datetime.utcnow()-timedelta(minutes=10) if lower_bound is not None: units_in_schedule = units_in_schedule.filter(scheduled_start_time__gte=lower_bound) if upper_bound is not None: units_in_schedule = units_in_schedule.filter(scheduled_start_time__lte=upper_bound) if units_in_schedule.exists(): logger.log(log_level, "-----------------------------------------------------------------") logger.log(log_level, "Schedule: #scheduled=%d #schedulable=%d #unschedulable=%d %s", units_in_schedule.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULED.value).count(), units_in_schedule.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULABLE.value).count(), units_in_schedule.filter(status__value=models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value).count(), "between '%s' and '%s'" % (lower_bound, upper_bound) if upper_bound else "") for unit in units_in_schedule.order_by('scheduled_start_time').all(): try: if unit.status.value==models.SchedulingUnitStatus.Choices.SCHEDULABLE.value: # skip schedulable units continue task_center_time, transit_time, offset_to_transit, lowest_elevation, elevation_at_center, elevation_at_transit = get_timestamps_elevations_and_offset_to_transit(unit, unit.scheduled_start_time) logger.log(log_level, " id=% 4d %s %s %s start_time='%s'[UTC] dur=%4d[min] %s name=%s midLST='%s' transit_offset=% 5d[min] elv@tr=% 3.1f[deg] elv@mid=% 3.1f[deg] elv_min=% 3.1f[deg] C/R/I=%s", unit.id, ("'%s'" % (unit.project.name[:8],)).ljust(10), unit.priority_queue.value, 'D' if unit.is_dynamically_scheduled else 'F', round_to_second_precision(unit.scheduled_observation_start_time), round(unit.specified_main_observation_duration.total_seconds()/60.0), unit.status.value.ljust(14), ("'%s'" % (unit.name[:32],)).ljust(34), unit.main_observation_scheduled_central_lst, offset_to_transit / 60.0 if offset_to_transit else None, Angle(elevation_at_transit, astropy.units.rad).degree if elevation_at_transit else None, Angle(elevation_at_center, astropy.units.rad).degree if elevation_at_center else None, Angle(lowest_elevation, astropy.units.rad).degree if lowest_elevation else None, '/'.join(str(q) for q in unit.main_observation_task.used_station_counts)) except Exception as e: logger.warning(e) logger.log(log_level, "-----------------------------------------------------------------") # TODO: report on schedule density except Exception as e: logger.warning(e) ################## core dynamic scheduling methods ################################################ # # # This module starts with the core dynamic scheduling methods which are used in the dynamic # # scheduling service. These high level methods only filter/score/sort in a generic way. # # The detailed concrete filter/score/sort methods are pick by a strategy pattern in the # # constraints package based on each scheduling unit's scheduling_constraints template. # # # ################################################################################################### def mark_scheduling_units_for_inactive_projects_unschedulable(projects: [str]=None): scheduling_units = models.SchedulingUnitBlueprint.objects.filter(status__value__in=(models.SchedulingUnitStatus.Choices.SCHEDULABLE.value, models.SchedulingUnitStatus.Choices.SCHEDULED.value)).filter(obsolete_since__isnull=True) maybe_unschedulable_units = scheduling_units.exclude(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value) if projects: maybe_unschedulable_units = maybe_unschedulable_units.filter(draft__scheduling_set__project__name__in=projects) if maybe_unschedulable_units.exists(): logger.info("marking %s scheduled unit(s) as unschedulable for inactive project(s)%s", maybe_unschedulable_units.count(), (' ' + ', '.join(p for p in projects)) if projects else '') for scheduling_unit in maybe_unschedulable_units: logger.info("marking unit id=%s status= %s project=%s pstate=%s", scheduling_unit.id, scheduling_unit.status, scheduling_unit.project.name, scheduling_unit.project.project_state.value) with transaction.atomic(): mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit, reason="project %s is not active"%(scheduling_unit.project.name,)) set_scheduling_unit_blueprint_start_times(scheduling_unit, first_start_time=round_to_second_precision(get_at_constraint_timestamp(scheduling_unit) or datetime.utcnow())) def unschedule_scheduled_units_for_cycleless_projects(): scheduled_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULED.value).filter(obsolete_since__isnull=True) for scheduling_unit in scheduled_units: if not scheduling_unit.draft.scheduling_set.project.cycles.exists(): logger.info("unscheduling unit id=%s project=%s because the project has no cycle(s) (anymore)", scheduling_unit.id, scheduling_unit.project.name) unschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit) def mark_unschedulable_scheduling_units_for_active_projects_schedulable(projects: [str]=None): ''' mark all scheduling_units which are currently unschedulable (for the given project(s), or all projects if None) as schedulable again. This means that these scheduling_units become schedulable again, and can thus be re-evaluated, resulting in either a scheduled or again unschedulable unit. ''' unschedulable_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value).filter(obsolete_since__isnull=True) maybe_schedulable_units = unschedulable_units.filter(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value) if projects: maybe_schedulable_units = maybe_schedulable_units.filter(draft__scheduling_set__project__name__in=projects) if maybe_schedulable_units.exists(): logger.info("marking %s unschedulable units schedulable for active project(s)%s if stations available...", maybe_schedulable_units.count(), (' ' + ', '.join(p for p in projects)) if projects else '') for scheduling_unit in maybe_schedulable_units.all(): try: if can_run_within_station_reservations(scheduling_unit): logger.info("marking scheduling unit id=%s as schedulable", scheduling_unit.id) mark_subtasks_in_scheduling_unit_blueprint_as_schedulable(scheduling_unit) else: logger.info("scheduling unit id=%s cannot run with current reservations and is still unschedulable", scheduling_unit.id) except Exception as e: logger.exception(e) def mark_scheduling_units_blocked_by_reservations_unschedulable(): schedulable_units = get_fixed_time_schedulable_scheduling_units().all() schedulable_units = [su for su in schedulable_units if not can_run_within_station_reservations(su)] if schedulable_units: logger.info("marking %s schedulable units unschedulable because they are blocked by a reservation", len(schedulable_units)) for scheduling_unit in schedulable_units: try: mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit, reason="too many stations are unavailable because of reservation(s)") except Exception as e: logger.exception(e) ################## service/messagebug handler class ############################################### class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): ''' The TMSSDynamicSchedulingMessageHandler reacts to TMSS EventMessages by triggering a new full update of the dynamic schedule. The actual schedule-update method runs on a backround thread, and can take some time to complete ranging from a few seconds to several minutes. In the mean time new EventMessages may be received. These are handled by raising a flag that signals the schedule-update-thread that a new full update is needed. This way, a burst of Events results in a single update, and it also ensures that we always compute the schedule with the latest data. ''' def __init__(self): super().__init__(log_event_messages=logger.level==logging.DEBUG) self.scheduler = Scheduler() def start_handling(self): self.scheduler.start_scheduling() super().start_handling() def stop_handling(self): self.scheduler.stop_scheduling() super().stop_handling() def onSchedulingUnitBlueprintCreated(self, id: int): '''prepare the new scheduling_unit for scheduling. Set unschedulable if project not active.''' logger.info("onSchedulingUnitBlueprintCreated(id=%s)",id) scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=id) # mark unschedulable if project not active if scheduling_unit.project.project_state.value != models.ProjectState.Choices.ACTIVE.value: mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit, reason="project is not active") # trigger next schedule computation self.scheduler.trigger() def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s)",id, status) scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=id) # trigger scheduler if needed if scheduling_unit.is_fixed_time_scheduled and self.scheduler.fixed_time_scheduling_enabled: if status == models.SchedulingUnitStatus.Choices.SCHEDULABLE.value: logger.info("triggering scheduler for fixed_time unit id=%s status=%s", id, status) self.scheduler.trigger() elif scheduling_unit.is_dynamically_scheduled and self.scheduler.dynamic_scheduling_enabled and not self.scheduler.is_scheduling: if status in [models.SchedulingUnitStatus.Choices.SCHEDULABLE.value, models.SchedulingUnitStatus.Choices.OBSERVING.value, models.SchedulingUnitStatus.Choices.CANCELLED.value, models.SchedulingUnitStatus.Choices.OBSERVED.value]: logger.info("triggering scheduler for dynamic unit id=%s status=%s", id, status) self.scheduler.trigger() def onSchedulingUnitBlueprintConstraintsUpdated(self, id: int, scheduling_constraints_doc: dict): try: # if there is an at constraint, try to position the unit directly at the 'at'-timestamp for nice visual feedback scheduling_unit_blueprint = models.SchedulingUnitBlueprint.objects.get(id=id) at = get_at_constraint_timestamp(scheduling_unit_blueprint) if at is not None: update_subtasks_start_times_for_scheduling_unit(scheduling_unit_blueprint, at) except: pass self.onSchedulingUnitBlueprintConstraintsRankOrQueueUpdated(id) def onSchedulingUnitBlueprintRankUpdated(self, id: int, rank: float): self.onSchedulingUnitBlueprintConstraintsRankOrQueueUpdated(id) def onSchedulingUnitBlueprintPriorityQueueUpdated(self, id: int, priority_queue: str): self.onSchedulingUnitBlueprintConstraintsRankOrQueueUpdated(id) def onSchedulingUnitBlueprintConstraintsRankOrQueueUpdated(self, id: int): # TODO: in TMSS-1980 the function calls are/shouldbe replaced with SU id and SU constraints as arguments. Then the cache does not need to be wiped anymore. wipe_evaluate_constraints_caches() scheduling_unit_blueprint = models.SchedulingUnitBlueprint.objects.get(id=id) if scheduling_unit_blueprint.status.value == models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value: logger.info("constraints/queue/priority for unschedulable scheduling unit id=%s changed: setting status to schedulable which will triggering a dynamic scheduling update...", id) mark_subtasks_in_scheduling_unit_blueprint_as_schedulable(scheduling_unit_blueprint) elif scheduling_unit_blueprint.status.value == models.SchedulingUnitStatus.Choices.SCHEDULED.value: logger.info("constraints/queue/priority for scheduled scheduling unit id=%s changed: unscheduling it, which will triggering a dynamic scheduling update...", id) unschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint) elif scheduling_unit_blueprint.status.value == models.SchedulingUnitStatus.Choices.SCHEDULABLE.value and not self.scheduler.is_scheduling: logger.info("constraints/queue/priority for schedulable scheduling unit id=%s changed: triggering a dynamic scheduling update...", id) self.scheduler.trigger() def onSubTaskStatusChanged(self, id: int, status: str): if status in (models.SubtaskState.Choices.DEFINED.value, models.SubtaskState.Choices.SCHEDULED.value): # this subtask is either: # - new, # - or is now defined after being scheduled/unschedulable, # - or is now scheduled (at a new time windown) after being defined # check if there are any overlapping unschedulable subtasks, and mark these as defined. # This will result in a status update event, on which the fixed_time scheduling will be triggered. # Summary: this subtask may have moved out of the way, as a result consider unschedulable overlapping units. try: subtask = models.Subtask.objects.get(id=id) affected_other_subtasks = models.Subtask.independent_subtasks() \ .filter(state__value=models.SubtaskState.Choices.UNSCHEDULABLE.value) \ .filter(task_blueprint__draft__scheduling_unit_draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value) \ .exclude(id=subtask.id) if subtask.scheduled_start_time is not None: affected_other_subtasks = affected_other_subtasks.filter(scheduled_stop_time__gte=subtask.scheduled_start_time) if subtask.scheduled_stop_time is not None: affected_other_subtasks = affected_other_subtasks.filter(scheduled_start_time__lte=subtask.scheduled_stop_time) for affected_subtask in affected_other_subtasks.all(): mark_subtasks_and_successors_as_defined(affected_subtask) except models.Subtask.DoesNotExist as e: # an older message for a not-existing-anymore subtask was received (usually during development). log and ignore. logger.debug(e) def onSettingUpdated(self, name: str, value: bool): if name in (models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value, models.SystemSettingFlag.Choices.FIXED_TIME_SCHEDULING_ENABLED.value) and value: logger.info("%s was set to %s: triggering update of schedule...", name, value) self.scheduler.trigger() def onSchedulingConstraintsWeightFactorUpdated(self, id: int): weight_factor = models.SchedulingConstraintsWeightFactor.objects.get(id=id) logger.info("weight_factor %s changed to %s: triggering update of dynamic schedule...", weight_factor.constraint_name, weight_factor.weight) wipe_evaluate_constraints_caches() for scheduled_unit in get_scheduled_scheduling_units(datetime.utcnow(), scheduler='dynamic'): unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_unit) mark_unschedulable_scheduling_units_for_active_projects_schedulable() self.scheduler.trigger() def onProjectStatusUpdated(self, name: str, status: str): logger.info("project '%s' status changed to %s", name, status) if status == models.ProjectState.Choices.ACTIVE.value: mark_unschedulable_scheduling_units_for_active_projects_schedulable(projects=[name]) else: mark_scheduling_units_for_inactive_projects_unschedulable(projects=[name]) self.scheduler.trigger() def onProjectRankUpdated(self, name: str, rank: float): logger.info("project '%s' rank changed to %s", name, rank) self.scheduler.trigger() def _onProjectCyclesCreatedUpdatedDeleted(self, id: int): logger.info("a project was added/removed to a cycle. triggering new scheduling round") mark_unschedulable_scheduling_units_for_active_projects_schedulable() unschedule_scheduled_units_for_cycleless_projects() self.scheduler.trigger() def onProjectCyclesCreated(self, id: int): self._onProjectCyclesCreatedUpdatedDeleted(id) def onProjectCyclesUpdated(self, id: int): self._onProjectCyclesCreatedUpdatedDeleted(id) def onProjectCyclesDeleted(self, id: int): self._onProjectCyclesCreatedUpdatedDeleted(id) def onReservationCreated(self, id: int): self._onReservationCreatedOrUpdated(id) def onReservationUpdated(self, id: int): self._onReservationCreatedOrUpdated(id) def _onReservationCreatedOrUpdated(self, id: int): reservation = models.Reservation.objects.get(id=id) logger.info("reservation id=%s '%s' was created/updated. start_time='%s' stop_time='%s'. checking/updating (un)schedulablity... as a result a new scheduling round might be triggered if needed.", id, reservation.name or '<no_name>', reservation.start_time, reservation.stop_time) # check all unschedulable units, and see if they are not blocked (anymore) unschedulable_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value).filter(obsolete_since__isnull=True) for scheduling_unit in unschedulable_units.all(): try: if can_run_within_station_reservations(scheduling_unit): logger.info("scheduling_unit id=%s can run with the current reservations, marking it schedulable.", scheduling_unit.id) mark_subtasks_in_scheduling_unit_blueprint_as_schedulable(scheduling_unit) except Exception as e: logger.exception(e) # check all scheduled units, and see if they are effected by this new reservation(window) scheduled_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULED.value).filter(obsolete_since__isnull=True) scheduled_units = scheduled_units.filter(scheduled_stop_time__gte=reservation.start_time) if reservation.stop_time is not None: scheduled_units = scheduled_units.filter(scheduled_start_time__lt=reservation.stop_time) for scheduling_unit in scheduled_units: try: if not can_run_within_station_reservations(scheduling_unit): logger.info("scheduled scheduling_unit id=%s can not run with the current reservations, marking it unschedulable.", scheduling_unit.id) # if the unit is scheduled, the method below unschedules it first mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit, reason="blocked by reservation(s)") else: # re-schedule the scheduling unit so that the now reserved stations are not included anymore logger.info("re-scheduling scheduling_unit id=%s so station reservations are taken into account", scheduling_unit.id) reschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit) except Exception as e: logger.exception(e) def onReservationDeleted(self, id: int, start_time: datetime, stop_time: datetime): logger.info("reservation id=%s was deleted. unscheduling units in the old reservation window: ['%s', '%s'].", id, start_time, stop_time) scheduled_units_under_deleted_reservation = get_scheduled_scheduling_units(start_time, stop_time) for unit in scheduled_units_under_deleted_reservation.all(): try: unschedule_subtasks_in_scheduling_unit_blueprint(unit) except Exception as e: logger.error(e) # also mark the unschedulable units as schedulable again... mark_unschedulable_scheduling_units_for_active_projects_schedulable() # ... and let the scheduler run self.scheduler.trigger() def create_dynamic_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER): return TMSSBusListener(handler_type=TMSSDynamicSchedulingMessageHandler, handler_kwargs=None, num_threads=1, exchange=exchange, broker=broker) ################## helper methods ################################################################# def get_dynamically_schedulable_scheduling_units(priority_queue: models.PriorityQueueType=None, include_triggered: bool=True, include_scheduled: bool=False) -> QuerySet: '''get a list of all dynamically schedulable scheduling_units for the given priority_queue (or all priority_queues if None)''' states = [models.SchedulingUnitStatus.Choices.SCHEDULABLE.value] if include_scheduled: states += [models.SchedulingUnitStatus.Choices.SCHEDULED.value] scheduling_units = models.SchedulingUnitBlueprint.objects.filter(status__value__in=states).filter(obsolete_since__isnull=True) scheduling_units = scheduling_units.filter(scheduling_constraints_doc__scheduler='dynamic') if priority_queue is not None: scheduling_units = scheduling_units.filter(priority_queue=priority_queue) if not include_triggered: scheduling_units = scheduling_units.exclude(interrupts_telescope=True) return scheduling_units def get_fixed_time_schedulable_scheduling_units() -> QuerySet: '''get a result QuerySet of all fixed_time schedulable scheduling_units''' scheduling_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULABLE.value).filter(obsolete_since__isnull=True) scheduling_units = scheduling_units.filter(scheduling_constraints_doc__scheduler='fixed_time') scheduling_units = scheduling_units.order_by('-updated_at') return scheduling_units def get_scheduled_scheduling_units(lower_bound: datetime=None, upper_bound: datetime=None, scheduler: str=None, priority_queue: models.PriorityQueueType=None) -> QuerySet: '''get a queryset of all scheduled scheduling_units which fall within the given [lower_bound, upper_bound) window (if not None)''' scheduled_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULED.value).filter(obsolete_since__isnull=True) if lower_bound is not None: scheduled_units = scheduled_units.filter(scheduled_stop_time__gt=lower_bound) if upper_bound is not None: scheduled_units = scheduled_units.filter(scheduled_start_time__lte=upper_bound) if scheduler is not None: scheduled_units = scheduled_units.filter(scheduling_constraints_doc__scheduler=scheduler) if priority_queue is not None: scheduled_units = scheduled_units.filter(priority_queue=priority_queue) return scheduled_units def get_observed_or_beyond_scheduling_units(lower_bound: datetime=None, upper_bound: datetime=None, scheduler: str=None) -> QuerySet: '''get a queryset of all scheduling_units which are observed or beyond (processing, ingesting, finished) and fall within the given [lower_bound, upper_bound) window (if not None)''' scheduled_units = models.SchedulingUnitBlueprint.objects.filter(status__value__in=(models.SchedulingUnitStatus.Choices.OBSERVED.value, models.SchedulingUnitStatus.Choices.PROCESSING.value, models.SchedulingUnitStatus.Choices.PROCESSED.value, models.SchedulingUnitStatus.Choices.INGESTING.value, models.SchedulingUnitStatus.Choices.INGESTED.value, models.SchedulingUnitStatus.Choices.FINISHED.value)) scheduled_units = scheduled_units.filter(obsolete_since__isnull=True) if lower_bound is not None: scheduled_units = scheduled_units.filter(scheduled_stop_time__gte=lower_bound) if upper_bound is not None: scheduled_units = scheduled_units.filter(scheduled_start_time__lt=upper_bound) if scheduler is not None: scheduled_units = scheduled_units.filter(scheduling_constraints_doc__scheduler=scheduler) return scheduled_units def get_observing_scheduling_units(lower_bound: datetime=None, upper_bound: datetime=None, scheduler: str=None) -> QuerySet: '''get a queryset of all scheduling_units which are observing and fall within the given [lower_bound, upper_bound) window (if not None)''' observing_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.OBSERVING.value) observing_units = observing_units.filter(obsolete_since__isnull=True) if lower_bound is not None: observing_units = observing_units.filter(scheduled_stop_time__gte=lower_bound) if upper_bound is not None: observing_units = observing_units.filter(scheduled_start_time__lt=upper_bound) if scheduler is not None: observing_units = observing_units.filter(scheduling_constraints_doc__scheduler=scheduler) return observing_units def get_triggered_schedulable_scheduling_units() -> QuerySet: '''get a list of all trigger dynamically and fixed_time schedulable scheduling_units''' scheduling_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULABLE.value) scheduling_units = scheduling_units.filter(interrupts_telescope=True) scheduling_units = scheduling_units.filter(obsolete_since__isnull=True) return scheduling_units def get_running_observation_subtasks(stopping_after:datetime=None) -> [models.Subtask]: '''get a list of all starting/started subtasks, optionally filter for those finishing after the provided time''' running_obs_subtasks = models.Subtask.objects.filter(state__value__in=[models.SubtaskState.Choices.STARTING.value, models.SubtaskState.Choices.STARTED.value], specifications_template__type__value=models.SubtaskType.Choices.OBSERVATION.value) if stopping_after is not None: running_obs_subtasks = running_obs_subtasks.filter(scheduled_stop_time__gte=stopping_after) return list(running_obs_subtasks.all()) def unschededule_previously_scheduled_unit_if_needed_and_possible(candidate_scheduling_unit: models.SchedulingUnitBlueprint, start_time: datetime): '''check if the candidate was previously already scheduled, and if the start_time changed. Unschedule it if allowed. ''' try: previously_scheduled_scheduling_unit = get_scheduled_scheduling_units().get(id=candidate_scheduling_unit.id) if previously_scheduled_scheduling_unit.scheduled_start_time != start_time: # start_time changed. unschedule it, so it can be rescheduled at new start_time unschedule_subtasks_in_scheduling_unit_blueprint(previously_scheduled_scheduling_unit) except models.SchedulingUnitBlueprint.DoesNotExist: pass def unschededule_blocking_scheduled_units_if_needed_and_possible(candidate_scheduling_unit: models.SchedulingUnitBlueprint, start_time: datetime, gridder: Gridder=None): '''check if there are any already scheduled units in the way, and unschedule them if allowed. ''' # check any previously scheduled units, and unschedule if needed/allowed overlapping_scheduled_scheduling_units = get_blocking_scheduled_units(candidate_scheduling_unit, start_time) # B-prio units are not allowed to unschedule A-prio units if candidate_scheduling_unit.priority_queue.value == models.PriorityQueueType.Choices.B.value: overlapping_scheduled_scheduling_units = overlapping_scheduled_scheduling_units.exclude(priority_queue__value=models.PriorityQueueType.Choices.A.value) # non-triggered dynamically scheduled units are not allowed to unschedule fixed_time scheduled units if candidate_scheduling_unit.is_dynamically_scheduled and not candidate_scheduling_unit.interrupts_telescope: overlapping_scheduled_scheduling_units = overlapping_scheduled_scheduling_units.exclude(scheduling_constraints_doc__scheduler='fixed_time') if not overlapping_scheduled_scheduling_units.exists(): logger.debug('no scheduled scheduling_units are blocking candidate scheduling_unit id=%s name=%s, nothing needs to be unscheduled', candidate_scheduling_unit.id, candidate_scheduling_unit.name) return overlapping_scheduled_scheduling_units = list(overlapping_scheduled_scheduling_units.all()) lower_bound = min(start_time, min([s.scheduled_start_time for s in overlapping_scheduled_scheduling_units])) upper_bound = max(start_time + candidate_scheduling_unit.specified_main_observation_duration, max([s.scheduled_stop_time for s in overlapping_scheduled_scheduling_units])) to_be_rescored_units = list(set(overlapping_scheduled_scheduling_units + [candidate_scheduling_unit])) if len(to_be_rescored_units) <= 1: logger.debug('no scheduled scheduling_units are blocking candidate scheduling_unit id=%s name=%s, nothing needs to be unscheduled', candidate_scheduling_unit.id, candidate_scheduling_unit.name) return to_be_rescored_units = sorted(to_be_rescored_units, key=lambda s: s.id) logger.info("re-scoring units %s in window ['%s', '%s')", ','.join(str(s.id) for s in to_be_rescored_units), lower_bound, upper_bound) # compute weighted scores for all scheduled- and the candidate scheduling units # because the weighted scores are also normalized over the given list of units. # So, to make a fair comparison if the candidate is 'better', it has to be scored in the same set. to_be_rescored_units_with_start_time = compute_start_times_for_units(to_be_rescored_units, lower_bound, upper_bound, gridder) scored_scheduling_units = compute_scores_for_units_with_start_time(to_be_rescored_units_with_start_time, lower_bound, upper_bound, gridder) if not scored_scheduling_units: logger.warning("re-scoring units %s in window ['%s', '%s') did not yield any results...", ','.join(str(s.id) for s in to_be_rescored_units), lower_bound, upper_bound) return # seperate the re-scored candidate again from the re-scored scheduled scheduling_units rescored_candidate = next((x for x in scored_scheduling_units if x.scheduling_unit.id == candidate_scheduling_unit.id), None) if not rescored_candidate: logger.warning("cannot find the re-scored candidate id=%s in the re-scored units %s in window ['%s', '%s') did not yield any results...", candidate_scheduling_unit.id, ','.join(str(s.id) for s in to_be_rescored_units), lower_bound, upper_bound) return rescored_scheduled_units = [x for x in scored_scheduling_units if x.scheduling_unit.id != candidate_scheduling_unit.id] # check if we can and need to unschedule the blocking units for rescored_scheduled_unit in rescored_scheduled_units: scheduled_scheduling_unit = rescored_scheduled_unit.scheduling_unit logger.info("checking if scheduled scheduling_unit id=%s '%s' start_time='%s' weighted_score=%s which is blocking candidate id=%s '%s' start_time='%s' weighted_score=%s can be unscheduled", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.scheduled_start_time, rescored_scheduled_unit.weighted_score, rescored_candidate.scheduling_unit.id, rescored_candidate.scheduling_unit.name, rescored_candidate.start_time, rescored_candidate.weighted_score) logger.info("the scheduled unit id=%s '%s' score=%.3f is in the way of the best candidate id=%s '%s' score=%.3f start_time=%s", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, rescored_scheduled_unit.weighted_score, candidate_scheduling_unit.id, candidate_scheduling_unit.name, rescored_candidate.weighted_score, candidate_scheduling_unit.scheduled_start_time) # in case of a triggered candidate with higher trigger priority than the scheduled unit, we don't care about # scores, but only check trigger priority. if candidate_scheduling_unit.interrupts_telescope or scheduled_scheduling_unit.interrupts_telescope: if (not scheduled_scheduling_unit.interrupts_telescope) or (candidate_scheduling_unit.project.trigger_priority > scheduled_scheduling_unit.project.trigger_priority): logger.info("unscheduling id=%s '%s' because it is in the way and has a lower trigger_priority=%s than the best candidate id=%s '%s' trigger_priority=%s start_time=%s", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.project.trigger_priority, candidate_scheduling_unit.id, candidate_scheduling_unit.name, candidate_scheduling_unit.project.trigger_priority, candidate_scheduling_unit.scheduled_start_time) unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) if not can_run_after(scheduled_scheduling_unit, scheduled_scheduling_unit.scheduled_start_time, gridder): logger.info("marking id=%s '%s' unschedulable because it cannot run after start_time=%s", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.scheduled_start_time) mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduled_scheduling_unit, "Blocked by higher priority trigger id=%s '%s'" % (candidate_scheduling_unit.id, candidate_scheduling_unit.name)) # compare the priority queues of the rescored units elif rescored_candidate.scheduling_unit.priority_queue != rescored_scheduled_unit.scheduling_unit.priority_queue: if rescored_candidate.scheduling_unit.priority_queue.value == models.PriorityQueueType.Choices.A.value: # the candidate overrules the already scheduled unit based on priority # let's try to make some space by unscheduling the scheduled_scheduling_unit logger.info("unscheduling id=%s '%s' because it has a priority queue=%s than the best candidate id=%s '%s' queue=%s start_time=%s", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.priority_queue.value, candidate_scheduling_unit.id, candidate_scheduling_unit.name, candidate_scheduling_unit.priority_queue.value, candidate_scheduling_unit.scheduled_start_time) unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) # 'fixed' wins over 'dynamic' elif rescored_candidate.scheduling_unit.is_fixed_time_scheduled and rescored_scheduled_unit.scheduling_unit.is_dynamically_scheduled: # the candidate is fixed_time and should overrule the already scheduled dynamic unit # let's try to make some space by unscheduling the scheduled_scheduling_unit logger.info("unscheduling id=%s '%s' because it is dynamically scheduled and the best candidate id=%s '%s' is fixed_time scheduled", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, candidate_scheduling_unit.id, candidate_scheduling_unit.name) unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) # compare the re-scored weighted_scores elif rescored_candidate.weighted_score > rescored_scheduled_unit.weighted_score: # the candidate clearly wins, let's try to make some space by unscheduling the scheduled_scheduling_unit logger.info("unscheduling id=%s '%s' because it has a lower score than the best candidate id=%s '%s' start_time=%s", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, candidate_scheduling_unit.id, candidate_scheduling_unit.name, candidate_scheduling_unit.scheduled_start_time) unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) elif enough_stations_available_for_scheduling_unit(scheduled_scheduling_unit, unavailable_stations=candidate_scheduling_unit.main_observation_specified_stations): # the candidate does not win, but it's ok to unschedule the scheduled_scheduling_unit because it can run without the candidate's stations. logger.info("rescheduling id=%s '%s' start_time=%s without the stations of the best candidate id=%s '%s' start_time=%s", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.scheduled_start_time, candidate_scheduling_unit.id, candidate_scheduling_unit.name, candidate_scheduling_unit.scheduled_start_time) # reschedule the scheduled_unit, and take out the candidate's stations rescheduled_unit = reschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit, misc_unavailable_stations=candidate_scheduling_unit.main_observation_specified_stations) logger.info("rescheduled scheduling_unit id=%s queue=%s '%s' start_time=%s central_lst='%s' interrupts_telescope=%s\nspecified stations: %s\n used stations: %s", rescheduled_unit.id, rescheduled_unit.priority_queue.value, rescheduled_unit.name, start_time, rescheduled_unit.main_observation_scheduled_central_lst, rescheduled_unit.interrupts_telescope, ','.join(rescheduled_unit.main_observation_specified_stations), ','.join(rescheduled_unit.main_observation_used_stations)) else: logger.info("scheduling_unit id=%s '%s' start_time='%s' will not be unscheduled and thus it will keep on blocking candidate id=%s '%s' start_time='%s'", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.scheduled_start_time, candidate_scheduling_unit.id, candidate_scheduling_unit.name, start_time) def cancel_overlapping_running_observation_if_needed_and_possible(candidate_scheduling_unit: models.SchedulingUnitBlueprint, start_time: datetime): '''check if there are starting/started observation subtasks that block the candidate. Only triggered scheduling_units can cancel running observations and only if they belong to a project with a higher trigger_priority than the projects of the subtasks to cancel''' if candidate_scheduling_unit.interrupts_telescope: running_obs_subtasks = get_running_observation_subtasks(start_time) for obs in running_obs_subtasks: if obs.project is None: logger.warning('cannot cancel running subtask pk=%s for triggered scheduling_unit pk=%s because it does belong to a project and hence has unknown priority' % (obs.pk, candidate_scheduling_unit.name)) continue if candidate_scheduling_unit.project.trigger_priority > obs.project.trigger_priority: logger.info('cancelling su id=%s for observation subtask pk=%s trigger_priority=%s because it blocks the triggered scheduling_unit pk=%s trigger_priority=%s' % (obs.task_blueprint.scheduling_unit_blueprint.id, obs.pk, obs.project.trigger_priority, candidate_scheduling_unit.pk, candidate_scheduling_unit.project.trigger_priority)) cancel_scheduling_unit_blueprint(obs.task_blueprint.scheduling_unit_blueprint) else: logger.info('NOT cancelling subtask pk=%s trigger_priority=%s for triggered scheduling_unit pk=%s trigger_priority=%s because its priority is too low' % (obs.pk, obs.project.trigger_priority, candidate_scheduling_unit.pk, candidate_scheduling_unit.project.trigger_priority))