diff --git a/LCS/PyCommon/postgres.py b/LCS/PyCommon/postgres.py index d66eb070f51bae93aeab9e9e6c516511d0588e8a..4e5b2a02b7429377968ba0b24de67a877182a9da 100644 --- a/LCS/PyCommon/postgres.py +++ b/LCS/PyCommon/postgres.py @@ -61,15 +61,26 @@ def makePostgresNotificationQueries(schema, table, action, columns: list=None, q change_name += '_' + '_'.join([column_name for column_name in quoted_columns]) function_name = '''NOTIFY_{change_name}'''.format(change_name=change_name) - # build query string selecting the id:value (and col:col_value) into a json formatted object string - select_payload = '''SELECT '{po}"{id_column_name}": {id_value_quote}' || CAST({new_or_old}.{id_column_name} AS text) || '{id_value_quote}{columns_key_values}{quoted_columns_key_values}{pc}' INTO payload;'''.format( - po="{", - id_column_name=id_column_name, - id_value_quote='"' if quote_id_value else '', - new_or_old='OLD' if action=='DELETE' else 'NEW', - columns_key_values=', ' + ', '.join('''"{column_name}": ' || CAST(NEW.{column_name} AS text) || ' '''.format(column_name=column_name) for column_name in columns) if columns else '', - quoted_columns_key_values=', ' + ', '.join('''"{column_name}": "' || CAST(NEW.{column_name} AS text) || '"'''.format(column_name=column_name) for column_name in quoted_columns) if quoted_columns else '', - pc = "}") + def select_column_subquery(column_name: str, new_or_old: str, quote: bool) -> str: + return '''"{column_name}": {quote_or_not}' || CASE WHEN {new_or_old}.{column_name} IS NULL THEN 'NULL' ELSE CAST({new_or_old}.{column_name} AS text) END || '{quote_or_not}'''.format(column_name=column_name, new_or_old=new_or_old, quote_or_not='"' if quote else '') + + def select_columns_subquery(column_names: list, new_or_old: str, quote: bool) -> str: + return ', '.join(select_column_subquery(column_name, new_or_old, quote) for column_name in column_names or []) + + select_payload = '''SELECT '{ ''' + select_columns_subquery([id_column_name], new_or_old='OLD' if action=='DELETE' else 'NEW', quote=quote_id_value) + select_payload += ''', "new": { ''' + if action in ('INSERT', 'UPDATE'): + select_payload += select_columns_subquery(columns, new_or_old='NEW', quote=False) + if columns and quoted_columns: + select_payload += ', ' + select_payload += select_columns_subquery(quoted_columns, new_or_old='NEW', quote=True) + select_payload += ''' }, "old": { ''' + if action in ('DELETE', 'UPDATE'): + select_payload += select_columns_subquery(columns, new_or_old='OLD', quote=False) + if columns and quoted_columns: + select_payload += ', ' + select_payload += select_columns_subquery(quoted_columns, new_or_old='OLD', quote=True) + select_payload += ''' } }' INTO payload; ''' if action == 'UPDATE': begin_update_check = 'IF ' diff --git a/SAS/TMSS/backend/services/postgres_listener/lib/postgres_listener.py b/SAS/TMSS/backend/services/postgres_listener/lib/postgres_listener.py index 4f411fdb781f49ce9ee0cbca379e6153598763d7..395ea36da6c90580043fc485b16ca1720a9b596c 100644 --- a/SAS/TMSS/backend/services/postgres_listener/lib/postgres_listener.py +++ b/SAS/TMSS/backend/services/postgres_listener/lib/postgres_listener.py @@ -20,6 +20,8 @@ from optparse import OptionParser, OptionGroup import json +import dateutil.parser + from lofar.common.postgres import PostgresListener, makePostgresNotificationQueries from lofar.messaging.messagebus import ToBus from lofar.sas.tmss.client.tmssbuslistener import * @@ -195,8 +197,8 @@ class TMSSPGListener(PostgresListener): self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_reservation', 'update')) self.subscribe('tmssapp_reservation_update', self.onReservationUpdated) - self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_reservation', 'delete')) - self.subscribe('tmssapp_reservation_delete', self.onReservationDeleted) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_reservation', 'delete', quoted_columns=['start_time', 'stop_time'])) + self.subscribe('tmssapp_reservation_delete_start_time_stop_time', self.onReservationDeleted) return super().start() @@ -206,11 +208,17 @@ class TMSSPGListener(PostgresListener): self.event_bus.exchange, self.event_bus.broker, self._dbcreds.stringWithHiddenPassword()) self.event_bus.close() - def _sendNotification(self, subject, contentDict): + def _sendNotification(self, subject, contentDict, delete_new_old_keys: bool=True): try: if isinstance(contentDict, str): contentDict = json.loads(contentDict) + if delete_new_old_keys: + if 'new' in contentDict: + del contentDict['new'] + if 'old' in contentDict: + del contentDict['old'] + msg = EventMessage(subject=subject, content=contentDict) logger.info('Sending %s to %s: %s', subject, self.event_bus.exchange, single_line_with_single_spaces(contentDict)) @@ -228,9 +236,9 @@ class TMSSPGListener(PostgresListener): self._sendNotification(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Deleted', payload) def onSubTaskStateUpdated(self, payload = None): - payload_dict = json.loads(payload) - subtask_id = payload_dict['id'] - subtask_state = payload_dict['state_id'] + payload = json.loads(payload) + subtask_id = payload['id'] + subtask_state = payload['new']['state_id'] self._sendNotification(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.'+subtask_state.capitalize(), {'id': subtask_id, 'status': subtask_state}) @@ -241,9 +249,9 @@ class TMSSPGListener(PostgresListener): self._sendNotification (TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', payload) def onTaskBlueprintStatusUpdated(self, payload = None): - payload_dict = json.loads(payload) - taskblueprint_id = payload_dict['id'] - taskblueprint_state = payload_dict['status_id'] + payload = json.loads(payload) + taskblueprint_id = payload['id'] + taskblueprint_state = payload['new']['status_id'] self._sendNotification(TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX+'.'+taskblueprint_state.capitalize(), {'id': taskblueprint_id, 'status': taskblueprint_state}) @@ -251,14 +259,16 @@ class TMSSPGListener(PostgresListener): self._sendNotification(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Deleted', payload) def onTaskBlueprintOutputPinningUpdated(self, payload = None): - self._sendNotification(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.OutputPinningUpdated', payload) - - if isinstance(payload, str): - payload = json.loads(payload) + payload = json.loads(payload) + taskblueprint_id = payload['id'] + output_pinned = payload['new']['output_pinned'] + self._sendNotification(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.OutputPinningUpdated', + {'id': taskblueprint_id, 'output_pinned': output_pinned}) from lofar.sas.tmss.tmss.tmssapp.models import TaskBlueprint - task_blueprint = TaskBlueprint.objects.get(id=payload['id']) - self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', {'id': task_blueprint.scheduling_unit_blueprint.id}) + task_blueprint = TaskBlueprint.objects.get(id=taskblueprint_id) + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', + {'id': task_blueprint.scheduling_unit_blueprint.id, 'output_pinned': output_pinned}) def onTaskDraftInserted(self, payload = None): self._sendNotification(TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX+'.Created', payload) @@ -278,28 +288,44 @@ class TMSSPGListener(PostgresListener): def onSchedulingUnitBlueprintStatusUpdated(self, payload = None): payload_dict = json.loads(payload) sub_id = payload_dict['id'] - sub_state = payload_dict['status_id'] + sub_state = payload_dict['new']['status_id'] self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.'+sub_state.capitalize(), {'id': sub_id, 'status': sub_state}) def onSchedulingUnitBlueprintIngestPermissionGranted(self, payload=None): - self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX + '.IngestPermissionGranted', payload) + payload_dict = json.loads(payload) + id = payload_dict['id'] + ingest_permission_granted_since = payload_dict['new']['ingest_permission_granted_since'] + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX + '.IngestPermissionGranted', + {'id': id, 'ingest_permission_granted_since': ingest_permission_granted_since}) def onSchedulingUnitBlueprintRankUpdated(self, payload=None): - self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX + '.Rank.Updated', payload) + payload_dict = json.loads(payload) + id = payload_dict['id'] + rank = payload_dict['new']['rank'] + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX + '.Rank.Updated', + {'id': id, 'rank': rank}) def onSchedulingUnitBlueprintPlacedUpdated(self, payload=None): - self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX + '.Placed.Updated', payload) + payload_dict = json.loads(payload) + id = payload_dict['id'] + placed = payload_dict['new']['placed'] + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX + '.Placed.Updated', + {'id': id, 'placed': placed}) def onSchedulingUnitBlueprintPriorityQueueUpdated(self, payload=None): payload_dict = json.loads(payload) id = payload_dict['id'] - priority_queue = payload_dict['priority_queue_id'] + priority_queue = payload_dict['new']['priority_queue_id'] self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX + '.PriorityQueue.Updated', {'id': id, 'priority_queue': priority_queue}) def onSchedulingUnitBlueprintConstraintsUpdated(self, payload=None): - self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX + '.Constraints.Updated', payload) + payload_dict = json.loads(payload) + id = payload_dict['id'] + scheduling_constraints_doc = payload_dict['new']['scheduling_constraints_doc'] + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX + '.Constraints.Updated', + {'id': id, 'scheduling_constraints_doc': scheduling_constraints_doc}) def onSchedulingUnitBlueprintDeleted(self, payload = None): self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Deleted', payload) @@ -328,7 +354,7 @@ class TMSSPGListener(PostgresListener): def onProjectStatusUpdated(self, payload = None): payload_dict = json.loads(payload) project_name = payload_dict['name'] - project_status = payload_dict['project_state_id'] + project_status = payload_dict['new']['project_state_id'] self._sendNotification(TMSS_PROJECT_STATUS_EVENT_PREFIX+'.'+project_status.capitalize(), {'name': project_name, 'status': project_status}) @@ -358,13 +384,13 @@ class TMSSPGListener(PostgresListener): def onSubsystemStatusUpdated(self, payload = None): payload = json.loads(payload) - payload['status'] = payload.pop('status_id') + payload['status'] = payload['new']['status_id'] self._sendNotification(TMSS_SUBSYSTEM_STATUS_EVENT_PREFIX+'.Updated', payload) def onSettingUpdated(self, payload = None): payload = json.loads(payload) payload['name'] = payload['name_id'] - del payload['name_id'] + payload['value'] = payload['new']['value'] self._sendNotification(TMSS_SETTING_OBJECT_EVENT_PREFIX+'.Updated', payload) def onReservationInserted(self, payload = None): @@ -374,6 +400,12 @@ class TMSSPGListener(PostgresListener): self._sendNotification(TMSS_RESERVATION_OBJECT_EVENT_PREFIX+'.Updated', payload) def onReservationDeleted(self, payload = None): + payload = json.loads(payload) + payload['start_time'] = dateutil.parser.parse(payload['old']['start_time'], ignoretz=True) + try: + payload['stop_time'] = dateutil.parser.parse(payload['old']['stop_time'], ignoretz=True) + except: + payload['stop_time'] = None self._sendNotification(TMSS_RESERVATION_OBJECT_EVENT_PREFIX+'.Deleted', payload) diff --git a/SAS/TMSS/backend/services/postgres_listener/test/t_postgres_listener_service.py b/SAS/TMSS/backend/services/postgres_listener/test/t_postgres_listener_service.py index d3cc342dff4258d01c260982917ec80028d2c99c..a51a9370b94a65513e34bd0ef72752188d17d6ae 100755 --- a/SAS/TMSS/backend/services/postgres_listener/test/t_postgres_listener_service.py +++ b/SAS/TMSS/backend/services/postgres_listener/test/t_postgres_listener_service.py @@ -17,6 +17,7 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. import threading +import time import unittest import uuid @@ -87,7 +88,14 @@ class TestSubtaskSchedulingService(unittest.TestCase): self.event_received.clear() logger.info("TestTMSSPGListener detected db change: %s %s", subject, single_line_with_single_spaces(contentDict)) self.subjects.append(subject) - self.contentDicts.append(json.loads(contentDict) if isinstance(contentDict, str) else contentDict) + + contentDict = json.loads(contentDict) if isinstance(contentDict, str) else contentDict + if 'new' in contentDict: + del contentDict['new'] + if 'old' in contentDict: + del contentDict['old'] + self.contentDicts.append(contentDict) + self.event_received.set() # create and start the service (the object under test) @@ -208,6 +216,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): # delete subtask, use direct http delete request on rest api requests.delete(subtask['url'], auth=self.test_data_creator.auth) + time.sleep(1) # sync and check subtask deleted with service.lock: diff --git a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py index b3d3d020f9e95fccae7d32f544edeb94a3b945fc..aa8a4f1b3d673fae7273de848e0584df950736ab 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py @@ -1166,11 +1166,19 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): logger.exception(e) - def onReservationDeleted(self, id: int): - # maybe some unschedulable/blocked units can use the spot that was used by the reservation - # mark them all schedulable, and do a scheduling round to see which ones can be scheduled - logger.info("reservation id=%s was deleted. checking/updating (un)schedulablity... as a result a new scheduling round might be triggered if needed.", id) + def onReservationDeleted(self, id: int, start_time: datetime, stop_time: datetime): + logger.info("reservation id=%s was deleted. unscheduling units in the old reservation window: ['%s', '%s'].", id, start_time, stop_time) + scheduled_units_under_deleted_reservation = get_scheduled_scheduling_units(start_time, stop_time) + for unit in scheduled_units_under_deleted_reservation.all(): + try: + unschedule_subtasks_in_scheduling_unit_blueprint(unit) + except Exception as e: + logger.error(e) + # also mark the unschedulable units as schedulable again... mark_unschedulable_scheduling_units_for_active_projects_schedulable() + # ... and let the scheduler run + self.scheduler.trigger() + diff --git a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py index 0eea390589b7f4cef75f7a99237ffe1d7cc13bd9..80c2c5c0da3282391d8d3046dadb0a46625cd08f 100755 --- a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py @@ -1228,6 +1228,41 @@ class TestFixedTimeScheduling(BaseDynamicSchedulingTestCase): self.assertEqual(models.SchedulingUnitStatus.Choices.SCHEDULED.value, scheduling_unit_blueprint.status.value) self.assertEqual(at, scheduling_unit_blueprint.scheduled_start_time) + # create an new reservation overlapping with only one station + reservation_template = models.ReservationTemplate.objects.first() # there is only one + reservation_doc = reservation_template.get_default_json_document_for_schema() + reservation_doc['resources']['stations'] = ['CS002'] + reservation_doc['schedulability']['fixed_time'] = False + reservation = models.Reservation.objects.create(start_time=scheduling_unit_blueprint.scheduled_start_time, + stop_time=scheduling_unit_blueprint.scheduled_observation_stop_time, + specifications_doc=reservation_doc, + specifications_template=reservation_template, + description='') + + from time import sleep + sleep(2) #idealy we should not sleep in tests. + # now wait and poll until unit is unscheduled, and scheduled again, or timeout + scheduling_unit_blueprint = wait_for_scheduling_unit_blueprint_status(scheduling_unit_blueprint.id, models.SchedulingUnitStatus.Choices.SCHEDULED.value) + + # is it scheduled at the right time? and is reserved station CS002 taken out? + self.assertEqual(models.SchedulingUnitStatus.Choices.SCHEDULED.value, scheduling_unit_blueprint.status.value) + self.assertEqual(at, scheduling_unit_blueprint.scheduled_start_time) + self.assertEqual(['CS003'], scheduling_unit_blueprint.main_observation_used_stations) + self.assertEqual(['CS002', 'CS003'], scheduling_unit_blueprint.main_observation_specified_stations) + + # remove the reservation as a whole + reservation.delete() # should trigger the scheduler event handler.... + + # now wait and poll until unit is unscheduled, and scheduled again, or timeout + sleep(2) #idealy we should not sleep in tests. + scheduling_unit_blueprint = wait_for_scheduling_unit_blueprint_status(scheduling_unit_blueprint.id, models.SchedulingUnitStatus.Choices.SCHEDULED.value) + + # is it scheduled at the right time? and is free station CS002 taken in again? + self.assertEqual(models.SchedulingUnitStatus.Choices.SCHEDULED.value, scheduling_unit_blueprint.status.value) + self.assertEqual(at, scheduling_unit_blueprint.scheduled_start_time) + self.assertEqual(['CS002', 'CS003'], scheduling_unit_blueprint.main_observation_used_stations) + self.assertEqual(['CS002', 'CS003'], scheduling_unit_blueprint.main_observation_specified_stations) + def test_can_schedule_all_observing_strategy_templates_with_default_constraints(self): '''''' diff --git a/SAS/TMSS/client/lib/tmssbuslistener.py b/SAS/TMSS/client/lib/tmssbuslistener.py index e807a0eabeecb8f5b36e0fa2ded8ed423c1703df..282e6d98abe4da8a329a07f723e4aac27404819b 100644 --- a/SAS/TMSS/client/lib/tmssbuslistener.py +++ b/SAS/TMSS/client/lib/tmssbuslistener.py @@ -410,7 +410,7 @@ class TMSSEventMessageHandler(AbstractMessageHandler): ''' pass - def onReservationDeleted(self, id: int): + def onReservationDeleted(self, id: int, start_time, stop_time): '''onReservationDeleted is called upon receiving a Reservation.Object.Deleted message, which is sent when a Reservation was created. ''' pass