diff --git a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py index da5ddb2df3d93121557f8d2b816d77ff1f0559f1..5b5c44614805d5cec621db41f14a2765b276210a 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py @@ -326,18 +326,13 @@ class Scheduler: # find and schedule the next best unit # ignore/exclude as candidates the unit(s) which are already scheduled in this round. # when new candidates are overlapping with already scheduled units, they are re-evaluated to see who wins. - schedulable_units = list(get_dynamically_schedulable_scheduling_units().all()) - to_be_excluded_units = set(scheduled_units) - - logger.info("do_dynamic_schedule: lower_bound='%s' upper_bound='%s' scheduled_units=%s schedulable_units=%s to_be_excluded_units=%s", + logger.info("do_dynamic_schedule: lower_bound='%s' upper_bound='%s' already_scheduled_and_thus_to_be_ignored_units: %s", lower_bound, upper_bound, - ",".join([str(s.id) for s in sorted(list(scheduled_units), key=id)]), - ",".join([str(s.id) for s in sorted(list(schedulable_units), key=id)]), - ",".join([str(s.id) for s in sorted(list(to_be_excluded_units), key=id)])) + ",".join([str(s.id) for s in sorted(list(scheduled_units), key=id)])) scheduled_unit = self.schedule_next_scheduling_unit(lower_bound, min(lower_bound + timedelta(hours=24), upper_bound), - exclude_units=to_be_excluded_units) + exclude_units=scheduled_units) if scheduled_unit: scheduled_units.append(scheduled_unit) @@ -346,7 +341,7 @@ class Scheduler: scheduled_units.extend(self.try_schedule_relational_units(scheduled_unit)) # see if we can fit any B-prio units in the new gap(s) in the schedule? - scheduled_B_units = self.schedule_B_priority_units_in_gaps_around_scheduling_unit(scheduled_unit, exclude_units=to_be_excluded_units, lower_bound=lower_bound, upper_bound=upper_bound) + scheduled_B_units = self.schedule_B_priority_units_in_gaps_around_scheduling_unit(scheduled_unit, exclude_units=scheduled_units, lower_bound=lower_bound, upper_bound=upper_bound) scheduled_units.extend(scheduled_B_units) else: # advance window and search again @@ -1047,16 +1042,17 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): self.scheduler.stop_scheduling() super().stop_handling() + def onSchedulingUnitBlueprintCreated(self, id: int): '''prepare the new scheduling_unit for scheduling. Set unschedulable if project not active.''' logger.info("onSchedulingUnitBlueprintCreated(id=%s)",id) scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=id) - # mark unschedulable if project not active - if scheduling_unit.project.project_state.value != models.ProjectState.Choices.ACTIVE.value: - mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit, reason="project is not active") - try: + # mark unschedulable if project not active + if scheduling_unit.project.project_state.value != models.ProjectState.Choices.ACTIVE.value: + mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit, reason="project is not active") + # if this is a new unit, and it is related to another one, then scheduling it right away. if scheduling_unit.scheduling_constraints_template.name == "relational": related_unit_id = scheduling_unit.scheduling_constraints_doc.get('other', -1) @@ -1064,30 +1060,34 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): self.scheduler.try_schedule_relational_units(related_unit) except Exception as e: logger.error(str(e)) + finally: + # trigger next schedule computation + self.scheduler.trigger() - # trigger next schedule computation - self.scheduler.trigger() def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s)",id, status) scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=id) - if status == models.SchedulingUnitStatus.Choices.SCHEDULABLE.value: - self.scheduler.unschedule_relational_units(scheduling_unit) - self.scheduler.mark_relational_units_as_schedulable(scheduling_unit) - - # trigger scheduler if needed - if scheduling_unit.is_fixed_time_scheduled and self.scheduler.fixed_time_scheduling_enabled: + try: if status == models.SchedulingUnitStatus.Choices.SCHEDULABLE.value: - logger.info("triggering scheduler for fixed_time unit id=%s status=%s", id, status) - self.scheduler.trigger() - elif scheduling_unit.is_dynamically_scheduled and self.scheduler.dynamic_scheduling_enabled and not self.scheduler.is_scheduling: - if status in [models.SchedulingUnitStatus.Choices.SCHEDULABLE.value, - models.SchedulingUnitStatus.Choices.OBSERVING.value, - models.SchedulingUnitStatus.Choices.CANCELLED.value, - models.SchedulingUnitStatus.Choices.OBSERVED.value]: - logger.info("triggering scheduler for dynamic unit id=%s status=%s", id, status) - self.scheduler.trigger() + self.scheduler.unschedule_relational_units(scheduling_unit) + self.scheduler.mark_relational_units_as_schedulable(scheduling_unit) + except Exception as e: + logger.exception("onSchedulingUnitBlueprintStatusChanged error: %s", str(e)) + finally: + # trigger scheduler if needed + if scheduling_unit.is_fixed_time_scheduled and self.scheduler.fixed_time_scheduling_enabled: + if status == models.SchedulingUnitStatus.Choices.SCHEDULABLE.value: + logger.info("onSchedulingUnitBlueprintStatusChanged: triggering scheduler for fixed_time unit id=%s status=%s", id, status) + self.scheduler.trigger() + elif scheduling_unit.is_dynamically_scheduled and self.scheduler.dynamic_scheduling_enabled and not self.scheduler.is_scheduling: + if status in [models.SchedulingUnitStatus.Choices.SCHEDULABLE.value, + models.SchedulingUnitStatus.Choices.OBSERVING.value, + models.SchedulingUnitStatus.Choices.CANCELLED.value, + models.SchedulingUnitStatus.Choices.OBSERVED.value]: + logger.info("onSchedulingUnitBlueprintStatusChanged triggering scheduler for dynamic unit id=%s status=%s", id, status) + self.scheduler.trigger() def onSchedulingUnitBlueprintConstraintsUpdated(self, id: int, scheduling_constraints_doc: dict): @@ -1159,40 +1159,52 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): def onSettingUpdated(self, name: str, value: bool): if name in (models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value, models.SystemSettingFlag.Choices.FIXED_TIME_SCHEDULING_ENABLED.value) and value: - logger.info("%s was set to %s: triggering update of schedule...", name, value) + logger.info("onSettingUpdated: %s was set to %s: triggering update of schedule...", name, value) self.scheduler.trigger() def onSchedulingConstraintsWeightFactorUpdated(self, id: int): weight_factor = models.SchedulingConstraintsWeightFactor.objects.get(id=id) - logger.info("weight_factor %s changed to %s: triggering update of dynamic schedule...", weight_factor.constraint_name, weight_factor.weight) - wipe_evaluate_constraints_caches() - for scheduled_unit in get_scheduled_scheduling_units(datetime.utcnow(), scheduler='dynamic'): - unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_unit) - mark_unschedulable_scheduling_units_for_active_projects_schedulable() - self.scheduler.trigger() + logger.info("onSchedulingConstraintsWeightFactorUpdated: weight_factor %s changed to %s: triggering update of dynamic schedule...", weight_factor.constraint_name, weight_factor.weight) + try: + wipe_evaluate_constraints_caches() + for scheduled_unit in get_scheduled_scheduling_units(datetime.utcnow(), scheduler='dynamic'): + unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_unit) + mark_unschedulable_scheduling_units_for_active_projects_schedulable() + except Exception as e: + logger.error(str(e)) + finally: + self.scheduler.trigger() - def onProjectStatusUpdated(self, name: str, status: str): - logger.info("project '%s' status changed to %s", name, status) - if status == models.ProjectState.Choices.ACTIVE.value: - mark_unschedulable_scheduling_units_for_active_projects_schedulable(projects=[name]) - else: - mark_scheduling_units_for_inactive_projects_unschedulable(projects=[name]) + def onProjectStatusUpdated(self, name: str, status: str): + logger.info("onProjectStatusUpdated: project '%s' status changed to %s", name, status) - self.scheduler.trigger() + try: + if status == models.ProjectState.Choices.ACTIVE.value: + mark_unschedulable_scheduling_units_for_active_projects_schedulable(projects=[name]) + else: + mark_scheduling_units_for_inactive_projects_unschedulable(projects=[name]) + except Exception as e: + logger.error(str(e)) + finally: + self.scheduler.trigger() def onProjectRankUpdated(self, name: str, rank: float): - logger.info("project '%s' rank changed to %s", name, rank) + logger.info("onProjectRankUpdated: project '%s' rank changed to %s", name, rank) self.scheduler.trigger() def _onProjectCyclesCreatedUpdatedDeleted(self, id: int): logger.info("a project was added/removed to a cycle. triggering new scheduling round") - mark_unschedulable_scheduling_units_for_active_projects_schedulable() - unschedule_scheduled_units_for_cycleless_projects() - self.scheduler.trigger() + try: + mark_unschedulable_scheduling_units_for_active_projects_schedulable() + unschedule_scheduled_units_for_cycleless_projects() + except Exception as e: + logger.error(str(e)) + finally: + self.scheduler.trigger() def onProjectCyclesCreated(self, id: int): self._onProjectCyclesCreatedUpdatedDeleted(id) @@ -1251,12 +1263,15 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): unschedule_subtasks_in_scheduling_unit_blueprint(unit) except Exception as e: logger.error(e) - # also mark the unschedulable units as schedulable again... - mark_unschedulable_scheduling_units_for_active_projects_schedulable() - # ... and let the scheduler run - self.scheduler.trigger() - + try: + # also mark the unschedulable units as schedulable again... + mark_unschedulable_scheduling_units_for_active_projects_schedulable() + except Exception as e: + logger.error(str(e)) + finally: + # ... and let the scheduler run + self.scheduler.trigger() def create_dynamic_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER):