From fe8ae886111df41ec255a411a4ce2c53b0a6744c Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Tue, 17 Nov 2020 11:17:34 +0100 Subject: [PATCH] TMSS-190: reshuffled layout of module. Added documentation --- .../scheduling/lib/dynamic_scheduling.py | 130 +++++++++++------- 1 file changed, 77 insertions(+), 53 deletions(-) diff --git a/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py index 70ce3e89f9b..c87973ebb2b 100644 --- a/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py @@ -42,56 +42,14 @@ from lofar.sas.tmss.services.scheduling.constraints import * # LOFAR needs to have a gap in between observations to (re)initialize hardware. DEFAULT_INTER_OBSERVATION_GAP = timedelta(seconds=60) -def get_schedulable_scheduling_units() -> [models.SchedulingUnitBlueprint]: - '''get a list of all schedulable scheduling_units''' - defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value='defined') - defined_independend_subtask_ids = defined_independend_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct().all() - scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=defined_independend_subtask_ids).select_related('draft', 'draft__scheduling_constraints_template').all() - return [su for su in scheduling_units if su.status == 'schedulable'] - - -def get_scheduled_scheduling_units(lower:datetime=None, upper:datetime=None) -> [models.SchedulingUnitBlueprint]: - '''get a list of all scheduled scheduling_units''' - scheduled_subtasks = models.Subtask.objects.filter(state__value='scheduled') - if lower is not None: - scheduled_subtasks = scheduled_subtasks.filter(stop_time__gte=lower) - if upper is not None: - scheduled_subtasks = scheduled_subtasks.filter(start_time__lte=upper) - return list(models.SchedulingUnitBlueprint.objects.filter(id__in=scheduled_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct()).all()) - - -def unschededule_blocking_scheduled_units_if_needed_and_possible(candidate: ScoredSchedulingUnit) -> bool: - '''check if there are any already scheduled units in the way, and unschedule them if allowed. Return True if nothing is blocking anymore.''' - # check any previously scheduled units, and unschedule if needed/allowed - scheduled_scheduling_units = get_scheduled_scheduling_units(lower=candidate.start_time, - upper=candidate.start_time + candidate.scheduling_unit.duration) - - # check if we can and need to unschedule the blocking units - for scheduled_scheduling_unit in scheduled_scheduling_units: - scheduled_score = compute_scores(scheduled_scheduling_unit, candidate.start_time, candidate.start_time + candidate.scheduling_unit.duration) - - if candidate.weighted_score > scheduled_score.weighted_score: - # ToDo: also check if the scheduled_scheduling_unit is manually/dynamically scheduled - logger.info("unscheduling id=%s '%s' because it is in the way and has a lower score than the best candidate id=%s '%s' score=%s start_time=%s", - scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, - candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.weighted_score, candidate.scheduling_unit.start_time) - - unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) - - # check again... are still there any scheduled_scheduling_units in the way? - scheduled_scheduling_units = get_scheduled_scheduling_units(lower=candidate.start_time, - upper=candidate.start_time + candidate.scheduling_unit.duration) - if scheduled_scheduling_units: - # accept current solution with current scheduled_scheduling_units - logger.info("keeping current scheduled unit(s) which have a better (or equal) score: %s", "; ".join( - "id=%s '%s' start_time='%s'" % (su.id, su.name, su.start_time) for su in scheduled_scheduling_units)) - - # indicate there are still blocking units - return False - - # all clear, nothing is blocking anymore - return True - +################## core dynamic scheduling methods ################################################ +# # +# This module starts with the core dynamic scheduling methods which are used in the dynamic # +# scheduling service. These high level methods only filter/score/sort in a generic way. # +# The detailed concrete filter/score/sort methods are pick by a strategy pattern in the # +# constraints package based on each scheduling unit's scheduling_constrains template. # +# # +################################################################################################### def find_best_next_schedulable_unit(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound_start_time: datetime, upper_bound_stop_time: datetime) -> ScoredSchedulingUnit: """ @@ -236,8 +194,16 @@ def schedule_next_scheduling_unit_and_assign_start_stop_times_to_remaining_sched return scheduled_unit -class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSEventMessageHandler): +################## service/messagebug handler class ############################################### + +class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): ''' + The TMSSDynamicSchedulingMessageHandler reacts to TMSS EventMessages by triggering a new full update of the dynamic + schedule. + The actual schedule-update method runs on a backround thread, and can take some time to complete ranging from a + few seconds to several minutes. In the mean time new EventMessages may be received. These are handled by raising a flag + that signals the schedule-update-thread that a new full update is needed. This way, a burst of Events results in + a single update, and it also ensures that we always compute the schedule with the latest data. ''' def __init__(self): @@ -248,7 +214,7 @@ class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSEventMessag def start_handling(self): # start the background thread which waits until the _do_schedule_event event is set upon receiving to the correct TMSS EVentMessages. - self._scheduling_thread = Thread(target=TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler._scheduling_loop, kwargs={'self':self}) + self._scheduling_thread = Thread(target=TMSSDynamicSchedulingMessageHandler._scheduling_loop, kwargs={'self':self}) self._scheduling_thread.daemon = True self._scheduling_thread_running = True self._scheduling_thread.start() @@ -297,8 +263,66 @@ class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSEventMessag def create_dynamic_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER): - return TMSSBusListener(handler_type=TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler, + return TMSSBusListener(handler_type=TMSSDynamicSchedulingMessageHandler, handler_kwargs=None, exchange=exchange, broker=broker) + + + + +################## helper methods ################################################################# + +def get_schedulable_scheduling_units() -> [models.SchedulingUnitBlueprint]: + '''get a list of all schedulable scheduling_units''' + defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value='defined') + defined_independend_subtask_ids = defined_independend_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct().all() + scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=defined_independend_subtask_ids).select_related('draft', 'draft__scheduling_constraints_template').all() + return [su for su in scheduling_units if su.status == 'schedulable'] + + +def get_scheduled_scheduling_units(lower:datetime=None, upper:datetime=None) -> [models.SchedulingUnitBlueprint]: + '''get a list of all scheduled scheduling_units''' + scheduled_subtasks = models.Subtask.objects.filter(state__value='scheduled') + if lower is not None: + scheduled_subtasks = scheduled_subtasks.filter(stop_time__gte=lower) + if upper is not None: + scheduled_subtasks = scheduled_subtasks.filter(start_time__lte=upper) + return list(models.SchedulingUnitBlueprint.objects.filter(id__in=scheduled_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct()).all()) + + +def unschededule_blocking_scheduled_units_if_needed_and_possible(candidate: ScoredSchedulingUnit) -> bool: + '''check if there are any already scheduled units in the way, and unschedule them if allowed. Return True if nothing is blocking anymore.''' + # check any previously scheduled units, and unschedule if needed/allowed + scheduled_scheduling_units = get_scheduled_scheduling_units(lower=candidate.start_time, + upper=candidate.start_time + candidate.scheduling_unit.duration) + + # check if we can and need to unschedule the blocking units + for scheduled_scheduling_unit in scheduled_scheduling_units: + scheduled_score = compute_scores(scheduled_scheduling_unit, candidate.start_time, candidate.start_time + candidate.scheduling_unit.duration) + + if candidate.weighted_score > scheduled_score.weighted_score: + # ToDo: also check if the scheduled_scheduling_unit is manually/dynamically scheduled + logger.info("unscheduling id=%s '%s' because it is in the way and has a lower score than the best candidate id=%s '%s' score=%s start_time=%s", + scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, + candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.weighted_score, candidate.scheduling_unit.start_time) + + unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) + + # check again... are still there any scheduled_scheduling_units in the way? + scheduled_scheduling_units = get_scheduled_scheduling_units(lower=candidate.start_time, + upper=candidate.start_time + candidate.scheduling_unit.duration) + if scheduled_scheduling_units: + # accept current solution with current scheduled_scheduling_units + logger.info("keeping current scheduled unit(s) which have a better (or equal) score: %s", "; ".join( + "id=%s '%s' start_time='%s'" % (su.id, su.name, su.start_time) for su in scheduled_scheduling_units)) + + # indicate there are still blocking units + return False + + # all clear, nothing is blocking anymore + return True + + + -- GitLab