diff --git a/SAS/TMSS/backend/services/scheduling/lib/subtask_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/subtask_scheduling.py index c5168cd778a288ed5d4f99f1e7306c8fd3c20cbc..f3ed9c72c76efb0717574eb9b8bf209c07176dd2 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/subtask_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/subtask_scheduling.py @@ -36,12 +36,79 @@ logger = logging.getLogger(__name__) from lofar.sas.tmss.client.tmssbuslistener import * from lofar.sas.tmss.tmss.tmssapp.models import Subtask, SubtaskState, SubtaskType, SchedulingUnitBlueprint from lofar.sas.tmss.tmss.tmssapp.subtasks import schedule_subtask +from lofar.sas.tmss.tmss.exceptions import TMSSException from lofar.common.datetimeutils import round_to_second_precision from datetime import datetime, timedelta + class TMSSSubTaskSchedulingEventMessageHandler(TMSSEventMessageHandler): ''' ''' + def try_schedule_subtasks(self, subtasks): + '''try to schedule each subtask in the list''' + for subtask in subtasks: + try: + # start with checking the prerequisites before we try schedule the subtask. + # We could of course just try to schedule it and check for errors, + # but that pollutes the logs with ugly error messages indicating something bad happened. + # It's much nice to present informative log messages here with reasonble explanations why a subtask cannot be scheduled. + + if subtask.is_obsolete: + logger.info("skipping scheduling of obsolete subtask %s", subtask.id) + continue + + if subtask.state.value != SubtaskState.Choices.DEFINED.value: + logger.info("skipping scheduling of subtask %s with state=%s", subtask.id, subtask.state) + continue + + predecessors = subtask.predecessors.all() + if any([predecessor.state.value != SubtaskState.Choices.FINISHED.value and not predecessor.is_obsolete for predecessor in predecessors]): + logger.info("skipping scheduling of subtask %s because not all its (non-obsolete) predecessors are finished", subtask.id) + continue + + if subtask.specifications_template.type.value == SubtaskType.Choices.INGEST.value: + # check ingest permission + scheduling_unit_blueprint = subtask.task_blueprint.scheduling_unit_blueprint + if scheduling_unit_blueprint.ingest_permission_required: + if scheduling_unit_blueprint.ingest_permission_granted_since is None or scheduling_unit_blueprint.ingest_permission_granted_since > datetime.utcnow(): + logger.info("skipping scheduling of ingest subtask %s which requires permission from scheduling_unit_blueprint id=%s first", subtask.id, scheduling_unit_blueprint.id) + continue + + if subtask.specifications_template.type.value == SubtaskType.Choices.CLEANUP.value: + # can we schedule this cleanup subtask already, or do we need to wait for ingest to finish and/or pinning + # check pinning + predecessor_tasks_with_pinned_data = [subtask_pred.task_blueprint for subtask_pred in subtask.predecessors if subtask_pred.task_blueprint.output_pinned] + if predecessor_tasks_with_pinned_data: + logger.info("skipping scheduling of cleanup subtask id=%s for which the data of to-be-cleaned-up task_bluerints %s has been pinned", subtask.id, ','.join(str(t.id) for t in predecessor_tasks_with_pinned_data)) + continue + + predecessor_tasks_with_pinned_data = [subtask_pred.task_blueprint for subtask_pred in subtask.predecessors if subtask_pred.task_blueprint.output_pinned] + if predecessor_tasks_with_pinned_data: + logger.info("skipping scheduling of cleanup subtask id=%s for which the data of to-be-cleaned-up task_bluerints %s has been pinned", subtask.id, ','.join(str(t.id) for t in predecessor_tasks_with_pinned_data)) + continue + + # check ingest status(es) + scheduling_unit_ingest_subtasks = Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint__id=subtask.task_blueprint.scheduling_unit_blueprint.id)\ + .filter(specifications_template__type__value=SubtaskType.Choices.INGEST.value) \ + .exclude(state__value=SubtaskState.Choices.FINISHED.value) \ + .filter(obsolete_since__isnull=True).order_by('id').all() + + if scheduling_unit_ingest_subtasks.exists(): + logger.info("skipping scheduling of cleanup subtask id=%s because not all ingest subtasks in the same scheduling unit are finished yet", subtask.id) + continue + + # ok, all prerequisites have been checked. + # try scheduling the subtask. + # if it succeeds, then the state will be 'scheduled' afterwards + # if there is a specification error, then the state will be 'error' afterwards + # if there is another kind of error (like needing ingest-permission), then the state will be 'defined' afterwards, so you can retry. + # for the ingest-permission we will retry automatically when that permission is granted + schedule_subtask(subtask, start_time=round_to_second_precision(datetime.utcnow()+timedelta(seconds=30))) + except TMSSException as e: + # just log the error and continue with the next subtask. + # the user should take action on "fixing" the unschedulable subtask + logger.error(e) + def onSubTaskCreated(self, id: int): super().onSubTaskCreated(id) subtask = Subtask.objects.get(id=id) @@ -49,52 +116,49 @@ class TMSSSubTaskSchedulingEventMessageHandler(TMSSEventMessageHandler): if predecessors.exists() and all([p.state.value==SubtaskState.Choices.FINISHED.value or p.is_obsolete for p in predecessors]): logger.info("subtask %s was just created and all its predecessors are finished or obsolete. trying to schedule it...", id) - subtask = schedule_subtask(subtask, start_time=round_to_second_precision(datetime.utcnow()+timedelta(seconds=30))) - logger.log(logging.INFO if subtask.state.value==SubtaskState.Choices.SCHEDULED.value else logging.WARNING, - "new subtask %s now has state '%s'", id, subtask.state.value) + self.try_schedule_subtasks([subtask]) def onSubTaskStatusChanged(self, id: int, status: str): super().onSubTaskStatusChanged(id, status) if status == SubtaskState.Choices.FINISHED.value: subtask = Subtask.objects.get(id=id) - successors = subtask.successors.filter(state__value=SubtaskState.Choices.DEFINED.value).filter(obsolete_since__isnull=True).order_by('id').all() - successor_ids = [s.id for s in successors] - - logger.info("subtask %s finished. trying to %s schedule defined non-obsolete successors: %s", - id, successors.count(), ' '.join(str(id) for id in successor_ids) or 'None') - - for successor in successors: - try: - successor_predecessors = successor.predecessors.all() - - if any([suc_pred.state.value != SubtaskState.Choices.FINISHED.value and not suc_pred.is_obsolete for suc_pred in successor_predecessors]): - logger.info("skipping scheduling of successor subtask %s for finished subtask %s because not all its other (non-obsolete) predecessor subtasks are finished", successor.id, id) - else: - logger.info("trying to schedule successor subtask %s for finished subtask %s", successor.id, id) - # try scheduling the subtask. - # if it succeeds, then the state will be 'scheduled' afterwards - # if there is a specification error, then the state will be 'error' afterwards - # if there is another kind of error (like needing ingest-permission), then the state will be 'defined' afterwards, so you can retry. - # for the ingest-permission we will retry automatically when that permission is granted - scheduled_successor = schedule_subtask(successor, start_time=round_to_second_precision(datetime.utcnow()+timedelta(seconds=30))) - logger.log(logging.INFO if scheduled_successor.state.value==SubtaskState.Choices.SCHEDULED.value else logging.WARNING, - "successor subtask %s for finished subtask %s now has state '%s'", successor.id, id, scheduled_successor.state.value) - - except Exception as e: - logger.error(e) + successors = subtask.successors.filter(state__value=SubtaskState.Choices.DEFINED.value)\ + .filter(obsolete_since__isnull=True).order_by('id').all() + + if successors: + logger.info("subtask %s finished. trying to schedule %s defined non-obsolete successors: %s", id, successors.count(), ' '.join(str(id) for id in [s.id for s in successors]) or 'None') + self.try_schedule_subtasks(successors) + + if subtask.specifications_template.type.value == SubtaskType.Choices.INGEST.value: + # ingest subtasks have a special type of "successors": the cleanup subtask(s) + # these cleanup subtasks do not depend on the ingest output data, so they are no direct data successor + # these cleanup subtasks do depend on the ingest with the same scheduling unit to be finished. + scheduling_unit_cleanup_subtasks = Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint__id=subtask.task_blueprint.scheduling_unit_blueprint.id)\ + .filter(specifications_template__type__value=SubtaskType.Choices.CLEANUP.value) \ + .filter(state__value=SubtaskState.Choices.DEFINED.value) \ + .filter(obsolete_since__isnull=True).order_by('id').all() + if scheduling_unit_cleanup_subtasks.exists(): + logger.info("ingest subtask %s finished. trying to schedule %s defined non-obsolete cleanup successor(s): %s", id, scheduling_unit_cleanup_subtasks.count(), ' '.join(str(id) for id in [s.id for s in scheduling_unit_cleanup_subtasks]) or 'None') + self.try_schedule_subtasks(scheduling_unit_cleanup_subtasks) - def onSchedulingUnitBlueprintIngestPermissionGranted(self, id: int, ingest_permission_granted_since: datetime): - logger.info("ingest_permission_granted_since='%s' for scheduling_unit_blueprint id=%s", ingest_permission_granted_since, id) - scheduling_unit = SchedulingUnitBlueprint.objects.get(id=id) - for task in scheduling_unit.task_blueprints.all(): - for subtask in task.subtasks.all(): - if subtask.state.value == SubtaskState.Choices.DEFINED.value: - if subtask.specifications_template.type.value == SubtaskType.Choices.INGEST.value: - if all(pred.state.value == SubtaskState.Choices.FINISHED.value or pred.is_obsolete for pred in subtask.predecessors.all()): - logger.info("trying to schedule ingest subtask id=%s for scheduling_unit_blueprint id=%s...", subtask.id, id) - schedule_subtask(subtask, start_time=round_to_second_precision(datetime.utcnow()+timedelta(seconds=30))) + def onSchedulingUnitBlueprintIngestPermissionGranted(self, id: int, ingest_permission_granted_since: datetime): + logger.info("ingest_permission_granted_since='%s' for scheduling_unit_blueprint id=%s, trying to schedule ingest subtasks if any, and prerequisites are met", ingest_permission_granted_since, id) + scheduling_unit_ingest_subtasks = Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint__id=id)\ + .filter(specifications_template__type__value=SubtaskType.Choices.INGEST.value) \ + .filter(state__value=SubtaskState.Choices.DEFINED.value) \ + .filter(obsolete_since__isnull=True).order_by('id').all() + self.try_schedule_subtasks(scheduling_unit_ingest_subtasks) + + def onTaskBlueprintOutputPinningUpdated(self, id: int, output_pinned: bool): + logger.info("task_blueprint id=%s output_pinned changed to '%s'", id, output_pinned) + scheduling_unit_cleanup_subtasks = Subtask.objects.filter(task_blueprint__id=id)\ + .filter(specifications_template__type__value=SubtaskType.Choices.CLEANUP.value) \ + .filter(task_blueprint__output_pinned=False)\ + .filter(state__value=SubtaskState.Choices.DEFINED.value) \ + .filter(obsolete_since__isnull=True).order_by('id').all() + self.try_schedule_subtasks(scheduling_unit_cleanup_subtasks) def create_subtask_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER): return TMSSBusListener(handler_type=TMSSSubTaskSchedulingEventMessageHandler,