Skip to content
Snippets Groups Projects
Select Git revision
  • ae202ddf2ce825fa4e2372361d04586fe7852ea7
  • main default protected
  • make_oidc_aud_configurable
  • 187_fix_agnpy
  • helm-chart
  • esap-general#159
  • nico-reviewing-async
  • feature/update-uws-auth
  • SDC-596/fix-internal-server-no-datasets
  • fix-query-errors
  • feature/ida-dekstop-records
  • merge-master
  • feature/better_logs_and_mocks
  • adex-settings-to-configuration
  • astron-vo-quick-fix
  • 69_add_diracIAM
  • sdc380-aladin-cone-search
  • esap-gateway-query
  • esap-general#11
  • esap-general#51
  • esap-api-dirac
  • esap-gateway-release-0_2_0
  • esap-gateway-release_0_1_1
  • esap-gateway-release_0_1_0
24 results

database_router.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    dynamic_scheduling.py 23.59 KiB
    #!/usr/bin/env python3
    
    # dynamic_scheduling.py
    #
    # Copyright (C) 2020
    # ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it
    # and/or modify it under the terms of the GNU General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be
    # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    #
    # $Id:  $
    
    """
    """
    
    import os
    
    import logging
    logger = logging.getLogger(__name__)
    from datetime import datetime, timedelta, time
    
    from lofar.sas.tmss.tmss.tmssapp import models
    from lofar.sas.tmss.tmss.tmssapp.tasks import schedule_independent_subtasks_in_scheduling_unit_blueprint, unschedule_subtasks_in_scheduling_unit_blueprint
    from lofar.sas.tmss.tmss.tmssapp.subtasks import update_subtasks_start_times_for_scheduling_unit, clear_defined_subtasks_start_stop_times_for_scheduling_unit, cancel_subtask
    from lofar.sas.tmss.client.tmssbuslistener import *
    from lofar.common.datetimeutils import round_to_second_precision
    from threading import Thread, Event
    
    from lofar.sas.tmss.services.scheduling.constraints import *
    
    
    # LOFAR needs to have a gap in between observations to (re)initialize hardware.
    DEFAULT_INTER_OBSERVATION_GAP = timedelta(seconds=60)
    DEFAULT_NEXT_STARTTIME_GAP = timedelta(seconds=180)
    
    ################## core dynamic scheduling methods ################################################
    #                                                                                                 #
    # This module starts with the core dynamic scheduling methods which are used in the dynamic       #
    # scheduling service. These high level methods only filter/score/sort in a generic way.           #
    # The detailed concrete filter/score/sort methods are pick by a strategy pattern in the           #
    # constraints package based on each scheduling unit's scheduling_constrains template.             #
    #                                                                                                 #
    ###################################################################################################
    
    def find_best_next_schedulable_unit(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound_start_time: datetime, upper_bound_stop_time: datetime) -> ScoredSchedulingUnit:
        """
        find the best schedulable scheduling_unit which can run withing the given time window from the given scheduling_units.
        :param lower_bound_start_time: evaluate the constrains at and after lower_bound_start_time. The returned unit has a start_time guaranteed at or after lower_bound_start_time.
        :param upper_bound_stop_time: evaluate the constrains before upper_bound_stop_time. The returned unit has a stop_time guaranteed before upper_bound_stop_time.
        :param scheduling_units: evaluate these scheduling_units.
        Returns a ScoredSchedulingUnit struct with the best next schedulable scheduling unit and its proposed start_time where it best fits its contraints.
        """
        # ensure upper is greater than or equal to lower
        upper_bound_stop_time = max(lower_bound_start_time, upper_bound_stop_time)
    
        filtered_scheduling_units = filter_scheduling_units_using_constraints(scheduling_units, lower_bound_start_time, upper_bound_stop_time)
    
        if filtered_scheduling_units:
            triggered_scheduling_units = [scheduling_unit for scheduling_unit in filtered_scheduling_units if scheduling_unit.interrupts_telescope]
            if triggered_scheduling_units:
                highest_priority_triggered_scheduling_unit = max(triggered_scheduling_units, key=lambda su: su.project.trigger_priority)
                best_scored_scheduling_unit = ScoredSchedulingUnit(scheduling_unit=highest_priority_triggered_scheduling_unit,
                                                                   start_time=get_earliest_possible_start_time(highest_priority_triggered_scheduling_unit, lower_bound_start_time),
                                                                   scores={}, weighted_score=None)  # we don't care about scores in case of a trigger
            else:
                best_scored_scheduling_unit = get_best_scored_scheduling_unit_scored_by_constraints(filtered_scheduling_units, lower_bound_start_time, upper_bound_stop_time)
            return best_scored_scheduling_unit
    
        # no filtered scheduling units found...
        logger.debug("No schedulable scheduling units found which meet the requirements between '%s' and '%s'", lower_bound_start_time, upper_bound_stop_time)
        return None
    
    
    def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint:
        '''find the best next schedulable scheduling unit and try to schedule it.
        Overlapping existing scheduled units are unscheduled if their score is lower.
        :return: the scheduled scheduling unit.'''
    
        # --- setup of needed variables ---
        schedulable_units = get_dynamically_schedulable_scheduling_units()
    
        if len(schedulable_units) == 0:
            logger.info("No scheduling units found...")
            return
    
        # estimate the lower_bound_start_time
        earliest_possible_start_time = get_min_earliest_possible_start_time(schedulable_units, datetime.utcnow()+DEFAULT_NEXT_STARTTIME_GAP)
        # no need to irritate user in log files with subsecond scheduling precision
        lower_bound_start_time = round_to_second_precision(earliest_possible_start_time)
        upper_bound_stop_time = lower_bound_start_time + timedelta(days=1)
    
        # --- core routine ---
        try:
            # try to find the best next scheduling_unit
            logger.info("schedule_next_scheduling_unit: searching for best scheduling unit to schedule between '%s' and '%s'", lower_bound_start_time, upper_bound_stop_time)
            best_scored_scheduling_unit = find_best_next_schedulable_unit(schedulable_units, lower_bound_start_time, upper_bound_stop_time)
            if best_scored_scheduling_unit:
                best_scheduling_unit = best_scored_scheduling_unit.scheduling_unit
                best_scheduling_unit_score = best_scored_scheduling_unit.weighted_score
                best_start_time = best_scored_scheduling_unit.start_time
    
                # make sure we don't start earlier than allowed
                best_start_time = max(best_start_time, earliest_possible_start_time)
    
                # make start_time "look nice" for us humans
                best_start_time = round_to_second_precision(best_start_time)
    
                logger.info("schedule_next_scheduling_unit: found best candidate id=%s '%s' weighted_score=%s start_time=%s interrupts_telescope=%s",
                            best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time, best_scheduling_unit.interrupts_telescope)
    
                if best_scheduling_unit.interrupts_telescope:
                    cancel_running_observation_if_needed_and_possible(best_scored_scheduling_unit)
    
                if unschededule_blocking_scheduled_units_if_needed_and_possible(best_scored_scheduling_unit):
                    # no (old) scheduled scheduling_units in the way, so schedule our candidate!
                    scheduled_scheduling_unit = schedule_independent_subtasks_in_scheduling_unit_blueprint(best_scheduling_unit, start_time=best_start_time)
    
                    logger.info("schedule_next_scheduling_unit: scheduled best candidate id=%s '%s' score=%s start_time=%s interrupts_telescope=%s",
                                best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time, best_scheduling_unit.interrupts_telescope)
                    return scheduled_scheduling_unit
    
        except SubtaskSchedulingException as e:
            logger.error("Could not schedule scheduling_unit id=%s name='%s'. Error: %s", best_scheduling_unit.id, best_scheduling_unit.name, e)
    
    
    def assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time: datetime):
        ''''''
        logger.info("Estimating mid-term schedule with lower_bound_start_time=%s ..." % lower_bound_start_time)
    
        scheduling_units = get_dynamically_schedulable_scheduling_units()
    
        if len(scheduling_units) == 0:
            logger.info("No scheduling units found...")
            return
    
        upper_bound_stop_time = lower_bound_start_time + timedelta(days=365)
    
        # update the start_times of the remaining ones (so they form queue, and can be visualized in a timeline)
        while scheduling_units and lower_bound_start_time < upper_bound_stop_time:
            best_scored_scheduling_unit = find_best_next_schedulable_unit(scheduling_units, lower_bound_start_time, upper_bound_stop_time)
    
            if best_scored_scheduling_unit:
                scheduling_unit = best_scored_scheduling_unit.scheduling_unit
                start_time = round_to_second_precision(best_scored_scheduling_unit.start_time)
                logger.info("mid-term schedule: next scheduling unit id=%s '%s' start_time=%s", scheduling_unit.id, scheduling_unit.name, start_time)
                update_subtasks_start_times_for_scheduling_unit(scheduling_unit, start_time)
                # TODO check this?
                # If the start_time of the subtasks are updated, should the start_time (and stop_time) of the
                # scheduling_unit also be updated? Currently its a cached property
    
                # keep track of the lower_bound_start_time based on last sub.stoptime and gap
                lower_bound_start_time = scheduling_unit.stop_time + DEFAULT_INTER_OBSERVATION_GAP
    
                scheduling_units.remove(scheduling_unit)
            else:
                # search again in a later timeslot
                min_earliest_possible_start_time = get_min_earliest_possible_start_time(scheduling_units, lower_bound_start_time+timedelta(minutes=10))
                logger.info("lower_bound_start_time='%s', min_earliest_possible_start_time='%s'", lower_bound_start_time, min_earliest_possible_start_time)
                if min_earliest_possible_start_time > lower_bound_start_time:
                    lower_bound_start_time = min_earliest_possible_start_time
                else:
                    # cannot advance anymore to find more
                    logger.warning("Cannot assign start/stop times to remaining scheduling units for mid-term schedule...")
                    for su in scheduling_units:
                        logger.warning("Remaining scheduling unit: id=%s '%s'", su.id, su.name)
    
                        # clear start/stop times, so they don't show up in the timeline,
                        # and we can filter/show them in a seperate list which the user can tinker on the constraints
                        clear_defined_subtasks_start_stop_times_for_scheduling_unit(su)
                    break
    
        logger.info("Estimating mid-term schedule... finished")
    
    
    def do_dynamic_schedule() -> models.SchedulingUnitBlueprint:
        '''do a full update of the schedule: schedule next scheduling unit and assign start stop times to remaining schedulable scheduling units'''
        logger.info("Updating dynamic schedule....")
        scheduled_unit = schedule_next_scheduling_unit()
    
        # determine next possible start time for remaining scheduling_units
        if scheduled_unit:
            lower_bound_start_time = scheduled_unit.stop_time + DEFAULT_INTER_OBSERVATION_GAP
        else:
            try:
                scheduled_units = get_scheduled_scheduling_units(datetime.utcnow(), datetime.utcnow())
                lower_bound_start_time = max([s.stop_time for s in scheduled_units if s.stop_time is not None]) + DEFAULT_INTER_OBSERVATION_GAP
            except:
                lower_bound_start_time = datetime.utcnow()
    
        # round up to next nearest second
        lower_bound_start_time += timedelta(microseconds=1000000-lower_bound_start_time.microsecond)
    
        # determine mid-term schedule by assigning start/stop times to remaining schedulable units using the same search strategy
        assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time)
        logger.info("Finished updating dynamic schedule")
    
        return scheduled_unit
    
    
    ################## service/messagebug handler class ###############################################
    
    class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler):
        '''
        The TMSSDynamicSchedulingMessageHandler reacts to TMSS EventMessages by triggering a new full update of the dynamic
        schedule.
        The actual schedule-update method runs on a backround thread, and can take some time to complete ranging from a
        few seconds to several minutes. In the mean time new EventMessages may be received. These are handled by raising a flag
        that signals the schedule-update-thread that a new full update is needed. This way, a burst of Events results in
        a single update, and it also ensures that we always compute the schedule with the latest data.
        '''
    
        def __init__(self):
            super().__init__(log_event_messages=True)
            self._scheduling_thread = None
            self._scheduling_thread_running = False
            self._do_schedule_event = Event()
    
        def start_handling(self):
            # start the background thread which waits until the _do_schedule_event event is set upon receiving to the correct TMSS EVentMessages.
            self._scheduling_thread = Thread(target=TMSSDynamicSchedulingMessageHandler._scheduling_loop, kwargs={'self':self})
            self._scheduling_thread.daemon = True
            self._scheduling_thread_running = True
            self._scheduling_thread.start()
            super().start_handling()
    
        def stop_handling(self):
            self._scheduling_thread_running = False
            self._scheduling_thread.join()
            self._scheduling_thread = None
            super().stop_handling()
    
        def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str):
            if status in ["schedulable", "observed", "finished", "cancelled"]:
                logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s): triggering update of dynamic schedule...", id, status)
                # scheduling takes a long time, longer then creating many scheduling units in bulk
                # so, we do not create a complete new schedule for each new unit,
                # but we only trigger a new schedule update.
                # This way we are sure that the latest units are always taken into account while scheduling, but we do not waste cpu cylces.
                self._do_schedule_event.set()
    
        def onSchedulingUnitDraftConstraintsUpdated(self, id: int, scheduling_constraints_doc: dict):  # todo: Does this now have to be onSchedulingUnitBlueprintConstraintsUpdated (since we now have those on the blueprint as well)?
            affected_scheduling_units = models.SchedulingUnitBlueprint.objects.filter(draft__id=id).all()
            for scheduling_unit in affected_scheduling_units:
                if scheduling_unit.status == 'scheduled':
                    unschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit)
    
            self._do_schedule_event.set()
        
        def onSettingUpdated(self, name: str, value: bool):
            if name == models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value and value:
                logger.info("%s was set to %s: triggering update of dynamic schedule...", name, value)
                self._do_schedule_event.set()
    
    
        def _scheduling_loop(self):
            while self._scheduling_thread_running:
                if self._do_schedule_event.wait(timeout=10):
                    self._do_schedule_event.clear()
                    try:
                        if models.Setting.objects.get(name=models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value:
                            do_dynamic_schedule()
                        else:
                            logger.warning("Skipping update of dynamic schedule because the setting %s=%s", models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value, models.Setting.objects.get(name=models.SystemSettingFlag.Choices.DYNAMIC_SCHEDULING_ENABLED.value).value)
                    except Exception as e:
                        logger.exception(str(e))
                        # just continue processing events. better luck next time...
    
    
    def create_dynamic_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER):
        return TMSSBusListener(handler_type=TMSSDynamicSchedulingMessageHandler,
                               handler_kwargs=None,
                               exchange=exchange,
                               broker=broker)
    
    
    
    
    
    ################## helper methods #################################################################
    
    def get_dynamically_schedulable_scheduling_units() -> [models.SchedulingUnitBlueprint]:
        '''get a list of all dynamically schedulable scheduling_units'''
        defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value='defined')
        defined_independend_subtask_ids = defined_independend_subtasks.values('task_blueprints__scheduling_unit_blueprint_id').distinct().all()
        scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=defined_independend_subtask_ids) \
                                                                 .filter(scheduling_constraints_template__isnull=False).all()
        return [su for su in scheduling_units if su.status == 'schedulable']
    
    
    def get_scheduled_scheduling_units(lower:datetime=None, upper:datetime=None) -> [models.SchedulingUnitBlueprint]:
        '''get a list of all scheduled scheduling_units'''
        scheduled_subtasks = models.Subtask.objects.filter(state__value='scheduled')
        if lower is not None:
            scheduled_subtasks = scheduled_subtasks.filter(stop_time__gte=lower)
        if upper is not None:
            scheduled_subtasks = scheduled_subtasks.filter(start_time__lte=upper)
        return list(models.SchedulingUnitBlueprint.objects.filter(id__in=scheduled_subtasks.values('task_blueprints__scheduling_unit_blueprint_id').distinct()).all())
    
    
    def get_running_observation_subtasks(stopping_after:datetime=None) -> [models.Subtask]:
        '''get a list of all starting/started subtasks, optionally filter for those finishing after the provided time'''
        running_obs_subtasks = models.Subtask.objects.filter(state__value__in=[models.SubtaskState.Choices.STARTING.value, models.SubtaskState.Choices.STARTED.value],
                                                             specifications_template__type__value=models.SubtaskType.Choices.OBSERVATION.value)
        if stopping_after is not None:
            running_obs_subtasks = running_obs_subtasks.filter(stop_time__gte=stopping_after)
        return list(running_obs_subtasks.all())
    
    
    def unschededule_blocking_scheduled_units_if_needed_and_possible(candidate: ScoredSchedulingUnit) -> bool:
        '''check if there are any already scheduled units in the way, and unschedule them if allowed. Return True if nothing is blocking anymore.'''
        # check any previously scheduled units, and unschedule if needed/allowed
        scheduled_scheduling_units = get_scheduled_scheduling_units(lower=candidate.start_time,
                                                                    upper=candidate.start_time + candidate.scheduling_unit.duration)
    
        # check if we can and need to unschedule the blocking units
        for scheduled_scheduling_unit in scheduled_scheduling_units:
            scheduled_score = compute_scores(scheduled_scheduling_unit, candidate.start_time, candidate.start_time + candidate.scheduling_unit.duration)
    
            # in case of a triggered candidate with higher trigger priority than the scheduled unit, we don't care about
            # scores, but only check trigger priority.
            if candidate.scheduling_unit.interrupts_telescope:
                if (not scheduled_scheduling_unit.interrupts_telescope) or candidate.scheduling_unit.project.trigger_priority > scheduled_scheduling_unit.project.trigger_priority:
                    logger.info("unscheduling id=%s '%s' because it is in the way and has a lower trigger_priority=%s than the best candidate id=%s '%s' trigger_priority=%s start_time=%s",
                        scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.project.trigger_priority,
                        candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.scheduling_unit.project.trigger_priority, candidate.scheduling_unit.start_time)
                    unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit)
    
            elif candidate.weighted_score > scheduled_score.weighted_score:
                # ToDo: also check if the scheduled_scheduling_unit is manually/dynamically scheduled
                logger.info("unscheduling id=%s '%s' because it is in the way and has a lower score than the best candidate id=%s '%s' score=%s start_time=%s",
                    scheduled_scheduling_unit.id, scheduled_scheduling_unit.name,
                    candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.weighted_score, candidate.scheduling_unit.start_time)
    
                unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit)
    
        # check again... are still there any scheduled_scheduling_units in the way?
        scheduled_scheduling_units = get_scheduled_scheduling_units(lower=candidate.start_time,
                                                                    upper=candidate.start_time + candidate.scheduling_unit.duration)
        if scheduled_scheduling_units:
            # accept current solution with current scheduled_scheduling_units
            logger.info("keeping current scheduled unit(s) which have a better (or equal) score: %s", "; ".join(
                "id=%s '%s' start_time='%s'" % (su.id, su.name, su.start_time) for su in scheduled_scheduling_units))
    
            # indicate there are still blocking units
            return False
    
        # all clear, nothing is blocking anymore
        return True
    
    
    def cancel_running_observation_if_needed_and_possible(candidate: ScoredSchedulingUnit) -> bool:
        '''check if there are starting/started observation subtasks that block the candidate.
         Only triggered scheduling_units can cancel running observations and only if they belong to a project with a higher
         trigger_priority than the projects of the subtasks to cancel'''
    
        # todo: is it sufficient to cancel the subtasks, or do we cancel the whole scheduling unit?
        if candidate.scheduling_unit.interrupts_telescope:
            running_obs_subtasks = get_running_observation_subtasks(candidate.start_time)
            for obs in running_obs_subtasks:
                if obs.project is None:
                    logger.warning('cannot cancel running subtask pk=%s for triggered scheduling_unit pk=%s because it does belong to a project and hence has unknown priority' %
                                   (obs.pk, candidate.scheduling_unit.name))
                    continue
                if candidate.scheduling_unit.project.trigger_priority > obs.project.trigger_priority:
                    logger.info('cancelling observation subtask pk=%s trigger_priority=%s because it blocks the triggered scheduling_unit pk=%s trigger_priority=%s' %
                                (obs.pk, obs.project.trigger_priority, candidate.scheduling_unit.pk, candidate.scheduling_unit.project.trigger_priority))
                    # todo: check if cancellation is really necessary or the trigger can be scheduled afterwards
                    #  I guess we could just do can_run_after(candidate, obs.stop_time) here for that?
                    #  We could also only do this, of there is a 'before' constraint on each trigger.
                    #  -> Clarify and implemented with TMSS-704.
                    cancel_subtask(obs)
                else:
                    logger.info('NOT cancelling subtask pk=%s trigger_priority=%s for triggered scheduling_unit pk=%s trigger_priority=%s because its priority is too low' %
                                (obs.pk, obs.project.trigger_priority, candidate.scheduling_unit.pk, candidate.scheduling_unit.project.trigger_priority))