From ca1f45626e58060a99e2d364d4080a10c9a4ffe3 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Thu, 14 Sep 2023 18:28:14 +0200 Subject: [PATCH] TMSS-2577: implemented project/cycle add/update/delete events, and handler, and made the scheduler run on such an event. Added test. --- .../lib/postgres_listener.py | 21 +++++++ .../services/scheduling/lib/constraints.py | 6 ++ .../scheduling/lib/dynamic_scheduling.py | 33 +++++++++-- .../scheduling/test/t_dynamic_scheduling.py | 58 +++++++++++++++++++ .../src/tmss/tmssapp/models/specification.py | 11 ++-- SAS/TMSS/client/lib/tmssbuslistener.py | 24 ++++++++ 6 files changed, 141 insertions(+), 12 deletions(-) diff --git a/SAS/TMSS/backend/services/postgres_listener/lib/postgres_listener.py b/SAS/TMSS/backend/services/postgres_listener/lib/postgres_listener.py index d94ca493c20..f65eb9c9cb1 100644 --- a/SAS/TMSS/backend/services/postgres_listener/lib/postgres_listener.py +++ b/SAS/TMSS/backend/services/postgres_listener/lib/postgres_listener.py @@ -177,6 +177,16 @@ class TMSSPGListener(PostgresListener): self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_projectquotaarchivelocation', 'delete')) self.subscribe('tmssapp_projectquotaarchivelocation_delete', self.onProjectQuotaArchiveLocationDeleted) + # ProjectCycle + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_project_cycles', 'insert')) + self.subscribe('tmssapp_project_cycles_insert', self.onProjectCyclesInserted) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_project_cycles', 'update')) + self.subscribe('tmssapp_project_cycles_update', self.onProjectCyclesUpdated) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_project_cycles', 'delete')) + self.subscribe('tmssapp_project_cycles_delete', self.onProjectCyclesDeleted) + # Reservation self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_reservation', 'insert')) @@ -335,6 +345,17 @@ class TMSSPGListener(PostgresListener): def onProjectQuotaArchiveLocationDeleted(self, payload = None): self._sendNotification(TMSS_PROJECTQUOTAARCHIVELOCATION_OBJECT_EVENT_PREFIX+'.Deleted', payload) + + def onProjectCyclesInserted(self, payload = None): + self._sendNotification(TMSS_PROJECTCYCLES_OBJECT_EVENT_PREFIX+'.Created', payload) + + def onProjectCyclesUpdated(self, payload = None): + self._sendNotification(TMSS_PROJECTCYCLES_OBJECT_EVENT_PREFIX+'.Updated', payload) + + def onProjectCyclesDeleted(self, payload = None): + self._sendNotification(TMSS_PROJECTCYCLES_OBJECT_EVENT_PREFIX+'.Deleted', payload) + + def onSubsystemStatusUpdated(self, payload = None): payload = json.loads(payload) payload['status'] = payload.pop('status_id') diff --git a/SAS/TMSS/backend/services/scheduling/lib/constraints.py b/SAS/TMSS/backend/services/scheduling/lib/constraints.py index 99b0f8bc2cd..662645e8141 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/constraints.py +++ b/SAS/TMSS/backend/services/scheduling/lib/constraints.py @@ -578,6 +578,9 @@ def can_run_at_within_cycles_bounds(scheduling_unit: models.SchedulingUnitBluepr def can_run_within_cycles_bounds(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime, upper_bound: datetime) -> bool: '''determine if the given scheduling_unit can run withing the unit's cycle start/end times for the given timewindow [lower_bound, upper_bound] ''' + if scheduling_unit.latest_possible_cycle_stop_time is None or scheduling_unit.earliest_possible_cycle_start_time is None: + return False + if upper_bound > scheduling_unit.latest_possible_cycle_stop_time: return False @@ -2197,6 +2200,9 @@ def determine_unschedulable_reason_and_mark_unschedulable_if_needed(scheduling_u msg = "Stations %s are already used" % (','.join([str(s) for s in missing_stations]), ) return mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit, msg) + if scheduling_unit.earliest_possible_cycle_start_time is None or scheduling_unit.latest_possible_cycle_stop_time is None: + return mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit, "unknown cycle bounds") + at = get_at_constraint_timestamp(scheduling_unit) if at: if not can_run_at_within_cycles_bounds(scheduling_unit, at): diff --git a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py index d2521fc297b..b3d3d020f9e 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py @@ -236,17 +236,14 @@ class Scheduler: at_timestamp = get_at_constraint_timestamp(schedulable_unit) set_scheduling_unit_blueprint_start_times(schedulable_unit, first_start_time=at_timestamp, placed=True) - # get the earliest possible starttime, which should be the fixed_time 'at' constraint, but can be None if the unit cannot run for some reason. - runnable_start_time = get_earliest_possible_start_time(schedulable_unit, datetime.utcnow(), gridder=self.search_gridder, raise_if_interruped=self._raise_if_triggered) - if runnable_start_time is None: + if not can_run_at(schedulable_unit, at_timestamp, self.fine_gridder): unschedulable_unit = determine_unschedulable_reason_and_mark_unschedulable_if_needed(schedulable_unit, at_timestamp, at_timestamp + schedulable_unit.specified_observation_duration, proposed_start_time=at_timestamp, gridder=self.search_gridder, raise_if_interruped=self._raise_if_triggered) logger.warning("Cannot schedule fixed_time unit [%s/%s] id=%d at '%s': %s", i, len(schedulable_units), unschedulable_unit.id, at_timestamp, unschedulable_unit.unschedulable_reason) continue - assert(at_timestamp == runnable_start_time) # make sure we schedule at the requested 'at' timestamp - scheduled_unit = self.try_schedule_unit(schedulable_unit, runnable_start_time) + scheduled_unit = self.try_schedule_unit(schedulable_unit, at_timestamp) if scheduled_unit: assert (scheduled_unit.status.value == models.SchedulingUnitStatus.Choices.SCHEDULED.value) logger.info("Scheduled fixed_time unit [%s/%s] id=%d at '%s'", i, len(schedulable_units), schedulable_unit.id, at_timestamp) @@ -543,6 +540,7 @@ class Scheduler: # make union over all schedulable and scheduled unit(s), # because the scheduled unit needs to be scored and evaluated as well candidate_units = list(schedulable_units | dynamicly_scheduled_scheduling_units) + candidate_units = [su for su in candidate_units if su.draft.scheduling_set.project.cycles.exists()] if len(candidate_units) == 0: continue # continue with the next in order schedulable_units @@ -646,7 +644,7 @@ class Scheduler: # so they are ignored next time. # It's up to the user/operator to tweak their constraints which makes them schedulable again, for a next try. for su in get_dynamically_schedulable_scheduling_units(): - determine_unschedulable_reason_and_mark_unschedulable_if_needed(su, lower_bound_start_time, upper_bound_stop_time, + determine_unschedulable_reason_and_mark_unschedulable_if_needed(su, datetime.utcnow(), datetime.utcnow()+timedelta(days=365), proposed_start_time=None, gridder=self.search_gridder, raise_if_interruped=self._raise_if_triggered) @@ -909,6 +907,15 @@ def mark_scheduling_units_for_inactive_projects_unschedulable(projects: [str]=No set_scheduling_unit_blueprint_start_times(scheduling_unit, first_start_time=round_to_second_precision(get_at_constraint_timestamp(scheduling_unit) or datetime.utcnow())) +def unschedule_scheduled_units_for_cycleless_projects(): + scheduled_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULED.value).filter(obsolete_since__isnull=True) + + for scheduling_unit in scheduled_units: + if not scheduling_unit.draft.scheduling_set.project.cycles.exists(): + logger.info("unscheduling unit id=%s project=%s because the project has no cycle(s) (anymore)", scheduling_unit.id, scheduling_unit.project.name) + unschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit) + + def mark_unschedulable_scheduling_units_for_active_projects_schedulable(projects: [str]=None): ''' mark all scheduling_units which are currently unschedulable (for the given project(s), or all projects if None) as schedulable again. @@ -1104,6 +1111,20 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): logger.info("project '%s' rank changed to %s", name, rank) self.scheduler.trigger() + def _onProjectCyclesCreatedUpdatedDeleted(self, id: int): + logger.info("a project was added/removed to a cycle. triggering new scheduling round") + mark_unschedulable_scheduling_units_for_active_projects_schedulable() + unschedule_scheduled_units_for_cycleless_projects() + self.scheduler.trigger() + + def onProjectCyclesCreated(self, id: int): + self._onProjectCyclesCreatedUpdatedDeleted(id) + + def onProjectCyclesUpdated(self, id: int): + self._onProjectCyclesCreatedUpdatedDeleted(id) + + def onProjectCyclesDeleted(self, id: int): + self._onProjectCyclesCreatedUpdatedDeleted(id) def onReservationCreated(self, id: int): self._onReservationCreatedOrUpdated(id) diff --git a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py index b3c88f8bb3e..0eea390589b 100755 --- a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py @@ -163,6 +163,11 @@ class BaseDynamicSchedulingTestCase(unittest.TestCase): scheduling_set.project.project_state = models.ProjectState.objects.get(value=models.ProjectState.Choices.ACTIVE.value) scheduling_set.project.save() + # ensure the (reused) project has a cycle + if not scheduling_set.project.cycles.exists(): + scheduling_set.project.cycles.add(models.Cycle.objects.create(**Cycle_test_data(start=datetime.utcnow()-timedelta(days=365), + stop=datetime.utcnow()+timedelta(days=365)))) + # add the scheduling_unit_doc to a new SchedulingUnitDraft instance, and were ready to use it! scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(name=name, scheduling_set=scheduling_set, @@ -956,6 +961,57 @@ class TestFixedTimeScheduling(BaseDynamicSchedulingTestCase): self.assertEqual(at, scheduling_unit_blueprint.scheduled_start_time) + def test_project_cycle_event_handling(self): + """ + Test a simple observation with the 'at' constraint and 'fixed_time' scheduler. + First ensure it is unschedulable by making the project not part of any cycle, so there is no cycle-window to schedule it in. + Then add the project to a cycle, and see if the scheduler picks it up and schedules it. + """ + project = models.Project.objects.create(**Project_test_data(name=str(uuid.uuid4()), project_state=models.ProjectState.objects.get(value=models.ProjectState.Choices.ACTIVE.value))) + scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data(project=project)) + + # Use at constraint in the near future + at = round_to_second_precision(datetime.utcnow() + timedelta(days=1)) + scheduling_unit_draft = self.create_simple_observation_scheduling_unit_fixed_time(at=at, scheduling_set=scheduling_set) + scheduling_unit_blueprint = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + + # make sure we start with no cycles + project.cycles.all().delete() + + # assert blueprint has correct constraints, and is schedulable + self.assertEqual('fixed_time', scheduling_unit_blueprint.scheduling_constraints_doc['scheduler']) + self.assertEqual(at.isoformat(), scheduling_unit_blueprint.scheduling_constraints_doc['time']['at']) + self.assertEqual(scheduling_unit_blueprint.status.value, models.SchedulingUnitStatus.Choices.SCHEDULABLE.value) + + # start the dynamic_scheduling_service, which includes eventmessage handling, and a running scheduler + with BusListenerJanitor(create_dynamic_scheduling_service()): + # at scheduler startup, a full scheduling run is done. + # wait and poll until unit is marked as unschedulable, or timeout + scheduling_unit_blueprint = wait_for_scheduling_unit_blueprint_status(scheduling_unit_blueprint.id, models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value) + + # add the project to a cycle + cycle = models.Cycle.objects.create(**Cycle_test_data(start=at - timedelta(days=7), + stop=at + timedelta(days=7))) + project.cycles.add(cycle) + + # now wait and poll until unit is scheduled, or timeout + scheduling_unit_blueprint = wait_for_scheduling_unit_blueprint_status(scheduling_unit_blueprint.id, models.SchedulingUnitStatus.Choices.SCHEDULED.value) + + # is it scheduled at the right time? + self.assertEqual(models.SchedulingUnitStatus.Choices.SCHEDULED.value, scheduling_unit_blueprint.status.value) + self.assertEqual(at, scheduling_unit_blueprint.scheduled_start_time) + + # remove the project from the cycle again + project.cycles.remove(cycle) + + # wait and poll until unit is unschedulable, or timeout + scheduling_unit_blueprint = wait_for_scheduling_unit_blueprint_status(scheduling_unit_blueprint.id, models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value) + + # is it unschedulable at the right time? + self.assertEqual(models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value, scheduling_unit_blueprint.status.value) + self.assertEqual(at, scheduling_unit_blueprint.scheduled_start_time) + + def test_constraints_update_event_handling(self): """ Test a simple observation with the 'at' constraint and 'fixed_time' scheduler. @@ -2340,6 +2396,8 @@ class TestDynamicScheduling(BaseDynamicSchedulingTestCase): ''' project = models.Project.objects.create(**Project_test_data(name=str(uuid.uuid4()), auto_ingest=True, project_state=models.ProjectState.objects.get(value=models.ProjectState.Choices.OPENED.value))) + project.cycles.add(models.Cycle.objects.create(**Cycle_test_data(start=datetime.utcnow() - timedelta(days=365), + stop=datetime.utcnow() + timedelta(days=365)))) scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data(project=project)) strategy_template = models.SchedulingUnitObservingStrategyTemplate.get_latest(name="IM HBA - 1 Beam") su_draft = create_scheduling_unit_draft_from_observing_strategy_template(strategy_template, scheduling_set=scheduling_set) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py index 53420f49cdc..68fe179316b 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py @@ -923,16 +923,13 @@ class Project(ProjectPropertyMixin, NamedCommonPK): def earliest_possible_cycle_start_time(self) -> datetime.datetime: '''return the earliest possible start time for this unit's project and cycle(s)''' min_cycle_start = self.cycles.all().aggregate(Min('start'))['start__min'] - return min_cycle_start or datetime.datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) + return min_cycle_start @cached_property def latest_possible_cycle_stop_time(self) -> datetime.datetime: '''return the latest possible start time for this unit's project and cycle(s)''' max_cycle_stop = self.cycles.all().aggregate(Max('stop'))['stop__max'] - if max_cycle_stop: - return max_cycle_stop - return datetime.datetime.utcnow().replace(day=datetime.datetime.today().day, - hour=0, minute=0, second=0, microsecond=0) + datetime.timedelta(days=7) + return max_cycle_stop class ProjectQuota(Model): project = ForeignKey('Project', related_name="quota", on_delete=PROTECT, help_text='Project to wich this quota belongs.') # protected to avoid accidents @@ -1087,7 +1084,9 @@ class SchedulingUnitCommonPropertiesMixin: @cached_property def latest_possible_cycle_start_time(self) -> datetime.datetime: '''return the latest possible start time for this unit's project and cycle(s)''' - return self.latest_possible_cycle_stop_time - self.specified_observation_duration + if self.latest_possible_cycle_stop_time is not None: + return self.latest_possible_cycle_stop_time - self.specified_observation_duration + return None @cached_property def latest_possible_cycle_stop_time(self) -> datetime.datetime: diff --git a/SAS/TMSS/client/lib/tmssbuslistener.py b/SAS/TMSS/client/lib/tmssbuslistener.py index 28f53b0baf8..e807a0eabee 100644 --- a/SAS/TMSS/client/lib/tmssbuslistener.py +++ b/SAS/TMSS/client/lib/tmssbuslistener.py @@ -51,6 +51,7 @@ TMSS_SCHEDULINGCONSTRAINTSWEIGHTFACTOR_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_ TMSS_PROJECT_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'Project.Object' TMSS_PROJECT_STATUS_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'Project.Status' TMSS_PROJECTQUOTAARCHIVELOCATION_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'ProjectQuotaArchiveLocation.Object' +TMSS_PROJECTCYCLES_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'ProjectCycles.Object' TMSS_SUBSYSTEM_STATUS_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'Subsystem.Status' TMSS_SETTING_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'Setting.Object' TMSS_RESERVATION_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'Reservation.Object' @@ -139,6 +140,12 @@ class TMSSEventMessageHandler(AbstractMessageHandler): self.onProjectQuotaArchiveLocationUpdated(**msg.content) elif stripped_subject == 'ProjectQuotaArchiveLocation.Object.Deleted': self.onProjectQuotaArchiveLocationDeleted(**msg.content) + elif stripped_subject == 'ProjectCycles.Object.Created': + self.onProjectCyclesCreated(**msg.content) + elif stripped_subject == 'ProjectCycles.Object.Updated': + self.onProjectCyclesUpdated(**msg.content) + elif stripped_subject == 'ProjectCycles.Object.Deleted': + self.onProjectCyclesDeleted(**msg.content) elif stripped_subject == 'SchedulingUnitBlueprint.Object.Placed.Updated': self.onSchedulingUnitBlueprintPlacedUpdated(**msg.content) elif stripped_subject == 'SchedulingUnitBlueprint.Object.Constraints.Updated': @@ -346,6 +353,23 @@ class TMSSEventMessageHandler(AbstractMessageHandler): ''' pass + def onProjectCyclesCreated(self, id: int): + '''onProjectCyclesCreated is called upon receiving a ProjectCycles.Object.Created message, which is sent when a ProjectCycles was created. + :param id: the TMSS id of the ProjectCycles + ''' + pass + + def onProjectCyclesUpdated(self, id: int): + '''onProjectCyclesUpdated is called upon receiving a ProjectCycles.Object.Updated message, which is sent when a ProjectCycles was created. + :param id: the TMSS id of the ProjectCycles + ''' + pass + + def onProjectCyclesDeleted(self, id: int): + '''onProjectCyclesDeleted is called upon receiving a ProjectCycles.Object.Deleted message, which is sent when a ProjectCycles was created. + :param id: the TMSS id of the ProjectCycles + ''' + pass def onSettingUpdated(self, name: str, value): '''onSettingUpdated is called upon receiving a Setting.Object.Updated message, which is sent when a Setting was updated. -- GitLab