Skip to content
Snippets Groups Projects
Commit 2d49fb50 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-1939: minor tweaks handling observing/observed events, and determining lower search bound

parent ea2da5c8
Branches
No related tags found
1 merge request!955TMSS-1939
...@@ -63,7 +63,7 @@ from lofar.sas.tmss.client.tmssbuslistener import * ...@@ -63,7 +63,7 @@ from lofar.sas.tmss.client.tmssbuslistener import *
from lofar.common.datetimeutils import round_to_second_precision from lofar.common.datetimeutils import round_to_second_precision
from threading import Thread, Event from threading import Thread, Event
from django.db import transaction from django.db import transaction
from django.db.models import QuerySet from django.db.models import QuerySet, Q, Max
from lofar.sas.tmss.tmss.exceptions import SchedulerInterruptedException from lofar.sas.tmss.tmss.exceptions import SchedulerInterruptedException
from lofar.sas.tmss.services.scheduling.constraints import * from lofar.sas.tmss.services.scheduling.constraints import *
...@@ -310,6 +310,9 @@ class Scheduler: ...@@ -310,6 +310,9 @@ class Scheduler:
logger.info("find_best_next_schedulable_unit: units meeting constraints in window ['%s', '%s']: %s", lower_bound_start_time, upper_bound_stop_time, ','.join([str(su.id) for su in sorted(filtered_scheduling_units, key=lambda x: x.id)]) or 'None') logger.info("find_best_next_schedulable_unit: units meeting constraints in window ['%s', '%s']: %s", lower_bound_start_time, upper_bound_stop_time, ','.join([str(su.id) for su in sorted(filtered_scheduling_units, key=lambda x: x.id)]) or 'None')
if not filtered_scheduling_units:
return None
# then, check if there is a subset that can only run exclusively in this window and not later. # then, check if there is a subset that can only run exclusively in this window and not later.
exclusive_in_this_window_scheduling_units = filter_scheduling_units_which_can_only_run_in_this_window(filtered_scheduling_units, lower_bound_start_time, upper_bound_stop_time, self._raise_if_triggered, gridder=self.search_gridder) exclusive_in_this_window_scheduling_units = filter_scheduling_units_which_can_only_run_in_this_window(filtered_scheduling_units, lower_bound_start_time, upper_bound_stop_time, self._raise_if_triggered, gridder=self.search_gridder)
...@@ -377,6 +380,14 @@ class Scheduler: ...@@ -377,6 +380,14 @@ class Scheduler:
# When scheduling 'just in time' we need to allow the other services/controllers/stations/boards some startup time: DEFAULT_NEXT_STARTTIME_GAP # When scheduling 'just in time' we need to allow the other services/controllers/stations/boards some startup time: DEFAULT_NEXT_STARTTIME_GAP
lower_bound_start_time = max(datetime.utcnow() + DEFAULT_NEXT_STARTTIME_GAP, lower_bound_start_time = max(datetime.utcnow() + DEFAULT_NEXT_STARTTIME_GAP,
min([su.earliest_possible_start_time for su in candidate_units])) min([su.earliest_possible_start_time for su in candidate_units]))
# check if any unit was already observed and ended later
observed_or_beyond_scheduling_units = get_observed_or_beyond_scheduling_units()
if observed_or_beyond_scheduling_units.exists():
last_on_sky_stop_time = observed_or_beyond_scheduling_units.aggregate(Max('on_sky_stop_time'))['on_sky_stop_time__max']
lower_bound_start_time = max(lower_bound_start_time, last_on_sky_stop_time + DEFAULT_NEXT_STARTTIME_GAP)
# upper bound of search window is at least a week later, or up unit latest cycle end time
upper_bound_stop_time = max(lower_bound_start_time + timedelta(days=7), upper_bound_stop_time = max(lower_bound_start_time + timedelta(days=7),
max([su.latest_possible_start_time for su in candidate_units])) max([su.latest_possible_start_time for su in candidate_units]))
...@@ -625,7 +636,7 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): ...@@ -625,7 +636,7 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler):
''' '''
def __init__(self): def __init__(self):
super().__init__(log_event_messages=False) super().__init__(log_event_messages=logger.level==logging.DEBUG)
self.scheduler = Scheduler() self.scheduler = Scheduler()
def start_handling(self): def start_handling(self):
...@@ -650,8 +661,9 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): ...@@ -650,8 +661,9 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler):
def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str):
if (status in [models.SchedulingUnitStatus.Choices.SCHEDULABLE.value, if (status in [models.SchedulingUnitStatus.Choices.SCHEDULABLE.value,
models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value] and not self.scheduler.is_scheduling) or \ models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value] and not self.scheduler.is_scheduling) or \
status in [models.SchedulingUnitStatus.Choices.OBSERVING, status in [models.SchedulingUnitStatus.Choices.OBSERVING.value,
models.SchedulingUnitStatus.Choices.CANCELLED.value]: models.SchedulingUnitStatus.Choices.CANCELLED.value] or \
(status==models.SchedulingUnitStatus.Choices.OBSERVED.value and get_scheduled_scheduling_units(scheduler='dynamic').count()==0):
logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s): triggering update of dynamic & fixed_time schedule...", id, status) logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s): triggering update of dynamic & fixed_time schedule...", id, status)
# scheduling takes a long time, longer then creating many scheduling units in bulk # scheduling takes a long time, longer then creating many scheduling units in bulk
...@@ -828,6 +840,23 @@ def get_scheduled_scheduling_units(lower_bound: datetime=None, upper_bound: date ...@@ -828,6 +840,23 @@ def get_scheduled_scheduling_units(lower_bound: datetime=None, upper_bound: date
scheduled_units = scheduled_units.filter(scheduling_constraints_doc__scheduler=scheduler) scheduled_units = scheduled_units.filter(scheduling_constraints_doc__scheduler=scheduler)
return scheduled_units return scheduled_units
def get_observed_or_beyond_scheduling_units(lower_bound: datetime=None, upper_bound: datetime=None, scheduler: str=None) -> QuerySet:
'''get a queryset of all scheduling_units which are observed or beyond (processing, ingesting, finished) and fall within the given [lower_bound, upper_bound) window (if not None)'''
scheduled_units = models.SchedulingUnitBlueprint.objects.filter(status__value__in=(models.SchedulingUnitStatus.Choices.OBSERVED.value,
models.SchedulingUnitStatus.Choices.PROCESSING.value,
models.SchedulingUnitStatus.Choices.PROCESSED.value,
models.SchedulingUnitStatus.Choices.INGESTING.value,
models.SchedulingUnitStatus.Choices.INGESTED.value,
models.SchedulingUnitStatus.Choices.FINISHED.value))
if lower_bound is not None:
scheduled_units = scheduled_units.filter(scheduled_stop_time__gte=lower_bound)
if upper_bound is not None:
scheduled_units = scheduled_units.filter(scheduled_start_time__lt=upper_bound)
if scheduler is not None:
scheduled_units = scheduled_units.filter(scheduling_constraints_doc__scheduler=scheduler)
return scheduled_units
def get_triggered_schedulable_scheduling_units() -> QuerySet: def get_triggered_schedulable_scheduling_units() -> QuerySet:
'''get a list of all trigger dynamically and fixed_time schedulable scheduling_units''' '''get a list of all trigger dynamically and fixed_time schedulable scheduling_units'''
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment