Select Git revision
comms_client.py
-
Jan David Mol authored
L2SS-333: CommClient.connect now does not return, but raises an exception upon failure. This replaces using 'return True' and raising an exception.
Jan David Mol authoredL2SS-333: CommClient.connect now does not return, but raises an exception upon failure. This replaces using 'return True' and raising an exception.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
dynamic_scheduling.py 36.92 KiB
#!/usr/bin/env python3
# dynamic_scheduling.py
#
# Copyright (C) 2020
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: $
"""
"""
import os
import logging
logger = logging.getLogger(__name__)
from datetime import datetime, timedelta, time
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp.tasks import schedule_independent_subtasks_in_scheduling_unit_blueprint, unschedule_subtasks_in_scheduling_unit_blueprint, mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable, mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable, set_scheduling_unit_blueprint_start_times
from lofar.sas.tmss.tmss.tmssapp.subtasks import update_subtasks_start_times_for_scheduling_unit, clear_defined_subtasks_start_stop_times_for_scheduling_unit, cancel_subtask, mark_subtasks_and_successors_as_defined
from lofar.sas.tmss.client.tmssbuslistener import *
from lofar.common.datetimeutils import round_to_second_precision
from threading import Thread, Event
from django.db import transaction
from django.db.models import QuerySet
from lofar.sas.tmss.services.scheduling.constraints import *
# LOFAR needs to have a gap in between observations to (re)initialize hardware.
DEFAULT_INTER_OBSERVATION_GAP = timedelta(seconds=60)
DEFAULT_NEXT_STARTTIME_GAP = timedelta(seconds=180)
################## core dynamic scheduling methods ################################################
# #
# This module starts with the core dynamic scheduling methods which are used in the dynamic #
# scheduling service. These high level methods only filter/score/sort in a generic way. #
# The detailed concrete filter/score/sort methods are pick by a strategy pattern in the #
# constraints package based on each scheduling unit's scheduling_constrains template. #
# #
###################################################################################################
def find_best_next_schedulable_unit(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound_start_time: datetime, upper_bound_stop_time: datetime) -> ScoredSchedulingUnit:
"""
find the best schedulable scheduling_unit which can run withing the given time window from the given scheduling_units.
:param lower_bound_start_time: evaluate the constrains at and after lower_bound_start_time. The returned unit has a start_time guaranteed at or after lower_bound_start_time.
:param upper_bound_stop_time: evaluate the constrains before upper_bound_stop_time. The returned unit has a stop_time guaranteed before upper_bound_stop_time.
:param scheduling_units: evaluate these scheduling_units.
Returns a ScoredSchedulingUnit struct with the best next schedulable scheduling unit and its proposed start_time where it best fits its contraints.
"""
# ensure upper is greater than or equal to lower
upper_bound_stop_time = max(lower_bound_start_time, upper_bound_stop_time)
filtered_scheduling_units = filter_scheduling_units_using_constraints(scheduling_units, lower_bound_start_time, upper_bound_stop_time)
if filtered_scheduling_units:
triggered_scheduling_units = [scheduling_unit for scheduling_unit in filtered_scheduling_units if scheduling_unit.interrupts_telescope]
if triggered_scheduling_units:
highest_priority_triggered_scheduling_unit = max(triggered_scheduling_units, key=lambda su: su.project.trigger_priority)
best_scored_scheduling_unit = ScoredSchedulingUnit(scheduling_unit=highest_priority_triggered_scheduling_unit,
start_time=get_earliest_possible_start_time(highest_priority_triggered_scheduling_unit, lower_bound_start_time),
scores={}, weighted_score=None) # we don't care about scores in case of a trigger
else:
best_scored_scheduling_unit = get_best_scored_scheduling_unit_scored_by_constraints(filtered_scheduling_units, lower_bound_start_time, upper_bound_stop_time)
return best_scored_scheduling_unit
# no filtered scheduling units found...
logger.debug("No schedulable scheduling units found which meet the requirements between '%s' and '%s'", lower_bound_start_time, upper_bound_stop_time)
return None
def mark_fixed_time_scheduling_units_for_inactive_projects_unschedulable():
# get the fixed_time schedulable scheduling_units in most-recently-updated order.
schedulable_units = get_fixed_time_schedulable_scheduling_units()
unschedulable_units = schedulable_units.exclude(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value)
if unschedulable_units.exists():
logger.info("making %s scheduling units with fixed_time at constraint unschedulable for inactive projects", unschedulable_units.count())
for unschedulable_unit in unschedulable_units:
mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(unschedulable_unit)
def mark_fixed_time_scheduling_units_for_active_projects_schedulable():
# mark unschedulable scheduling_units for active projects as schedulable again
scheduling_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value)
scheduling_units = scheduling_units.filter(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value)
if scheduling_units.exists():
logger.info("trying to make %s scheduling units with fixed_time at constraint schedulable for active projects", scheduling_units.count())
for scheduling_unit in scheduling_units.all():
try:
mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable(scheduling_unit)
except Exception as e:
logger.exception(e)
def schedule_fixed_time_scheduling_units():
'''
'''
# get the fixed_timely schedulable scheduling_units in most-recently-updated order.
schedulable_units = get_fixed_time_schedulable_scheduling_units()
# only consider active projects
schedulable_units = schedulable_units.filter(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value)
logger.info("trying to schedule %s scheduling units with fixed_time at constraint for active projects", schedulable_units.count())
for schedulable_unit in schedulable_units:
start_time = get_earliest_possible_start_time(schedulable_unit)
stop_time = start_time + schedulable_unit.specified_main_observation_duration
set_scheduling_unit_blueprint_start_times(schedulable_unit, first_start_time=start_time)
logger.info("Checking if fixed_time scheduled scheduling unit id=%d can be scheduled at '%s'", schedulable_unit.id, start_time)
if filter_scheduling_units_using_constraints([schedulable_unit], lower_bound=start_time, upper_bound=stop_time):
try:
logger.info("Scheduling fixed_time scheduled scheduling unit id=%d at '%s'", schedulable_unit.id, start_time)
schedule_independent_subtasks_in_scheduling_unit_blueprint(schedulable_unit, start_time)
except SubtaskSchedulingException as e:
logger.error(e)
mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(schedulable_unit)
else:
logger.warning("fixed_time-scheduled scheduling unit id=%d cannot be scheduled at '%s'", schedulable_unit.id, start_time)
mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(schedulable_unit)
def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint:
'''find the best next schedulable scheduling unit and try to schedule it.
Overlapping existing scheduled units are unscheduled if their score is lower.
:return: the scheduled scheduling unit.'''
# loop over the different priority queue's. 'A'-type projects/scheduling units always go first, then 'B'-type etc.
# if a scheduling unit can be scheduled for this priority_queue, then we exit early.
for priority_queue in models.PriorityQueueType.objects.order_by('value').all():
schedulable_units = get_dynamically_schedulable_scheduling_units(priority_queue=priority_queue)
if len(schedulable_units) == 0:
logger.info("No scheduling units found for priority_queue '%s'...", priority_queue.value)
continue # continue with the remaining priority_queue types
# --- core routine ---
# search in a forward sliding window for the best scheduling_unit that can be scheduled
# once found and scheduled, exit. Otherwise slide window forward and try again.
lower_bound_start_time = datetime.utcnow() + DEFAULT_NEXT_STARTTIME_GAP
upper_bound_stop_time = lower_bound_start_time + timedelta(days=1)
while lower_bound_start_time < upper_bound_stop_time:
try:
# no need to irritate user in log files with sub-second scheduling precision
lower_bound_start_time = round_to_second_precision(lower_bound_start_time)
upper_bound_stop_time = max(round_to_second_precision(upper_bound_stop_time), lower_bound_start_time)
# try to find the best next scheduling_unit
logger.info("schedule_next_scheduling_unit: searching for best scheduling unit to schedule between '%s' and '%s'", lower_bound_start_time, upper_bound_stop_time)
best_scored_scheduling_unit = find_best_next_schedulable_unit(schedulable_units, lower_bound_start_time, upper_bound_stop_time)
if best_scored_scheduling_unit:
best_scheduling_unit = best_scored_scheduling_unit.scheduling_unit
best_scheduling_unit_score = best_scored_scheduling_unit.weighted_score
best_start_time = best_scored_scheduling_unit.start_time
# make sure we don't start earlier than allowed
best_start_time = max(best_start_time, lower_bound_start_time)
# make start_time "look nice" for us humans
best_start_time = round_to_second_precision(best_start_time)
logger.info("schedule_next_scheduling_unit: found best candidate id=%s '%s' weighted_score=%s start_time=%s interrupts_telescope=%s",
best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time, best_scheduling_unit.interrupts_telescope)
# cancel and/or unschedule current units-in-the-way...
cancel_running_observation_if_needed_and_possible(best_scored_scheduling_unit)
if unschededule_blocking_scheduled_units_if_needed_and_possible(best_scored_scheduling_unit):
# try schedule our candidate (there might still be unit(s) in the way, which will cause a SubtaskSchedulingException)
scheduled_scheduling_unit = schedule_independent_subtasks_in_scheduling_unit_blueprint(best_scheduling_unit, start_time=best_start_time)
logger.info("schedule_next_scheduling_unit: scheduled best candidate id=%s '%s' score=%s start_time=%s interrupts_telescope=%s",
best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time, best_scheduling_unit.interrupts_telescope)
# return the scheduled scheduling_unit, early exit out of looping over priority queues.
return scheduled_scheduling_unit
except SubtaskSchedulingException as e:
logger.error("Could not schedule scheduling_unit id=%s name='%s'. Error: %s", best_scheduling_unit.id, best_scheduling_unit.name, e)
if best_scheduling_unit.interrupts_telescope:
# this schedule_next_scheduling_unit call was 'triggered' by a interrupts_telescope-scheduling unit.
# aparently this triggered schedunit could not be scheduled due to a scheduled/running unit being in the way.
# no need to seek furter,
# mark the remaining_defined_triggered_subtasks in the interrupts_telescope-scheduling unit as error and return None....
remaining_defined_triggered_subtasks = list(models.Subtask.independent_subtasks().filter(task_blueprints__scheduling_unit_blueprint_id=best_scheduling_unit.id,
state__value=models.SubtaskState.Choices.DEFINED.value).all())
for subtask in remaining_defined_triggered_subtasks:
subtask.state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.ERROR.value)
return None
# nothing was found, or an error occurred.
# search again... (loop) with the remaining schedulable_units and new lower_bound_start_time
# it may be that in the mean time some scheduling_units are not (dynamically) schedulable anymore, fetch list again.
schedulable_units = get_dynamically_schedulable_scheduling_units(priority_queue=priority_queue)
lower_bound_start_time = get_min_earliest_possible_start_time(schedulable_units, lower_bound=lower_bound_start_time+timedelta(hours=1))
# TODO: update upper_bound_stop_time as well, stop when upper_bound_stop_time > cycle end.
def assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time: datetime):
''''''
logger.info("Estimating mid-term schedule with lower_bound_start_time=%s ..." % lower_bound_start_time)
for priority_queue in models.PriorityQueueType.objects.order_by('value').all():
scheduling_units = get_dynamically_schedulable_scheduling_units(priority_queue=priority_queue)
if len(scheduling_units) == 0:
logger.info("No scheduling units found...")
return
upper_bound_stop_time = lower_bound_start_time + timedelta(days=365)
# update the start_times of the remaining ones (so they form queue, and can be visualized in a timeline)
while scheduling_units and lower_bound_start_time < upper_bound_stop_time:
best_scored_scheduling_unit = find_best_next_schedulable_unit(scheduling_units, lower_bound_start_time, upper_bound_stop_time)
if best_scored_scheduling_unit:
scheduling_unit = best_scored_scheduling_unit.scheduling_unit
start_time = round_to_second_precision(best_scored_scheduling_unit.start_time)
logger.info("mid-term schedule: next scheduling unit id=%s '%s' start_time=%s", scheduling_unit.id, scheduling_unit.name, start_time)
update_subtasks_start_times_for_scheduling_unit(scheduling_unit, start_time)
# TODO check this?
# If the start_time of the subtasks are updated, should the start_time (and stop_time) of the
# scheduling_unit also be updated? Currently its a cached property
# keep track of the lower_bound_start_time based on last sub.stoptime and gap
lower_bound_start_time = scheduling_unit.scheduled_stop_time + DEFAULT_INTER_OBSERVATION_GAP
scheduling_units.remove(scheduling_unit)
else:
# search again in a later timeslot
min_earliest_possible_start_time = get_min_earliest_possible_start_time(scheduling_units, lower_bound_start_time+timedelta(minutes=10))
logger.info("lower_bound_start_time='%s', min_earliest_possible_start_time='%s'", lower_bound_start_time, min_earliest_possible_start_time)
if min_earliest_possible_start_time > lower_bound_start_time:
lower_bound_start_time = min_earliest_possible_start_time
else:
# cannot advance anymore to find more
logger.warning("Cannot assign start/stop times to remaining scheduling units for mid-term schedule...")
for su in scheduling_units:
logger.warning("Remaining scheduling unit: id=%s '%s'", su.id, su.name)
# clear start/stop times, so they don't show up in the timeline,
# and we can filter/show them in a seperate list which the user can tinker on the constraints
clear_defined_subtasks_start_stop_times_for_scheduling_unit(su)
break
logger.info("Estimating mid-term schedule... finished")
def do_dynamic_schedule() -> models.SchedulingUnitBlueprint:
'''do a full update of the schedule: schedule next scheduling unit and assign start stop times to remaining schedulable scheduling units'''
logger.info("Updating schedule....")
scheduled_unit = schedule_next_scheduling_unit()
# determine next possible start time for remaining scheduling_units
if scheduled_unit:
lower_bound_start_time = scheduled_unit.scheduled_stop_time + DEFAULT_INTER_OBSERVATION_GAP
else:
try:
scheduled_units = get_scheduled_scheduling_units(datetime.utcnow(), datetime.utcnow())
lower_bound_start_time = max([s.scheduled_stop_time for s in scheduled_units if s.scheduled_stop_time is not None]) + DEFAULT_INTER_OBSERVATION_GAP
except:
lower_bound_start_time = datetime.utcnow()
# round up to next nearest second
lower_bound_start_time += timedelta(microseconds=1000000-lower_bound_start_time.microsecond)
# determine mid-term schedule by assigning start/stop times to remaining schedulable units using the same search strategy
assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time)
logger.info("Finished updating dynamic schedule")
return scheduled_unit
################## service/messagebug handler class ###############################################
class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler):
'''
The TMSSDynamicSchedulingMessageHandler reacts to TMSS EventMessages by triggering a new full update of the dynamic
schedule.
The actual schedule-update method runs on a backround thread, and can take some time to complete ranging from a
few seconds to several minutes. In the mean time new EventMessages may be received. These are handled by raising a flag
that signals the schedule-update-thread that a new full update is needed. This way, a burst of Events results in
a single update, and it also ensures that we always compute the schedule with the latest data.
'''
def __init__(self):
super().__init__(log_event_messages=True)
self._scheduling_thread = None
self._scheduling_thread_running = False
self._do_schedule_event = Event()
def start_handling(self):
# start the background thread which waits until the _do_schedule_event event is set upon receiving to the correct TMSS EVentMessages.
self._scheduling_thread = Thread(target=TMSSDynamicSchedulingMessageHandler._scheduling_loop, kwargs={'self':self})
self._scheduling_thread.daemon = True
self._scheduling_thread_running = True
self._scheduling_thread.start()
super().start_handling()
def stop_handling(self):
self._scheduling_thread_running = False
self._scheduling_thread.join()
self._scheduling_thread = None
super().stop_handling()
def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str):
if status in [models.SchedulingUnitStatus.Choices.SCHEDULABLE.value,
models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value,
models.SchedulingUnitStatus.Choices.OBSERVED,
models.SchedulingUnitStatus.Choices.FINISHED.value,
models.SchedulingUnitStatus.Choices.CANCELLED.value]:
logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s): triggering update of dynamic & fixed_time schedule...", id, status)
# scheduling takes a long time, longer then creating many scheduling units in bulk
# so, we do not create a complete new schedule for each new unit,
# but we only trigger a new schedule update.
# This way we are sure that the latest units are always taken into account while scheduling, but we do not waste cpu cylces.
self._do_schedule_event.set()
def onSchedulingUnitBlueprintConstraintsUpdated(self, id: int, scheduling_constraints_doc: dict):
scheduling_unit_blueprint = models.SchedulingUnitBlueprint.objects.get(id=id)
if scheduling_unit_blueprint.status.value == models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value:
logger.info("constraints for unschedulable scheduling unit id=%s changed: setting status to schedulable which will triggering a dynamic scheduling update...", id)
mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable(scheduling_unit_blueprint)
if scheduling_unit_blueprint.scheduling_constraints_doc.get('scheduler') == 'fixed_time' and \
scheduling_unit_blueprint.status.value == models.SchedulingUnitStatus.Choices.SCHEDULED.value:
logger.info("constraints for fixed_time scheduled scheduling unit id=%s changed: unscheduling it...", id)
unschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint)
logger.info("constraints for scheduling unit id=%s changed: triggering update of dynamic schedule...", id)
self._do_schedule_event.set()
def onSubTaskStatusChanged(self, id: int, status: str):
if status == models.SubtaskState.Choices.DEFINED.value:
# this subtask is either new, or is now defined after being scheduled/unschedulable
# check if there are any overlapping unschedulable subtasks, and mark these as defined.
# This will result in a status update event, on which the fixed_time scheduling will be triggered.
# Summary: this subtask may have moved out of the way, as a result consider unschedulable overlapping units.
subtask = models.Subtask.objects.get(id=id)
affected_subtasks = models.Subtask.independent_subtasks()\
.filter(state__value=models.SubtaskState.Choices.UNSCHEDULABLE.value)\
.filter(scheduled_start_time__lte=subtask.scheduled_stop_time) \
.filter(scheduled_stop_time__gte=subtask.scheduled_start_time)
for affected_subtask in affected_subtasks.all():
mark_subtasks_and_successors_as_defined(affected_subtask)
def onSettingUpdated(self, name: str, value: bool):
if name == models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value and value:
logger.info("%s was set to %s: triggering update of dynamic schedule...", name, value)
self._do_schedule_event.set()
def onSchedulingConstraintsWeightFactorUpdated(self, id: int):
weight_factor = models.SchedulingConstraintsWeightFactor.objects.get(id=id)
logger.info("weight_factor %s for template %s version %s changed to %s: triggering update of dynamic schedule...",
weight_factor.constraint_name, weight_factor.scheduling_constraints_template.name, weight_factor.scheduling_constraints_template.version, weight_factor.weight)
self._do_schedule_event.set()
def onProjectStatusUpdated(self, name: str, status: str):
mark_fixed_time_scheduling_units_for_active_projects_schedulable()
mark_fixed_time_scheduling_units_for_inactive_projects_unschedulable()
self._do_schedule_event.set()
def onReservationCreated(self, id: int):
self._onReservationCreatedOrUpdated(id)
def onReservationUpdated(self, id: int):
self._onReservationCreatedOrUpdated(id)
def _onReservationCreatedOrUpdated(self, id: int):
reveration = models.Reservation.objects.get(id=id)
# TODO: consider reservation time window. unschedule scheduled units
# improperlu use mark_fixed_time_scheduling_units_for_active_projects_schedulable for now
mark_fixed_time_scheduling_units_for_active_projects_schedulable()
self._do_schedule_event.set()
def onReservationDeleted(self, id: int):
# maybe some unschedulable/blocked units can use the spot that was used by the reservation
# mark them all schedulable, and do a scheduling round to see which ones can be scheduled
mark_fixed_time_scheduling_units_for_active_projects_schedulable()
self._do_schedule_event.set()
def _scheduling_loop(self):
while self._scheduling_thread_running:
if self._do_schedule_event.wait(timeout=10):
self._do_schedule_event.clear()
try:
if models.Setting.objects.get(name=models.SystemSettingFlag.Choices.FIXED_TIME_SCHEDULING_ENABLED.value).value:
schedule_fixed_time_scheduling_units()
else:
logger.warning("Skipping update of fixed_time schedule because the setting %s=%s", models.SystemSettingFlag.Choices.FIXED_TIME_SCHEDULING_ENABLED.value, models.Setting.objects.get(name=models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value)
if models.Setting.objects.get(name=models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value:
do_dynamic_schedule()
else:
logger.warning("Skipping update of dynamic schedule because the setting %s=%s", models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value, models.Setting.objects.get(name=models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value)
except Exception as e:
logger.exception(str(e))
# just continue processing events. better luck next time...
def create_dynamic_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER):
return TMSSBusListener(handler_type=TMSSDynamicSchedulingMessageHandler,
handler_kwargs=None,
exchange=exchange,
broker=broker)
################## helper methods #################################################################
def get_dynamically_schedulable_scheduling_units(priority_queue: models.PriorityQueueType=None) -> [models.SchedulingUnitBlueprint]:
'''get a list of all dynamically schedulable scheduling_units for the given priority_queue (or all priority_queues if None)'''
defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value='defined')
scheduling_unit_ids = [x['task_blueprint__scheduling_unit_blueprint_id'] for x in defined_independend_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct().all()]
scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=scheduling_unit_ids)
scheduling_units = scheduling_units.filter(scheduling_constraints_doc__scheduler='dynamic')
if priority_queue is not None:
scheduling_units = scheduling_units.filter(priority_queue=priority_queue)
return [su for su in scheduling_units.all() if su.status.value == models.SchedulingUnitStatus.Choices.SCHEDULABLE.value]
def get_fixed_time_schedulable_scheduling_units() -> QuerySet:
'''get a result QuerySet of all fixed_time schedulable scheduling_units'''
defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value=models.SubtaskState.Choices.DEFINED.value)
scheduling_unit_ids = [x['task_blueprint__scheduling_unit_blueprint_id'] for x in defined_independend_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct().all()]
scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=scheduling_unit_ids)
scheduling_units = scheduling_units.filter(scheduling_constraints_doc__scheduler='fixed_time')
scheduling_units = scheduling_units.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULABLE.value)
scheduling_units = scheduling_units.order_by('-updated_at')
return scheduling_units
def get_scheduled_scheduling_units(lower:datetime=None, upper:datetime=None) -> [models.SchedulingUnitBlueprint]:
'''get a list of all scheduling_units for which at least one 'independent' (with no predecessor like an observation) subtask is scheduled'''
scheduled_subtasks = models.Subtask.independent_subtasks().filter(state__value='scheduled')
if lower is not None:
scheduled_subtasks = scheduled_subtasks.filter(scheduled_stop_time__gte=lower)
if upper is not None:
scheduled_subtasks = scheduled_subtasks.filter(scheduled_start_time__lte=upper)
return list(models.SchedulingUnitBlueprint.objects.filter(id__in=scheduled_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct()).all())
def get_running_observation_subtasks(stopping_after:datetime=None) -> [models.Subtask]:
'''get a list of all starting/started subtasks, optionally filter for those finishing after the provided time'''
running_obs_subtasks = models.Subtask.objects.filter(state__value__in=[models.SubtaskState.Choices.STARTING.value, models.SubtaskState.Choices.STARTED.value],
specifications_template__type__value=models.SubtaskType.Choices.OBSERVATION.value)
if stopping_after is not None:
running_obs_subtasks = running_obs_subtasks.filter(scheduled_stop_time__gte=stopping_after)
return list(running_obs_subtasks.all())
def unschededule_blocking_scheduled_units_if_needed_and_possible(candidate: ScoredSchedulingUnit) -> bool:
'''check if there are any already scheduled units in the way, and unschedule them if allowed. Return True if nothing is blocking anymore.'''
# check any previously scheduled units, and unschedule if needed/allowed
scheduled_scheduling_units = get_scheduled_scheduling_units(lower=candidate.start_time,
upper=candidate.start_time + candidate.scheduling_unit.duration)
if not scheduled_scheduling_units:
logger.debug('no scheduled scheduling_units are blocking candidate scheduling_unit id=%s name=%s, nothing needs to be unscheduled', candidate.scheduling_unit.id, candidate.scheduling_unit.name)
return True
scheduled_scored_scheduling_units = compute_scores(scheduled_scheduling_units, candidate.start_time,
candidate.start_time + candidate.scheduling_unit.duration)
logger.info("checking scheduled scheduling_unit(s) which are blocking candidate id=%s '%s' start_time='%s': %s",
candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.start_time,
"; ".join("id=%s '%s' start_time='%s'" % (su.id, su.name, su.scheduled_start_time) for su in scheduled_scheduling_units))
# check if we can and need to unschedule the blocking units
for scheduled_scored_scheduling_unit in scheduled_scored_scheduling_units:
scheduled_scheduling_unit = scheduled_scored_scheduling_unit.scheduling_unit
# in case of a triggered candidate with higher trigger priority than the scheduled unit, we don't care about
# scores, but only check trigger priority.
if candidate.scheduling_unit.interrupts_telescope:
if (not scheduled_scheduling_unit.interrupts_telescope) or candidate.scheduling_unit.project.trigger_priority > scheduled_scheduling_unit.project.trigger_priority:
logger.info("unscheduling id=%s '%s' because it is in the way and has a lower trigger_priority=%s than the best candidate id=%s '%s' trigger_priority=%s start_time=%s",
scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.project.trigger_priority,
candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.scheduling_unit.project.trigger_priority, candidate.scheduling_unit.scheduled_start_time)
unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit)
elif candidate.weighted_score > scheduled_scored_scheduling_unit.weighted_score:
# ToDo: also check if the scheduled_scheduling_unit is fixed_time/dynamically scheduled
logger.info("unscheduling id=%s '%s' because it is in the way and has a lower score than the best candidate id=%s '%s' score=%s start_time=%s",
scheduled_scheduling_unit.id, scheduled_scheduling_unit.name,
candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.weighted_score, candidate.scheduling_unit.scheduled_start_time)
unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit)
elif candidate.scheduling_unit.id==scheduled_scheduling_unit.id and candidate.scheduled_start_time!=scheduled_scheduling_unit.scheduled_start_time:
logger.info("unscheduling id=%s '%s' start_time='%s' because it has a new proposed start_time id=%s '%s' start_time=%s",
scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.scheduled_start_time,
candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.weighted_score, candidate.scheduling_unit.scheduled_start_time)
unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit)
# check again... are still there any scheduled_scheduling_units in the way?
scheduled_scheduling_units = get_scheduled_scheduling_units(lower=candidate.start_time,
upper=candidate.start_time + candidate.scheduling_unit.duration)
if scheduled_scheduling_units:
# accept current solution with current scheduled_scheduling_units
logger.info("keeping current scheduled unit(s) which have a better (or equal) score: %s", "; ".join(
"id=%s '%s' start_time='%s'" % (su.id, su.name, su.scheduled_start_time) for su in scheduled_scheduling_units))
# indicate there are still blocking units
return False
# all clear, nothing is blocking anymore
return True
def cancel_running_observation_if_needed_and_possible(candidate: ScoredSchedulingUnit) -> bool:
'''check if there are starting/started observation subtasks that block the candidate.
Only triggered scheduling_units can cancel running observations and only if they belong to a project with a higher
trigger_priority than the projects of the subtasks to cancel'''
# todo: is it sufficient to cancel the subtasks, or do we cancel the whole scheduling unit?
if candidate.scheduling_unit.interrupts_telescope:
running_obs_subtasks = get_running_observation_subtasks(candidate.start_time)
for obs in running_obs_subtasks:
if obs.project is None:
logger.warning('cannot cancel running subtask pk=%s for triggered scheduling_unit pk=%s because it does belong to a project and hence has unknown priority' %
(obs.pk, candidate.scheduling_unit.name))
continue
if candidate.scheduling_unit.project.trigger_priority > obs.project.trigger_priority:
logger.info('cancelling observation subtask pk=%s trigger_priority=%s because it blocks the triggered scheduling_unit pk=%s trigger_priority=%s' %
(obs.pk, obs.project.trigger_priority, candidate.scheduling_unit.pk, candidate.scheduling_unit.project.trigger_priority))
# todo: check if cancellation is really necessary or the trigger can be scheduled afterwards
# I guess we could just do can_run_after(candidate, obs.scheduled_stop_time) here for that?
# We could also only do this, of there is a 'before' constraint on each trigger.
# -> Clarify and implemented with TMSS-704.
cancel_subtask(obs)
else:
logger.info('NOT cancelling subtask pk=%s trigger_priority=%s for triggered scheduling_unit pk=%s trigger_priority=%s because its priority is too low' %
(obs.pk, obs.project.trigger_priority, candidate.scheduling_unit.pk, candidate.scheduling_unit.project.trigger_priority))