Skip to content
Snippets Groups Projects
Merged Jorrit Schaap requested to merge TMSS-2492 into master
11 files
+ 160
289
Compare changes
  • Side-by-side
  • Inline
Files
11
+ 32
14
@@ -29,6 +29,7 @@ from queue import Queue, Empty
from datetime import datetime, timedelta
import collections
import time
from typing import Union
import re
import select
import psycopg2
@@ -48,29 +49,46 @@ def truncate_notification_channel_name(notification_channel_name: str) -> str:
return truncated_notification
def makePostgresNotificationQueries(schema, table, action, column_name=None, quote_column_value:bool=True, id_column_name='id', quote_id_value:bool=False):
def makePostgresNotificationQueries(schema, table, action, columns: list=None, quoted_columns: list=None, id_column_name: str='id', quote_id_value: bool=False):
action = action.upper()
if action not in ('INSERT', 'UPDATE', 'DELETE'):
raise ValueError('''trigger_type '%s' not in ('INSERT', 'UPDATE', 'DELETE')''' % action)
change_name = '''{table}_{action}'''.format(table=table, action=action)
if column_name is not None and column_name != id_column_name:
change_name += '_' + column_name
if columns is not None:
change_name += '_' + '_'.join([column_name for column_name in columns])
if quoted_columns is not None:
change_name += '_' + '_'.join([column_name for column_name in quoted_columns])
function_name = '''NOTIFY_{change_name}'''.format(change_name=change_name)
# build query string selecting the id:value (and col:col_value) into a json formatted object string
select_payload = '''SELECT '{po}"{id_column_name}": {id_value_quote}' || CAST({new_or_old}.{id_column_name} AS text) || '{id_value_quote}{column_key_value}{pc}' INTO payload;'''.format(
po="{",
id_column_name=id_column_name,
id_value_quote='"' if quote_id_value else '',
new_or_old='OLD' if action=='DELETE' else 'NEW',
column_key_value=''', "{column_name}": {column_value_quote}' || CAST(NEW.{column_name} AS text) || '{column_value_quote}'''.format(
column_name=column_name,
column_value_quote='"' if quote_column_value else '') if column_name else '',
pc = "}")
def select_column_subquery(column_name: str, new_or_old: str, quote: bool) -> str:
return '''"{column_name}": {quote_or_not}' || CASE WHEN {new_or_old}.{column_name} IS NULL THEN 'NULL' ELSE CAST({new_or_old}.{column_name} AS text) END || '{quote_or_not}'''.format(column_name=column_name, new_or_old=new_or_old, quote_or_not='"' if quote else '')
def select_columns_subquery(column_names: list, new_or_old: str, quote: bool) -> str:
return ', '.join(select_column_subquery(column_name, new_or_old, quote) for column_name in column_names or [])
select_payload = '''SELECT '{ ''' + select_columns_subquery([id_column_name], new_or_old='OLD' if action=='DELETE' else 'NEW', quote=quote_id_value)
select_payload += ''', "new": { '''
if action in ('INSERT', 'UPDATE'):
select_payload += select_columns_subquery(columns, new_or_old='NEW', quote=False)
if columns and quoted_columns:
select_payload += ', '
select_payload += select_columns_subquery(quoted_columns, new_or_old='NEW', quote=True)
select_payload += ''' }, "old": { '''
if action in ('DELETE', 'UPDATE'):
select_payload += select_columns_subquery(columns, new_or_old='OLD', quote=False)
if columns and quoted_columns:
select_payload += ', '
select_payload += select_columns_subquery(quoted_columns, new_or_old='OLD', quote=True)
select_payload += ''' } }' INTO payload; '''
if action == 'UPDATE':
begin_update_check = 'IF ROW(NEW.{what}) IS DISTINCT FROM ROW(OLD.{what}) THEN'.format(what='*' if column_name is None or column_name == id_column_name else column_name)
begin_update_check = 'IF '
if columns or quoted_columns:
begin_update_check += ' OR '.join('ROW(NEW.{what}) IS DISTINCT FROM ROW(OLD.{what})'.format(what=column_name) for column_name in ((columns or []) + (quoted_columns or [])))
else:
begin_update_check += ' ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*)'
begin_update_check += ' THEN'
end_update_check = 'END IF;'
else:
begin_update_check = ''
Loading