From bada2c005acd15d56dfd10d6d9335ea490b358b1 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Tue, 11 Jul 2023 16:40:51 +0200 Subject: [PATCH] TMSS-2618: fixed test, and in a more general sense fixed when a blocking unit us reschedule without conflicting stations, or when it is unscheduled to make space --- .../services/scheduling/lib/constraints.py | 17 +++-- .../scheduling/lib/dynamic_scheduling.py | 40 +++++++++--- .../scheduling/test/t_dynamic_scheduling.py | 62 ++++++++++++++----- .../src/tmss/tmssapp/models/scheduling.py | 4 ++ SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py | 55 +++++----------- SAS/TMSS/backend/src/tmss/tmssapp/tasks.py | 51 +++++++++++++-- 6 files changed, 152 insertions(+), 77 deletions(-) diff --git a/SAS/TMSS/backend/services/scheduling/lib/constraints.py b/SAS/TMSS/backend/services/scheduling/lib/constraints.py index a1948650853..ec2298855ed 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/constraints.py +++ b/SAS/TMSS/backend/services/scheduling/lib/constraints.py @@ -42,8 +42,8 @@ from lofar.sas.tmss.tmss.tmssapp.conversions import * from lofar.common.util import noop from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.exceptions import * -from lofar.sas.tmss.tmss.tmssapp.subtasks import enough_stations_available, get_missing_stations, convert_station_groups_to_list_of_available_stations -from lofar.sas.tmss.tmss.tmssapp.tasks import mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable, get_schedulable_stations +from lofar.sas.tmss.tmss.tmssapp.subtasks import enough_stations_available_for_subtask, get_missing_stations +from lofar.sas.tmss.tmss.tmssapp.tasks import mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable, get_schedulable_stations, enough_stations_available_for_scheduling_unit,enough_stations_available_for_task import logging logger = logging.getLogger(__name__) @@ -1978,7 +1978,7 @@ def can_run_within_station_reservations(scheduling_unit: models.SchedulingUnitBl observation_subtasks = models.Subtask.independent_subtasks().filter(task_blueprint__scheduling_unit_blueprint_id=scheduling_unit.id).filter(specifications_template__type__value=models.SubtaskType.Choices.OBSERVATION.value).all() for subtask in observation_subtasks: - if not enough_stations_available(subtask, remove_reserved_stations=True, remove_used_stations=False, + if not enough_stations_available_for_subtask(subtask, remove_reserved_stations=True, remove_used_stations=False, lower_bound=lower_bound, upper_bound=upper_bound): return False @@ -1994,7 +1994,7 @@ def can_run_without_used_stations(scheduling_unit: models.SchedulingUnitBlueprin observation_subtasks = models.Subtask.independent_subtasks().filter(task_blueprint__scheduling_unit_blueprint_id=scheduling_unit.id).filter(specifications_template__type__value=models.SubtaskType.Choices.OBSERVATION.value).all() for subtask in observation_subtasks: - if not enough_stations_available(subtask, remove_reserved_stations=False, remove_used_stations=True, + if not enough_stations_available_for_subtask(subtask, remove_reserved_stations=False, remove_used_stations=True, lower_bound=lower_bound, upper_bound=upper_bound): return False @@ -2024,13 +2024,10 @@ def get_blocking_scheduled_units(scheduling_unit: models.SchedulingUnitBlueprint blocking_scheduled_unit_ids = set() for obs_task in scheduling_unit.observation_tasks.filter(obsolete_since__isnull=True).all(): for overlapping_scheduled_unit in observation_overlapping_scheduled_units: - for overlapping_scheduled_obs_task in overlapping_scheduled_unit.task_blueprints.filter(specifications_template__type__value=models.TaskType.Choices.OBSERVATION.value).all(): - try: - convert_station_groups_to_list_of_available_stations(obs_task.specified_station_groups, - unavailable_stations=overlapping_scheduled_obs_task.used_stations, - raise_when_too_many_missing=True) - except TooManyStationsUnavailableException: + for overlapping_scheduled_obs_task in overlapping_scheduled_unit.observation_tasks.all(): + if not enough_stations_available_for_task(obs_task, unavailable_stations=overlapping_scheduled_obs_task.used_stations): blocking_scheduled_unit_ids.add(overlapping_scheduled_unit.id) + continue # Finally, return the result as a queryset, so the caller can do further queries on it. return scheduled_units.filter(id__in=[x for x in blocking_scheduled_unit_ids]).all() diff --git a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py index 1dff9918b26..3bb55ef060b 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py @@ -560,6 +560,10 @@ class Scheduler: '''Try scheduling the given scheduling_unit (at its scheduled_start_time). Cancel/unschedule any blocking/overlapping units depending on priorities and scores''' + logger.info("try_schedule_unit id=%s queue=%s '%s' start_time=%s central_lst='%s' interrupts_telescope=%s", + scheduling_unit.id, scheduling_unit.priority_queue.value, scheduling_unit.name, start_time, + scheduling_unit.main_observation_scheduled_central_lst, scheduling_unit.interrupts_telescope) + # cancel and/or unschedule current units-in-the-way... # (only if possible, depending on priorities and other rules...) cancel_overlapping_running_observation_if_needed_and_possible(scheduling_unit, start_time) @@ -578,8 +582,9 @@ class Scheduler: # there may still be uncancelled-running observations in the way -> SubtaskSchedulingException scheduled_scheduling_unit = schedule_independent_subtasks_in_scheduling_unit_blueprint(scheduling_unit, start_time=start_time) - logger.info("scheduled scheduling_unit id=%s queue=%s '%s' start_time=%s central_lst='%s' interrupts_telescope=%s", - scheduling_unit.id, scheduling_unit.priority_queue.value, scheduling_unit.name, start_time, scheduling_unit.main_observation_scheduled_central_lst, scheduling_unit.interrupts_telescope) + logger.info("scheduled scheduling_unit id=%s queue=%s '%s' start_time=%s central_lst='%s' interrupts_telescope=%s\nspecified stations: %s\n used stations: %s", + scheduling_unit.id, scheduling_unit.priority_queue.value, scheduling_unit.name, start_time, scheduling_unit.main_observation_scheduled_central_lst, scheduling_unit.interrupts_telescope, + ','.join(scheduling_unit.main_observation_specified_stations), ','.join(scheduling_unit.main_observation_used_stations)) self.log_schedule(log_level=logging.INFO) @@ -1211,6 +1216,11 @@ def unschededule_blocking_scheduled_units_if_needed_and_possible(candidate_sched scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.scheduled_start_time, rescored_scheduled_unit.weighted_score, rescored_candidate.scheduling_unit.id, rescored_candidate.scheduling_unit.name, rescored_candidate.start_time, rescored_candidate.weighted_score) + logger.info("the scheduled unit id=%s '%s' score=%.3f is in the way of the best candidate id=%s '%s' score=%.3f start_time=%s", + scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, rescored_scheduled_unit.weighted_score, + candidate_scheduling_unit.id, candidate_scheduling_unit.name, rescored_candidate.weighted_score, + candidate_scheduling_unit.scheduled_start_time) + # in case of a triggered candidate with higher trigger priority than the scheduled unit, we don't care about # scores, but only check trigger priority. if candidate_scheduling_unit.interrupts_telescope or scheduled_scheduling_unit.interrupts_telescope: @@ -1226,12 +1236,26 @@ def unschededule_blocking_scheduled_units_if_needed_and_possible(candidate_sched # compare the re-scored weighted_scores elif rescored_candidate.weighted_score > rescored_scheduled_unit.weighted_score: - if can_run_after(scheduled_scheduling_unit, scheduled_scheduling_unit.scheduled_start_time, gridder): - logger.info("unscheduling id=%s '%s' because it is in the way and has a lower score than the best candidate id=%s '%s' start_time=%s", - scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, - candidate_scheduling_unit.id, candidate_scheduling_unit.name, - candidate_scheduling_unit.scheduled_start_time) - unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) + # the candidate clearly wins, let's try to make some space by unscheduling the scheduled_scheduling_unit + logger.info("unscheduling id=%s '%s' because it has a lower score than the best candidate id=%s '%s' start_time=%s", + scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, + candidate_scheduling_unit.id, candidate_scheduling_unit.name, + candidate_scheduling_unit.scheduled_start_time) + unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) + elif enough_stations_available_for_scheduling_unit(scheduled_scheduling_unit, unavailable_stations=candidate_scheduling_unit.main_observation_specified_stations): + # the candidate does not win, but it's ok to unschedule the scheduled_scheduling_unit because it can run without the candidate's stations. + logger.info("rescheduling id=%s '%s' start_time=%s without the stations of the best candidate id=%s '%s' start_time=%s", + scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.scheduled_start_time, + candidate_scheduling_unit.id, candidate_scheduling_unit.name, candidate_scheduling_unit.scheduled_start_time) + + # reschedule the scheduled_unit, and take out the candidate's stations + rescheduled_unit = reschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit, misc_unavailable_stations=candidate_scheduling_unit.main_observation_specified_stations) + + logger.info("rescheduled scheduling_unit id=%s queue=%s '%s' start_time=%s central_lst='%s' interrupts_telescope=%s\nspecified stations: %s\n used stations: %s", + rescheduled_unit.id, rescheduled_unit.priority_queue.value, rescheduled_unit.name, start_time, + rescheduled_unit.main_observation_scheduled_central_lst, rescheduled_unit.interrupts_telescope, + ','.join(rescheduled_unit.main_observation_specified_stations), + ','.join(rescheduled_unit.main_observation_used_stations)) else: logger.info("scheduling_unit id=%s '%s' start_time='%s' will not be unscheduled and thus it will keep on blocking candidate id=%s '%s' start_time='%s'", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, diff --git a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py index ad26fc4a7ad..bc28e6c3fbb 100755 --- a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py @@ -207,7 +207,8 @@ class TestFixedTimeScheduling(BaseDynamicSchedulingTestCase): @staticmethod def create_simple_observation_scheduling_unit_fixed_time(name:str=None, scheduling_set=None, at: datetime=None, - obs_duration: typing.Union[int, timedelta]=None) -> models.SchedulingUnitDraft: + obs_duration: typing.Union[int, timedelta]=None, + unit_rank: float=0.5) -> models.SchedulingUnitDraft: if name is None: name = 'scheduling_unit for fixed_time at constraint' @@ -220,7 +221,7 @@ class TestFixedTimeScheduling(BaseDynamicSchedulingTestCase): if isinstance(obs_duration, timedelta): obs_duration = obs_duration.total_seconds() - scheduling_unit_draft = BaseDynamicSchedulingTestCase.create_simple_observation_scheduling_unit(name, scheduling_set=scheduling_set, obs_duration=obs_duration) + scheduling_unit_draft = BaseDynamicSchedulingTestCase.create_simple_observation_scheduling_unit(name, scheduling_set=scheduling_set, obs_duration=obs_duration, unit_rank=unit_rank) BaseDynamicSchedulingTestCase._clear_constraints(scheduling_unit_draft) scheduling_unit_draft.scheduling_constraints_doc['scheduler'] = 'fixed_time' scheduling_unit_draft.scheduling_constraints_doc['time']['at'] = at.isoformat() @@ -234,7 +235,7 @@ class TestFixedTimeScheduling(BaseDynamicSchedulingTestCase): """ # Use at constraint in the future (noon, tomorrow) at = round_to_second_precision(datetime.utcnow().replace(hour=12, minute=0, second=0, microsecond=0) + timedelta(days=1)) - scheduling_unit_draft = self.create_simple_observation_scheduling_unit_fixed_time(at=at) + scheduling_unit_draft = self.create_simple_observation_scheduling_unit_fixed_time(at=at, unit_rank=0.5) scheduling_unit_blueprint = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) # assert blueprint has correct constraints, and is schedulable @@ -327,6 +328,12 @@ class TestFixedTimeScheduling(BaseDynamicSchedulingTestCase): Test a simple observation with the 'at' constraint and 'fixed_time' scheduler. It should not be schedulable when overlapping an already scheduled unit. """ + # set all weight factors to 0, except for scheduling_rank so we can control which unit wins + self.reset_scheduling_constraints_weight_factors(default_value=0, default_age_value=0) + weight_factor, created = models.SchedulingConstraintsWeightFactor.objects.get_or_create(constraint_name="scheduling_unit_rank", scheduling_constraints_template=models.SchedulingConstraintsTemplate.get_latest(name="constraints")) + weight_factor.weight = 1 + weight_factor.save() + # re-use scheduled unit from test_fixed_time_in_the_future_can_be_scheduled # assume that test_fixed_time_in_the_future_can_be_scheduled passes. self.test_fixed_time_in_the_future_can_be_scheduled() @@ -342,7 +349,7 @@ class TestFixedTimeScheduling(BaseDynamicSchedulingTestCase): scheduling_unit_blueprint1.scheduled_start_time + duration - timedelta(seconds=1)) # head just touches tail for at in overlapping_start_times: - scheduling_unit_draft2 = self.create_simple_observation_scheduling_unit_fixed_time(at=at, obs_duration=duration) + scheduling_unit_draft2 = self.create_simple_observation_scheduling_unit_fixed_time(at=at, obs_duration=duration, unit_rank=1) scheduling_unit_blueprint2 = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft2) # assert blueprint has correct constraints, and is schedulable @@ -819,7 +826,7 @@ class TestFixedTimeScheduling(BaseDynamicSchedulingTestCase): # mock the schedule_subtask method, make it raise an Exception with mock.patch('lofar.sas.tmss.tmss.tmssapp.subtasks.schedule_subtask') as mocked: - def mocked_schedule_subtask(subtask, start_time=None): + def mocked_schedule_subtask(*args, **kwargs): raise Exception("test-exception with test-unschedulable-reason") mocked.side_effect = mocked_schedule_subtask @@ -3488,21 +3495,27 @@ class TestDynamicScheduling(BaseDynamicSchedulingTestCase): # preparation complete # now start testing scheduling these # test all sequences of units, with and without reservations - for idols_rank, other_rank in ((models.SchedulingUnitRank.LOWEST.value, models.SchedulingUnitRank.HIGHEST.value), - (models.SchedulingUnitRank.HIGHEST.value, models.SchedulingUnitRank.LOWEST.value)): + for idols_rank, other_rank in ((models.SchedulingUnitRank.HIGHEST.value, models.SchedulingUnitRank.LOWEST.value), + (models.SchedulingUnitRank.LOWEST.value, models.SchedulingUnitRank.HIGHEST.value)): su_idols.rank = idols_rank su_idols.save() su_other.rank = other_rank su_other.save() - # 1st attempt, idols first, then the other, no reservation + # wipe any lingering reservation(s), and/or scheduled units + models.Reservation.objects.all().delete() + unschedule_subtasks_in_scheduling_unit_blueprint(su_idols) + unschedule_subtasks_in_scheduling_unit_blueprint(su_other) + + # 1st attempt/variant, idols first, then the other, no reservation + # both should be scheduled, idols should have cs032, and the other should have cs032 removed because it was not available su_idols_scheduled = self.scheduler.try_schedule_unit(su_idols, get_at_constraint_timestamp(su_idols)) su_other_scheduled = self.scheduler.try_schedule_unit(su_other, get_at_constraint_timestamp(su_other)) self.assertIsNotNone(su_idols_scheduled) self.assertIsNotNone(su_other_scheduled) self.assertEqual('scheduled', su_idols_scheduled.status.value) self.assertEqual('scheduled', su_other_scheduled.status.value) - self.assertIn('CS032', su_idols.main_observation_used_stations) + self.assertIn('CS032', su_idols_scheduled.main_observation_used_stations) self.assertNotIn('CS032', su_other_scheduled.main_observation_used_stations) self.scheduler.log_schedule() # for reference @@ -3510,25 +3523,40 @@ class TestDynamicScheduling(BaseDynamicSchedulingTestCase): unschedule_subtasks_in_scheduling_unit_blueprint(su_idols) unschedule_subtasks_in_scheduling_unit_blueprint(su_other) - # Skipping this attempt, as it is dependend on the calendar day for which unit "wins". This is also not a case that we use in production. - # 2nd attempt, the other first, then idols, no reservation + # 2nd attempt/variant, the other first, then idols, no reservation + # both should be scheduled, idols should have cs032, + # at first the other should use cs032, but after scheduling idols, cs032 should have been removed by implicit rescheduling su_other_scheduled = self.scheduler.try_schedule_unit(su_other, get_at_constraint_timestamp(su_other)) - su_idols_scheduled = self.scheduler.try_schedule_unit(su_idols, get_at_constraint_timestamp(su_idols)) self.assertIsNotNone(su_other_scheduled) - self.assertIsNone(su_idols_scheduled) # could not be scheduled because the other block idols by using CS0032, which is correct self.assertEqual('scheduled', su_other_scheduled.status.value) + self.assertIn('CS032', su_other_scheduled.main_observation_used_stations) + # by scheduling the idols one, the other may be unscheduled or rescheduled depending on their scores. + su_idols_scheduled = self.scheduler.try_schedule_unit(su_idols, get_at_constraint_timestamp(su_idols)) + self.assertIsNotNone(su_idols_scheduled) + self.assertEqual('scheduled', su_idols_scheduled.status.value) + self.assertIn('CS032', su_idols_scheduled.main_observation_used_stations) + su_other.refresh_from_db() + if su_idols.rank < su_other.rank: # lower rank means higher prio + # the other was unscheduled, so no need to check if cs032 was taken out + self.assertEqual('schedulable', su_other.status.value) + else: + # the other was (re)scheduled, and cs032 was taken out + self.assertEqual('scheduled', su_other.status.value) + self.assertNotIn('CS032', su_other_scheduled.main_observation_used_stations) self.scheduler.log_schedule() # for reference # revert/reset + unschedule_subtasks_in_scheduling_unit_blueprint(su_idols) unschedule_subtasks_in_scheduling_unit_blueprint(su_other) + # now test the cases with reservation, which is the best way, because then the user/operator is explicit in what they want. reservation_strategy_idols = models.ReservationStrategyTemplate.get_latest('IDOLS') reservation_idols = models.Reservation.objects.create(name="IDOLS", description="IDOLS", project=project_idols, specifications_template=reservation_strategy_idols.reservation_template, specifications_doc=reservation_strategy_idols.template, start_time=night_start, stop_time=night_end) - # 3rd attempt, idols first, then the other, with reservation + # 3rd attempt/variant, idols first, then the other, with reservation su_idols_scheduled = self.scheduler.try_schedule_unit(su_idols, get_at_constraint_timestamp(su_idols)) su_other_scheduled = self.scheduler.try_schedule_unit(su_other, get_at_constraint_timestamp(su_other)) self.assertIsNotNone(su_idols_scheduled) @@ -3543,11 +3571,11 @@ class TestDynamicScheduling(BaseDynamicSchedulingTestCase): unschedule_subtasks_in_scheduling_unit_blueprint(su_idols) unschedule_subtasks_in_scheduling_unit_blueprint(su_other) - # 4th attempt, the other first, then idols, with reservation + # 4th attempt/variant, the other first, then idols, with reservation su_other_scheduled = self.scheduler.try_schedule_unit(su_other, get_at_constraint_timestamp(su_other)) su_idols_scheduled = self.scheduler.try_schedule_unit(su_idols, get_at_constraint_timestamp(su_idols)) self.assertIsNotNone(su_other_scheduled) - self.assertIsNotNone(su_idols_scheduled) # could not be scheduled because the other block idols by using CS0032, which is correct + self.assertIsNotNone(su_idols_scheduled) self.assertEqual('scheduled', su_other_scheduled.status.value) self.assertEqual('scheduled', su_idols_scheduled.status.value) self.assertIn('CS032', su_idols.main_observation_used_stations) @@ -4823,4 +4851,4 @@ logging.getLogger('lofar.sas.tmss.services.tmss_postgres_listener').disabled = T if __name__ == '__main__': #run the unit tests - unittest.main() + unittest.main(defaultTest='TestDynamicScheduling.test_bugfix_TMSS_2618_order_of_scheduling_parallel_observations_and_reservations_should_not_matter') diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py index 9810dcdb143..a24881a956d 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py @@ -244,6 +244,10 @@ class Subtask(RefreshFromDbInvalidatesCachedPropertiesMixin, ProjectPropertyMixi return all(station in lofar2_stations for station in used_stations) return False + @property + def specified_stations(self) -> bool: + return self.task_blueprint.specified_stations + @cached_property def specified_duration(self) -> timedelta: '''get the specified (or estimated) duration of this subtask based on the specified task duration and the subtask type''' diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 063257b12cd..84dc1a9136f 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -994,8 +994,8 @@ def create_cleanup_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> # LOFAR needs to have a gap in between observations to (re)initialize hardware. DEFAULT_INTER_OBSERVATION_GAP = timedelta(seconds=60) -def schedule_subtask(subtask: Subtask, start_time: datetime=None) -> Subtask: - '''Generic scheduling method for subtasks. Calls the appropiate scheduling method based on the subtask's type.''' +def schedule_subtask(subtask: Subtask, start_time: datetime=None, misc_unavailable_stations: Iterable[str]=None) -> Subtask: + '''Generic scheduling method for subtasks. Calls the appropriate scheduling method based on the subtask's type.''' check_prerequities_for_scheduling(subtask) if start_time is not None: @@ -1019,7 +1019,7 @@ def schedule_subtask(subtask: Subtask, start_time: datetime=None) -> Subtask: return schedule_pipeline_subtask(subtask) if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: - return schedule_observation_subtask(subtask) + return schedule_observation_subtask(subtask, misc_unavailable_stations=misc_unavailable_stations) if subtask.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value: return schedule_qafile_subtask(subtask) @@ -1107,8 +1107,8 @@ def unschedule_subtasks_in_task_blueprint(task_blueprint: TaskBlueprint): unschedule_subtask(subtask) -def schedule_subtask_and_update_successor_start_times(subtask: Subtask) -> Subtask: - scheduled_subtask = schedule_subtask(subtask) +def schedule_subtask_and_update_successor_start_times(subtask: Subtask, misc_unavailable_stations: Iterable[str]=None) -> Subtask: + scheduled_subtask = schedule_subtask(subtask, misc_unavailable_stations=misc_unavailable_stations) shift_successors_until_after_stop_time(scheduled_subtask) return scheduled_subtask @@ -1452,7 +1452,7 @@ def used_pipeline_stations(pipeline_specs: dict, subtask_specs: dict): return stations -def schedule_observation_subtask(observation_subtask: Subtask): +def schedule_observation_subtask(observation_subtask: Subtask, misc_unavailable_stations: Iterable[str]=None): ''' Schedule the given observation_subtask For first observations in a 'train' of subtasks this method is typically called by hand, or by the short-term-scheduler. For subsequent observation subtasks this method is typically called by the subtask_scheduling_service upon the predecessor finished event. @@ -1502,7 +1502,7 @@ def schedule_observation_subtask(observation_subtask: Subtask): # the subtask spec station_list is currently filled in with the specified/requested/wanted stations. # now that we are scheduling, fill list with what is actually available. Raises if requirements are not met. try: - specifications_doc['stations']['station_list'] = convert_task_station_groups_specification_to_station_list_without_used_and_or_reserved_stations(observation_subtask, remove_reserved_stations=True, remove_used_stations=True) + specifications_doc['stations']['station_list'] = convert_task_station_groups_specification_to_station_list_without_used_and_or_reserved_stations(observation_subtask, remove_reserved_stations=True, remove_used_stations=True, misc_unavailable_stations=misc_unavailable_stations) except TooManyStationsUnavailableException as e: raise SubtaskSchedulingException('Cannot schedule subtask id=%s at start_time=\'%s\'. Too many stations unavailable because of reservations/observations.' % (observation_subtask.id, observation_subtask.scheduled_start_time)) @@ -1723,7 +1723,8 @@ def get_used_stations_in_timewindow(lower_bound: datetime, upper_bound: datetime def convert_task_station_groups_specification_to_station_list_without_used_and_or_reserved_stations(subtask: Subtask, remove_reserved_stations: bool=True, remove_used_stations: bool=True, raise_when_too_many_missing: bool=True, - lower_bound: datetime=None, upper_bound: datetime=None) -> []: + lower_bound: datetime=None, upper_bound: datetime=None, + misc_unavailable_stations: Iterable[str]=None) -> []: '''convert the station_groups specified in the subtask's task specification to a station list of available stations between the given lower/upper_bound, or the subtask's scheduled start/stop time when lower/upper_bound are None :raises: TooManyStationsUnavailableException when one or more of the specified station_groups is missing more stations than allowed. ''' @@ -1752,41 +1753,19 @@ def convert_task_station_groups_specification_to_station_list_without_used_and_o upper_bound=upper_bound)) unavailable_stations |= used_stations + if misc_unavailable_stations: + unavailable_stations |= set(misc_unavailable_stations) + try: + from .tasks import convert_station_groups_to_list_of_available_stations return convert_station_groups_to_list_of_available_stations(subtask.task_blueprint.specified_station_groups, unavailable_stations, raise_when_too_many_missing=raise_when_too_many_missing) except TooManyStationsUnavailableException as e: # add some nice subtask and time info to exception message raise TooManyStationsUnavailableException("subtask id=%s between '%s' and '%s' %s", subtask.id, lower_bound, upper_bound, str(e)) -def convert_station_groups_to_list_of_available_stations(station_groups: Iterable[dict], unavailable_stations: Iterable[str], raise_when_too_many_missing: bool=True): - # create station_list with stations that are actually available (and not reserved/used/unavailable) - station_list = [] - for i, station_group in enumerate(station_groups): - requested_stations = set(station_group['stations']) - available_stations = requested_stations - set(unavailable_stations) - missing_stations = requested_stations - available_stations - max_nr_missing = station_group.get('max_nr_missing', 0) - if raise_when_too_many_missing and len(missing_stations) > max_nr_missing: - # early exit. No need to evaluate more groups when one groups does not meet the requirements - raise TooManyStationsUnavailableException('missing more than max_nr_missing=%s stations\nrequested: %s\navailable: %s\nmissing: %s' % ( - max_nr_missing, - ','.join(sorted(list(requested_stations))), - ','.join(sorted(list(available_stations))), - ','.join(sorted(list(missing_stations))))) - - station_list.extend(available_stations) - - if logger.isEnabledFor(logging.DEBUG) and len(missing_stations) > 0: - logger.debug('unavailable requested stations in group \'%s\': %s (which is within the allowed max_nr_missing=%d)', i, sorted(list(missing_stations)), max_nr_missing) - - # collapse to sorted list of available unique stations - station_list = sorted(list(set(station_list))) - return station_list - - -def enough_stations_available(subtask: Subtask, remove_reserved_stations: bool=True, remove_used_stations: bool=True, - lower_bound: datetime=None, upper_bound: datetime=None) -> bool: +def enough_stations_available_for_subtask(subtask: Subtask, remove_reserved_stations: bool=True, remove_used_stations: bool=True, + lower_bound: datetime=None, upper_bound: datetime=None) -> bool: '''Are there enough stations available in the schedule for the given window between lower/upper_bound? (the subtask's specified scheduled start/stop time are used as lower/upper_bound when None)''' try: stations = convert_task_station_groups_specification_to_station_list_without_used_and_or_reserved_stations(subtask, remove_reserved_stations=remove_reserved_stations, remove_used_stations=remove_used_stations, @@ -2282,7 +2261,7 @@ def schedule_copy_subtask(copy_subtask: Subtask): # === Misc === -def schedule_independent_subtasks_in_task_blueprint(task_blueprint: TaskBlueprint, start_time: datetime=None) -> [Subtask]: +def schedule_independent_subtasks_in_task_blueprint(task_blueprint: TaskBlueprint, start_time: datetime=None, misc_unavailable_stations: Iterable[str]=None) -> [Subtask]: '''Convenience method: Schedule (and return) the subtasks in the task_blueprint that are not dependend on any predecessors''' with transaction.atomic(): independent_subtasks = list(Subtask.independent_subtasks().filter(task_blueprint__id=task_blueprint.id, state__value=SubtaskState.Choices.DEFINED.value).all()) @@ -2290,7 +2269,7 @@ def schedule_independent_subtasks_in_task_blueprint(task_blueprint: TaskBlueprin for subtask in independent_subtasks: if start_time is not None: subtask.scheduled_start_time = start_time - schedule_subtask_and_update_successor_start_times(subtask) + schedule_subtask_and_update_successor_start_times(subtask, misc_unavailable_stations=misc_unavailable_stations) return independent_subtasks diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py index 69e4b4f7a8d..48fc5153fb8 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py @@ -6,6 +6,7 @@ from lofar.sas.tmss.tmss.tmssapp.models.specification import TaskBlueprint, Sche from lofar.sas.tmss.tmss.tmssapp.subtasks import create_or_update_subtasks_from_task_blueprint, schedule_independent_subtasks_in_task_blueprint, update_subtasks_start_times_for_scheduling_unit, get_gaps_to_previous_and_next_observations from lofar.common.datetimeutils import round_to_minute_precision from functools import cmp_to_key +from collections.abc import Iterable import os from copy import deepcopy from typing import Tuple @@ -637,7 +638,7 @@ def update_task_blueprints_and_subtasks_graph_from_draft(scheduling_unit_bluepri return scheduling_unit_blueprint -def schedule_independent_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint: SchedulingUnitBlueprint, start_time: datetime=None) -> models.SchedulingUnitBlueprint: +def schedule_independent_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint: SchedulingUnitBlueprint, start_time: datetime=None, misc_unavailable_stations: Iterable[str]=None) -> models.SchedulingUnitBlueprint: '''Convenience method: Schedule the subtasks in the scheduling_unit_blueprint that are not dependend on predecessors''' if start_time is None: if scheduling_unit_blueprint.scheduled_start_time is None: @@ -648,7 +649,7 @@ def schedule_independent_subtasks_in_scheduling_unit_blueprint(scheduling_unit_b task_blueprints = list(scheduling_unit_blueprint.task_blueprints.all()) for task_blueprint in task_blueprints: - schedule_independent_subtasks_in_task_blueprint(task_blueprint, start_time=start_time+task_blueprint.relative_start_time) + schedule_independent_subtasks_in_task_blueprint(task_blueprint, start_time=start_time+task_blueprint.relative_start_time, misc_unavailable_stations=misc_unavailable_stations) scheduling_unit_blueprint.refresh_from_db() @@ -671,11 +672,11 @@ def unschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint: return scheduling_unit_blueprint -def reschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint: SchedulingUnitBlueprint) -> models.SchedulingUnitBlueprint: +def reschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint: SchedulingUnitBlueprint, misc_unavailable_stations: Iterable[str]=None) -> models.SchedulingUnitBlueprint: '''Convenience method: Unschedule all scheduled subtasks and schedule them again in one transaction''' with transaction.atomic(): unschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint) - schedule_independent_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint) + schedule_independent_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint, misc_unavailable_stations=misc_unavailable_stations) scheduling_unit_blueprint.refresh_from_db() return scheduling_unit_blueprint @@ -994,3 +995,45 @@ def mark_scheduling_unit_dynamically_scheduled(scheduling_unit_blueprint: Schedu return scheduling_unit_blueprint +def convert_station_groups_to_list_of_available_stations(station_groups: Iterable[dict], unavailable_stations: Iterable[str], raise_when_too_many_missing: bool=True): + # create station_list with stations that are actually available (and not reserved/used/unavailable) + station_list = [] + for i, station_group in enumerate(station_groups): + requested_stations = set(station_group['stations']) + available_stations = requested_stations - set(unavailable_stations) + missing_stations = requested_stations - available_stations + max_nr_missing = station_group.get('max_nr_missing', 0) + if raise_when_too_many_missing and len(missing_stations) > max_nr_missing: + # early exit. No need to evaluate more groups when one groups does not meet the requirements + raise TooManyStationsUnavailableException('missing more than max_nr_missing=%s stations\nrequested: %s\navailable: %s\nmissing: %s' % ( + max_nr_missing, + ','.join(sorted(list(requested_stations))), + ','.join(sorted(list(available_stations))), + ','.join(sorted(list(missing_stations))))) + + station_list.extend(available_stations) + + if logger.isEnabledFor(logging.DEBUG) and len(missing_stations) > 0: + logger.debug('unavailable requested stations in group \'%s\': %s (which is within the allowed max_nr_missing=%d)', i, sorted(list(missing_stations)), max_nr_missing) + + # collapse to sorted list of available unique stations + station_list = sorted(list(set(station_list))) + return station_list + +def enough_stations_available_for_task(task: TaskBlueprint, unavailable_stations: Iterable[str]) -> bool: + '''Are there enough stations available if the given unavailable_stations are removed? + Checks all stations_groups of the spec, and checks if there are less stations missing than max_nr_missing''' + try: + convert_station_groups_to_list_of_available_stations(task.specified_station_groups, + unavailable_stations, + raise_when_too_many_missing=True) + except TooManyStationsUnavailableException as e: + logger.debug(e) + return False + return True + + +def enough_stations_available_for_scheduling_unit(scheduling_unit: SchedulingUnitBlueprint, unavailable_stations: Iterable[str]) -> bool: + '''Are there enough stations available if the given unavailable_stations are removed? + Checks all stations_groups of the spec, and checks if there are less stations missing than max_nr_missing''' + return all(enough_stations_available_for_task(obs_task, unavailable_stations) for obs_task in scheduling_unit.observation_tasks.all()) -- GitLab