Select Git revision
-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
dynamic_scheduling.py 36.92 KiB
#!/usr/bin/env python3
# dynamic_scheduling.py
#
# Copyright (C) 2020
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: $
"""
"""
import os
import logging
logger = logging.getLogger(__name__)
from datetime import datetime, timedelta, time
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp.tasks import schedule_independent_subtasks_in_scheduling_unit_blueprint, unschedule_subtasks_in_scheduling_unit_blueprint, mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable, mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable, set_scheduling_unit_blueprint_start_times
from lofar.sas.tmss.tmss.tmssapp.subtasks import update_subtasks_start_times_for_scheduling_unit, clear_defined_subtasks_start_stop_times_for_scheduling_unit, cancel_subtask, mark_subtasks_and_successors_as_defined
from lofar.sas.tmss.client.tmssbuslistener import *
from lofar.common.datetimeutils import round_to_second_precision
from threading import Thread, Event
from django.db import transaction
from django.db.models import QuerySet
from lofar.sas.tmss.services.scheduling.constraints import *
# LOFAR needs to have a gap in between observations to (re)initialize hardware.
DEFAULT_INTER_OBSERVATION_GAP = timedelta(seconds=60)
DEFAULT_NEXT_STARTTIME_GAP = timedelta(seconds=180)
################## core dynamic scheduling methods ################################################
# #
# This module starts with the core dynamic scheduling methods which are used in the dynamic #
# scheduling service. These high level methods only filter/score/sort in a generic way. #
# The detailed concrete filter/score/sort methods are pick by a strategy pattern in the #
# constraints package based on each scheduling unit's scheduling_constrains template. #
# #
###################################################################################################
def find_best_next_schedulable_unit(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound_start_time: datetime, upper_bound_stop_time: datetime) -> ScoredSchedulingUnit:
"""
find the best schedulable scheduling_unit which can run withing the given time window from the given scheduling_units.
:param lower_bound_start_time: evaluate the constrains at and after lower_bound_start_time. The returned unit has a start_time guaranteed at or after lower_bound_start_time.
:param upper_bound_stop_time: evaluate the constrains before upper_bound_stop_time. The returned unit has a stop_time guaranteed before upper_bound_stop_time.
:param scheduling_units: evaluate these scheduling_units.
Returns a ScoredSchedulingUnit struct with the best next schedulable scheduling unit and its proposed start_time where it best fits its contraints.
"""
# ensure upper is greater than or equal to lower
upper_bound_stop_time = max(lower_bound_start_time, upper_bound_stop_time)
filtered_scheduling_units = filter_scheduling_units_using_constraints(scheduling_units, lower_bound_start_time, upper_bound_stop_time)
if filtered_scheduling_units:
triggered_scheduling_units = [scheduling_unit for scheduling_unit in filtered_scheduling_units if scheduling_unit.interrupts_telescope]
if triggered_scheduling_units:
highest_priority_triggered_scheduling_unit = max(triggered_scheduling_units, key=lambda su: su.project.trigger_priority)
best_scored_scheduling_unit = ScoredSchedulingUnit(scheduling_unit=highest_priority_triggered_scheduling_unit,
start_time=get_earliest_possible_start_time(highest_priority_triggered_scheduling_unit, lower_bound_start_time),
scores={}, weighted_score=None) # we don't care about scores in case of a trigger
else:
best_scored_scheduling_unit = get_best_scored_scheduling_unit_scored_by_constraints(filtered_scheduling_units, lower_bound_start_time, upper_bound_stop_time)
return best_scored_scheduling_unit
# no filtered scheduling units found...
logger.debug("No schedulable scheduling units found which meet the requirements between '%s' and '%s'", lower_bound_start_time, upper_bound_stop_time)
return None
def mark_fixed_time_scheduling_units_for_inactive_projects_unschedulable():
# get the fixed_time schedulable scheduling_units in most-recently-updated order.
schedulable_units = get_fixed_time_schedulable_scheduling_units()
unschedulable_units = schedulable_units.exclude(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value)
if unschedulable_units.exists():
logger.info("making %s scheduling units with fixed_time at constraint unschedulable for inactive projects", unschedulable_units.count())
for unschedulable_unit in unschedulable_units:
mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(unschedulable_unit)
def mark_fixed_time_scheduling_units_for_active_projects_schedulable():
# mark unschedulable scheduling_units for active projects as schedulable again
scheduling_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value)
scheduling_units = scheduling_units.filter(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value)
if scheduling_units.exists():
logger.info("trying to make %s scheduling units with fixed_time at constraint schedulable for active projects", scheduling_units.count())
for scheduling_unit in scheduling_units.all():
try:
mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable(scheduling_unit)
except Exception as e:
logger.exception(e)
def schedule_fixed_time_scheduling_units():
'''
'''
# get the fixed_timely schedulable scheduling_units in most-recently-updated order.
schedulable_units = get_fixed_time_schedulable_scheduling_units()
# only consider active projects
schedulable_units = schedulable_units.filter(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value)
logger.info("trying to schedule %s scheduling units with fixed_time at constraint for active projects", schedulable_units.count())
for schedulable_unit in schedulable_units:
start_time = get_earliest_possible_start_time(schedulable_unit)
stop_time = start_time + schedulable_unit.specified_main_observation_duration
set_scheduling_unit_blueprint_start_times(schedulable_unit, first_start_time=start_time)
logger.info("Checking if fixed_time scheduled scheduling unit id=%d can be scheduled at '%s'", schedulable_unit.id, start_time)
if filter_scheduling_units_using_constraints([schedulable_unit], lower_bound=start_time, upper_bound=stop_time):
try:
logger.info("Scheduling fixed_time scheduled scheduling unit id=%d at '%s'", schedulable_unit.id, start_time)
schedule_independent_subtasks_in_scheduling_unit_blueprint(schedulable_unit, start_time)
except SubtaskSchedulingException as e:
logger.error(e)
mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(schedulable_unit)
else:
logger.warning("fixed_time-scheduled scheduling unit id=%d cannot be scheduled at '%s'", schedulable_unit.id, start_time)
mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(schedulable_unit)
def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint:
'''find the best next schedulable scheduling unit and try to schedule it.
Overlapping existing scheduled units are unscheduled if their score is lower.
:return: the scheduled scheduling unit.'''
# loop over the different priority queue's. 'A'-type projects/scheduling units always go first, then 'B'-type etc.
# if a scheduling unit can be scheduled for this priority_queue, then we exit early.
for priority_queue in models.PriorityQueueType.objects.order_by('value').all():
schedulable_units = get_dynamically_schedulable_scheduling_units(priority_queue=priority_queue)
if len(schedulable_units) == 0:
logger.info("No scheduling units found for priority_queue '%s'...", priority_queue.value)
continue # continue with the remaining priority_queue types
# --- core routine ---
# search in a forward sliding window for the best scheduling_unit that can be scheduled
# once found and scheduled, exit. Otherwise slide window forward and try again.
lower_bound_start_time = datetime.utcnow() + DEFAULT_NEXT_STARTTIME_GAP
upper_bound_stop_time = lower_bound_start_time + timedelta(days=1)
while lower_bound_start_time < upper_bound_stop_time:
try:
# no need to irritate user in log files with sub-second scheduling precision
lower_bound_start_time = round_to_second_precision(lower_bound_start_time)
upper_bound_stop_time = max(round_to_second_precision(upper_bound_stop_time), lower_bound_start_time)
# try to find the best next scheduling_unit
logger.info("schedule_next_scheduling_unit: searching for best scheduling unit to schedule between '%s' and '%s'", lower_bound_start_time, upper_bound_stop_time)
best_scored_scheduling_unit = find_best_next_schedulable_unit(schedulable_units, lower_bound_start_time, upper_bound_stop_time)
if best_scored_scheduling_unit:
best_scheduling_unit = best_scored_scheduling_unit.scheduling_unit
best_scheduling_unit_score = best_scored_scheduling_unit.weighted_score
best_start_time = best_scored_scheduling_unit.start_time
# make sure we don't start earlier than allowed
best_start_time = max(best_start_time, lower_bound_start_time)
# make start_time "look nice" for us humans
best_start_time = round_to_second_precision(best_start_time)
logger.info("schedule_next_scheduling_unit: found best candidate id=%s '%s' weighted_score=%s start_time=%s interrupts_telescope=%s",
best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time, best_scheduling_unit.interrupts_telescope)
# cancel and/or unschedule current units-in-the-way...
cancel_running_observation_if_needed_and_possible(best_scored_scheduling_unit)
if unschededule_blocking_scheduled_units_if_needed_and_possible(best_scored_scheduling_unit):
# try schedule our candidate (there might still be unit(s) in the way, which will cause a SubtaskSchedulingException)
scheduled_scheduling_unit = schedule_independent_subtasks_in_scheduling_unit_blueprint(best_scheduling_unit, start_time=best_start_time)
logger.info("schedule_next_scheduling_unit: scheduled best candidate id=%s '%s' score=%s start_time=%s interrupts_telescope=%s",
best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time, best_scheduling_unit.interrupts_telescope)
# return the scheduled scheduling_unit, early exit out of looping over priority queues.
return scheduled_scheduling_unit
except SubtaskSchedulingException as e:
logger.error("Could not schedule scheduling_unit id=%s name='%s'. Error: %s", best_scheduling_unit.id, best_scheduling_unit.name, e)
if best_scheduling_unit.interrupts_telescope:
# this schedule_next_scheduling_unit call was 'triggered' by a interrupts_telescope-scheduling unit.
# aparently this triggered schedunit could not be scheduled due to a scheduled/running unit being in the way.
# no need to seek furter,
# mark the remaining_defined_triggered_subtasks in the interrupts_telescope-scheduling unit as error and return None....
remaining_defined_triggered_subtasks = list(models.Subtask.independent_subtasks().filter(task_blueprints__scheduling_unit_blueprint_id=best_scheduling_unit.id,
state__value=models.SubtaskState.Choices.DEFINED.value).all())
for subtask in remaining_defined_triggered_subtasks:
subtask.state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.ERROR.value)
return None
# nothing was found, or an error occurred.
# search again... (loop) with the remaining schedulable_units and new lower_bound_start_time
# it may be that in the mean time some scheduling_units are not (dynamically) schedulable anymore, fetch list again.
schedulable_units = get_dynamically_schedulable_scheduling_units(priority_queue=priority_queue)
lower_bound_start_time = get_min_earliest_possible_start_time(schedulable_units, lower_bound=lower_bound_start_time+timedelta(hours=1))
# TODO: update upper_bound_stop_time as well, stop when upper_bound_stop_time > cycle end.
def assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time: datetime):
''''''
logger.info("Estimating mid-term schedule with lower_bound_start_time=%s ..." % lower_bound_start_time)
for priority_queue in models.PriorityQueueType.objects.order_by('value').all():
scheduling_units = get_dynamically_schedulable_scheduling_units(priority_queue=priority_queue)
if len(scheduling_units) == 0:
logger.info("No scheduling units found...")
return
upper_bound_stop_time = lower_bound_start_time + timedelta(days=365)
# update the start_times of the remaining ones (so they form queue, and can be visualized in a timeline)
while scheduling_units and lower_bound_start_time < upper_bound_stop_time:
best_scored_scheduling_unit = find_best_next_schedulable_unit(scheduling_units, lower_bound_start_time, upper_bound_stop_time)
if best_scored_scheduling_unit:
scheduling_unit = best_scored_scheduling_unit.scheduling_unit
start_time = round_to_second_precision(best_scored_scheduling_unit.start_time)
logger.info("mid-term schedule: next scheduling unit id=%s '%s' start_time=%s", scheduling_unit.id, scheduling_unit.name, start_time)
update_subtasks_start_times_for_scheduling_unit(scheduling_unit, start_time)
# TODO check this?
# If the start_time of the subtasks are updated, should the start_time (and stop_time) of the
# scheduling_unit also be updated? Currently its a cached property
# keep track of the lower_bound_start_time based on last sub.stoptime and gap
lower_bound_start_time = scheduling_unit.scheduled_stop_time + DEFAULT_INTER_OBSERVATION_GAP
scheduling_units.remove(scheduling_unit)
else:
# search again in a later timeslot
min_earliest_possible_start_time = get_min_earliest_possible_start_time(scheduling_units, lower_bound_start_time+timedelta(minutes=10))
logger.info("lower_bound_start_time='%s', min_earliest_possible_start_time='%s'", lower_bound_start_time, min_earliest_possible_start_time)
if min_earliest_possible_start_time > lower_bound_start_time:
lower_bound_start_time = min_earliest_possible_start_time
else:
# cannot advance anymore to find more
logger.warning("Cannot assign start/stop times to remaining scheduling units for mid-term schedule...")
for su in scheduling_units:
logger.warning("Remaining scheduling unit: id=%s '%s'", su.id, su.name)
# clear start/stop times, so they don't show up in the timeline,
# and we can filter/show them in a seperate list which the user can tinker on the constraints
clear_defined_subtasks_start_stop_times_for_scheduling_unit(su)
break
logger.info("Estimating mid-term schedule... finished")
def do_dynamic_schedule() -> models.SchedulingUnitBlueprint:
'''do a full update of the schedule: schedule next scheduling unit and assign start stop times to remaining schedulable scheduling units'''
logger.info("Updating schedule....")
scheduled_unit = schedule_next_scheduling_unit()
# determine next possible start time for remaining scheduling_units
if scheduled_unit:
lower_bound_start_time = scheduled_unit.scheduled_stop_time + DEFAULT_INTER_OBSERVATION_GAP
else:
try:
scheduled_units = get_scheduled_scheduling_units(datetime.utcnow(), datetime.utcnow())
lower_bound_start_time = max([s.scheduled_stop_time for s in scheduled_units if s.scheduled_stop_time is not None]) + DEFAULT_INTER_OBSERVATION_GAP
except:
lower_bound_start_time = datetime.utcnow()
# round up to next nearest second
lower_bound_start_time += timedelta(microseconds=1000000-lower_bound_start_time.microsecond)
# determine mid-term schedule by assigning start/stop times to remaining schedulable units using the same search strategy
assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time)
logger.info("Finished updating dynamic schedule")
return scheduled_unit
################## service/messagebug handler class ###############################################
class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler):
'''
The TMSSDynamicSchedulingMessageHandler reacts to TMSS EventMessages by triggering a new full update of the dynamic
schedule.
The actual schedule-update method runs on a backround thread, and can take some time to complete ranging from a
few seconds to several minutes. In the mean time new EventMessages may be received. These are handled by raising a flag
that signals the schedule-update-thread that a new full update is needed. This way, a burst of Events results in
a single update, and it also ensures that we always compute the schedule with the latest data.
'''
def __init__(self):
super().__init__(log_event_messages=True)
self._scheduling_thread = None
self._scheduling_thread_running = False
self._do_schedule_event = Event()
def start_handling(self):
# start the background thread which waits until the _do_schedule_event event is set upon receiving to the correct TMSS EVentMessages.
self._scheduling_thread = Thread(target=TMSSDynamicSchedulingMessageHandler._scheduling_loop, kwargs={'self':self})
self._scheduling_thread.daemon = True
self._scheduling_thread_running = True
self._scheduling_thread.start()
super().start_handling()
def stop_handling(self):
self._scheduling_thread_running = False
self._scheduling_thread.join()
self._scheduling_thread = None
super().stop_handling()
def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str):
if status in [models.SchedulingUnitStatus.Choices.SCHEDULABLE.value,
models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value,
models.SchedulingUnitStatus.Choices.OBSERVED,
models.SchedulingUnitStatus.Choices.FINISHED.value,
models.SchedulingUnitStatus.Choices.CANCELLED.value]:
logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s): triggering update of dynamic & fixed_time schedule...", id, status)
# scheduling takes a long time, longer then creating many scheduling units in bulk
# so, we do not create a complete new schedule for each new unit,
# but we only trigger a new schedule update.
# This way we are sure that the latest units are always taken into account while scheduling, but we do not waste cpu cylces.
self._do_schedule_event.set()
def onSchedulingUnitBlueprintConstraintsUpdated(self, id: int, scheduling_constraints_doc: dict):
scheduling_unit_blueprint = models.SchedulingUnitBlueprint.objects.get(id=id)
if scheduling_unit_blueprint.status.value == models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value:
logger.info("constraints for unschedulable scheduling unit id=%s changed: setting status to schedulable which will triggering a dynamic scheduling update...", id)
mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable(scheduling_unit_blueprint)
if scheduling_unit_blueprint.scheduling_constraints_doc.get('scheduler') == 'fixed_time' and \
scheduling_unit_blueprint.status.value == models.SchedulingUnitStatus.Choices.SCHEDULED.value:
logger.info("constraints for fixed_time scheduled scheduling unit id=%s changed: unscheduling it...", id)
unschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint)
logger.info("constraints for scheduling unit id=%s changed: triggering update of dynamic schedule...", id)
self._do_schedule_event.set()
def onSubTaskStatusChanged(self, id: int, status: str):
if status == models.SubtaskState.Choices.DEFINED.value:
# this subtask is either new, or is now defined after being scheduled/unschedulable
# check if there are any overlapping unschedulable subtasks, and mark these as defined.
# This will result in a status update event, on which the fixed_time scheduling will be triggered.
# Summary: this subtask may have moved out of the way, as a result consider unschedulable overlapping units.
subtask = models.Subtask.objects.get(id=id)
affected_subtasks = models.Subtask.independent_subtasks()\
.filter(state__value=models.SubtaskState.Choices.UNSCHEDULABLE.value)\
.filter(scheduled_start_time__lte=subtask.scheduled_stop_time) \
.filter(scheduled_stop_time__gte=subtask.scheduled_start_time)
for affected_subtask in affected_subtasks.all():
mark_subtasks_and_successors_as_defined(affected_subtask)
def onSettingUpdated(self, name: str, value: bool):
if name == models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value and value:
logger.info("%s was set to %s: triggering update of dynamic schedule...", name, value)
self._do_schedule_event.set()
def onSchedulingConstraintsWeightFactorUpdated(self, id: int):
weight_factor = models.SchedulingConstraintsWeightFactor.objects.get(id=id)
logger.info("weight_factor %s for template %s version %s changed to %s: triggering update of dynamic schedule...",
weight_factor.constraint_name, weight_factor.scheduling_constraints_template.name, weight_factor.scheduling_constraints_template.version, weight_factor.weight)
self._do_schedule_event.set()
def onProjectStatusUpdated(self, name: str, status: str):
mark_fixed_time_scheduling_units_for_active_projects_schedulable()
mark_fixed_time_scheduling_units_for_inactive_projects_unschedulable()
self._do_schedule_event.set()
def onReservationCreated(self, id: int):
self._onReservationCreatedOrUpdated(id)
def onReservationUpdated(self, id: int):
self._onReservationCreatedOrUpdated(id)
def _onReservationCreatedOrUpdated(self, id: int):
reveration = models.Reservation.objects.get(id=id)
# TODO: consider reservation time window. unschedule scheduled units
# improperlu use mark_fixed_time_scheduling_units_for_active_projects_schedulable for now
mark_fixed_time_scheduling_units_for_active_projects_schedulable()
self._do_schedule_event.set()
def onReservationDeleted(self, id: int):
# maybe some unschedulable/blocked units can use the spot that was used by the reservation
# mark them all schedulable, and do a scheduling round to see which ones can be scheduled
mark_fixed_time_scheduling_units_for_active_projects_schedulable()
self._do_schedule_event.set()
def _scheduling_loop(self):
while self._scheduling_thread_running:
if self._do_schedule_event.wait(timeout=10):
self._do_schedule_event.clear()
try:
if models.Setting.objects.get(name=models.SystemSettingFlag.Choices.FIXED_TIME_SCHEDULING_ENABLED.value).value:
schedule_fixed_time_scheduling_units()
else:
logger.warning("Skipping update of fixed_time schedule because the setting %s=%s", models.SystemSettingFlag.Choices.FIXED_TIME_SCHEDULING_ENABLED.value, models.Setting.objects.get(name=models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value)
if models.Setting.objects.get(name=models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value:
do_dynamic_schedule()
else:
logger.warning("Skipping update of dynamic schedule because the setting %s=%s", models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value, models.Setting.objects.get(name=models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value)
except Exception as e:
logger.exception(str(e))
# just continue processing events. better luck next time...
def create_dynamic_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER):
return TMSSBusListener(handler_type=TMSSDynamicSchedulingMessageHandler,
handler_kwargs=None,
exchange=exchange,
broker=broker)
################## helper methods #################################################################
def get_dynamically_schedulable_scheduling_units(priority_queue: models.PriorityQueueType=None) -> [models.SchedulingUnitBlueprint]:
'''get a list of all dynamically schedulable scheduling_units for the given priority_queue (or all priority_queues if None)'''
defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value='defined')
scheduling_unit_ids = [x['task_blueprint__scheduling_unit_blueprint_id'] for x in defined_independend_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct().all()]
scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=scheduling_unit_ids)
scheduling_units = scheduling_units.filter(scheduling_constraints_doc__scheduler='dynamic')
if priority_queue is not None:
scheduling_units = scheduling_units.filter(priority_queue=priority_queue)
return [su for su in scheduling_units.all() if su.status.value == models.SchedulingUnitStatus.Choices.SCHEDULABLE.value]
def get_fixed_time_schedulable_scheduling_units() -> QuerySet:
'''get a result QuerySet of all fixed_time schedulable scheduling_units'''
defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value=models.SubtaskState.Choices.DEFINED.value)
scheduling_unit_ids = [x['task_blueprint__scheduling_unit_blueprint_id'] for x in defined_independend_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct().all()]
scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=scheduling_unit_ids)
scheduling_units = scheduling_units.filter(scheduling_constraints_doc__scheduler='fixed_time')
scheduling_units = scheduling_units.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULABLE.value)
scheduling_units = scheduling_units.order_by('-updated_at')
return scheduling_units
def get_scheduled_scheduling_units(lower:datetime=None, upper:datetime=None) -> [models.SchedulingUnitBlueprint]:
'''get a list of all scheduling_units for which at least one 'independent' (with no predecessor like an observation) subtask is scheduled'''
scheduled_subtasks = models.Subtask.independent_subtasks().filter(state__value='scheduled')
if lower is not None:
scheduled_subtasks = scheduled_subtasks.filter(scheduled_stop_time__gte=lower)
if upper is not None:
scheduled_subtasks = scheduled_subtasks.filter(scheduled_start_time__lte=upper)
return list(models.SchedulingUnitBlueprint.objects.filter(id__in=scheduled_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct()).all())
def get_running_observation_subtasks(stopping_after:datetime=None) -> [models.Subtask]:
'''get a list of all starting/started subtasks, optionally filter for those finishing after the provided time'''
running_obs_subtasks = models.Subtask.objects.filter(state__value__in=[models.SubtaskState.Choices.STARTING.value, models.SubtaskState.Choices.STARTED.value],
specifications_template__type__value=models.SubtaskType.Choices.OBSERVATION.value)
if stopping_after is not None:
running_obs_subtasks = running_obs_subtasks.filter(scheduled_stop_time__gte=stopping_after)
return list(running_obs_subtasks.all())
def unschededule_blocking_scheduled_units_if_needed_and_possible(candidate: ScoredSchedulingUnit) -> bool:
'''check if there are any already scheduled units in the way, and unschedule them if allowed. Return True if nothing is blocking anymore.'''
# check any previously scheduled units, and unschedule if needed/allowed
scheduled_scheduling_units = get_scheduled_scheduling_units(lower=candidate.start_time,
upper=candidate.start_time + candidate.scheduling_unit.duration)
if not scheduled_scheduling_units:
logger.debug('no scheduled scheduling_units are blocking candidate scheduling_unit id=%s name=%s, nothing needs to be unscheduled', candidate.scheduling_unit.id, candidate.scheduling_unit.name)
return True
scheduled_scored_scheduling_units = compute_scores(scheduled_scheduling_units, candidate.start_time,
candidate.start_time + candidate.scheduling_unit.duration)
logger.info("checking scheduled scheduling_unit(s) which are blocking candidate id=%s '%s' start_time='%s': %s",
candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.start_time,
"; ".join("id=%s '%s' start_time='%s'" % (su.id, su.name, su.scheduled_start_time) for su in scheduled_scheduling_units))
# check if we can and need to unschedule the blocking units
for scheduled_scored_scheduling_unit in scheduled_scored_scheduling_units:
scheduled_scheduling_unit = scheduled_scored_scheduling_unit.scheduling_unit
# in case of a triggered candidate with higher trigger priority than the scheduled unit, we don't care about
# scores, but only check trigger priority.
if candidate.scheduling_unit.interrupts_telescope:
if (not scheduled_scheduling_unit.interrupts_telescope) or candidate.scheduling_unit.project.trigger_priority > scheduled_scheduling_unit.project.trigger_priority:
logger.info("unscheduling id=%s '%s' because it is in the way and has a lower trigger_priority=%s than the best candidate id=%s '%s' trigger_priority=%s start_time=%s",
scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.project.trigger_priority,
candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.scheduling_unit.project.trigger_priority, candidate.scheduling_unit.scheduled_start_time)
unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit)
elif candidate.weighted_score > scheduled_scored_scheduling_unit.weighted_score:
# ToDo: also check if the scheduled_scheduling_unit is fixed_time/dynamically scheduled
logger.info("unscheduling id=%s '%s' because it is in the way and has a lower score than the best candidate id=%s '%s' score=%s start_time=%s",
scheduled_scheduling_unit.id, scheduled_scheduling_unit.name,
candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.weighted_score, candidate.scheduling_unit.scheduled_start_time)
unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit)
elif candidate.scheduling_unit.id==scheduled_scheduling_unit.id and candidate.scheduled_start_time!=scheduled_scheduling_unit.scheduled_start_time:
logger.info("unscheduling id=%s '%s' start_time='%s' because it has a new proposed start_time id=%s '%s' start_time=%s",
scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.scheduled_start_time,
candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.weighted_score, candidate.scheduling_unit.scheduled_start_time)
unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit)
# check again... are still there any scheduled_scheduling_units in the way?
scheduled_scheduling_units = get_scheduled_scheduling_units(lower=candidate.start_time,
upper=candidate.start_time + candidate.scheduling_unit.duration)
if scheduled_scheduling_units:
# accept current solution with current scheduled_scheduling_units
logger.info("keeping current scheduled unit(s) which have a better (or equal) score: %s", "; ".join(
"id=%s '%s' start_time='%s'" % (su.id, su.name, su.scheduled_start_time) for su in scheduled_scheduling_units))
# indicate there are still blocking units
return False
# all clear, nothing is blocking anymore
return True
def cancel_running_observation_if_needed_and_possible(candidate: ScoredSchedulingUnit) -> bool:
'''check if there are starting/started observation subtasks that block the candidate.
Only triggered scheduling_units can cancel running observations and only if they belong to a project with a higher
trigger_priority than the projects of the subtasks to cancel'''
# todo: is it sufficient to cancel the subtasks, or do we cancel the whole scheduling unit?
if candidate.scheduling_unit.interrupts_telescope:
running_obs_subtasks = get_running_observation_subtasks(candidate.start_time)
for obs in running_obs_subtasks:
if obs.project is None:
logger.warning('cannot cancel running subtask pk=%s for triggered scheduling_unit pk=%s because it does belong to a project and hence has unknown priority' %
(obs.pk, candidate.scheduling_unit.name))
continue
if candidate.scheduling_unit.project.trigger_priority > obs.project.trigger_priority:
logger.info('cancelling observation subtask pk=%s trigger_priority=%s because it blocks the triggered scheduling_unit pk=%s trigger_priority=%s' %
(obs.pk, obs.project.trigger_priority, candidate.scheduling_unit.pk, candidate.scheduling_unit.project.trigger_priority))
# todo: check if cancellation is really necessary or the trigger can be scheduled afterwards
# I guess we could just do can_run_after(candidate, obs.scheduled_stop_time) here for that?
# We could also only do this, of there is a 'before' constraint on each trigger.
# -> Clarify and implemented with TMSS-704.
cancel_subtask(obs)
else:
logger.info('NOT cancelling subtask pk=%s trigger_priority=%s for triggered scheduling_unit pk=%s trigger_priority=%s because its priority is too low' %
(obs.pk, obs.project.trigger_priority, candidate.scheduling_unit.pk, candidate.scheduling_unit.project.trigger_priority))