Skip to content
Snippets Groups Projects
Commit c5802a56 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-671: moved dynamic scheduling methods into new interruptable Scheduler...

TMSS-671: moved dynamic scheduling methods into new interruptable Scheduler class. Added _raise_if_triggered calls in high level dynamic scheduling loops
parent b56aca48
No related branches found
No related tags found
1 merge request!715TMSS-671 & TMSS-1135 & TMSS-1332
......@@ -115,7 +115,7 @@ class Scheduler:
logger.info("Scheduler skipping update of fixed_time schedule because it is not enabled in the settings")
if self.dynamic_scheduling_enabled:
do_dynamic_schedule()
self.do_dynamic_schedule()
else:
logger.info("Scheduler skipping update of dynamic schedule because it is not enabled in the settings")
......@@ -159,17 +159,32 @@ class Scheduler:
logger.warning("fixed_time-scheduled scheduling unit id=%d cannot be scheduled at '%s'", schedulable_unit.id, start_time)
mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(schedulable_unit)
def do_dynamic_schedule(self) -> models.SchedulingUnitBlueprint:
'''do a full update of the schedule: schedule next scheduling unit and assign start stop times to remaining schedulable scheduling units'''
logger.info("Updating schedule....")
################## core dynamic scheduling methods ################################################
# #
# This module starts with the core dynamic scheduling methods which are used in the dynamic #
# scheduling service. These high level methods only filter/score/sort in a generic way. #
# The detailed concrete filter/score/sort methods are pick by a strategy pattern in the #
# constraints package based on each scheduling unit's scheduling_constrains template. #
# #
###################################################################################################
scheduled_unit = self.schedule_next_scheduling_unit()
# determine next possible start time for remaining scheduling_units
if scheduled_unit:
lower_bound_start_time = scheduled_unit.scheduled_stop_time + DEFAULT_INTER_OBSERVATION_GAP
else:
try:
scheduled_units = get_scheduled_scheduling_units(datetime.utcnow(), datetime.utcnow())
lower_bound_start_time = max([s.scheduled_stop_time for s in scheduled_units if s.scheduled_stop_time is not None]) + DEFAULT_INTER_OBSERVATION_GAP
except:
lower_bound_start_time = datetime.utcnow()
# round up to next nearest second
lower_bound_start_time += timedelta(microseconds=1000000-lower_bound_start_time.microsecond)
# determine mid-term schedule by assigning start/stop times to remaining schedulable units using the same search strategy
self.assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time)
logger.info("Finished updating dynamic schedule")
return scheduled_unit
def find_best_next_schedulable_unit(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound_start_time: datetime, upper_bound_stop_time: datetime) -> ScoredSchedulingUnit:
def find_best_next_schedulable_unit(self, scheduling_units:[models.SchedulingUnitBlueprint], lower_bound_start_time: datetime, upper_bound_stop_time: datetime) -> ScoredSchedulingUnit:
"""
find the best schedulable scheduling_unit which can run withing the given time window from the given scheduling_units.
:param lower_bound_start_time: evaluate the constrains at and after lower_bound_start_time. The returned unit has a start_time guaranteed at or after lower_bound_start_time.
......@@ -197,46 +212,7 @@ def find_best_next_schedulable_unit(scheduling_units:[models.SchedulingUnitBluep
logger.debug("No schedulable scheduling units found which meet the requirements between '%s' and '%s'", lower_bound_start_time, upper_bound_stop_time)
return None
def mark_scheduling_units_for_inactive_projects_unschedulable(projects: [str]=None):
scheduling_units = models.SchedulingUnitBlueprint.objects.filter(status__value__in=(models.SchedulingUnitStatus.Choices.SCHEDULABLE.value,
models.SchedulingUnitStatus.Choices.SCHEDULED.value))
maybe_unschedulable_units = scheduling_units.exclude(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value)
if projects:
maybe_unschedulable_units = maybe_unschedulable_units.filter(draft__scheduling_set__project__name__in=projects)
if maybe_unschedulable_units.exists():
logger.info("marking %s scheduled units as unschedulable for inactive project(s)%s",
maybe_unschedulable_units.count(),
(' ' + ', '.join(p for p in projects)) if projects else '')
for scheduling_unit in maybe_unschedulable_units:
mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit)
def mark_unschedulable_scheduling_units_for_active_projects_schedulable(projects: [str]=None):
unschedulable_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value)
maybe_schedulable_units = unschedulable_units.filter(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value)
if projects:
maybe_schedulable_units = maybe_schedulable_units.filter(draft__scheduling_set__project__name__in=projects)
if maybe_schedulable_units.exists():
logger.info("trying to make %s unschedulable units schedulable for active project(s)%s",
maybe_schedulable_units.count(),
(' ' + ', '.join(p for p in projects)) if projects else '')
for scheduling_unit in maybe_schedulable_units.all():
try:
mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable(scheduling_unit)
except Exception as e:
logger.exception(e)
def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint:
def schedule_next_scheduling_unit(self) -> models.SchedulingUnitBlueprint:
'''find the best next schedulable scheduling unit and try to schedule it.
Overlapping existing scheduled units are unscheduled if their score is lower.
:return: the scheduled scheduling unit.'''
......@@ -257,6 +233,8 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint:
upper_bound_stop_time = lower_bound_start_time + timedelta(days=1)
while lower_bound_start_time < upper_bound_stop_time:
self._raise_if_triggered() # interrupts the scheduling loop for a next round
try:
# no need to irritate user in log files with sub-second scheduling precision
lower_bound_start_time = round_to_second_precision(lower_bound_start_time)
......@@ -264,7 +242,7 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint:
# try to find the best next scheduling_unit
logger.info("schedule_next_scheduling_unit: searching for best scheduling unit to schedule between '%s' and '%s'", lower_bound_start_time, upper_bound_stop_time)
best_scored_scheduling_unit = find_best_next_schedulable_unit(schedulable_units, lower_bound_start_time, upper_bound_stop_time)
best_scored_scheduling_unit = self.find_best_next_schedulable_unit(schedulable_units, lower_bound_start_time, upper_bound_stop_time)
if best_scored_scheduling_unit:
best_scheduling_unit = best_scored_scheduling_unit.scheduling_unit
......@@ -312,7 +290,7 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint:
# TODO: update upper_bound_stop_time as well, stop when upper_bound_stop_time > cycle end.
def assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time: datetime):
def assign_start_stop_times_to_schedulable_scheduling_units(self, lower_bound_start_time: datetime):
''''''
logger.info("Estimating mid-term schedule with lower_bound_start_time=%s ..." % lower_bound_start_time)
......@@ -327,7 +305,7 @@ def assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_ti
# update the start_times of the remaining ones (so they form queue, and can be visualized in a timeline)
while scheduling_units and lower_bound_start_time < upper_bound_stop_time:
best_scored_scheduling_unit = find_best_next_schedulable_unit(scheduling_units, lower_bound_start_time, upper_bound_stop_time)
best_scored_scheduling_unit = self.find_best_next_schedulable_unit(scheduling_units, lower_bound_start_time, upper_bound_stop_time)
if best_scored_scheduling_unit:
scheduling_unit = best_scored_scheduling_unit.scheduling_unit
......@@ -362,30 +340,52 @@ def assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_ti
logger.info("Estimating mid-term schedule... finished")
def do_dynamic_schedule() -> models.SchedulingUnitBlueprint:
'''do a full update of the schedule: schedule next scheduling unit and assign start stop times to remaining schedulable scheduling units'''
logger.info("Updating schedule....")
################## core dynamic scheduling methods ################################################
# #
# This module starts with the core dynamic scheduling methods which are used in the dynamic #
# scheduling service. These high level methods only filter/score/sort in a generic way. #
# The detailed concrete filter/score/sort methods are pick by a strategy pattern in the #
# constraints package based on each scheduling unit's scheduling_constrains template. #
# #
###################################################################################################
scheduled_unit = schedule_next_scheduling_unit()
# determine next possible start time for remaining scheduling_units
if scheduled_unit:
lower_bound_start_time = scheduled_unit.scheduled_stop_time + DEFAULT_INTER_OBSERVATION_GAP
else:
try:
scheduled_units = get_scheduled_scheduling_units(datetime.utcnow(), datetime.utcnow())
lower_bound_start_time = max([s.scheduled_stop_time for s in scheduled_units if s.scheduled_stop_time is not None]) + DEFAULT_INTER_OBSERVATION_GAP
except:
lower_bound_start_time = datetime.utcnow()
def mark_scheduling_units_for_inactive_projects_unschedulable(projects: [str]=None):
scheduling_units = models.SchedulingUnitBlueprint.objects.filter(status__value__in=(models.SchedulingUnitStatus.Choices.SCHEDULABLE.value,
models.SchedulingUnitStatus.Choices.SCHEDULED.value))
# round up to next nearest second
lower_bound_start_time += timedelta(microseconds=1000000-lower_bound_start_time.microsecond)
maybe_unschedulable_units = scheduling_units.exclude(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value)
# determine mid-term schedule by assigning start/stop times to remaining schedulable units using the same search strategy
assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time)
logger.info("Finished updating dynamic schedule")
if projects:
maybe_unschedulable_units = maybe_unschedulable_units.filter(draft__scheduling_set__project__name__in=projects)
if maybe_unschedulable_units.exists():
logger.info("marking %s scheduled units as unschedulable for inactive project(s)%s",
maybe_unschedulable_units.count(),
(' ' + ', '.join(p for p in projects)) if projects else '')
for scheduling_unit in maybe_unschedulable_units:
mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit)
def mark_unschedulable_scheduling_units_for_active_projects_schedulable(projects: [str]=None):
unschedulable_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value)
maybe_schedulable_units = unschedulable_units.filter(draft__scheduling_set__project__project_state__value=models.ProjectState.Choices.ACTIVE.value)
if projects:
maybe_schedulable_units = maybe_schedulable_units.filter(draft__scheduling_set__project__name__in=projects)
if maybe_schedulable_units.exists():
logger.info("trying to make %s unschedulable units schedulable for active project(s)%s",
maybe_schedulable_units.count(),
(' ' + ', '.join(p for p in projects)) if projects else '')
for scheduling_unit in maybe_schedulable_units.all():
try:
mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable(scheduling_unit)
except Exception as e:
logger.exception(e)
return scheduled_unit
################## service/messagebug handler class ###############################################
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment