diff --git a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py index 133952cb4313d3b37c97e83fefe6afe1b7e70612..748c2ff9f908e72a5ef943d2befcb011d75c5092 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py @@ -40,6 +40,7 @@ from threading import Thread, Event from django.db import transaction from django.db.models import QuerySet +from lofar.sas.tmss.tmss.exceptions import SchedulerInterruptedException from lofar.sas.tmss.services.scheduling.constraints import * @@ -47,6 +48,118 @@ from lofar.sas.tmss.services.scheduling.constraints import * DEFAULT_INTER_OBSERVATION_GAP = timedelta(seconds=60) DEFAULT_NEXT_STARTTIME_GAP = timedelta(seconds=180) +class Scheduler: + def __init__(self) -> None: + self._scheduling_thread = None + self._do_schedule_event = Event() + super().__init__() + + def start_scheduling(self): + self._initialize_statuses() + + # start the background thread which waits until the _do_schedule_event event is set upon receiving to the correct TMSS EVentMessages. + self._scheduling_thread = Thread(target=self._run_scheduling_loop) + self._scheduling_thread.daemon = True + self._scheduling_thread_running = True + self._scheduling_thread.start() + + def stop_scheduling(self): + self._scheduling_thread_running = False + self._scheduling_thread.join() + self._scheduling_thread = None + + def __enter__(self): + self.start_scheduling_thread() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop_scheduling_thread() + + def trigger(self): + self._do_schedule_event.set() + + @property + def is_triggered(self) -> bool: + return self._do_schedule_event.is_set() + + @property + def fixed_time_scheduling_enabled(self) -> bool: + return models.Setting.objects.get(name=models.SystemSettingFlag.Choices.FIXED_TIME_SCHEDULING_ENABLED.value).value + + @property + def dynamic_scheduling_enabled(self) -> bool: + return models.Setting.objects.get(name=models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value + + def _raise_if_triggered(self): + if self.is_triggered: + raise SchedulerInterruptedException() + + def _initialize_statuses(self): + '''upon startup, do one round over all scheduling units for all projects marking them as (un)schedulable so they can be picked up in the _scheduling_loop later''' + logger.info("preparing (un)schedulable scheduling units for all (in)active projects") + mark_unschedulable_scheduling_units_for_active_projects_schedulable() + mark_scheduling_units_for_inactive_projects_unschedulable() + logger.info("prepared (un)schedulable scheduling units for all (in)active projects") + + def _run_scheduling_loop(self): + logger.info("Scheduler starting scheduling thread...") + while self._scheduling_thread_running: + if self._do_schedule_event.wait(timeout=10): + logger.info("Scheduler was triggered to compute new schedule...") + self._do_schedule_event.clear() + + try: + if self.fixed_time_scheduling_enabled: + self.schedule_fixed_time_scheduling_units() + else: + logger.info("Scheduler skipping update of fixed_time schedule because it is not enabled in the settings") + + if self.dynamic_scheduling_enabled: + do_dynamic_schedule() + else: + logger.info("Scheduler skipping update of dynamic schedule because it is not enabled in the settings") + + except SchedulerInterruptedException: + # log the interruption, and continue with the next loop, cause _do_schedule_event was set + logger.info("scheduler was interrupted while computing a new schedule") + except Exception as e: + # log and just continue processing events. better luck next time... + logger.exception(str(e)) + + def schedule_fixed_time_scheduling_units(self): + ''' + + ''' + # get the fixed_timely schedulable scheduling_units in most-recently-updated order. + schedulable_units = get_fixed_time_schedulable_scheduling_units() + + # only consider active projects + schedulable_units = schedulable_units.filter(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value) + + logger.info("trying to schedule %s scheduling units with fixed_time at constraint for active projects", schedulable_units.count()) + + for i, schedulable_unit in enumerate(schedulable_units): + self._raise_if_triggered() #interrupts the scheduling loop for a next round + + start_time = get_earliest_possible_start_time(schedulable_unit) + stop_time = start_time + schedulable_unit.specified_main_observation_duration + set_scheduling_unit_blueprint_start_times(schedulable_unit, first_start_time=start_time) + + logger.info("Scheduler checking if fixed_time scheduled scheduling unit %s/%s id=%d can be scheduled at '%s'", + i, len(schedulable_units), schedulable_unit.id, start_time) + + if filter_scheduling_units_using_constraints([schedulable_unit], lower_bound=start_time, upper_bound=stop_time): + try: + logger.info("Scheduling fixed_time scheduled scheduling unit id=%d at '%s'", schedulable_unit.id, start_time) + schedule_independent_subtasks_in_scheduling_unit_blueprint(schedulable_unit, start_time) + except SubtaskSchedulingException as e: + logger.error(e) + mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(schedulable_unit) + else: + logger.warning("fixed_time-scheduled scheduling unit id=%d cannot be scheduled at '%s'", schedulable_unit.id, start_time) + mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(schedulable_unit) + + ################## core dynamic scheduling methods ################################################ # # # This module starts with the core dynamic scheduling methods which are used in the dynamic # @@ -121,34 +234,6 @@ def mark_unschedulable_scheduling_units_for_active_projects_schedulable(projects logger.exception(e) -def schedule_fixed_time_scheduling_units(): - ''' - - ''' - # get the fixed_timely schedulable scheduling_units in most-recently-updated order. - schedulable_units = get_fixed_time_schedulable_scheduling_units() - - # only consider active projects - schedulable_units = schedulable_units.filter(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value) - - logger.info("trying to schedule %s scheduling units with fixed_time at constraint for active projects", schedulable_units.count()) - - for schedulable_unit in schedulable_units: - start_time = get_earliest_possible_start_time(schedulable_unit) - stop_time = start_time + schedulable_unit.specified_main_observation_duration - set_scheduling_unit_blueprint_start_times(schedulable_unit, first_start_time=start_time) - - logger.info("Checking if fixed_time scheduled scheduling unit id=%d can be scheduled at '%s'", schedulable_unit.id, start_time) - if filter_scheduling_units_using_constraints([schedulable_unit], lower_bound=start_time, upper_bound=stop_time): - try: - logger.info("Scheduling fixed_time scheduled scheduling unit id=%d at '%s'", schedulable_unit.id, start_time) - schedule_independent_subtasks_in_scheduling_unit_blueprint(schedulable_unit, start_time) - except SubtaskSchedulingException as e: - logger.error(e) - mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(schedulable_unit) - else: - logger.warning("fixed_time-scheduled scheduling unit id=%d cannot be scheduled at '%s'", schedulable_unit.id, start_time) - mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(schedulable_unit) def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: @@ -317,25 +402,14 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): def __init__(self): super().__init__(log_event_messages=True) - self._scheduling_thread = None - self._scheduling_thread_running = False - self._do_schedule_event = Event() + self.scheduler = Scheduler() def start_handling(self): - # prepare scheduling units (if needed) - self._scheduling_initialization() - - # start the background thread which waits until the _do_schedule_event event is set upon receiving to the correct TMSS EVentMessages. - self._scheduling_thread = Thread(target=TMSSDynamicSchedulingMessageHandler._scheduling_loop, kwargs={'self':self}) - self._scheduling_thread.daemon = True - self._scheduling_thread_running = True - self._scheduling_thread.start() + self.scheduler.start_scheduling() super().start_handling() def stop_handling(self): - self._scheduling_thread_running = False - self._scheduling_thread.join() - self._scheduling_thread = None + self.scheduler.stop_scheduling() super().stop_handling() def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): @@ -345,11 +419,12 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): models.SchedulingUnitStatus.Choices.FINISHED.value, models.SchedulingUnitStatus.Choices.CANCELLED.value]: logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s): triggering update of dynamic & fixed_time schedule...", id, status) + # scheduling takes a long time, longer then creating many scheduling units in bulk # so, we do not create a complete new schedule for each new unit, # but we only trigger a new schedule update. # This way we are sure that the latest units are always taken into account while scheduling, but we do not waste cpu cylces. - self._do_schedule_event.set() + self.scheduler.trigger() def onSchedulingUnitBlueprintConstraintsUpdated(self, id: int, scheduling_constraints_doc: dict): scheduling_unit_blueprint = models.SchedulingUnitBlueprint.objects.get(id=id) @@ -363,7 +438,8 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): unschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint) logger.info("constraints for scheduling unit id=%s changed: triggering update of dynamic schedule...", id) - self._do_schedule_event.set() + self.scheduler.trigger() + def onSubTaskStatusChanged(self, id: int, status: str): if status in (models.SubtaskState.Choices.DEFINED.value, models.SubtaskState.Choices.SCHEDULED.value): @@ -389,13 +465,15 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): if name in (models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value, models.SystemSettingFlag.Choices.FIXED_TIME_SCHEDULING_ENABLED.value) and value: logger.info("%s was set to %s: triggering update of schedule...", name, value) - self._do_schedule_event.set() + self.scheduler.trigger() + def onSchedulingConstraintsWeightFactorUpdated(self, id: int): weight_factor = models.SchedulingConstraintsWeightFactor.objects.get(id=id) logger.info("weight_factor %s for template %s version %s changed to %s: triggering update of dynamic schedule...", weight_factor.constraint_name, weight_factor.scheduling_constraints_template.name, weight_factor.scheduling_constraints_template.version, weight_factor.weight) - self._do_schedule_event.set() + self.scheduler.trigger() + def onProjectStatusUpdated(self, name: str, status: str): logger.info("project '%s' status changed to %s", name, status) @@ -405,7 +483,8 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): else: mark_scheduling_units_for_inactive_projects_unschedulable(projects=[name]) - self._do_schedule_event.set() + self.scheduler.trigger() + def onReservationCreated(self, id: int): self._onReservationCreatedOrUpdated(id) @@ -420,39 +499,15 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): # improperlu use mark_unschedulable_scheduling_units_for_active_projects_schedulable for now mark_unschedulable_scheduling_units_for_active_projects_schedulable() - self._do_schedule_event.set() + self.scheduler.trigger() + def onReservationDeleted(self, id: int): # maybe some unschedulable/blocked units can use the spot that was used by the reservation # mark them all schedulable, and do a scheduling round to see which ones can be scheduled mark_unschedulable_scheduling_units_for_active_projects_schedulable() - self._do_schedule_event.set() + self.scheduler.trigger() - def _scheduling_initialization(self): - '''upon startup, do one round over all scheduling units for all projects marking them as (un)schedulable so they can be picked up in the _scheduling_loop later''' - logger.info("preparing (un)schedulable scheduling units for all (in)active projects") - mark_unschedulable_scheduling_units_for_active_projects_schedulable() - mark_scheduling_units_for_inactive_projects_unschedulable() - logger.info("prepared (un)schedulable scheduling units for all (in)active projects") - - def _scheduling_loop(self): - logger.info("starting scheduling thread...") - while self._scheduling_thread_running: - if self._do_schedule_event.wait(timeout=10): - self._do_schedule_event.clear() - try: - if models.Setting.objects.get(name=models.SystemSettingFlag.Choices.FIXED_TIME_SCHEDULING_ENABLED.value).value: - schedule_fixed_time_scheduling_units() - else: - logger.warning("Skipping update of fixed_time schedule because the setting %s=%s", models.SystemSettingFlag.Choices.FIXED_TIME_SCHEDULING_ENABLED.value, models.Setting.objects.get(name=models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value) - - if models.Setting.objects.get(name=models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value: - do_dynamic_schedule() - else: - logger.warning("Skipping update of dynamic schedule because the setting %s=%s", models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value, models.Setting.objects.get(name=models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value) - except Exception as e: - logger.exception(str(e)) - # just continue processing events. better luck next time... def create_dynamic_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER): diff --git a/SAS/TMSS/backend/src/tmss/exceptions.py b/SAS/TMSS/backend/src/tmss/exceptions.py index dbf961049ef18fd0c041d7be9673bdd581a3adf6..dc3c26f56199a402ec5fdf9ec393d3dbf9b5d79e 100644 --- a/SAS/TMSS/backend/src/tmss/exceptions.py +++ b/SAS/TMSS/backend/src/tmss/exceptions.py @@ -26,6 +26,9 @@ class SubtaskInvalidStateException(TMSSException): class SchedulingException(TMSSException): pass +class SchedulerInterruptedException(SchedulingException): + pass + class SubtaskSchedulingException(SubtaskException, SchedulingException): pass