Skip to content
Snippets Groups Projects
Commit eff8c74d authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-2492: refactored makePostgresNotificationQueries such that we can yield...

TMSS-2492: refactored makePostgresNotificationQueries such that we can yield more columns into the payload
parent 7a52c34d
No related branches found
No related tags found
2 merge requests!1161TMSS-2492,!1160extract labeled toggle, move filter elements to folder, fix tests and warnings
......@@ -29,6 +29,7 @@ from queue import Queue, Empty
from datetime import datetime, timedelta
import collections
import time
from typing import Union
import re
import select
import psycopg2
......@@ -48,29 +49,35 @@ def truncate_notification_channel_name(notification_channel_name: str) -> str:
return truncated_notification
def makePostgresNotificationQueries(schema, table, action, column_name=None, quote_column_value:bool=True, id_column_name='id', quote_id_value:bool=False):
def makePostgresNotificationQueries(schema, table, action, columns: list=None, quoted_columns: list=None, id_column_name: str='id', quote_id_value: bool=False):
action = action.upper()
if action not in ('INSERT', 'UPDATE', 'DELETE'):
raise ValueError('''trigger_type '%s' not in ('INSERT', 'UPDATE', 'DELETE')''' % action)
change_name = '''{table}_{action}'''.format(table=table, action=action)
if column_name is not None and column_name != id_column_name:
change_name += '_' + column_name
if columns is not None:
change_name += '_' + '_'.join([column_name for column_name in columns])
if quoted_columns is not None:
change_name += '_' + '_'.join([column_name for column_name in quoted_columns])
function_name = '''NOTIFY_{change_name}'''.format(change_name=change_name)
# build query string selecting the id:value (and col:col_value) into a json formatted object string
select_payload = '''SELECT '{po}"{id_column_name}": {id_value_quote}' || CAST({new_or_old}.{id_column_name} AS text) || '{id_value_quote}{column_key_value}{pc}' INTO payload;'''.format(
select_payload = '''SELECT '{po}"{id_column_name}": {id_value_quote}' || CAST({new_or_old}.{id_column_name} AS text) || '{id_value_quote}{columns_key_values}{quoted_columns_key_values}{pc}' INTO payload;'''.format(
po="{",
id_column_name=id_column_name,
id_value_quote='"' if quote_id_value else '',
new_or_old='OLD' if action=='DELETE' else 'NEW',
column_key_value=''', "{column_name}": {column_value_quote}' || CAST(NEW.{column_name} AS text) || '{column_value_quote}'''.format(
column_name=column_name,
column_value_quote='"' if quote_column_value else '') if column_name else '',
columns_key_values=', ' + ', '.join('''"{column_name}": ' || CAST(NEW.{column_name} AS text) || ' '''.format(column_name=column_name) for column_name in columns) if columns else '',
quoted_columns_key_values=', ' + ', '.join('''"{column_name}": "' || CAST(NEW.{column_name} AS text) || '"'''.format(column_name=column_name) for column_name in quoted_columns) if quoted_columns else '',
pc = "}")
if action == 'UPDATE':
begin_update_check = 'IF ROW(NEW.{what}) IS DISTINCT FROM ROW(OLD.{what}) THEN'.format(what='*' if column_name is None or column_name == id_column_name else column_name)
begin_update_check = 'IF '
if columns or quoted_columns:
begin_update_check += ' OR '.join('ROW(NEW.{what}) IS DISTINCT FROM ROW(OLD.{what})'.format(what=column_name) for column_name in ((columns or []) + (quoted_columns or [])))
else:
begin_update_check += ' ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*)'
begin_update_check += ' THEN'
end_update_check = 'END IF;'
else:
begin_update_check = ''
......
......@@ -63,7 +63,7 @@ class TMSSPGListener(PostgresListener):
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subtask', 'delete'))
self.subscribe('tmssapp_subtask_delete', self.onSubTaskDeleted)
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subtask', 'update', column_name='state_id', quote_column_value=True))
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subtask', 'update', quoted_columns=['state_id']))
self.subscribe('tmssapp_subtask_update_state_id', self.onSubTaskStateUpdated)
......@@ -80,10 +80,10 @@ class TMSSPGListener(PostgresListener):
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'delete'))
self.subscribe('tmssapp_taskblueprint_delete', self.onTaskBlueprintDeleted)
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'update', column_name='status_id', quote_column_value=True))
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'update', quoted_columns=['status_id']))
self.subscribe('tmssapp_taskblueprint_update_status_id', self.onTaskBlueprintStatusUpdated)
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'update', column_name='output_pinned', quote_column_value=False))
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'update', columns=['output_pinned']))
self.subscribe('tmssapp_taskblueprint_update_output_pinned', self.onTaskBlueprintOutputPinningUpdated)
......@@ -108,22 +108,22 @@ class TMSSPGListener(PostgresListener):
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'delete'))
self.subscribe('tmssapp_schedulingunitblueprint_delete', self.onSchedulingUnitBlueprintDeleted)
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', column_name='ingest_permission_granted_since', quote_column_value=True))
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', quoted_columns=['ingest_permission_granted_since']))
self.subscribe('tmssapp_schedulingunitblueprint_update_ingest_permission_granted_since', self.onSchedulingUnitBlueprintIngestPermissionGranted)
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', column_name='scheduling_constraints_doc', quote_column_value=False))
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', columns=['scheduling_constraints_doc']))
self.subscribe('tmssapp_schedulingunitblueprint_update_scheduling_constraints_doc', self.onSchedulingUnitBlueprintConstraintsUpdated)
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', column_name='status_id', quote_column_value=True))
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', quoted_columns=['status_id']))
self.subscribe('tmssapp_schedulingunitblueprint_update_status_id', self.onSchedulingUnitBlueprintStatusUpdated)
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', column_name='priority_queue_id', quote_column_value=True))
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', quoted_columns=['priority_queue_id']))
self.subscribe('tmssapp_schedulingunitblueprint_update_priority_queue_id', self.onSchedulingUnitBlueprintPriorityQueueUpdated)
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', column_name='rank', quote_column_value=False))
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', columns=['rank']))
self.subscribe('tmssapp_schedulingunitblueprint_update_rank', self.onSchedulingUnitBlueprintRankUpdated)
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', column_name='placed', quote_column_value=False))
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update', columns=['placed']))
self.subscribe('tmssapp_schedulingunitblueprint_update_placed', self.onSchedulingUnitBlueprintPlacedUpdated)
......@@ -144,11 +144,11 @@ class TMSSPGListener(PostgresListener):
self.subscribe('tmssapp_schedulingunitdraft_delete', self.onSchedulingUnitDraftDeleted)
# SubsystemStatus
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subsystem', 'update', id_column_name='name', quote_id_value=True, column_name='status_id', quote_column_value=True))
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subsystem', 'update', id_column_name='name', quote_id_value=True, quoted_columns=['status_id']))
self.subscribe('tmssapp_subsystem_update_status_id', self.onSubsystemStatusUpdated)
# Setting
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_setting', 'update', id_column_name='name_id', quote_id_value=True, column_name='value', quote_column_value=False))
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_setting', 'update', id_column_name='name_id', quote_id_value=True, columns=['value']))
self.subscribe('tmssapp_setting_update_value', self.onSettingUpdated)
# Project
......@@ -161,10 +161,10 @@ class TMSSPGListener(PostgresListener):
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_project', 'delete', id_column_name="name", quote_id_value=True))
self.subscribe('tmssapp_project_delete', self.onProjectDeleted)
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_project', 'update', id_column_name="name", quote_id_value=True, column_name='project_state_id', quote_column_value=True))
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_project', 'update', id_column_name="name", quote_id_value=True, quoted_columns=['project_state_id']))
self.subscribe('tmssapp_project_update_project_state_id', self.onProjectStatusUpdated)
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_project', 'update', id_column_name="name", quote_id_value=True, column_name='rank', quote_column_value=False))
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_project', 'update', id_column_name="name", quote_id_value=True, columns=['rank']))
self.subscribe('tmssapp_project_update_rank', self.onProjectRankUpdated)
# ProjectQuotaArchiveLocation
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment