diff --git a/LCS/PyCommon/postgres.py b/LCS/PyCommon/postgres.py index 0dfc460da351b91806ee0f7da477c095b8c7f9d1..d66eb070f51bae93aeab9e9e6c516511d0588e8a 100644 --- a/LCS/PyCommon/postgres.py +++ b/LCS/PyCommon/postgres.py @@ -29,6 +29,7 @@ from queue import Queue, Empty from datetime import datetime, timedelta import collections import time +from typing import Union import re import select import psycopg2 @@ -48,29 +49,35 @@ def truncate_notification_channel_name(notification_channel_name: str) -> str: return truncated_notification -def makePostgresNotificationQueries(schema, table, action, column_name=None, quote_column_value:bool=True, id_column_name='id', quote_id_value:bool=False): +def makePostgresNotificationQueries(schema, table, action, columns: list=None, quoted_columns: list=None, id_column_name: str='id', quote_id_value: bool=False): action = action.upper() if action not in ('INSERT', 'UPDATE', 'DELETE'): raise ValueError('''trigger_type '%s' not in ('INSERT', 'UPDATE', 'DELETE')''' % action) change_name = '''{table}_{action}'''.format(table=table, action=action) - if column_name is not None and column_name != id_column_name: - change_name += '_' + column_name + if columns is not None: + change_name += '_' + '_'.join([column_name for column_name in columns]) + if quoted_columns is not None: + change_name += '_' + '_'.join([column_name for column_name in quoted_columns]) function_name = '''NOTIFY_{change_name}'''.format(change_name=change_name) # build query string selecting the id:value (and col:col_value) into a json formatted object string - select_payload = '''SELECT '{po}"{id_column_name}": {id_value_quote}' || CAST({new_or_old}.{id_column_name} AS text) || '{id_value_quote}{column_key_value}{pc}' INTO payload;'''.format( + select_payload = '''SELECT '{po}"{id_column_name}": {id_value_quote}' || CAST({new_or_old}.{id_column_name} AS text) || '{id_value_quote}{columns_key_values}{quoted_columns_key_values}{pc}' INTO payload;'''.format( po="{", id_column_name=id_column_name, id_value_quote='"' if quote_id_value else '', new_or_old='OLD' if action=='DELETE' else 'NEW', - column_key_value=''', "{column_name}": {column_value_quote}' || CAST(NEW.{column_name} AS text) || '{column_value_quote}'''.format( - column_name=column_name, - column_value_quote='"' if quote_column_value else '') if column_name else '', + columns_key_values=', ' + ', '.join('''"{column_name}": ' || CAST(NEW.{column_name} AS text) || ' '''.format(column_name=column_name) for column_name in columns) if columns else '', + quoted_columns_key_values=', ' + ', '.join('''"{column_name}": "' || CAST(NEW.{column_name} AS text) || '"'''.format(column_name=column_name) for column_name in quoted_columns) if quoted_columns else '', pc = "}") if action == 'UPDATE': - begin_update_check = 'IF ROW(NEW.{what}) IS DISTINCT FROM ROW(OLD.{what}) THEN'.format(what='*' if column_name is None or column_name == id_column_name else column_name) + begin_update_check = 'IF ' + if columns or quoted_columns: + begin_update_check += ' OR '.join('ROW(NEW.{what}) IS DISTINCT FROM ROW(OLD.{what})'.format(what=column_name) for column_name in ((columns or []) + (quoted_columns or []))) + else: + begin_update_check += ' ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*)' + begin_update_check += ' THEN' end_update_check = 'END IF;' else: begin_update_check = '' diff --git a/SAS/TMSS/backend/services/postgres_listener/lib/postgres_listener.py b/SAS/TMSS/backend/services/postgres_listener/lib/postgres_listener.py index f65eb9c9cb14dc265c0459ed223c7bec3d4fd3f8..4f411fdb781f49ce9ee0cbca379e6153598763d7 100644 --- a/SAS/TMSS/backend/services/postgres_listener/lib/postgres_listener.py +++ b/SAS/TMSS/backend/services/postgres_listener/lib/postgres_listener.py @@ -63,7 +63,7 @@ class TMSSPGListener(PostgresListener): self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subtask', 'delete')) self.subscribe('tmssapp_subtask_delete', self.onSubTaskDeleted) - self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subtask', 'update', column_name='state_id', quote_column_value=True)) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subtask', 'update', quoted_columns=['state_id'])) self.subscribe('tmssapp_subtask_update_state_id', self.onSubTaskStateUpdated) @@ -80,10 +80,10 @@ class TMSSPGListener(PostgresListener): self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'delete')) self.subscribe('tmssapp_taskblueprint_delete', self.onTaskBlueprintDeleted) - self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'update', column_name='status_id', quote_column_value=True)) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'update', quoted_columns=['status_id'])) self.subscribe('tmssapp_taskblueprint_update_status_id', self.onTaskBlueprintStatusUpdated) - self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'update', column_name='output_pinned', quote_column_value=False)) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'update', columns=['output_pinned'])) self.subscribe('tmssapp_taskblueprint_update_output_pinned', self.onTaskBlueprintOutputPinningUpdated) @@ -108,22 +108,22 @@ class TMSSPGListener(PostgresListener): self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'delete')) self.subscribe('tmssapp_schedulingunitblueprint_delete', self.onSchedulingUnitBlueprintDeleted) - self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', column_name='ingest_permission_granted_since', quote_column_value=True)) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', quoted_columns=['ingest_permission_granted_since'])) self.subscribe('tmssapp_schedulingunitblueprint_update_ingest_permission_granted_since', self.onSchedulingUnitBlueprintIngestPermissionGranted) - self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', column_name='scheduling_constraints_doc', quote_column_value=False)) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', columns=['scheduling_constraints_doc'])) self.subscribe('tmssapp_schedulingunitblueprint_update_scheduling_constraints_doc', self.onSchedulingUnitBlueprintConstraintsUpdated) - self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', column_name='status_id', quote_column_value=True)) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', quoted_columns=['status_id'])) self.subscribe('tmssapp_schedulingunitblueprint_update_status_id', self.onSchedulingUnitBlueprintStatusUpdated) - self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', column_name='priority_queue_id', quote_column_value=True)) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', quoted_columns=['priority_queue_id'])) self.subscribe('tmssapp_schedulingunitblueprint_update_priority_queue_id', self.onSchedulingUnitBlueprintPriorityQueueUpdated) - self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', column_name='rank', quote_column_value=False)) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', columns=['rank'])) self.subscribe('tmssapp_schedulingunitblueprint_update_rank', self.onSchedulingUnitBlueprintRankUpdated) - self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', column_name='placed', quote_column_value=False)) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', columns=['placed'])) self.subscribe('tmssapp_schedulingunitblueprint_update_placed', self.onSchedulingUnitBlueprintPlacedUpdated) @@ -144,11 +144,11 @@ class TMSSPGListener(PostgresListener): self.subscribe('tmssapp_schedulingunitdraft_delete', self.onSchedulingUnitDraftDeleted) # SubsystemStatus - self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subsystem', 'update', id_column_name='name', quote_id_value=True, column_name='status_id', quote_column_value=True)) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subsystem', 'update', id_column_name='name', quote_id_value=True, quoted_columns=['status_id'])) self.subscribe('tmssapp_subsystem_update_status_id', self.onSubsystemStatusUpdated) # Setting - self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_setting', 'update', id_column_name='name_id', quote_id_value=True, column_name='value', quote_column_value=False)) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_setting', 'update', id_column_name='name_id', quote_id_value=True, columns=['value'])) self.subscribe('tmssapp_setting_update_value', self.onSettingUpdated) # Project @@ -161,10 +161,10 @@ class TMSSPGListener(PostgresListener): self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_project', 'delete', id_column_name="name", quote_id_value=True)) self.subscribe('tmssapp_project_delete', self.onProjectDeleted) - self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_project', 'update', id_column_name="name", quote_id_value=True, column_name='project_state_id', quote_column_value=True)) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_project', 'update', id_column_name="name", quote_id_value=True, quoted_columns=['project_state_id'])) self.subscribe('tmssapp_project_update_project_state_id', self.onProjectStatusUpdated) - self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_project', 'update', id_column_name="name", quote_id_value=True, column_name='rank', quote_column_value=False)) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_project', 'update', id_column_name="name", quote_id_value=True, columns=['rank'])) self.subscribe('tmssapp_project_update_rank', self.onProjectRankUpdated) # ProjectQuotaArchiveLocation