diff --git a/SAS/TMSS/backend/services/scheduling/lib/constraints/template_constraints_v1.py b/SAS/TMSS/backend/services/scheduling/lib/constraints/template_constraints_v1.py index dd52adb95b8609518735e051a5f7bb99dcb98afc..34df77a4037f24109ce3b1ea8c0b4aec1159acaf 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/constraints/template_constraints_v1.py +++ b/SAS/TMSS/backend/services/scheduling/lib/constraints/template_constraints_v1.py @@ -102,6 +102,10 @@ def can_run_anywhere_within_timewindow_with_daily_constraints(scheduling_unit: m """ main_observation_task_name = get_target_observation_task_name_from_requirements_doc(scheduling_unit) constraints = scheduling_unit.scheduling_constraints_doc + + if not "daily" in constraints: + return True + if constraints['daily']['require_day'] or constraints['daily']['require_night'] or constraints['daily']['avoid_twilight']: if (upper_bound - lower_bound).days >= 1: @@ -161,7 +165,7 @@ def can_run_within_timewindow_with_time_constraints(scheduling_unit: models.Sche constraints = scheduling_unit.scheduling_constraints_doc # Check the 'at' constraint and then only check can_run_anywhere for the single possible time window - if 'at' in constraints['time']: + if 'time' in constraints and 'at' in constraints['time']: at = parser.parse(constraints['time']['at'], ignoretz=True) if (at >= lower_bound and at + scheduling_unit.duration <= upper_bound): # todo: suggestion: use scheduling_unit.requirements_doc['tasks']['Observation']['specifications_doc']['duration'] return can_run_anywhere_within_timewindow_with_time_constraints(scheduling_unit, lower_bound=at, @@ -191,6 +195,9 @@ def can_run_anywhere_within_timewindow_with_time_constraints(scheduling_unit: mo can_run_not_between = True constraints = scheduling_unit.scheduling_constraints_doc + if not "time" in constraints: + return True + # given time window needs to end before constraint if 'before' in constraints['time']: before = parser.parse(constraints['time']['before'], ignoretz=True) @@ -355,8 +362,6 @@ def can_run_anywhere_within_timewindow_with_sky_constraints(scheduling_unit: mod average_transit_time = _reference_date + sum([date - _reference_date for date in sap_datetime_list], timedelta()) / len(sap_datetime_list) transit_times.get(station, []).append(average_transit_time) - logger.warning('##### %s' % transit_times) - for station, times in transit_times.items(): for i in range(len(timestamps)): offset = (timestamps[i] - times[i]).total_seconds() @@ -391,15 +396,15 @@ def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBluep main_observation_task_name = get_target_observation_task_name_from_requirements_doc(scheduling_unit) duration = timedelta(seconds=scheduling_unit.requirements_doc['tasks'][main_observation_task_name]['specifications_doc']['duration']) try: - if 'at' in constraints['time']: + if 'time' in constraints and 'at' in constraints['time']: at = parser.parse(constraints['time']['at'], ignoretz=True) return max(lower_bound, at) - if 'after' in constraints['time']: + if 'time' in constraints and 'after' in constraints['time']: after = parser.parse(constraints['time']['after'], ignoretz=True) return max(lower_bound, after) - if constraints['daily']['require_day'] or constraints['daily']['require_night'] or constraints['daily']['avoid_twilight']: + if 'daily' in constraints and (constraints['daily']['require_day'] or constraints['daily']['require_night'] or constraints['daily']['avoid_twilight']): station_groups = scheduling_unit.requirements_doc['tasks'][main_observation_task_name]['specifications_doc']["station_groups"] stations = list(set(sum([group['stations'] for group in station_groups], []))) # flatten all station_groups to single list all_sun_events = timestamps_and_stations_to_sun_rise_and_set(timestamps=(lower_bound,lower_bound+timedelta(days=1)), stations=tuple(stations)) @@ -467,7 +472,7 @@ def compute_scores(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: # for now (as a proof of concept and sort of example), just return 1's. Return 1000 (placeholder value, change later) if the 'at' constraint is in, so it gets prioritised. scores = {'daily': 1.0, - 'time': 1000.0 if ('at' in constraints['time'] and constraints['time']['at'] is not None) else 1.0, + 'time': 1000.0 if ('time' in constraints and 'at' in constraints['time'] and constraints['time']['at'] is not None) else 1.0, 'sky': 1.0} # add "common" scores which do not depend on constraints, such as project rank and creation date diff --git a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py index dd5ea7be4beee80fab8f995cd87f8b1a215ca47d..5769fa88fbda113e9cb37bc1b3ed98007abcbbc8 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py @@ -70,13 +70,13 @@ def find_best_next_schedulable_unit(scheduling_units:[models.SchedulingUnitBluep if filtered_scheduling_units: triggered_scheduling_units = [scheduling_unit for scheduling_unit in filtered_scheduling_units if scheduling_unit.is_triggered] if triggered_scheduling_units: - highest_priority_triggered_scheduling_unit = max(triggered_scheduling_units, key=lambda su: su.project.trigger_priority) - return ScoredSchedulingUnit(scheduling_unit=highest_priority_triggered_scheduling_unit, - start_time=get_earliest_possible_start_time(highest_priority_triggered_scheduling_unit, lower_bound_start_time), - scores={}, weighted_score=None) # we don't care about scores in case of a trigger + highest_priority_triggered_scheduling_unit = max(triggered_scheduling_units, key=lambda su: su.project.trigger_priority) + best_scored_scheduling_unit = ScoredSchedulingUnit(scheduling_unit=highest_priority_triggered_scheduling_unit, + start_time=get_earliest_possible_start_time(highest_priority_triggered_scheduling_unit, lower_bound_start_time), + scores={}, weighted_score=None) # we don't care about scores in case of a trigger else: best_scored_scheduling_unit = get_best_scored_scheduling_unit_scored_by_constraints(filtered_scheduling_units, lower_bound_start_time, upper_bound_stop_time) - return best_scored_scheduling_unit + return best_scored_scheduling_unit # no filtered scheduling units found... logger.debug("No schedulable scheduling units found which meet the requirements between '%s' and '%s'", lower_bound_start_time, upper_bound_stop_time) @@ -99,10 +99,14 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: lower_bound_start_time = get_min_earliest_possible_start_time(schedulable_units, datetime.utcnow()+DEFAULT_NEXT_STARTTIME_GAP) # estimate the upper_bound_stop_time, which may give us a small timewindow before any next scheduled unit, or a default window of a day - try: - upper_bound_stop_time = max(su.start_time for su in get_scheduled_scheduling_units(lower=lower_bound_start_time, upper=lower_bound_start_time + timedelta(days=1))) - except ValueError: + if any([su.is_triggered for su in schedulable_units]): + # ignore what's scheduled if we have triggers upper_bound_stop_time = lower_bound_start_time + timedelta(days=1) + else: + try: + upper_bound_stop_time = max(su.start_time for su in get_scheduled_scheduling_units(lower=lower_bound_start_time, upper=lower_bound_start_time + timedelta(days=1))) + except ValueError: + upper_bound_stop_time = lower_bound_start_time + timedelta(days=1) # no need to irritate user in log files with subsecond scheduling precision lower_bound_start_time = round_to_second_precision(lower_bound_start_time) @@ -327,7 +331,7 @@ def get_running_observation_subtasks(stopping_after:datetime=None) -> [models.Su running_obs_subtasks = models.Subtask.objects.filter(state__value__in=[models.SubtaskState.Choices.STARTING.value, models.SubtaskState.Choices.STARTED.value], specifications_template__type__value=models.SubtaskType.Choices.OBSERVATION.value) if stopping_after is not None: - running_obs_subtasks = running_obs_subtasks.filter(stop_time__lte=stopping_after) + running_obs_subtasks = running_obs_subtasks.filter(stop_time__gte=stopping_after) return list(running_obs_subtasks.all()) @@ -345,9 +349,12 @@ def unschededule_blocking_scheduled_units_if_needed_and_possible(candidate: Scor # scores, but only check trigger priority. if candidate.scheduling_unit.is_triggered: if (not scheduled_scheduling_unit.is_triggered) or candidate.scheduling_unit.project.trigger_priority > scheduled_scheduling_unit.project.trigger_priority: - unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) + logger.info("unscheduling id=%s '%s' because it is in the way and has a lower trigger_priority=%s than the best candidate id=%s '%s' trigger_priority=%s start_time=%s", + scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, scheduled_scheduling_unit.project.trigger_priority, + candidate.scheduling_unit.id, candidate.scheduling_unit.name, candidate.scheduling_unit.project.trigger_priority, candidate.scheduling_unit.start_time) + unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) - if candidate.weighted_score > scheduled_score.weighted_score: + elif candidate.weighted_score > scheduled_score.weighted_score: # ToDo: also check if the scheduled_scheduling_unit is manually/dynamically scheduled logger.info("unscheduling id=%s '%s' because it is in the way and has a lower score than the best candidate id=%s '%s' score=%s start_time=%s", scheduled_scheduling_unit.id, scheduled_scheduling_unit.name, @@ -385,5 +392,13 @@ def cancel_running_observation_if_needed_and_possible(candidate: ScoredSchedulin continue if candidate.scheduling_unit.project.trigger_priority > obs.project.trigger_priority: logger.info('cancelling observation subtask pk=%s trigger_priority=%s because it blocks the triggered scheduling_unit pk=%s trigger_priority=%s' % - obs.pk, obs.project.trigger_priority, candidate.scheduling_unit.pk, candidate.scheduling_unit.project.trigger_priority) + (obs.pk, obs.project.trigger_priority, candidate.scheduling_unit.pk, candidate.scheduling_unit.project.trigger_priority)) + # todo: check if cancellation is really necessary or the trigger can be scheduled afterwards + # I guess we could just do can_run_after(candidate, obs.stop_time) here for that? + # We could also only do this, of there is a 'before' constraint on each trigger. + # -> Clarify and implemented with TMSS-704. cancel_subtask(obs) + else: + logger.info('NOT cancelling subtask pk=%s trigger_priority=%s for triggered scheduling_unit pk=%s trigger_priority=%s because its priority is too low' % + (obs.pk, obs.project.trigger_priority, candidate.scheduling_unit.pk, candidate.scheduling_unit.project.trigger_priority)) + diff --git a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py index 9cb4e4059f30569a045fbd07c565d6fdb36a9743..93aea38eb9fee4974120462d18c712345810a3db 100755 --- a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py @@ -1788,6 +1788,12 @@ class TestTriggers(TestCase): self.scheduling_set.project.can_trigger = True self.scheduling_set.project.save() + # create a second scheduling set in a project that allows triggers and has higher trigger_priority + self.scheduling_set_high_trigger_priority = models.SchedulingSet.objects.create(**SchedulingSet_test_data()) + self.scheduling_set_high_trigger_priority.project.can_trigger = True + self.scheduling_set_high_trigger_priority.project.trigger_priority = self.scheduling_set_high_trigger_priority.project.trigger_priority + 1 + self.scheduling_set_high_trigger_priority.project.save() + def test_simple_triggered_scheduling_unit_gets_scheduled(self): scheduling_unit_draft = TestDynamicScheduling.create_simple_observation_scheduling_unit( @@ -1806,7 +1812,7 @@ class TestTriggers(TestCase): def test_triggered_scheduling_unit_with_at_constraint_gets_scheduled_at_correct_time(self): scheduling_unit_draft = TestDynamicScheduling.create_simple_observation_scheduling_unit( - 'scheduling_unit for at constraint', + 'scheduling_unit for at %s' % self._testMethodName, scheduling_set=self.scheduling_set) # Clear constraints scheduling_unit_draft.scheduling_constraints_doc['sky'] = {} @@ -1845,12 +1851,166 @@ class TestTriggers(TestCase): scheduled_scheduling_unit = do_dynamic_schedule() + # Assert the triggered scheduling_unit has been scheduled + self.assertIsNotNone(scheduled_scheduling_unit) + self.assertEqual(scheduled_scheduling_unit.id, triggered_scheduling_unit_blueprint.id) + self.assertEqual(regular_scheduling_unit_blueprint.status, 'schedulable') + self.assertEqual(triggered_scheduling_unit_blueprint.status, 'scheduled') + + def test_triggered_scheduling_unit_unschedules_regular_observation(self): + + # create a regular scheduling_unit + scheduling_unit_draft = TestDynamicScheduling.create_simple_observation_scheduling_unit( + "scheduling unit for %s" % self._testMethodName, + scheduling_set=self.scheduling_set, + is_triggered=False) + regular_scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + regular_scheduling_unit_blueprint.scheduling_constraints_doc = {} + regular_scheduling_unit_blueprint.save() + + scheduled_scheduling_unit = do_dynamic_schedule() + # Assert the scheduling_unit has been scheduled self.assertIsNotNone(scheduled_scheduling_unit) + self.assertEqual(scheduled_scheduling_unit.id, regular_scheduling_unit_blueprint.id) + self.assertEqual(regular_scheduling_unit_blueprint.status, 'scheduled') + + # add a triggered scheduling_unit + scheduling_unit_draft = TestDynamicScheduling.create_simple_observation_scheduling_unit( + "triggered scheduling unit for %s" % self._testMethodName, + scheduling_set=self.scheduling_set, + is_triggered=True) + triggered_scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + triggered_scheduling_unit_blueprint.scheduling_constraints_doc = {} + triggered_scheduling_unit_blueprint.save() + + scheduled_scheduling_unit = do_dynamic_schedule() + + # Assert now the new triggered scheduling_unit has been scheduled, and the regular one has been unscheduled + self.assertIsNotNone(scheduled_scheduling_unit) self.assertEqual(scheduled_scheduling_unit.id, triggered_scheduling_unit_blueprint.id) self.assertEqual(regular_scheduling_unit_blueprint.status, 'schedulable') self.assertEqual(triggered_scheduling_unit_blueprint.status, 'scheduled') + @mock.patch("lofar.sas.tmss.services.scheduling.dynamic_scheduling.cancel_subtask") + def test_triggered_scheduling_unit_cancels_regular_observation(self, cancel_mock): + + # create a regular scheduling_unit + scheduling_unit_draft = TestDynamicScheduling.create_simple_observation_scheduling_unit( + "scheduling unit for %s" % self._testMethodName, + scheduling_set=self.scheduling_set, + is_triggered=False) + regular_scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + regular_scheduling_unit_blueprint.scheduling_constraints_doc = {} + regular_scheduling_unit_blueprint.save() + + scheduled_scheduling_unit = do_dynamic_schedule() + + # Assert the scheduling_unit has been scheduled + self.assertIsNotNone(scheduled_scheduling_unit) + self.assertEqual(scheduled_scheduling_unit.id, regular_scheduling_unit_blueprint.id) + self.assertEqual(regular_scheduling_unit_blueprint.status, 'scheduled') + + # put obs to started state + subtask = scheduled_scheduling_unit.task_blueprints.first().subtasks.first() + subtask.state = models.SubtaskState.objects.get(value='starting') + subtask.save() + subtask.state = models.SubtaskState.objects.get(value='started') + subtask.save() + + # assert obs it detected as running + running_subtasks = get_running_observation_subtasks() + self.assertIn(subtask, running_subtasks) + + # also assert cut-off date is considered + running_subtasks = get_running_observation_subtasks(subtask.stop_time + timedelta(minutes=5)) + self.assertNotIn(subtask, running_subtasks) + + # add a triggered scheduling_unit with higher priority + scheduling_unit_draft = TestDynamicScheduling.create_simple_observation_scheduling_unit( + "triggered scheduling unit for %s" % self._testMethodName, + scheduling_set=self.scheduling_set_high_trigger_priority, + is_triggered=True) + triggered_scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + triggered_scheduling_unit_blueprint.scheduling_constraints_doc = {} + triggered_scheduling_unit_blueprint.save() + + # wipe all radb entries (via cascading deletes) so that we don't get a conflict later because we omöy mock + # the cancellation. + # todo: TMSS-704: if we don't do this, the triggered SU goes in error state (conflict due to the cancel being + # mocked?) - confirm that's ok. + with PostgresDatabaseConnection(tmss_test_env.ra_test_environment.radb_test_instance.dbcreds) as radb: + radb.executeQuery('DELETE FROM resource_allocation.specification;') + radb.executeQuery('TRUNCATE resource_allocation.resource_usage;') + radb.commit() + + scheduled_scheduling_unit = do_dynamic_schedule() + + # assert the subtask has been cancelled + cancel_mock.assert_called_with(subtask) + + # Note that we cannot check that the regular_scheduling_unit_blueprint or the subtask has been cancelled since + # we only mocked the cancellation. + + # Assert now the new triggered scheduling_unit has been scheduled + self.assertIsNotNone(scheduled_scheduling_unit) + self.assertEqual(scheduled_scheduling_unit.id, triggered_scheduling_unit_blueprint.id) + self.assertEqual(triggered_scheduling_unit_blueprint.status, 'scheduled') + + @mock.patch("lofar.sas.tmss.services.scheduling.dynamic_scheduling.cancel_subtask") + def test_triggered_scheduling_unit_does_not_cancel_regular_observation_with_same_trigger_priority(self, cancel_mock): + + # create a regular scheduling_unit + scheduling_unit_draft = TestDynamicScheduling.create_simple_observation_scheduling_unit( + "scheduling unit for %s" % self._testMethodName, + scheduling_set=self.scheduling_set, + is_triggered=False) + regular_scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + regular_scheduling_unit_blueprint.scheduling_constraints_doc = {} + regular_scheduling_unit_blueprint.save() + + scheduled_scheduling_unit = do_dynamic_schedule() + + # Assert the scheduling_unit has been scheduled + self.assertIsNotNone(scheduled_scheduling_unit) + self.assertEqual(scheduled_scheduling_unit.id, regular_scheduling_unit_blueprint.id) + self.assertEqual(regular_scheduling_unit_blueprint.status, 'scheduled') + + # put obs to started state + subtask = scheduled_scheduling_unit.task_blueprints.first().subtasks.first() + subtask.state = models.SubtaskState.objects.get(value='starting') + subtask.save() + subtask.state = models.SubtaskState.objects.get(value='started') + subtask.save() + + # assert obs it detected as running + running_subtasks = get_running_observation_subtasks() + self.assertIn(subtask, running_subtasks) + + # also assert cut-off date is considered + running_subtasks = get_running_observation_subtasks(subtask.stop_time + timedelta(minutes=5)) + self.assertNotIn(subtask, running_subtasks) + + # add a triggered scheduling_unit with higher priority + scheduling_unit_draft = TestDynamicScheduling.create_simple_observation_scheduling_unit( + "triggered scheduling unit for %s" % self._testMethodName, + scheduling_set=self.scheduling_set, + is_triggered=True) + triggered_scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + triggered_scheduling_unit_blueprint.scheduling_constraints_doc = {} + triggered_scheduling_unit_blueprint.save() + + scheduled_scheduling_unit = do_dynamic_schedule() + + # assert that the subtask has NOT been cancelled and is still in state 'started' + cancel_mock.assert_not_called() + self.assertEqual(subtask.state.value, 'started') + + # Assert that the new triggered scheduling_unit has NOT been scheduled, and the regular one is still observing + self.assertIsNone(scheduled_scheduling_unit) + self.assertEqual(regular_scheduling_unit_blueprint.status, 'observing') + self.assertEqual(triggered_scheduling_unit_blueprint.status, 'error') # todo: TMSS-704: this should be 'schedulable' as long as no constraint is blocking it, or unschedulable otherwise. + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)