Select Git revision
constraints.py

TMSS-1939: minor improvement
Jorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
constraints.py 82.92 KiB
#!/usr/bin/env python3
# Copyright (C) 2020
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
"""
This module defines the 'API' to:
- filter a list of schedulable scheduling_units by checking their constraints: see method filter_scheduling_units_using_constraints
- sort a (possibly filtered) list of schedulable scheduling_units evaluating their constraints and computing a 'fitness' score: see method get_sorted_scheduling_units_scored_by_constraints
These main methods are used in the dynamic_scheduler to pick the next best scheduling unit, and compute the midterm schedule.
"""
import logging
logger = logging.getLogger(__name__)
from datetime import datetime, timedelta
from dateutil import parser
from typing import Callable
from astropy.coordinates import Angle
from astropy.coordinates.earth import EarthLocation
import astropy.units
from functools import reduce, lru_cache
from copy import deepcopy
from lofar.common.datetimeutils import round_to_second_precision
from lofar.sas.tmss.tmss.tmssapp.conversions import timestamps_and_stations_to_sun_rise_and_set, coordinates_and_timestamps_to_separation_from_bodies, coordinates_timestamps_and_stations_to_target_rise_and_set, coordinates_timestamps_and_stations_to_target_transit, Pointing
from lofar.common.util import noop
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.exceptions import *
from lofar.sas.tmss.tmss.tmssapp.subtasks import enough_stations_available
from lofar.sas.tmss.tmss.tmssapp.tasks import mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable
# rough estimate of how many units and timestamp evaluations can be cached
CACHE_SIZE = 1000 * 24 * 12 * 365
class Gridder:
VERY_COARSE_TIME_GRID = 60 # minimum accuracy for distance
COARSE_TIME_GRID = 30 # minimum accuracy for elevation
DEFAULT_TIME_GRID = 5 # minimum accuracy for daily, transit (tolerance > 5 minutes)
FINE_TIME_GRID = 1 # minimum accuracy for transit (tolerance <= 5 minutes)
NO_TIME_GRID = None
def __init__(self, grid_minutes=DEFAULT_TIME_GRID):
self.grid_minutes = grid_minutes
def __str__(self):
return "Gridder(grid_minutes=%s, n_grid_points=%s)" % (self.grid_minutes, self.n_grid_points())
def __hash__(self):
return hash(self.grid_minutes)
def __eq__(self, other):
return isinstance(other, Gridder) and self.grid_minutes == other.grid_minutes
def n_grid_points(self) -> int:
# provides an appropriate value for the n_grid_points parameter of astropy methods, reflecting an accuracy similar to the time grid of this Gridder.
# According to https://readthedocs.org/projects/astroplan/downloads/pdf/latest/, n_grid_points of
# - 150 means better than one minute precision (default)
# - 10 means better than five minute precision
# A value of 10 however breaks test_can_run_within_timewindow_with_sky_constraints_with_transit_offset_constraint_returns_true_when_met
# because the calculated transit_time seems to be quite a bit more than 5 minutes off. So what this returns now is a more conservative
# guesstimate.
# todo: figure out how exactly this can be computed properly so that the required precision is always met.
if self.grid_minutes <= 1:
return 150
if self.grid_minutes < 5:
return 75
if self.grid_minutes >= 5:
return 30
def as_timedelta(self):
return timedelta(minutes=self.grid_minutes) if self.grid_minutes is not None else None
def grid_time(self, dt: datetime) -> datetime:
"""
Rounds a datetime object to the nearest tick on a configurable regular time grid.
This is supposed to be used when evaluating constraint, so that caching is more effective.
Checking contraints on a gridded time means that the check outcome may be inaccurate. We assume that this is
acceptable, given that the difference between checked and real values is marginal and specified constraints
include a certain margin of safety. This should be good enough at least for the medium or long-term scheduling,
and a final non-gridded check before actually scheduling a unit can make sure that constraints are actually met.
:param grid_minutes: Number of minutes between ticks on the time grid. Must be a number with multiple of 60 or a
multiple of 60. A higher grid_minutes value increases the chance of cache hits, but also of false positive/negative
results of the constraints check.
"""
if self.grid_minutes:
if 60 % self.grid_minutes not in [0, 60]:
raise ValueError('Please provide a time grid that has a multiple of 60 or is a multiple of 60')
timestamp = round(dt.timestamp() / (60*self.grid_minutes)) * (60*self.grid_minutes) # seconds since epoch, rounded to grid
return datetime.fromtimestamp(timestamp)
else:
return dt
def plus_margin(self, dt: datetime) -> datetime:
"""
Adds a 'margin' of half grid_minutes to the given datetime.
"""
if self.grid_minutes:
return dt + timedelta(minutes=0.5*self.grid_minutes)
return dt
def minus_margin(self, dt: datetime) -> datetime:
"""
Subtracts a 'margin' of half grid_minutes from the given datetime.
"""
if self.grid_minutes:
return dt - timedelta(minutes=0.5*self.grid_minutes)
return dt
# todo: add celestial coordinate grid
class ScoredSchedulingUnit():
'''struct for collecting scores per constraint and a weighted_score for a scheduling_unit at the given start_time
'''
def __init__(self, scheduling_unit: models.SchedulingUnitBlueprint, scores: dict, start_time: datetime, weighted_score: float=0):
for score_key, score_value in scores.items():
assert score_value >= 0.0 and score_value <= 1.0, "scheduling_unit id=%s score %s=%.3f should be between 0.0 and 1.0" % (scheduling_unit.id, score_key, score_value)
self.scheduling_unit = scheduling_unit
self.scores = scores
self.start_time = start_time
self.weighted_score = weighted_score
def __str__(self):
return "SUB id:%s start='%s' weighted_score=%.4f scores: %s" % (
self.scheduling_unit.id,
self.start_time,
self.weighted_score,
', '.join(['%s=%.4f' % (key, self.scores[key]) for key in sorted(self.scores.keys())]))
class ConstraintResult():
'''struct for the results of a constraint evaluation for a scheduling_unit at the given start_time
'''
def __init__(self, scheduling_unit: models.SchedulingUnitBlueprint, constraint_key: str, evaluation_timestamp: datetime, score: float=1.0, earliest_possible_start_time: datetime=None, optimal_start_time: datetime=None):
self.scheduling_unit = scheduling_unit
self.constraint_key = constraint_key
self.evaluation_timestamp = evaluation_timestamp
self.score = score
self.earliest_possible_start_time = earliest_possible_start_time
self.optimal_start_time = optimal_start_time
@property
def is_constraint_met(self) -> bool:
'''is the constraint met? implicitly we interpret having an earliest_possible_start_time and a score>0 as a yes'''
return self.earliest_possible_start_time is not None and self.score > 0
@property
def has_constraint(self) -> bool:
'''does the scheduling_unit have a constraint with the given constraint_key?'''
return self.constraint_value is not None
@property
def constraint_value(self):
'''get the constraint value for the given self.constraint_key from the scheduling_unit constraints doc'''
try:
return reduce(dict.get, self.constraint_key.split('.'), self.scheduling_unit.scheduling_constraints_doc or {})
except:
return None
def __str__(self):
if self.has_constraint:
return "SUB id=%s constraint %s is %smet at evaluation='%s': score=%.4f earliest_possible='%s' optimal='%s'" % (
self.scheduling_unit.id,
self.constraint_key.ljust(18), # longest constrain_key is 18 chars. printing out many results with this ljust makes the log lines readible like a table.
'yes ' if self.is_constraint_met else 'not ', # yes/not are same length for same reason: reading as table in log.
self.evaluation_timestamp,
self.score,
self.earliest_possible_start_time,
self.optimal_start_time)
return "SUB id=%s has no '%s' constraint" % (self.scheduling_unit.id, self.constraint_key)
def get_boundary_stations_from_list(stations: [str]) -> (str, str, str, str):
'''
utility function to determine the stations at boundary locations. Meant to be used for onstraint checks, since when
constraints are met at these stations, this is typically also true at the other stations.
Note: There are probably cases where this is not strictly true, but the error is acceptable to us. # todo: is something like geometric distance from core and per quartile a better criterion?
returns a tuple with the most (northern, eastern, southern, western) station from the input list
'''
from lofar.lta.sip import station_coordinates
min_lat, max_lat, min_lon, max_lon = None, None, None, None
for station in stations:
coords = station_coordinates.parse_station_coordinates()["%s_LBA" % station.upper()]
loc = EarthLocation.from_geocentric(x=coords['x'], y=coords['y'], z=coords['z'], unit=astropy.units.m)
if not min_lat or loc.lat < min_lat:
min_lat = loc.lat
most_southern = station
if not max_lat or loc.lat > max_lat:
max_lat = loc.lat
most_northern = station
if not min_lon or loc.lon < min_lon:
min_lon = loc.lon
most_western = station
if not max_lon or loc.lon > max_lon:
max_lon = loc.lon
most_eastern = station
return most_northern, most_eastern, most_southern, most_western
def filter_scheduling_units_using_constraints(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound: datetime, upper_bound: datetime, raise_if_interruped: Callable=noop, gridder: Gridder=Gridder()) -> [models.SchedulingUnitBlueprint]:
"""
Filter the given scheduling_units by whether their constraints are met within the given timewindow.
:param lower_bound: evaluate and score the constrains at and after lower_bound_start_time. The returned unit has a start_time guaranteed at or after lower_bound_start_time.
:param upper_bound: evaluate and score the constrains before upper_bound_stop_time. The returned unit has a stop_time guaranteed before upper_bound_stop_time.
:param scheduling_units: evaluate/filter these scheduling_units.
:param raise_if_interruped: a callable function which raises under an externally set condition (an 'interrupt' flag was set). This function is/can_be used to interrupt a long-running scheduling call to do an early exit and start a new scheduling call. Default used function is noop (no-operation), thus no interruptable behaviour.
Returns a list scheduling_units for which their constraints are met within the given timewindow.
"""
_method_start_timestamp = datetime.utcnow()
runnable_scheduling_units = []
for scheduling_unit in scheduling_units:
raise_if_interruped() # interrupts the scheduling loop
try:
# should not happen, these are non-nullable in db
assert(scheduling_unit.draft is not None)
assert(scheduling_unit.scheduling_constraints_template is not None)
# TODo: check su within cycle
if can_run_within_station_reservations(scheduling_unit):
if can_run_within_timewindow(scheduling_unit, lower_bound, upper_bound, raise_if_interruped, gridder):
runnable_scheduling_units.append(scheduling_unit)
except Exception as e:
if isinstance(e, SchedulerInterruptedException):
raise # bubble up
else:
logger.exception(e)
mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit, reason=str(e))
_method_elapsed = datetime.utcnow() - _method_start_timestamp
logger.debug("filter_scheduling_units_using_constraints: filtered %d units (took %.1f[s])", len(scheduling_units), _method_elapsed.total_seconds())
return runnable_scheduling_units
def filter_scheduling_units_which_can_only_run_in_this_window(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound: datetime, upper_bound: datetime, raise_if_interruped: Callable=noop, gridder: Gridder=Gridder()) -> [models.SchedulingUnitBlueprint]:
"""
Filter the given scheduling_units and return those which can run exclusively in the given timewindow.
:param lower_bound: evaluate and score the constrains at and after lower_bound_start_time. The returned unit has a start_time guaranteed at or after lower_bound_start_time.
:param upper_bound: evaluate and score the constrains before upper_bound_stop_time. The returned unit has a stop_time guaranteed before upper_bound_stop_time.
:param scheduling_units: evaluate/filter these scheduling_units.
:param raise_if_interruped: a callable function which raises under an externally set condition (an 'interrupt' flag was set). This function is/can_be used to interrupt a long-running scheduling call to do an early exit and start a new scheduling call. Default used function is noop (no-operation), thus no interruptable behaviour.
Returns a list scheduling_units which can only run in the given timewindow.
"""
runnable_exclusive_in_this_window_scheduling_units = []
for scheduling_unit in scheduling_units:
raise_if_interruped() # interrupts the scheduling loop
try:
if can_run_within_timewindow(scheduling_unit, lower_bound, upper_bound, gridder=gridder):
# ok, this unit can run within this window...
# but can it also run later than this estimated earliest stop_time? If not, then it is exclusively bound to this window
# what is the earliest_possible_start_time for this unit within this window?
earliest_possible_start_time = get_earliest_possible_start_time(scheduling_unit, lower_bound, gridder=gridder)
assert earliest_possible_start_time is not None, "SUB id=%s can run in timewindow [%s, %s] so it should have an earliest_possible_start_time as well" % (scheduling_unit.id, lower_bound, upper_bound)
earliest_possible_start_time = max(earliest_possible_start_time, lower_bound)
earliest_possible_start_time = min(earliest_possible_start_time, upper_bound)
if not can_run_after(scheduling_unit, earliest_possible_start_time, gridder=gridder):
if not can_run_before(scheduling_unit, lower_bound, gridder=gridder):
runnable_exclusive_in_this_window_scheduling_units.append(scheduling_unit)
if lower_bound < datetime.utcnow() + timedelta(minutes=1):
# no unit can run before 'now'
runnable_exclusive_in_this_window_scheduling_units.append(scheduling_unit)
except Exception as e:
logger.exception(e)
return runnable_exclusive_in_this_window_scheduling_units
def get_best_scored_scheduling_unit_scored_by_constraints(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound_start_time:datetime, upper_bound_stop_time:datetime, coarse_gridder: Gridder, fine_gridder: Gridder) -> ScoredSchedulingUnit:
"""
get the best scored schedulable scheduling_unit which can run withing the given time window from the given scheduling_units.
:param lower_bound_start_time: evaluate and score the constrains at and after lower_bound_start_time. The returned unit has a start_time guaranteed at or after lower_bound_start_time.
:param upper_bound_stop_time: evaluate and score the constrains before upper_bound_stop_time. The returned unit has a stop_time guaranteed before upper_bound_stop_time.
:param scheduling_units: evaluate these scheduling_units.
Returns a ScoredSchedulingUnit struct with the best next schedulable scheduling unit and its proposed start_time where it best fits its contraints.
"""
_method_start_timestamp = datetime.utcnow()
# First score everything based on coarse grid
sorted_scored_scheduling_units_coarse = sort_scheduling_units_scored_by_constraints(scheduling_units, lower_bound_start_time, upper_bound_stop_time, coarse_gridder)
if sorted_scored_scheduling_units_coarse:
if logger.level == logging.DEBUG:
logger.debug("sorted %d scored_scheduling_units at coarse grid of %s[min]:", len(sorted_scored_scheduling_units_coarse), coarse_gridder.grid_minutes)
for i, scored_scheduling_unit in enumerate(sorted_scored_scheduling_units_coarse):
logger.debug(" [%03d] %s", i, scored_scheduling_unit)
# Re-evaluate top 5(or less) with fine grid
top_sorted_scored_scheduling_units_coarse = sorted_scored_scheduling_units_coarse[:5]
logger.debug("get_best_scored_scheduling_unit_scored_by_constraints: scoring and sorting %d units at fine grid of %s[min]...", len(top_sorted_scored_scheduling_units_coarse), fine_gridder.grid_minutes)
top_scheduling_units = [x.scheduling_unit for x in top_sorted_scored_scheduling_units_coarse]
top_sorted_scored_scheduling_units_fine = sort_scheduling_units_scored_by_constraints(top_scheduling_units, lower_bound_start_time, upper_bound_stop_time, fine_gridder)
_method_elapsed = datetime.utcnow() - _method_start_timestamp
logger.debug("get_best_scored_scheduling_unit_scored_by_constraints: scored and sorted %d units (took %.1f[s])", len(scheduling_units), _method_elapsed.total_seconds())
if top_sorted_scored_scheduling_units_fine:
logger.info("sorted top %d scored_scheduling_units at fine grid of %s[min]:", len(top_sorted_scored_scheduling_units_fine), fine_gridder.grid_minutes)
for i, scored_scheduling_unit in enumerate(top_sorted_scored_scheduling_units_fine):
logger.info(" [%03d] %s", i, scored_scheduling_unit)
# they are sorted best to worst, so return/use first.
best_scored_scheduling_unit = top_sorted_scored_scheduling_units_fine[0]
return best_scored_scheduling_unit
return None
def sort_scheduling_units_scored_by_constraints(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound_start_time: datetime, upper_bound_stop_time: datetime, gridder: Gridder) -> [ScoredSchedulingUnit]:
"""
Compute the score and proposed start_time for all given scheduling_units. Return them sorted by their weighted_score.
:param lower_bound_start_time: evaluate and score the constrains at and after lower_bound_start_time. The returned unit has a start_time guaranteed at or after lower_bound_start_time.
:param upper_bound_stop_time: evaluate and score the constrains before upper_bound_stop_time. The returned unit has a stop_time guaranteed before upper_bound_stop_time.
:param scheduling_units: evaluate these scheduling_units.
Returns a list of ScoredSchedulingUnit structs with the score details, a weighted_score and a proposed start_time where it best fits its contraints.
"""
scored_scheduling_units = compute_individual_and_weighted_scores(scheduling_units, lower_bound_start_time, upper_bound_stop_time, gridder)
unstartable_scheduling_units = [sssu for sssu in scored_scheduling_units if sssu.start_time is None]
if unstartable_scheduling_units:
logger.warning("unstartable_scheduling_units: %s", ','.join([x.scheduling_unit.id for x in unstartable_scheduling_units]))
# filter out unstartable (and thus unsortable) units.
scored_scheduling_units = [sssu for sssu in scored_scheduling_units if sssu.start_time is not None]
# sort by weighted_score, then by start_time, then by created at
return sorted(scored_scheduling_units, key=lambda x: (x.weighted_score,
(x.start_time-lower_bound_start_time).total_seconds(),
x.scheduling_unit.created_at), reverse=True)
def can_run_within_timewindow(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime, upper_bound: datetime, raise_if_interruped: Callable=noop, gridder: Gridder=Gridder()) -> bool:
'''determine if the given scheduling_unit can run withing the given timewindow evaluating all constraints from the "constraints" version 1 template
:param raise_if_interruped: a callable function which raises under an externally set condition (an 'interrupt' flag was set). This function is/can_be used to interrupt a long-running scheduling call to do an early exit and start a new scheduling call. Default used function is noop (no-operation), thus no interruptable behaviour.
'''
# Seek the earliest_possible_start_time. If existing and within window, then the unit can run within this window
earliest_possible_start_time = get_earliest_possible_start_time(scheduling_unit, lower_bound, upper_bound, gridder)
if earliest_possible_start_time is not None:
earliest_possible_stop_time = earliest_possible_start_time + scheduling_unit.specified_main_observation_duration
if earliest_possible_start_time >= lower_bound and earliest_possible_stop_time <= upper_bound:
return True
return False
def can_run_at(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime, gridder: Gridder=None) -> bool:
'''determine if the given scheduling_unit can run withing the given timewindow evaluating all constraints from the "constraints" version 1 template
:param raise_if_interruped: a callable function which raises under an externally set condition (an 'interrupt' flag was set). This function is/can_be used to interrupt a long-running scheduling call to do an early exit and start a new scheduling call. Default used function is noop (no-operation), thus no interruptable behaviour.
'''
if gridder is None:
gridder = Gridder()
if not can_run_at_with_time_constraints(scheduling_unit, proposed_start_time):
return False
if not can_run_at_with_daily_constraints(scheduling_unit, proposed_start_time, gridder=gridder):
return False
if not can_run_at_with_sky_constraints(scheduling_unit, proposed_start_time, gridder=gridder):
return False
return True
def can_run_before(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime, gridder: Gridder) -> bool:
'''Check if the given scheduling_unit can run somewhere before the given proposed_start_time timestamp depending on the sub's constrains-template/doc and its duration.
:param scheduling_unit: the scheduling_unit for which we want to check if it can run after the proposed_start_time given its constraints.
:param proposed_start_time: the proposed start_time for this scheduling_unit. Can it run after the proposed start_time, or is that forbidden by the constraints?
'''
logger.info("can_run_before id=%s, proposed_start_time='%s'", scheduling_unit.id, proposed_start_time)
# check all fine-grained can_run_before_with_*_constraints
# opt for early exit to save time.
if not can_run_before_with_time_constraints(scheduling_unit, proposed_start_time):
return False
if not can_run_before_with_daily_constraints(scheduling_unit, proposed_start_time):
return False
if not can_run_before_with_sky_constraints(scheduling_unit, proposed_start_time, gridder):
return False
# no constraints above are blocking, so return True.
return True
def can_run_after(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime, gridder: Gridder) -> bool:
'''Check if the given scheduling_unit can run somewhere after the given proposed_start_time timestamp depending on the sub's constrains-template/doc and its duration.
:param scheduling_unit: the scheduling_unit for which we want to check if it can run after the proposed_start_time given its constraints.
:param proposed_start_time: the proposed start_time for this scheduling_unit. Can it run after the proposed start_time as well, or is that forbidden by the constraints?
'''
# check all fine-grained can_run_after_with_*_constraints
# opt for early exit to save computations and time.
if not can_run_after_with_time_constraints(scheduling_unit, proposed_start_time):
return False
if not can_run_after_with_daily_constraints(scheduling_unit, proposed_start_time):
return False
if not can_run_after_with_sky_constraints(scheduling_unit, proposed_start_time, gridder):
return False
# no constraints above are blocking, so return True.
return True
def can_run_before_with_time_constraints(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime) -> bool:
'''Check if the given scheduling_unit can run somewhere before the given proposed_start_time timestamp depending on the sub's time-constrains-template/doc and its duration.
:param scheduling_unit: the scheduling_unit for which we want to check if it can run after the proposed_start_time given its constraints.
:param proposed_start_time: the proposed start_time for this scheduling_unit. Can it run after the proposed start_time, or is that forbidden by the constraints?
'''
constraints = scheduling_unit.scheduling_constraints_doc
if 'time' not in constraints:
return True
if 'after' in constraints['time']:
start_after = parser.parse(constraints['time']['after'], ignoretz=True)
return proposed_start_time >= start_after
if 'before' in constraints['time']:
# the full main observation should be able to complete before the 'before'timestamp
stop_before = parser.parse(constraints['time']['before'], ignoretz=True)
latest_possible_start_time = stop_before - scheduling_unit.specified_main_observation_duration
return proposed_start_time <= latest_possible_start_time
if 'at' in constraints['time']:
at = parser.parse(constraints['time']['at'], ignoretz=True)
return at <= proposed_start_time
if 'between' in constraints['time']:
# loop over all between intervals.
# exit early when a constraint cannot be met
for between in constraints['time']['between']:
start_after = parser.parse(between["from"], ignoretz=True)
if proposed_start_time < start_after:
return False
stop_before = parser.parse(between["to"], ignoretz=True)
latest_possible_start_time = stop_before - scheduling_unit.specified_main_observation_duration
if latest_possible_start_time >= proposed_start_time:
return False
if 'not_between' in constraints['time']:
for not_between in constraints['time']['not_between']:
raise NotImplemented()
# no constraint yielded an early False exit,
# so this unit can run before the proposed start_time
return True
def can_run_before_with_daily_constraints(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime) -> bool:
'''Check if the given scheduling_unit can run somewhere before the given proposed_start_time timestamp depending on the sub's daily-constrains-template/doc and its duration.
:param scheduling_unit: the scheduling_unit for which we want to check if it can run before the proposed_start_time given its constraints.
:param proposed_start_time: the proposed start_time for this scheduling_unit. Can it run before the proposed start_time, or is that forbidden by the constraints?
'''
# 'daily' day/night/twilight constraints can always be met before the proposed_start_time,
# for example one day earlier, or a week earlier, etc.
return True
def can_run_before_with_sky_constraints(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime, gridder: Gridder) -> bool:
'''Check if the given scheduling_unit can run somewhere before the given proposed_start_time timestamp depending on the sub's sky-constrains-template/doc and its duration.
:param scheduling_unit: the scheduling_unit for which we want to check if it can run after the proposed_start_time given its constraints.
:param proposed_start_time: the proposed start_time for this scheduling_unit. Can it run after the proposed start_time, or is that forbidden by the constraints?
'''
constraints = scheduling_unit.scheduling_constraints_doc
if 'sky' not in constraints:
return True
# do expensive search from proposed_start_time-24hours until proposed_start_time with half-hour steps
# (sky constrains are (almost) cyclic over 24 hours).
earlier_start_time = proposed_start_time - timedelta(hours=24)
while earlier_start_time < proposed_start_time:
if can_run_at_with_sky_constraints(scheduling_unit, earlier_start_time, gridder):
return True
earlier_start_time += gridder.as_timedelta()
return False
def can_run_after_with_time_constraints(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime) -> bool:
'''Check if the given scheduling_unit can run somewhere after the given proposed_start_time timestamp depending on the sub's time-constrains-template/doc and its duration.
:param scheduling_unit: the scheduling_unit for which we want to check if it can run after the proposed_start_time given its constraints.
:param proposed_start_time: the proposed start_time for this scheduling_unit. Can it run after the proposed start_time as well, or is that forbidden by the constraints?
'''
constraints = scheduling_unit.scheduling_constraints_doc
if 'time' not in constraints:
return True
if 'before' in constraints['time']:
# the full main observation should be able to complete before the 'before'timestamp
stop_before = parser.parse(constraints['time']['before'], ignoretz=True)
latest_possible_start_time = stop_before - scheduling_unit.specified_main_observation_duration
return proposed_start_time < latest_possible_start_time
if 'after' in constraints['time']:
start_after = parser.parse(constraints['time']['after'], ignoretz=True)
return start_after >= proposed_start_time
if 'at' in constraints['time']:
at = parser.parse(constraints['time']['at'], ignoretz=True)
return at > proposed_start_time
if 'between' in constraints['time'] and constraints['time']['between']:
stop_before = max([parser.parse(between["to"], ignoretz=True) for between in constraints['time']['between']])
latest_possible_start_time = stop_before - scheduling_unit.specified_main_observation_duration
return proposed_start_time < latest_possible_start_time
return True
def can_run_after_with_daily_constraints(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime) -> bool:
'''Check if the given scheduling_unit can run somewhere after the given proposed_start_time timestamp depending on the sub's daily-constrains-template/doc and its duration.
:param scheduling_unit: the scheduling_unit for which we want to check if it can run after the proposed_start_time given its constraints.
:param proposed_start_time: the proposed start_time for this scheduling_unit. Can it run after the proposed start_time, or is that forbidden by the constraints?
'''
# 'daily' day/night/twilight constraints can always be met after the proposed_start_time,
# for example one day earlier, or a week later, etc.
return True
def can_run_after_with_sky_constraints(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime, gridder: Gridder) -> bool:
'''Check if the given scheduling_unit can run somewhere after the given proposed_start_time timestamp depending on the sub's sky-constrains-template/doc and its duration.
:param scheduling_unit: the scheduling_unit for which we want to check if it can run after the proposed_start_time given its constraints.
:param proposed_start_time: the proposed start_time for this scheduling_unit. Can it run after the proposed start_time as well, or is that forbidden by the constraints?
'''
constraints = scheduling_unit.scheduling_constraints_doc
if 'sky' not in constraints:
return True
# do expensive search from proposed_start_time until 24h later with 15min steps
# (sky constrains are (almost) cyclic over 24 hours).
later_start_time = proposed_start_time
while later_start_time < proposed_start_time + timedelta(hours=24):
if can_run_at_with_sky_constraints(scheduling_unit, later_start_time, gridder):
return True
later_start_time += gridder.as_timedelta()
return False
def can_run_at_with_time_constraints(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime) -> bool:
"""
Checks whether it is possible to place the scheduling unit arbitrarily in the given time window,
i.e. the time constraints must be met over the full time window.
:return: True if all time constraints are met over the entire time window, else False.
"""
constraints = scheduling_unit.scheduling_constraints_doc
if not "time" in constraints:
return True
can_run_at = True
can_run_before = True
can_run_with_after = True
can_run_between = True
can_run_not_between = True
lower_bound = proposed_start_time
upper_bound = proposed_start_time + scheduling_unit.specified_main_observation_duration
if 'at' in constraints['time']:
at = parser.parse(constraints['time']['at'], ignoretz=True)
can_run_at = (at >= lower_bound and at + scheduling_unit.specified_main_observation_duration <= upper_bound)
# given time window needs to end before constraint
if 'before' in constraints['time']:
end_before = parser.parse(constraints['time']['before'], ignoretz=True)
can_run_before = (upper_bound <= end_before)
# given time window needs to start after constraint
if 'after' in constraints['time']:
after = parser.parse(constraints['time']['after'], ignoretz=True)
can_run_with_after = (lower_bound >= after)
# Run within one of these time windows
if 'between' in constraints['time']:
can_run_between = True # empty list is no constraint
for between in constraints['time']['between']:
time_from = parser.parse(between["from"], ignoretz=True)
time_to = parser.parse(between["to"], ignoretz=True)
if time_from <= lower_bound and time_to >= upper_bound:
can_run_between = True
break # constraint window completely covering the boundary, so True and don't look any further
else:
can_run_between = False
# Do NOT run within any of these time windows
if 'not_between' in constraints['time']:
can_run_not_between = True # empty list is no constraint
for not_between in constraints['time']['not_between']:
time_from = parser.parse(not_between["from"], ignoretz=True)
time_to = parser.parse(not_between["to"], ignoretz=True)
if time_from <= upper_bound and time_to >= lower_bound:
can_run_not_between = False
break # constraint window at least partially inside the boundary, so False and don't look any further
else:
can_run_not_between = True
return can_run_at & can_run_before & can_run_with_after & can_run_between & can_run_not_between
def can_run_at_with_daily_constraints(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime, gridder: Gridder) -> bool:
"""
Checks whether it is possible to run the scheduling unit /somewhere/ in the given time window,
considering the duration of the involved observation.
:return: True if there is at least one possibility to place the scheduling unit in a way that all time
constraints are met over the runtime of the observation, else False.
"""
result = evaluate_daily_constraints(scheduling_unit, proposed_start_time, gridder)
logger.debug("can_run_at_with_daily_constraints: %s", result)
if result.has_constraint:
return result.is_constraint_met
return True
def can_run_at_with_sky_constraints(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime, gridder: Gridder) -> bool:
"""
Checks whether it is possible to place the scheduling unit arbitrarily in the given time window, i.e. the sky constraints must be met over the full time window.
:return: True if all sky constraints are met at the proposed_start_time, else False.
"""
constraints = scheduling_unit.scheduling_constraints_doc
if "sky" not in constraints:
return True
# evaluate all sky constraints at gridded proposed_start_time
gridded_proposed_start_time = gridder.grid_time(proposed_start_time)
for evaluate_constraint_method in (evaluate_sky_min_elevation_constraint,
evaluate_sky_transit_constraint,
evaluate_sky_min_distance_constraint):
result = evaluate_constraint_method(scheduling_unit, gridded_proposed_start_time, gridder=gridder)
logger.debug("can_run_at_with_sky_constraints: %s", result)
if result.has_constraint and not result.is_constraint_met:
return False
return True
@lru_cache(10000)
def evaluate_sky_transit_constraint(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime, gridder: Gridder) -> ConstraintResult:
""" Evaluate the sky transit offset constraint: is it met? and what are the score, optimal- and earliest_possible_start_time?
For this sky transit offset constraint we always compute the optimal_start_time, which is by definition the transit_time.
If no sky transit offset constraint limits are given, the full +/- 12h limits are used.
"""
result = ConstraintResult(scheduling_unit, 'sky.transit_offset', proposed_start_time)
constraints = scheduling_unit.scheduling_constraints_doc
constraint = constraints.get('sky', {}).get('transit_offset', {})
# Determine the bounds/limits for the transit_offset constraint.
# If not given, use default of +/- 12 hours, because transit is 'cyclic' over ~24h periods.
# With these limits we can compute a score
transit_from_limit = constraint.get('from', -12 * 60 * 60)
transit_to_limit = constraint.get('to', +12 * 60 * 60)
# add margins to take gridding/rounding effects into account
transit_from_limit_with_margin = transit_from_limit - 60*gridder.grid_minutes
transit_to_limit_with_margin = transit_to_limit + 60*gridder.grid_minutes
# determine the pointing(s) to use for checking the sky constraints.
# for transit, use reference pointing if given, else use all sap pointings
transit_pointings = []
if constraints.get('sky', {}).get('reference_pointing', {}).get('enabled', False):
transit_pointings = [constraints['sky']['reference_pointing']['pointing']]
elif scheduling_unit.main_observation_task is not None:
transit_pointings = [sap['digital_pointing'] for sap in scheduling_unit.main_observation_task.specifications_doc.get('station_configuration', {}).get('SAPs', [])]
if not transit_pointings:
logger.warning("SUB id=%s could not determine pointing to evaluate sky transit constraint", scheduling_unit.id)
return result
# since the constraint only applies to the middle of the obs, only consider the proposed_center_time
duration = scheduling_unit.main_observation_task.specified_duration
proposed_center_time = proposed_start_time + duration / 2
gridded_timestamps = (gridder.grid_time(proposed_center_time),)
# TODO: determine transit opt/earliest times over all boundery stations
stations = ('CS002',) #set(get_boundary_stations_from_list(scheduling_unit.main_observation_task.specified_stations)) or ('CS002',)
# currently we only check at bounds and center, we probably want to add some more samples in between later on
for transit_pointing in transit_pointings:
pointing = Pointing(**transit_pointing)
transits = coordinates_timestamps_and_stations_to_target_transit(pointing=pointing,
timestamps=gridded_timestamps,
stations=tuple(stations),
n_grid_points=gridder.n_grid_points())
for station, transit_timestamps in transits.items():
assert len(transit_timestamps) == 1 # only one center time
transit_timestamp = round_to_second_precision(transit_timestamps[0])
# transit minus half duration is by definition the optimal start_time
result.optimal_start_time = transit_timestamp - duration / 2
# earliest_possible_start_time is the transit plus the lower limit (which is usually negative)
result.earliest_possible_start_time = transit_timestamp + timedelta(seconds=transit_from_limit)
# now check if the constraint is met, and compute/set score
# include the margins for gridding effects when checking offset-within-window,
# but not when computing score
offset = (transit_timestamp-proposed_center_time).total_seconds()
if offset > transit_from_limit_with_margin and offset < transit_to_limit_with_margin:
# constraint is met. compute score.
# 1.0 when proposed_center_time==transit_timestamp
# 0.0 at either translit offset limit
score_from = min(1.0, max(0.0, (transit_from_limit - offset)/transit_from_limit))
score_to = min(1.0, max(0.0, (transit_to_limit - offset)/transit_to_limit))
result.score = 0.5*score_from + 0.5*score_to
else:
result.score = 0
return result
@lru_cache(10000)
def evaluate_sky_min_elevation_constraint(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime, gridder: Gridder) -> ConstraintResult:
""" Evaluate the sky min_elevation constraint: is it met? and what are the score, optimal- and earliest_possible_start_time?
"""
result = ConstraintResult(scheduling_unit, 'sky.min_elevation', proposed_start_time)
constraints = scheduling_unit.scheduling_constraints_doc
if 'sky' not in constraints or 'min_elevation' not in constraints['sky']:
return result
for task in scheduling_unit.task_blueprints.filter(specifications_template__type__value=models.TaskType.Choices.OBSERVATION.value).all():
if 'station_configuration' not in task.specifications_doc:
# skipping calibrator tasks
continue
# determine the pointing(s) to use for checking the sky constraints.
# gather all sap pointings, used for min_distance and/or min_elevation. (and for transit if no reference_pointing is given)
sap_pointings = [sap['digital_pointing'] for sap in task.specifications_doc.get('station_configuration', {}).get('SAPs',[])]
if not sap_pointings:
logger.warning("SUB id=%s could not determine pointing to evaluate sky constraints", scheduling_unit.id)
return result
# evaluate at start/stop/center of observation
proposed_end_time = proposed_start_time + task.specified_duration
proposed_center_time = proposed_start_time + task.specified_duration / 2
# currently we only check at bounds and center, we probably want to add some more samples in between later on
for timestamp in (proposed_start_time, proposed_center_time, proposed_end_time):
for sap_pointing in sap_pointings:
pointing = Pointing(**sap_pointing)
if 'calibrator' in task.specifications_template.name:
min_elevation = Angle(constraints['sky']['min_elevation']['calibrator'], unit=astropy.units.rad)
stations = scheduling_unit.main_observation_task.specified_stations
else:
# target imaging, and beamforming or combined observations all use min_elevation.target
min_elevation = Angle(constraints['sky']['min_elevation']['target'], unit=astropy.units.rad)
stations = task.specified_stations
stations = set(get_boundary_stations_from_list(stations)) or ('CS002',)
gridded_timestamps = (gridder.grid_time(timestamp),)
station_target_rise_and_set_times = coordinates_timestamps_and_stations_to_target_rise_and_set(pointing=pointing,
timestamps=gridded_timestamps,
stations=tuple(stations),
angle_to_horizon=min_elevation,
n_grid_points=gridder.n_grid_points())
for station, target_rise_and_set_times in station_target_rise_and_set_times.items():
assert len(target_rise_and_set_times) == 1
rise_and_set_time = target_rise_and_set_times[0]
if rise_and_set_time['rise'] is not None:
rise_and_set_time['rise'] = round_to_second_precision(rise_and_set_time['rise'])
if rise_and_set_time['set'] is not None:
rise_and_set_time['set'] = round_to_second_precision(rise_and_set_time['set'])
if not rise_and_set_time['always_below_horizon']:
# when crossing the horizon somewhere,
# determine earliest_possible_start_time, even when constraint is not met, which may never be earlier than lower_bound
earliest_possible_start_time = rise_and_set_time['rise'] or proposed_start_time
if result.earliest_possible_start_time is None:
result.earliest_possible_start_time = earliest_possible_start_time
else:
result.earliest_possible_start_time = max(earliest_possible_start_time, result.earliest_possible_start_time)
# check if constraint is met...
if rise_and_set_time['always_below_horizon'] or \
(rise_and_set_time['rise'] is not None and timestamp < gridder.plus_margin(rise_and_set_time['rise'])) or \
(rise_and_set_time['set'] is not None and timestamp > gridder.mius_margin(rise_and_set_time['set'])):
# constraint not met. update result, and do early exit.
result.score = 0
result.evaluation_timestamp = timestamp
result.optimal_start_time = None
# logger.debug("%s min_elevation=%.3f[deg] rise='%s' set='%s'", result, min_elevation.degree, rise_and_set_time['rise'], rise_and_set_time['set'])
return result
# constraint is met for this station and timestamp
# for now, just use a score=1 (it's above the min_elevation)
# ToDo: compute score based on actual elevation at earliest/optimal timestamp
result.score = 1
# for min_elevation there is no optimal start time.
# any timestamp meeting the constraint is good enough.
result.optimal_start_time = None
return result
@lru_cache(10000)
def evaluate_sky_min_distance_constraint(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime, gridder: Gridder) -> ConstraintResult:
""" Evaluate the sky min_distance constraint: is it met? and what are the score, optimal- and earliest_possible_start_time?
"""
result = ConstraintResult(scheduling_unit, 'sky.min_distance', proposed_start_time)
constraints = scheduling_unit.scheduling_constraints_doc
if 'sky' not in constraints or 'min_distance' not in constraints['sky']:
return result
for task in scheduling_unit.task_blueprints.filter(specifications_template__type__value=models.TaskType.Choices.OBSERVATION.value).all():
if 'station_configuration' not in task.specifications_doc:
# skipping calibrator tasks
continue
# determine the pointing(s) to use for checking the sky constraints.
sap_pointings = [sap['digital_pointing'] for sap in task.specifications_doc.get('station_configuration', {}).get('SAPs',[])]
if not sap_pointings:
logger.warning("SUB id=%s could not determine pointing to evaluate sky constraints", scheduling_unit.id)
return True
# evaluate at start/stop/center of observation
proposed_end_time = proposed_start_time + task.specified_duration
proposed_center_time = proposed_start_time + task.specified_duration / 2
gridded_timestamps = (gridder.grid_time(proposed_start_time),
gridder.grid_time(proposed_center_time),
gridder.grid_time(proposed_end_time))
# currently we only check at bounds and center, we probably want to add some more samples in between later on
for gridded_timestamp in gridded_timestamps:
for sap_pointing in sap_pointings:
pointing = Pointing(**sap_pointing)
bodies = tuple(constraints['sky']['min_distance'].keys())
distances = coordinates_and_timestamps_to_separation_from_bodies(pointing=pointing,
timestamps=(gridded_timestamp,),
bodies=bodies)
# loop over all bodies and their respective min_distance constraints
for body, min_distance in constraints['sky']['min_distance'].items():
actual_distances = distances[body]
assert (len(actual_distances) == 1)
actual_distance = actual_distances[gridded_timestamp]
if actual_distance.rad < min_distance:
# constraint not met. update result, and do early exit.
result.score = 0
result.earliest_possible_start_time = None
result.optimal_start_time = None
logger.debug("%s body=%s actual_distance=%.3f[deg] < min_distance=%.3f[deg]", result, body, actual_distance.degree, Angle(min_distance, astropy.units.rad).degree)
return result
# no early exit, so constraint is met for this station and timestamp
# continue with rest of stations & timestamps
# no early exit, so constraint is met for all stations and timestamps
result.earliest_possible_start_time = proposed_start_time
# ToDo: compute score based on actual distance at earliest/optimal timestamp
# for now, just use a score=1 (it's far enough away)
result.score = 1
# for min_distance there is no optimal start time.
# any timestamp meeting the constraint is good enough.
result.optimal_start_time = None
return result
def get_earliest_possible_start_time_for_sky_min_elevation(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime, gridder: Gridder=Gridder()) -> datetime:
# do expensive search from lower_bound until 24 hours later with small steps
# (sky constrains are (almost) cyclic over 24 hours).
# first occurrence where min_elevation constraint is met is taken as rough estimate of earliest_possible_start_time
gridded_lower_bound = gridder.grid_time(lower_bound)
possible_start_time = gridded_lower_bound
while possible_start_time < lower_bound + timedelta(hours=24):
result = evaluate_sky_min_elevation_constraint(scheduling_unit, possible_start_time, gridder=gridder)
if result.earliest_possible_start_time is None or result.earliest_possible_start_time < lower_bound:
# advance with a grid step, and evaluate again
possible_start_time += gridder.as_timedelta()
continue
if not result.is_constraint_met and result.earliest_possible_start_time is not None:
# advance straight to earliest_possible_start_time, and evaluate again to ensure the constraint is met
possible_start_time = gridder.grid_time(result.earliest_possible_start_time)
continue
if result.is_constraint_met:
return result.earliest_possible_start_time
return None
def get_earliest_possible_start_time_for_sky_transit_offset(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime, gridder: Gridder) -> datetime:
# compute the transit time, and thus the optimal_start_time and earliest_possible_start_time
gridded_lower_bound = gridder.grid_time(lower_bound)
possible_start_time = gridded_lower_bound
result = evaluate_sky_transit_constraint(scheduling_unit, possible_start_time, gridder=gridder)
logger.debug('get_earliest_possible_start_time_for_sky_transit_offset %s', result)
if result.earliest_possible_start_time is not None and result.earliest_possible_start_time < lower_bound:
return lower_bound
return result.earliest_possible_start_time
def get_earliest_possible_start_time_for_sky_min_distance(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime, gridder: Gridder) -> datetime:
# do expensive search from lower_bound until 24 hours later with small steps
# (sky constrains are (almost) cyclic over 24 hours).
# first occurrence where min_distance constraint is met is taken as rough estimate of earliest_possible_start_time
gridded_lower_bound = gridder.grid_time(lower_bound)
possible_start_time = gridded_lower_bound
while possible_start_time < lower_bound+timedelta(hours=24):
result = evaluate_sky_min_distance_constraint(scheduling_unit, possible_start_time, gridder=gridder)
logger.debug('get_earliest_possible_start_time_for_min_distance %s', result)
if not result.has_constraint:
return None
if result.earliest_possible_start_time is None or result.earliest_possible_start_time < lower_bound:
# advance with a grid step, and evaluate again
possible_start_time += gridder.as_timedelta()
continue
if not result.is_constraint_met and result.earliest_possible_start_time is not None:
# advance straight to earliest_possible_start_time, and evaluate again to ensure the constraint is met
possible_start_time = gridder.grid_time(result.earliest_possible_start_time)
continue
if result.is_constraint_met:
return result.earliest_possible_start_time
return None
@lru_cache(10000)
def evaluate_daily_constraints(scheduling_unit: models.SchedulingUnitBlueprint, proposed_start_time: datetime, gridder: Gridder) -> ConstraintResult:
""" Evaluate the daily constraints: are they met? and what are the score, optimal- and earliest_possible_start_time?
"""
result = ConstraintResult(scheduling_unit, 'daily', proposed_start_time)
constraints = scheduling_unit.scheduling_constraints_doc
if 'daily' not in constraints or not constraints['daily']:
return result
daily_constraints = constraints['daily']
if not daily_constraints.get('require_day') and not daily_constraints.get('require_night') and not daily_constraints.get('avoid_twilight'):
# no constraints
result.score = 1
result.earliest_possible_start_time = proposed_start_time
return result
if daily_constraints.get('require_day') and daily_constraints.get('require_night'):
# mutually exclusive options
result.score = 0
return result
duration = scheduling_unit.specified_main_observation_duration
proposed_end_time = proposed_start_time + duration
stations = set(get_boundary_stations_from_list(scheduling_unit.main_observation_stations)) or ('CS002',)
# the sun rise/set events do not depend on the actual time-of-day, but only on the date.
# so, use one timstamp for 'today'-noon, one for 'yesterday'-noon and one for 'tomorrow'-noon
# by using a 'gridded' noon timestamp, we get many cachehits for any proposed_start_time on this date.
# We need the previous and next day events as well for reasoning about observations passing midnight
today_noon = proposed_start_time.replace(hour=12, minute=0, second=0, microsecond=0)
prevday_noon = today_noon - timedelta(days=1)
nextday_noon = today_noon + timedelta(days=1)
timestamps = (today_noon, nextday_noon, prevday_noon)
all_sun_events = timestamps_and_stations_to_sun_rise_and_set(timestamps=timestamps,
stations=tuple(stations),
n_grid_points=gridder.n_grid_points())
# start with a score of 1, assume constraint is met.
# then loop over all station-sun-events below, and set score to 0 if one is not met.
# if none are not met (thus all are met), then we keep this initial score of 1, indicating all are met.
result.score = 1
# start with earliest_possible_start_time = min
# find latest (max) earliest_possible_start_time looping over all stations
result.earliest_possible_start_time = datetime.min
for station in stations:
sun_events = all_sun_events[station]
for event_name, event in list(sun_events.items()):
if len(event) < len(timestamps):
logger.warning("get_earliest_possible_start_time for SUB id=%s: not all %s events could be computed for station %s lower_bound=%s.", scheduling_unit.id, event_name, station, proposed_start_time)
# use datetime.max as defaults, which have no influence on getting an earliest start_time
sun_events[event_name] = [{'start': datetime.max, 'end': datetime.max} for _ in timestamps]
day = sun_events['day'][0]
night = sun_events['night'][0]
next_day = sun_events['day'][1]
next_night = sun_events['night'][1]
prev_day = sun_events['day'][2]
prev_night = sun_events['night'][2]
if daily_constraints.get('require_day'):
if proposed_start_time >= gridder.minus_margin(day['start']) and proposed_end_time <= gridder.plus_margin(day['end']):
# in principle, we could start immediately at the day.start
# but, we're not allowed to start earlier than the proposed_start_time
# so, use proposed_start_time as earliest_possible_start_time
result.earliest_possible_start_time = max(proposed_start_time, result.earliest_possible_start_time)
elif proposed_end_time > gridder.plus_margin(day['end']):
# cannot start this day
result.score = 0
# but can start next day
result.earliest_possible_start_time = max(next_day['start'], result.earliest_possible_start_time)
else:
result.score = 0
result.earliest_possible_start_time = max(day['start'], result.earliest_possible_start_time)
if daily_constraints.get('require_night'):
if proposed_start_time >= gridder.minus_margin(night['start']) and proposed_end_time <= gridder.plus_margin(night['end']):
# in principle, we could start immediately at the night.start
# but, we're not allowed to start earlier than the proposed_start_time
# so, use proposed_start_time as earliest_possible_start_time
result.earliest_possible_start_time = max(proposed_start_time, result.earliest_possible_start_time)
elif proposed_end_time > gridder.minus_margin(prev_night['end']):
result.earliest_possible_start_time = max(night['start'], result.earliest_possible_start_time)
result.score = 0
elif proposed_start_time >= gridder.minus_margin(prev_night['start']) and proposed_end_time <= gridder.plus_margin(prev_night['end']):
# in principle, we could start immediately at the prev_night.start
# but, we're not allowed to start earlier than the proposed_start_time
# so, use proposed_start_time as earliest_possible_start_time
result.earliest_possible_start_time = max(proposed_start_time, result.earliest_possible_start_time)
else:
result.earliest_possible_start_time = max(night['start'], result.earliest_possible_start_time)
if daily_constraints.get('avoid_twilight'):
if proposed_start_time >= gridder.minus_margin(day['start']) and proposed_end_time <= gridder.plus_margin(day['end']):
# obs fits in today's daytime
result.score = 1
result.earliest_possible_start_time = max(day['start'], result.earliest_possible_start_time)
elif proposed_start_time >= gridder.minus_margin(night['start']) and proposed_end_time <= gridder.plus_margin(night['end']):
# obs fits in today's upcoming nighttime
result.score = 1
result.earliest_possible_start_time = max(night['start'], result.earliest_possible_start_time)
elif proposed_start_time >= gridder.minus_margin(prev_night['start']) and proposed_end_time <= gridder.plus_margin(prev_night['end']):
# obs fits in today's past nighttime
result.score = 1
result.earliest_possible_start_time = max(prev_night['start'], result.earliest_possible_start_time)
elif proposed_start_time < gridder.plus_margin(night['start']) and proposed_end_time >= gridder.minus_margin(night['start']):
# obs starts in afternoon twilight
result.score = 0
result.earliest_possible_start_time = max(night['start'], result.earliest_possible_start_time)
elif proposed_start_time < gridder.plus_margin(day['end']) and proposed_end_time >= gridder.minus_margin(day['end']):
# obs ends in afternoon twilight
result.score = 0
result.earliest_possible_start_time = max(night['start'], result.earliest_possible_start_time)
elif proposed_start_time >= gridder.minus_margin(prev_night['end']) and proposed_start_time < gridder.plus_margin(day['start']):
# obs starts in morning twilight
result.score = 0
result.earliest_possible_start_time = max(day['start'], result.earliest_possible_start_time)
elif proposed_end_time >= gridder.minus_margin(prev_night['end']) and proposed_end_time < gridder.plus_margin(day['start']):
# obs ends in morning twilight
result.score = 0
result.earliest_possible_start_time = max(day['start'], result.earliest_possible_start_time)
elif proposed_start_time >= gridder.minus_margin(night['end']) and proposed_start_time < gridder.plus_margin(next_day['start']):
# obs starts in next day in twilight
result.score = 0
result.earliest_possible_start_time = max(next_day['start'], result.earliest_possible_start_time)
else:
result.score = 0
if result.earliest_possible_start_time == datetime.min:
# No start possible at any station. Set to None.
result.earliest_possible_start_time = None
return result
def get_earliest_possible_start_time_for_daily_constraints(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime, gridder: Gridder) -> datetime:
# search from lower_bound until 24 hours later with 6 hour steps
# (daily constrains are (almost) cyclic over 24 hours)
gridded_lower_bound = gridder.grid_time(lower_bound)
possible_start_time = gridded_lower_bound
while possible_start_time < lower_bound+timedelta(hours=24):
result = evaluate_daily_constraints(scheduling_unit, possible_start_time, gridder=gridder)
logger.debug('get_earliest_possible_start_time_for_daily_constraints %s', result)
if result.earliest_possible_start_time is not None:
return max(result.earliest_possible_start_time, gridded_lower_bound)
# TODO: do smarter search, can we advance immediately to result.earliest_possible_start_time if not None?
possible_start_time = gridder.grid_time(possible_start_time+timedelta(hours=6))
return None
def get_earliest_possible_start_time_for_time_constraints(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime, gridder: Gridder=None) -> datetime:
'''
'''
constraints = scheduling_unit.scheduling_constraints_doc
# collect the earliest_possible_start_times per subtype of time-constraints.
# then return the latest of all earliest_possible_start_times
earliest_possible_start_times = set()
if 'time' in constraints:
if 'at' in constraints['time']:
at = parser.parse(constraints['time']['at'], ignoretz=True)
if constraints.get('scheduler', '') == 'fixed_time':
return at
if lower_bound is not None:
earliest_possible_start_times.add(max(lower_bound, at))
else:
earliest_possible_start_times.add(at)
if 'after' in constraints['time']:
after = parser.parse(constraints['time']['after'], ignoretz=True)
if lower_bound is not None:
earliest_possible_start_times.add(max(lower_bound, after))
else:
earliest_possible_start_times.add(after)
if 'before' in constraints['time']:
before = parser.parse(constraints['time']['before'], ignoretz=True)
start_before = before - scheduling_unit.specified_main_observation_duration
if lower_bound is not None and lower_bound <= start_before:
earliest_possible_start_times.add(lower_bound)
if 'between' in constraints['time'] and constraints['time']['between']:
from_timestamps = [parser.parse(between["from"], ignoretz=True)
for between in constraints['time']['between']
if lower_bound is None or parser.parse(between["to"], ignoretz=True) > lower_bound]
if from_timestamps:
earliest_possible_start_times.add(min(from_timestamps))
if earliest_possible_start_times:
return max(earliest_possible_start_times)
def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime, upper_bound: datetime=None, gridder: Gridder=None) -> datetime:
'''
'''
_method_start_timestamp = datetime.utcnow()
if gridder is None:
gridder = Gridder()
if 'at' in scheduling_unit.scheduling_constraints_doc.get('time', {}):
at = parser.parse(scheduling_unit.scheduling_constraints_doc['time']['at'], ignoretz=True)
if at >= lower_bound and can_run_at(scheduling_unit, at):
logger.debug("get_earliest_possible_start_time SUB id=%s lower_bound='%s' earliest_possible_start_time='%s'", scheduling_unit.id, lower_bound, at)
return at
return None
# collect the earliest_possible_start_times per type of constraint.
# then return the latest of all earliest_possible_start_times
earliest_possible_start_times = set()
for get_earliest_possible_start_time_method in (get_earliest_possible_start_time_for_time_constraints,
get_earliest_possible_start_time_for_daily_constraints,
get_earliest_possible_start_time_for_sky_min_elevation,
get_earliest_possible_start_time_for_sky_transit_offset,
get_earliest_possible_start_time_for_sky_min_distance):
try:
earliest_possible_start_time = get_earliest_possible_start_time_method(scheduling_unit, lower_bound, gridder)
if earliest_possible_start_time is not None:
earliest_possible_start_times.add(earliest_possible_start_time)
except Exception as e:
logger.exception(str(e))
if len(earliest_possible_start_times) == 0:
# it's possible that none of the above constraints yielded an earliest_possible_start_time (or that there are no constraints)
# this might mean that the unit can start right away at the lower_bound
# so, always add lower_bound, and evaluate it below (along with the other possible earliest_possible_start_times) if it can actually run.
earliest_possible_start_times.add(lower_bound)
# filter for non-None and within bound(s)
earliest_possible_start_times = set([t for t in earliest_possible_start_times if t is not None and t >= lower_bound])
if upper_bound is not None:
earliest_possible_start_times = set([t for t in earliest_possible_start_times if t <= upper_bound])
logger.debug("get_earliest_possible_start_time SUB id=%s lower_bound='%s' earliest_possible_start_times: %s", scheduling_unit.id, lower_bound, ', '.join([str(t) for t in sorted(earliest_possible_start_times)]))
# the earliest_possible_start_times were computed per constraint-type.
# it is possible that a unit can run at a certain time for one constraint, but cannot for the other.
# filter and keep only those for which all constraints are met.
runnable_earliest_possible_start_times = [t for t in earliest_possible_start_times if can_run_at(scheduling_unit, t)]
if not runnable_earliest_possible_start_times and earliest_possible_start_times:
# none of the possible start_times is actually runnable and > lower_bound...
# so, 'advance' to the max of the known earliest_possible_start_times and try again (recurse)
advanced_lower_bound = max(earliest_possible_start_times)
if advanced_lower_bound > lower_bound and upper_bound is not None and advanced_lower_bound < upper_bound:
return get_earliest_possible_start_time(scheduling_unit, advanced_lower_bound, upper_bound, gridder)
_method_elapsed = datetime.utcnow() - _method_start_timestamp
if runnable_earliest_possible_start_times:
logger.debug("get_earliest_possible_start_time SUB id=%s lower_bound='%s' runnable_earliest_possible_start_times: %s", scheduling_unit.id, lower_bound, ', '.join([str(t) for t in sorted(runnable_earliest_possible_start_times)]))
# return the first of all runnable earliest_possible_start_times
earliest_possible_start_time = min(runnable_earliest_possible_start_times)
# make sure it's past lower_bound, and nicely rounded
earliest_possible_start_time = max(lower_bound, earliest_possible_start_time)
earliest_possible_start_time = round_to_second_precision(earliest_possible_start_time)
logger.debug("get_earliest_possible_start_time SUB id=%s lower_bound='%s' earliest_possible_start_time='%s' (took %.1f[s])", scheduling_unit.id, lower_bound, earliest_possible_start_time, _method_elapsed.total_seconds())
return earliest_possible_start_time
# no constraints yielding a runnable earliest_possible_start_time?
logger.debug("get_earliest_possible_start_time SUB id=%s lower_bound='%s' earliest_possible_start_time=None (took %.1f[s])", scheduling_unit.id, lower_bound, _method_elapsed.total_seconds())
return None
def get_optimal_start_time(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime, gridder: Gridder) -> datetime:
'''get the most optimal start_time (where exposure is highest) AND where all constraints are met.
If lower_bound is given and optimal>lower_bound then the earliest possible start_time is returned (which is close to optimal)'''
_method_start_timestamp = datetime.utcnow()
# seek nearest transit around lower_bound
result = evaluate_sky_transit_constraint(scheduling_unit, gridder.grid_time(lower_bound), gridder=gridder)
optimal_start_time = result.optimal_start_time
if lower_bound is not None and optimal_start_time is not None and optimal_start_time < lower_bound:
logger.debug("get_optimal_start_time SUB id=%s optimal_start_time='%s' < lower_bound='%s' computing earliest_possible_start_time...", scheduling_unit.id, optimal_start_time, lower_bound)
# use the earliest_possible_start_time limited to lower_bound as optimal_start_time
optimal_start_time = get_earliest_possible_start_time(scheduling_unit, lower_bound, lower_bound+timedelta(hours=24), gridder=gridder)
logger.debug("get_optimal_start_time SUB id=%s earliest_possible_start_time and optimal_start_time='%s'", scheduling_unit.id, optimal_start_time)
assert optimal_start_time is not None and optimal_start_time >= lower_bound, "SUB id=%s cannot find optimal start_time > %s" % (scheduling_unit.id, lower_bound)
optimal_start_time = round_to_second_precision(optimal_start_time)
logger.debug("get_optimal_start_time SUB id=%s lower_bound='%s' optimal_start_time='%s' (took %.1f[s])", scheduling_unit.id, lower_bound, optimal_start_time, (datetime.utcnow() - _method_start_timestamp).total_seconds())
return optimal_start_time
def get_weighted_start_time(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime, gridder: Gridder) -> datetime:
'''get a weighted start_time balanced between the earliest_possible_start_time and the optimal_start_time.'''
earliest_possible_start_time = get_earliest_possible_start_time(scheduling_unit, lower_bound, lower_bound+timedelta(hours=24), gridder=gridder)
if earliest_possible_start_time is not None:
assert earliest_possible_start_time >= lower_bound, "SUB id=%s earliest_possible_start_time='%s' should be >= lower_bound='%s'" % (scheduling_unit.id, earliest_possible_start_time, lower_bound)
# get the optimal_start_time with the earliest_possible_start_time as lower_bound
optimal_start_time = get_optimal_start_time(scheduling_unit, earliest_possible_start_time, gridder=gridder)
if optimal_start_time is not None:
assert optimal_start_time >= earliest_possible_start_time, "SUB id=%s optimal_start_time='%s' should be >= earliest_possible_start_time='%s'" % (scheduling_unit.id, optimal_start_time, earliest_possible_start_time)
# with the density_vs_optimal the user can optimize for:
# - a dense schedule where all units are packed near to each other (weight=0)
# - an optimal schedule where all units are positioned at/near the optimal (transit) time (weight=1)
# - or a mix between both (0<weight<1)
density_vs_optimal, created = models.SchedulingConstraintsWeightFactor.objects.get_or_create(
scheduling_constraints_template=scheduling_unit.scheduling_constraints_template,
constraint_name='density_vs_optimal',
defaults={'weight': 0.5})
weighted_start_time = earliest_possible_start_time + density_vs_optimal.weight * (optimal_start_time-earliest_possible_start_time)
logger.debug("get_weighted_start_time: SUB id=%s weight=%.3f earliest='%s' optimal='%s' weighted='%s'", scheduling_unit.id, density_vs_optimal.weight, earliest_possible_start_time, optimal_start_time, weighted_start_time)
return weighted_start_time
logger.debug("get_weighted_start_time: SUB id=%s returning earliest='%s' because optimal is None", scheduling_unit.id, earliest_possible_start_time)
return earliest_possible_start_time
logger.debug("get_weighted_start_time: SUB id=%s could not compute weighted_start_time", scheduling_unit.id)
return None
def compute_individual_and_weighted_scores(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound:datetime, upper_bound:datetime, gridder: Gridder=Gridder()) -> [ScoredSchedulingUnit]:
scored_scheduling_units = []
for su in scheduling_units:
try:
scored_su = compute_scheduling_unit_scores(su, lower_bound, upper_bound, gridder)
scored_scheduling_units.append(scored_su)
except Exception as e:
logger.exception(e)
# compute weighted total score
for scored_scheduling_unit in sorted(scored_scheduling_units, key=lambda x: x.scheduling_unit.id):
scored_scheduling_unit.weighted_score = 0.0
count = len(scored_scheduling_unit.scores)
if count > 0:
weighted_score = 0.0
for score_key, score_value in scored_scheduling_unit.scores.items():
weight_factor, created = models.SchedulingConstraintsWeightFactor.objects.get_or_create(scheduling_constraints_template=scored_scheduling_unit.scheduling_unit.scheduling_constraints_template,
constraint_name=score_key,
defaults={'weight': 1.0})
weighted_score += weight_factor.weight * score_value
scored_scheduling_unit.weighted_score = weighted_score / float(count)
logger.debug("computed (weighted) score(s): %s", scored_scheduling_unit)
return scored_scheduling_units
def compute_scheduling_unit_scores(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound:datetime, upper_bound:datetime, gridder: Gridder) -> ScoredSchedulingUnit:
'''Compute the "fitness" scores per constraint for the given scheduling_unit at the given start_time depending on the sub's constrains-template/doc.'''
scores = {}
# add individual scheduling_unit rank: rank is "inverse", meaning that a lower value is better. also, normalize it.
scores['scheduling_unit_rank'] = float(models.SchedulingUnitRank.LOWEST.value - scheduling_unit.rank) / float(models.SchedulingUnitRank.LOWEST.value-models.SchedulingUnitRank.HIGHEST.value)
# add "common" scores which do not depend on constraints, such as project rank and creation date
# rank is "inverse", meaning that a lower value is better. also, normalize it.
scores['project_rank'] = float(models.ProjectRank.LOWEST.value - scheduling_unit.project.rank)/float(models.ProjectRank.LOWEST.value-models.ProjectRank.HIGHEST.value)
weighted_start_time = get_weighted_start_time(scheduling_unit, lower_bound, gridder=gridder) or lower_bound
gridded_weighted_start_time = gridder.grid_time(weighted_start_time)
if 'at' in scheduling_unit.scheduling_constraints_doc.get('time',{}):
at = parser.parse(scheduling_unit.scheduling_constraints_doc['time']['at'], ignoretz=True)
scores['time.at'] = 1.0 if abs((at - weighted_start_time).total_seconds()) < 60 else 0.0
# density means less and smaller gaps in the schedule
# so, if we start at lower_bound, then density should be 1
# so, if we start at upper_bound, then density should be 0
scores['density'] = min(1, max(0, (upper_bound-weighted_start_time).total_seconds() / (upper_bound-lower_bound).total_seconds()))
# compute/get scores per constraint type
for evaluate_method in (evaluate_sky_transit_constraint,
evaluate_sky_min_elevation_constraint,
evaluate_sky_min_distance_constraint,
evaluate_daily_constraints):
# compute the result at the gridded weighted_start_time for cache hits. (margins at bounds are taken into account)
result = evaluate_method(scheduling_unit, gridded_weighted_start_time, gridder)
if result.is_constraint_met:
scores[result.constraint_key] = result.score
return ScoredSchedulingUnit(scheduling_unit=scheduling_unit,
scores=scores,
# return the actual (not the gridded) weighted_start_time
start_time=weighted_start_time)
def get_min_earliest_possible_start_time(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound: datetime, upper_bound: datetime=None, raise_if_interruped: Callable=noop, gridder: Gridder=Gridder()) -> datetime:
'''deterimine the earliest possible starttime over all given scheduling units, taking into account all their constraints
:param raise_if_interruped: a callable function which raises under an externally set condition (an 'interrupt' flag was set). This function is/can_be used to interrupt a long-running scheduling call to do an early exit and start a new scheduling call. Default used function is noop (no-operation), thus no interruptable behaviour.
'''
min_earliest_possible_start_time = None
for scheduling_unit in scheduling_units:
raise_if_interruped()
if scheduling_unit.scheduling_constraints_template is not None:
earliest_possible_start_time = get_earliest_possible_start_time(scheduling_unit, lower_bound, upper_bound, gridder)
if earliest_possible_start_time is not None:
if min_earliest_possible_start_time is None or earliest_possible_start_time < min_earliest_possible_start_time:
min_earliest_possible_start_time = earliest_possible_start_time
return min_earliest_possible_start_time
def can_run_within_station_reservations(scheduling_unit: models.SchedulingUnitBlueprint) -> bool:
"""
Check if the given scheduling_unit can run if the reserved stations are taken into account.
The station requirement will be evaluated. If a reserved station will be used within the time window of
the given boundaries (start/stop time) for this scheduling unit then this function will return False.
"""
observation_subtasks = models.Subtask.independent_subtasks().filter(task_blueprint__scheduling_unit_blueprint_id=scheduling_unit.id).filter(specifications_template__type__value=models.SubtaskType.Choices.OBSERVATION.value).all()
for subtask in observation_subtasks:
if not enough_stations_available(subtask, remove_reserved_stations=True, remove_used_stations=False):
return False
return True