From 04707c7f1294a0561ea3fe3aed67a47b14597ef3 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Tue, 10 Oct 2023 21:31:29 +0200 Subject: [PATCH] TMSS-2809: logging and event handling --- .../services/scheduling/lib/constraints.py | 4 +- .../scheduling/lib/dynamic_scheduling.py | 39 +++++++------------ 2 files changed, 15 insertions(+), 28 deletions(-) diff --git a/SAS/TMSS/backend/services/scheduling/lib/constraints.py b/SAS/TMSS/backend/services/scheduling/lib/constraints.py index f13c85db670..228fa6c7c6c 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/constraints.py +++ b/SAS/TMSS/backend/services/scheduling/lib/constraints.py @@ -398,7 +398,7 @@ def filter_scheduling_units_using_time_constraints(scheduling_units: [models.Sch if can_run_between_with_time_constraints(scheduling_unit, lower_bound, upper_bound): filtered_scheduling_units.append(scheduling_unit) - logger.info("filter_scheduling_units_using_time_constraints: checked unit [%d/%d] %.1f%% id=%d time-constraints are %smet in window ['%s', '%s'] project='%s' C/R/I=%s", + logger.debug("filter_scheduling_units_using_time_constraints: checked unit [%d/%d] %.1f%% id=%d time-constraints are %smet in window ['%s', '%s'] project='%s' C/R/I=%s", i+1, len(scheduling_units), 100.0*(i+1)/len(scheduling_units), scheduling_unit.id, 'yes ' if scheduling_unit in filtered_scheduling_units else 'not ', lower_bound, upper_bound, @@ -454,7 +454,7 @@ def filter_scheduling_units_which_can_only_run_in_this_window(scheduling_units: (lower_bound < datetime.utcnow() + timedelta(minutes=1)) ): # no unit can run before 'now' runnable_exclusive_in_this_window_scheduling_units.append(scheduling_unit) - logger.info("filter_scheduling_units_which_can_only_run_in_this_window: checked unit [%d/%d] %.1f%% id=%d can %srun outside of window ['%s', '%s']", + logger.debug("filter_scheduling_units_which_can_only_run_in_this_window: checked unit [%d/%d] %.1f%% id=%d can %srun outside of window ['%s', '%s']", i+1, len(scheduling_units), 100.0 *(i+1) / len(scheduling_units), scheduling_unit.id, 'not ' if scheduling_unit in runnable_exclusive_in_this_window_scheduling_units else '', lower_bound, upper_bound) diff --git a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py index d564ab0ae49..87846f35728 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py @@ -590,7 +590,7 @@ class Scheduler: if len(candidate_units) == 0: continue # continue with the next in order schedulable_units - logger.debug("%s: candidate_units: %s", log_prefix, ','.join([str(su.id) for su in sorted(candidate_units, key=lambda x: x.id)]) or 'None') + logger.info("%s: %s candidate_units: %s", log_prefix, len(candidate_units), ','.join([str(su.id) for su in sorted(candidate_units, key=lambda x: x.id)]) or 'None') # search in a forward sliding window for the best scheduling_unit that can be scheduled if lower_bound is None: @@ -665,11 +665,13 @@ class Scheduler: # triggered observations cannot run later. mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(best_scheduling_unit, str(e)) return None - elif best_scheduling_unit.status.value == models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value: + else: if can_run_after(best_scheduling_unit, best_start_time, self.search_gridder) and not best_scheduling_unit.interrupts_telescope: logger.info("%s: Unschedulable scheduling_unit id=%s can run later than '%s'. Marking it as schedulable again...", log_prefix, best_scheduling_unit.id, best_start_time) # yep, can run later, so mark it as schedulable again, and let it be handled in a new scheduler-round mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable(best_scheduling_unit) + else: + mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(best_scheduling_unit, str(e)) if not candidate_units: logger.debug("%s: no more candidate units...", log_prefix) @@ -1014,29 +1016,14 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s)",id, status) scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=id) - # we want the dynamic scheduler to schedule at most 1 A-prio unit at the time - # so, unschedule any lingering dynamically scheduled unit other than the just scheduled_scheduling_unit - if status == models.SchedulingUnitStatus.Choices.SCHEDULED.value: - if scheduling_unit.priority_queue.value == models.PriorityQueueType.Choices.A.value: - # unschedule all other dynamic A-prio units - other_scheduled_A_prio_units = get_scheduled_scheduling_units(scheduler='dynamic', priority_queue=models.PriorityQueueType.objects.get(value=models.PriorityQueueType.Choices.A.value)).exclude(id=scheduling_unit.id).all() - for other_scheduled_A_prio_unit in other_scheduled_A_prio_units: - unschedule_subtasks_in_scheduling_unit_blueprint(other_scheduled_A_prio_unit) - # trigger scheduler if needed - if (status in [models.SchedulingUnitStatus.Choices.SCHEDULABLE.value] and not self.scheduler.is_scheduling ) or \ - status in [models.SchedulingUnitStatus.Choices.OBSERVING.value, - models.SchedulingUnitStatus.Choices.CANCELLED.value] or \ - (status in [models.SchedulingUnitStatus.Choices.OBSERVED.value, - models.SchedulingUnitStatus.Choices.PROCESSING.value, - models.SchedulingUnitStatus.Choices.INGESTING.value] and get_scheduled_scheduling_units(scheduler='dynamic').count()==0): - logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s): triggering update of dynamic & fixed_time schedule...", id, status) - - # scheduling takes a long time, longer then creating many scheduling units in bulk - # so, we do not create a complete new schedule for each new unit, - # but we only trigger a new schedule update. - # This way we are sure that the latest units are always taken into account while scheduling, but we do not waste cpu cylces. - self.scheduler.trigger() + if not self.scheduler.is_scheduling: + if (status in [models.SchedulingUnitStatus.Choices.SCHEDULABLE.value] and not scheduling_unit.placed) or \ + status in [models.SchedulingUnitStatus.Choices.OBSERVING.value, + models.SchedulingUnitStatus.Choices.CANCELLED.value, + models.SchedulingUnitStatus.Choices.OBSERVED.value]: + logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s, placed=%s): triggering update of dynamic & fixed_time schedule...", id, status, scheduling_unit.placed) + self.scheduler.trigger() def onSchedulingUnitBlueprintConstraintsUpdated(self, id: int, scheduling_constraints_doc: dict): @@ -1045,7 +1032,7 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): scheduling_unit_blueprint = models.SchedulingUnitBlueprint.objects.get(id=id) at = get_at_constraint_timestamp(scheduling_unit_blueprint) if at is not None: - update_subtasks_start_times_for_scheduling_unit(scheduling_unit_blueprint, at, placed=True) + update_subtasks_start_times_for_scheduling_unit(scheduling_unit_blueprint, at, placed=False) except: pass self.onSchedulingUnitBlueprintConstraintsRankOrQueueUpdated(id) @@ -1067,7 +1054,7 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): elif scheduling_unit_blueprint.status.value == models.SchedulingUnitStatus.Choices.SCHEDULED.value: logger.info("constraints/queue/priority for scheduled scheduling unit id=%s changed: unscheduling it, which will triggering a dynamic scheduling update...", id) unschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint) - elif scheduling_unit_blueprint.status.value == models.SchedulingUnitStatus.Choices.SCHEDULABLE.value: + elif scheduling_unit_blueprint.status.value == models.SchedulingUnitStatus.Choices.SCHEDULABLE.value and not scheduling_unit_blueprint.placed: logger.info("constraints/queue/priority for schedulable scheduling unit id=%s changed: triggering a dynamic scheduling update...", id) self.scheduler.trigger() -- GitLab