From 3d080ab5ff31cfda9abf13338cb5289a2b49faad Mon Sep 17 00:00:00 2001
From: Jorrit Schaap <schaap@astron.nl>
Date: Mon, 5 Sep 2016 10:11:45 +0000
Subject: [PATCH] Task #9607: changed postgres notification mechanism. Do not
 return full row as json anymore, because the payload is not allowed to be
 larger than 8000 chars. Now, just return on value as payload, usually the id,
 and handle the rest in the PostgresListener subclass

---
 LCS/PyCommon/postgres.py                      |  74 ++---
 .../ResourceAssigner/lib/assignment.py        |  40 +--
 .../ResourceAssigner/lib/schedulechecker.py   |  32 ++-
 .../ResourceAssignmentDatabase/radb.py        |  52 ++--
 .../radbbuslistener.py                        |  44 ++-
 .../radbpglistener.py                         | 142 ++++------
 .../sql/add_notifications.sql                 | 252 +++++++++++-------
 .../sql/create_add_notifications.sql.py       |  23 +-
 .../lib/radbchangeshandler.py                 |  62 +++--
 .../ResourceAssignmentService/rpc.py          |  12 +-
 .../ResourceAssignmentService/service.py      |  14 +-
 11 files changed, 399 insertions(+), 348 deletions(-)

diff --git a/LCS/PyCommon/postgres.py b/LCS/PyCommon/postgres.py
index 2db1316e191..1251ede07ad 100644
--- a/LCS/PyCommon/postgres.py
+++ b/LCS/PyCommon/postgres.py
@@ -33,60 +33,34 @@ import psycopg2.extensions
 
 logger = logging.getLogger(__name__)
 
-def makePostgresNotificationQueries(schema, table, action, view_for_row=None, view_selection_id=None):
+def makePostgresNotificationQueries(schema, table, action, column_name='id'):
     action = action.upper()
     if action not in ('INSERT', 'UPDATE', 'DELETE'):
         raise ValueError('''trigger_type '%s' not in ('INSERT', 'UPDATE', 'DELETE')''' % action)
 
-    if view_for_row and action == 'DELETE':
-        raise ValueError('You cannot use a view for results on action DELETE')
-
-    if view_for_row:
-        change_name = '''{table}_{action}_with_{view_for_row}'''.format(schema=schema,
-                                                                        table=table,
-                                                                        action=action,
-                                                                        view_for_row=view_for_row)
-        function_name = '''NOTIFY_{change_name}'''.format(change_name=change_name)
-        function_sql = '''
-        CREATE OR REPLACE FUNCTION {schema}.{function_name}()
-        RETURNS TRIGGER AS $$
-        DECLARE
-        new_row_from_view {schema}.{view_for_row}%ROWTYPE;
-        BEGIN
-        select * into new_row_from_view from {schema}.{view_for_row} where {view_selection_id} = NEW.id LIMIT 1;
-        PERFORM pg_notify(CAST('{change_name}' AS text),
-        '{{"old":' || {old} || ',"new":' || row_to_json(new_row_from_view)::text || '}}');
-        RETURN NEW;
-        END;
-        $$ LANGUAGE plpgsql;
-        '''.format(schema=schema,
-                   function_name=function_name,
-                   table=table,
-                   action=action,
-                   old='row_to_json(OLD)::text' if action == 'UPDATE' or action == 'DELETE' else '\'null\'',
-                   view_for_row=view_for_row,
-                   view_selection_id=view_selection_id if view_selection_id else 'id',
-                   change_name=change_name.lower())
-    else:
-        change_name = '''{table}_{action}'''.format(table=table, action=action)
-        function_name = '''NOTIFY_{change_name}'''.format(change_name=change_name)
-        function_sql = '''
-        CREATE OR REPLACE FUNCTION {schema}.{function_name}()
-        RETURNS TRIGGER AS $$
-        BEGIN
-        PERFORM pg_notify(CAST('{change_name}' AS text),
-        '{{"old":' || {old} || ',"new":' || {new} || '}}');
-        RETURN {value};
-        END;
-        $$ LANGUAGE plpgsql;
-        '''.format(schema=schema,
-                   function_name=function_name,
-                   table=table,
-                   action=action,
-                   old='row_to_json(OLD)::text' if action == 'UPDATE' or action == 'DELETE' else '\'null\'',
-                   new='row_to_json(NEW)::text' if action == 'UPDATE' or action == 'INSERT' else '\'null\'',
-                   value='OLD' if action == 'DELETE' else 'NEW',
-                   change_name=change_name.lower())
+    change_name = '''{table}_{action}'''.format(table=table, action=action)
+    if column_name != 'id':
+        change_name += '_column_' + column_name
+    function_name = '''NOTIFY_{change_name}'''.format(change_name=change_name)
+    function_sql = '''
+    CREATE OR REPLACE FUNCTION {schema}.{function_name}()
+    RETURNS TRIGGER AS $$
+    DECLARE payload text;
+    BEGIN
+    {begin_update_check}SELECT CAST({column_value} AS text) INTO payload;
+    PERFORM pg_notify(CAST('{change_name}' AS text), payload);{end_update_check}
+    RETURN {value};
+    END;
+    $$ LANGUAGE plpgsql;
+    '''.format(schema=schema,
+                function_name=function_name,
+                table=table,
+                action=action,
+                column_value=('OLD' if action == 'DELETE' else 'NEW') + '.' + column_name,
+                value='OLD' if action == 'DELETE' else 'NEW',
+                change_name=change_name.lower(),
+                begin_update_check='IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN\n' if action == 'UPDATE' else '',
+                end_update_check='\nEND IF;' if action == 'UPDATE' else '')
 
     trigger_name = 'TRIGGER_NOTIFY_%s' % function_name
 
diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py
index f78e06beb3e..f2117648b52 100755
--- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py
+++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py
@@ -135,8 +135,8 @@ class ResourceAssigner():
         clusterIsCEP4 = self.checkClusterIsCEP4(mainParset)
         clusterName = 'CEP4' if clusterIsCEP4 else 'CEP2'
 
-        if clusterIsCEP4:
-            def applySaneStartEndTime():
+        def applySaneStartEndTime():
+            if clusterIsCEP4:
                 startTime = datetime.utcnow() + timedelta(minutes=1)
 
                 maxPredecessorEndTime = self.getMaxPredecessorEndTime(specification_tree)
@@ -160,26 +160,26 @@ class ResourceAssigner():
 
                 return startTime, endTime
 
-            try:
-                startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S')
-                endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S')
-                if startTime < datetime.utcnow():
-                    startTime, endTime = applySaneStartEndTime()
-            except ValueError:
-                logger.warning('cannot parse for start/end time from specification for otdb_id=%s. searching for sane defaults...', otdb_id)
+        try:
+            startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S')
+            endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S')
+            if startTime < datetime.utcnow():
                 startTime, endTime = applySaneStartEndTime()
+        except ValueError:
+            logger.warning('cannot parse for start/end time from specification for otdb_id=%s. searching for sane defaults...', otdb_id)
+            startTime, endTime = applySaneStartEndTime()
 
-            try:
-                # fix for MoM bug introduced before NV's holiday
-                # MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4
-                # so, override it here if needed, and update to otdb
-                processingClusterName = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', '')
-                if processingClusterName != clusterName:
-                    logger.info('overwriting and uploading processingClusterName to otdb from %s to %s for otdb_id=%s',
-                                 processingClusterName, clusterName, otdb_id)
-                    self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': clusterName })
-            except Exception as e:
-                logger.error(e)
+        try:
+            # fix for MoM bug introduced before NV's holiday
+            # MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4
+            # so, override it here if needed, and update to otdb
+            processingClusterName = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', '')
+            if processingClusterName != clusterName:
+                logger.info('overwriting and uploading processingClusterName to otdb from %s to %s for otdb_id=%s',
+                                processingClusterName, clusterName, otdb_id)
+                self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': clusterName })
+        except Exception as e:
+            logger.error(e)
 
         # insert new task and specification in the radb
         # any existing specification and task with same otdb_id will be deleted automatically
diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py
index d309b2a229c..7e6d3371ede 100644
--- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py
+++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py
@@ -53,12 +53,32 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None):
 
             if (task['starttime'] < max_pred_endtime) or (min_start_timestamp and task['starttime'] > min_start_timestamp):
                 shift = max_pred_endtime - task['starttime']
-                logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s from \'%s\' to \'%s\'", task['status'], task['id'], task['otdb_id'], shift, task['starttime'], max_pred_endtime)
-                radbrpc.updateTaskAndResourceClaims(task['id'], starttime=task['starttime']+shift, endtime=task['endtime']+shift)
-                updated_task = radbrpc.getTask(task['id'])
-                if updated_task['status'] != task['status']:
-                    logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change from %s to %s", updated_task['id'], updated_task['otdb_id'], task['status'], updated_task['status'])
-                    #TODO: automatically resolve conflict status by moved pipeline in first free time slot.
+                newStartTime = task['starttime']+shift
+                newEndTime = task['endtime']+shift
+
+                # move pipeline even further ahead in case there are more than 2 overlapping scheduled/queued pipelines
+                while True:
+                    overlapping_pipelines = radbrpc.getTasks(lower_bound=newStartTime, upper_bound=newEndTime, task_type='pipeline', task_status='scheduled', cluster='CEP4')
+                    overlapping_pipelines += radbrpc.getTasks(lower_bound=newStartTime, upper_bound=newEndTime, task_type='pipeline', task_status='queued', cluster='CEP4')
+                    overlapping_pipelines += radbrpc.getTasks(lower_bound=newStartTime, upper_bound=newEndTime, task_type='pipeline', task_status='active', cluster='CEP4')
+                    overlapping_pipelines += radbrpc.getTasks(lower_bound=newStartTime, upper_bound=newEndTime, task_type='pipeline', task_status='completing', cluster='CEP4')
+                    overlapping_pipelines = [pl for pl in overlapping_pipelines if pl['id'] != task['id']]
+
+                    if len(overlapping_pipelines) >= 2:
+                        max_overlapping_pipeline_endtime = max([t['endtime'] for t in overlapping_pipelines])
+                        shift = max_overlapping_pipeline_endtime + timedelta(minutes=1) - task['starttime']
+                        newStartTime = task['starttime']+shift
+                        newEndTime = task['endtime']+shift
+                    else:
+                        break
+
+                if shift != timedelta(seconds=0):
+                    logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s from \'%s\' to \'%s\'", task['status'], task['id'], task['otdb_id'], shift, task['starttime'], newStartTime)
+                    radbrpc.updateTaskAndResourceClaims(task['id'], starttime=newStartTime, endtime=newEndTime)
+                    updated_task = radbrpc.getTask(task['id'])
+                    if updated_task['status'] != task['status']:
+                        logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change from %s to %s", updated_task['id'], updated_task['otdb_id'], task['status'], updated_task['status'])
+                        #TODO: automatically resolve conflict status by moved pipeline in first free time slot.
     except Exception as e:
         logger.error("Error while checking pipeline starttime: %s", e)
 
diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py
index 79126379d7e..7f69d9723fa 100644
--- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py
+++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py
@@ -231,21 +231,24 @@ class RADatabase:
         return tasks
 
 
-    def getTask(self, id=None, mom_id=None, otdb_id=None):
-        '''get a task for either the given (task)id, or for the given mom_id, or for the given otdb_id'''
-        ids = [id, mom_id, otdb_id]
+    def getTask(self, id=None, mom_id=None, otdb_id=None, specification_id=None):
+        '''get a task for either the given (task)id, or for the given mom_id, or for the given otdb_id, or for the given specification_id'''
+        ids = [id, mom_id, otdb_id, specification_id]
         validIds = [x for x in ids if x != None]
 
         if len(validIds) != 1:
-            raise KeyError("Provide one and only one id: id=%s, mom_id=%s, otdb_id=%s" % (id, mom_id, otdb_id))
+            raise KeyError("Provide one and only one id: id=%s, mom_id=%s, otdb_id=%s, specification_id=%s" % (id, mom_id, otdb_id, specification_id))
 
         query = '''SELECT * from resource_allocation.task_view tv '''
-        if id:
+        if id is not None:
             query += '''where tv.id = (%s);'''
-        elif mom_id:
+        elif mom_id is not None:
             query += '''where tv.mom_id = (%s);'''
-        elif otdb_id:
+        elif otdb_id is not None:
             query += '''where tv.otdb_id = (%s);'''
+        elif specification_id is not None:
+            query += '''where tv.specification_id = (%s);'''
+
         result = self._executeQuery(query, validIds, fetch=_FETCH_ONE)
 
         task = dict(result) if result else None
@@ -272,10 +275,10 @@ class RADatabase:
         return task_status, task_type
 
     def insertTask(self, mom_id, otdb_id, task_status, task_type, specification_id, commit=True):
-        if isinstance(mom_id, int) and mom_id <= 0:
+        if isinstance(mom_id, int) and mom_id < 0:
             mom_id = None
 
-        if isinstance(otdb_id, int) and otdb_id <= 0:
+        if isinstance(otdb_id, int) and otdb_id < 0:
             otdb_id = None
 
         logger.info('insertTask mom_id=%s, otdb_id=%s, task_status=%s, task_type=%s, specification_id=%s' %
@@ -358,9 +361,14 @@ class RADatabase:
 
         return self.cursor.rowcount > 0
 
-    def getTaskPredecessorIds(self):
-        query = '''SELECT * from resource_allocation.task_predecessor tp;'''
-        items = list(self._executeQuery(query, fetch=_FETCH_ALL))
+    def getTaskPredecessorIds(self, id=None):
+        query = '''SELECT * from resource_allocation.task_predecessor tp'''
+
+        if id is not None :
+            query += ' WHERE id=%s'
+
+        items = list(self._executeQuery(query, [id] if id is not None else None, fetch=_FETCH_ALL))
+
         predIdDict = {}
         for item in items:
             taskId = item['task_id']
@@ -369,9 +377,14 @@ class RADatabase:
             predIdDict[taskId].append(item['predecessor_id'])
         return predIdDict
 
-    def getTaskSuccessorIds(self):
+    def getTaskSuccessorIds(self, id=None):
         query = '''SELECT * from resource_allocation.task_predecessor tp;'''
-        items = list(self._executeQuery(query, fetch=_FETCH_ALL))
+
+        if id is not None :
+            query += ' WHERE id=%s'
+
+        items = list(self._executeQuery(query, [id] if id is not None else None, fetch=_FETCH_ALL))
+
         succIdDict = {}
         for item in items:
             predId = item['predecessor_id']
@@ -387,11 +400,11 @@ class RADatabase:
         items = list(self._executeQuery(query, [task_id], fetch=_FETCH_ALL))
         return [x['predecessor_id'] for x in items]
 
-    def getTaskSuccessorIdsForTask(self, task_):
+    def getTaskSuccessorIdsForTask(self, task_id):
         query = '''SELECT * from resource_allocation.task_predecessor tp
         WHERE tp.predecessor_id = %s;'''
 
-        items = list(self._executeQuery(query, [task_], fetch=_FETCH_ALL))
+        items = list(self._executeQuery(query, [task_id], fetch=_FETCH_ALL))
         return [x['task_id'] for x in items]
 
     def insertTaskPredecessor(self, task_id, predecessor_id, commit=True):
@@ -1325,7 +1338,8 @@ class RADatabase:
                 if commit:
                     self.commit()
                 return {'inserted': True, 'specification_id': specId, 'task_id': taskId}
-        except:
+        except Exception as e:
+            logger.error(e)
             self.rollback()
 
         return {'inserted': False, 'specification_id': None, 'task_id': None}
@@ -1526,7 +1540,11 @@ if __name__ == '__main__':
     logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword())
 
     db = RADatabase(dbcreds=dbcreds, log_queries=True)
+    st1 = db.insertSpecificationAndTask(0, 0, 'approved', 'observation', datetime.utcnow(), datetime.utcnow() + timedelta(hours=1), 'foo', 'CEP4')
+    st2 = db.insertSpecificationAndTask(1, 1, 'approved', 'pipeline', datetime.utcnow(), datetime.utcnow() + timedelta(hours=1), 'foo', 'CEP4')
 
+    db.insertTaskPredecessor(st2['task_id'], st1['task_id'])
+    exit()
 
     def resultPrint(method):
         print '\n-- ' + str(method.__name__) + ' --'
diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py
index fcf3ff0cf91..7f108839fba 100644
--- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py
+++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py
@@ -61,28 +61,27 @@ class RADBBusListener(AbstractBusListener):
         logger.info("on%s: %s" % (msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' ')))
 
         if msg.subject == '%sTaskUpdated' % self.subject_prefix:
-            self.onTaskUpdated(msg.content.get('old'), msg.content.get('new'))
+            self.onTaskUpdated(msg.content)
         elif msg.subject == '%sTaskInserted' % self.subject_prefix:
-            self.onTaskInserted(msg.content.get('new'))
+            self.onTaskInserted(msg.content)
         elif msg.subject == '%sTaskDeleted' % self.subject_prefix:
-            self.onTaskDeleted(msg.content.get('old'))
+            self.onTaskDeleted(msg.content)
         elif msg.subject == '%sResourceClaimUpdated' % self.subject_prefix:
-            self.onResourceClaimUpdated(msg.content.get('old'), msg.content.get('new'))
+            self.onResourceClaimUpdated(msg.content)
         elif msg.subject == '%sResourceClaimInserted' % self.subject_prefix:
-            self.onResourceClaimInserted(msg.content.get('new'))
+            self.onResourceClaimInserted(msg.content)
         elif msg.subject == '%sResourceClaimDeleted' % self.subject_prefix:
-            self.onResourceClaimDeleted(msg.content.get('old'))
+            self.onResourceClaimDeleted(msg.content)
         elif msg.subject == '%sResourceAvailabilityUpdated' % self.subject_prefix:
-            self.onResourceAvailabilityUpdated(msg.content.get('old'), msg.content.get('new'))
+            self.onResourceAvailabilityUpdated(msg.content)
         elif msg.subject == '%sResourceCapacityUpdated' % self.subject_prefix:
-            self.onResourceCapacityUpdated(msg.content.get('old'), msg.content.get('new'))
+            self.onResourceCapacityUpdated(msg.content)
         else:
             logger.error("RADBBusListener.handleMessage: unknown subject: %s" %str(msg.subject))
 
-    def onTaskUpdated(self, old_task, new_task):
+    def onTaskUpdated(self, updated_task):
         '''onTaskUpdated is called upon receiving a TaskUpdated message.
-        :param old_task: dictionary with the task before the update
-        :param new_task: dictionary with the updated task'''
+        :param updated_task: dictionary with the updated task'''
         pass
 
     def onTaskInserted(self, new_task):
@@ -90,15 +89,14 @@ class RADBBusListener(AbstractBusListener):
         :param new_task: dictionary with the inserted task'''
         pass
 
-    def onTaskDeleted(self, old_task):
+    def onTaskDeleted(self, old_task_id):
         '''onTaskDeleted is called upon receiving a TaskDeleted message.
-        :param old_task: dictionary with the deleted task'''
+        :param old_task_id: id of the deleted task'''
         pass
 
-    def onResourceClaimUpdated(self, old_claim, new_claim):
+    def onResourceClaimUpdated(self, updated_claim):
         '''onResourceClaimUpdated is called upon receiving a ResourceClaimUpdated message.
-        :param old_claim: dictionary with the claim before the update
-        :param new_claim: dictionary with the updated claim'''
+        :param updated_claim: dictionary with the updated claim'''
         pass
 
     def onResourceClaimInserted(self, new_claim):
@@ -106,21 +104,19 @@ class RADBBusListener(AbstractBusListener):
         :param new_claim: dictionary with the inserted claim'''
         pass
 
-    def onResourceClaimDeleted(self, old_claim):
+    def onResourceClaimDeleted(self, old_claim_id):
         '''onResourceClaimDeleted is called upon receiving a ResourceClaimDeleted message.
-        :param old_claim: dictionary with the deleted claim'''
+        :param old_claim_id: id of the deleted claim'''
         pass
 
-    def onResourceAvailabilityUpdated(self, old_availability, new_availability):
+    def onResourceAvailabilityUpdated(self, updated_availability):
         '''onResourceAvailabilityUpdated is called upon receiving a ResourceAvailabilityUpdated message.
-        :param old_availability: dictionary with the resource availability before the update
-        :param new_availability: dictionary with the updated availability'''
+        :param updated_availability: dictionary with the updated availability'''
         pass
 
-    def onResourceCapacityUpdated(self, old_capacity, new_capacity):
+    def onResourceCapacityUpdated(self, updated_capacity):
         '''onResourceCapacityUpdated is called upon receiving a ResourceCapacityUpdated message.
-        :param old_capacity: dictionary with the resource capacity before the update
-        :param new_capacity: dictionary with the updated capacity'''
+        :param updated_capacity: dictionary with the updated capacity'''
         pass
 
 
diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py
index 03065ae1c72..499821a7e8c 100644
--- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py
+++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py
@@ -56,85 +56,67 @@ class RADBPGListener(PostgresListener):
 
         self.rarpc = RARPC(busname=radb_busname, servicename=radb_servicename, broker=broker)
 
-        self.subscribe('task_update_with_task_view', self.onTaskUpdated)
-        self.subscribe('task_insert_with_task_view', self.onTaskInserted)
+        self.subscribe('task_update', self.onTaskUpdated)
+        self.subscribe('task_insert', self.onTaskInserted)
         self.subscribe('task_delete', self.onTaskDeleted)
 
-        self.subscribe('task_predecessor_insert', self.onTaskPredecessorInserted)
-        self.subscribe('task_predecessor_update', self.onTaskPredecessorUpdated)
-        self.subscribe('task_predecessor_delete', self.onTaskPredecessorDeleted)
+        self.subscribe('task_predecessor_insert_column_task_id', self.onTaskPredecessorChanged)
+        self.subscribe('task_predecessor_update_column_task_id', self.onTaskPredecessorChanged)
+        self.subscribe('task_predecessor_delete_column_task_id', self.onTaskPredecessorChanged)
+
+        self.subscribe('task_predecessor_insert_column_predecessor_id', self.onTaskSuccessorChanged)
+        self.subscribe('task_predecessor_update_column_predecessor_id', self.onTaskSuccessorChanged)
+        self.subscribe('task_predecessor_delete_column_predecessor_id', self.onTaskSuccessorChanged)
 
         # when the specification starttime and endtime are updated, then that effects the task as well
-        # so subscribe to specification_update, and use task_view as view_for_row
-        self.subscribe('specification_update_with_task_view', self.onSpecificationUpdated)
+        self.subscribe('specification_update', self.onSpecificationUpdated)
 
-        self.subscribe('resource_claim_update_with_resource_claim_view', self.onResourceClaimUpdated)
-        self.subscribe('resource_claim_insert_with_resource_claim_view', self.onResourceClaimInserted)
+        self.subscribe('resource_claim_update', self.onResourceClaimUpdated)
+        self.subscribe('resource_claim_insert', self.onResourceClaimInserted)
         self.subscribe('resource_claim_delete', self.onResourceClaimDeleted)
 
         self.subscribe('resource_availability_update', self.onResourceAvailabilityUpdated)
         self.subscribe('resource_capacity_update', self.onResourceCapacityUpdated)
 
     def onTaskUpdated(self, payload = None):
-        self._convertPayloadAndSendNotification('TaskUpdated', payload, ['starttime', 'endtime'])
+        self._sendNotification('TaskUpdated', self.rarpc.getTask(payload))
 
     def onTaskInserted(self, payload = None):
-        self._convertPayloadAndSendNotification('TaskInserted', payload, ['starttime', 'endtime'])
+        self._sendNotification('TaskInserted', self.rarpc.getTask(payload))
 
     def onTaskDeleted(self, payload = None):
-        self._convertPayloadAndSendNotification('TaskDeleted', payload)
-
-    def _onTaskPredecessorChanged(self, change_type, payload = None):
-        logger.info(payload)
-        try:
-            content = json.loads(payload)
-        except Exception as e:
-            logger.error('Could not parse payload: %s\n%s' % (payload, e))
-            return
-
-        try:
-            task_content = {}
-
-            if change_type == 'Inserted' or change_type == 'Updated':
-                task_id = content['new']['task_id']
-                task = self.rarpc.getTask(task_id)
-                task_content = {'new': task }
-            elif change_type == 'Deleted':
-                task_id = content['old']['task_id']
-                task_content = {'old': {'id':task_id} }
-
-            self._sendNotification('Task%s' % change_type, task_content)
-        except Exception as e:
-            logger.error('Could not parse payload: %s\n%s' % (payload, e))
-
-    def onTaskPredecessorUpdated(self, payload = None):
-        self._onTaskPredecessorChanged('Updated', payload)
+        self._sendNotification('TaskDeleted', {'id': payload })
 
-    def onTaskPredecessorInserted(self, payload = None):
-        self._onTaskPredecessorChanged('Inserted', payload)
+    def onTaskPredecessorChanged(self, task_id):
+        logger.info('onTaskPredecessorChanged(task_id=%s)', task_id)
+        self._sendNotification('TaskUpdated', self.rarpc.getTask(task_id))
 
-    def onTaskPredecessorDeleted(self, payload = None):
-        self._onTaskPredecessorChanged('Deleted', payload)
+    def onTaskSuccessorChanged(self, task_id):
+        logger.info('onTaskSuccessorChanged(task_id=%s)', task_id)
+        self._sendNotification('TaskUpdated', self.rarpc.getTask(task_id))
 
     def onSpecificationUpdated(self, payload = None):
         # when the specification starttime and endtime are updated, then that effects the task as well
-        # so send a TaskUpdated notification
-        self._convertPayloadAndSendNotification('TaskUpdated', payload, ['starttime', 'endtime'])
+        self._sendNotification('TaskUpdated', self.rarpc.getTask(specification_id=payload))
 
     def onResourceClaimUpdated(self, payload = None):
-        self._convertPayloadAndSendNotification('ResourceClaimUpdated', payload, ['starttime', 'endtime'])
+        self._sendNotification('ResourceClaimUpdated', self.rarpc.getResourceClaim(payload))
 
     def onResourceClaimInserted(self, payload = None):
-        self._convertPayloadAndSendNotification('ResourceClaimInserted', payload, ['starttime', 'endtime'])
+        self._sendNotification('ResourceClaimInserted', self.rarpc.getResourceClaim(payload))
 
     def onResourceClaimDeleted(self, payload = None):
-        self._convertPayloadAndSendNotification('ResourceClaimDeleted', payload)
+        self._sendNotification('ResourceClaimDeleted', {'id': payload })
 
     def onResourceAvailabilityUpdated(self, payload = None):
-        self._convertPayloadAndSendNotification('ResourceAvailabilityUpdated', payload)
+        r = self.rarpc.getResources(resource_ids=[payload], include_availability=True)[0]
+        r = {k:r[k] for k in ['id', 'active']}
+        self._sendNotification('ResourceAvailabilityUpdated', r)
 
     def onResourceCapacityUpdated(self, payload = None):
-        self._convertPayloadAndSendNotification('ResourceCapacityUpdated', payload)
+        r = self.rarpc.getResources(resource_ids=[payload], include_availability=True)[0]
+        r = {k:r[k] for k in ['id', 'total_capacity', 'available_capacity', 'used_capacity']}
+        self._sendNotification('ResourceCapacityUpdated', r)
 
     def __enter__(self):
         super(RADBPGListener, self).__enter__()
@@ -155,58 +137,28 @@ class RADBPGListener(PostgresListener):
         So, parse the requested fields, and return them as datetime.
         '''
         try:
-            for state in ('old', 'new'):
-                if state in contentDict:
-                    for field in fields:
-                        try:
-                            if contentDict[state] and field in contentDict[state]:
-                                timestampStr = contentDict[state][field]
-                                formatStr = '%Y-%m-%dT%H:%M:%S' if 'T' in timestampStr else '%Y-%m-%d %H:%M:%S'
-                                if timestampStr.rfind('.') > -1:
-                                    formatStr += '.%f'
-
-                                timestamp = datetime.strptime(timestampStr, formatStr)
-
-                                contentDict[state][field] = timestamp
-                        except Exception as e:
-                            logger.error('Could not convert field \'%s\' to datetime: %s' % (field, e))
+            for field in fields:
+                try:
+                    if field in contentDict:
+                        timestampStr = contentDict[field]
+                        formatStr = '%Y-%m-%dT%H:%M:%S' if 'T' in timestampStr else '%Y-%m-%d %H:%M:%S'
+                        if timestampStr.rfind('.') > -1:
+                            formatStr += '.%f'
 
-            return contentDict
-        except Exception as e:
-            logger.error('Error while convering timestamp fields \'%s\'in %s\n%s' % (fields, contentDict, e))
-
-
-    def _convertPayloadAndSendNotification(self, subject, payload, timestampFields = None):
-        try:
-            content = json.loads(payload)
-        except Exception as e:
-            logger.error('Could not parse payload: %s\n%s' % (payload, e))
-            content=None
+                        timestamp = datetime.strptime(timestampStr, formatStr)
 
-        return self._sendNotification(subject, content, timestampFields)
+                        contentDict[field] = timestamp
+                except Exception as e:
+                    logger.error('Could not convert field \'%s\' to datetime: %s' % (field, e))
 
-    def _sendNotification(self, subject, content, timestampFields = None):
-        try:
-            if 'new' in content and content['new'] and 'old' in content and content['old']:
-                # check if new and old are equal.
-                # however, new and old can be based on different views,
-                # so, only check the values for the keys they have in common
-                new_keys = set(content['new'].keys())
-                old_keys = set(content['old'].keys())
-                common_keys = new_keys & old_keys
-                equal_valued_keys = [k for k in common_keys if content['new'][k] == content['old'][k]]
-                if len(equal_valued_keys) == len(common_keys):
-                    logger.info('new and old values are equal, not sending notification. %s' % (content['new']))
-                    return
-
-            if timestampFields:
-                content = self._formatTimestampsAsIso(timestampFields, content)
+            return contentDict
         except Exception as e:
-            logger.error('Exception while anayzing content: %s\n%s' % (content, e))
+            logger.error('Error while convering timestamp fields \'%s\'in %s\n%s' % (fields, contentDict, e))
 
+    def _sendNotification(self, subject, contentDict):
         try:
-            msg = EventMessage(context=self.notification_prefix + subject, content=content)
-            logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' ')))
+            msg = EventMessage(context=self.notification_prefix + subject, content=contentDict)
+            logger.info('Sending notification %s: %s' % (subject, str(contentDict).replace('\n', ' ')))
             self.event_bus.send(msg)
         except Exception as e:
             logger.error(str(e))
diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_notifications.sql b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_notifications.sql
index 05fec7a48a4..05906465401 100644
--- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_notifications.sql
+++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_notifications.sql
@@ -6,50 +6,48 @@
 --this RADBPGListener then broadcasts the event on the lofar bus.
 
 
-DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_INSERT_with_task_view ON resource_allocation.task CASCADE;
-DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_INSERT_with_task_view();
+DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_INSERT ON resource_allocation.task CASCADE;
+DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_INSERT();
 
 
-CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_INSERT_with_task_view()
+CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_INSERT()
 RETURNS TRIGGER AS $$
-DECLARE
-new_row_from_view resource_allocation.task_view%ROWTYPE;
+DECLARE payload text;
 BEGIN
-select * into new_row_from_view from resource_allocation.task_view where id = NEW.id LIMIT 1;
-PERFORM pg_notify(CAST('task_insert_with_task_view' AS text),
-'{"old":' || 'null' || ',"new":' || row_to_json(new_row_from_view)::text || '}');
+SELECT CAST(NEW.id AS text) INTO payload;
+PERFORM pg_notify(CAST('task_insert' AS text), payload);
 RETURN NEW;
 END;
 $$ LANGUAGE plpgsql;
 
 
-CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_INSERT_with_task_view
+CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_INSERT
 AFTER INSERT ON resource_allocation.task
 FOR EACH ROW
-EXECUTE PROCEDURE resource_allocation.NOTIFY_task_INSERT_with_task_view();
+EXECUTE PROCEDURE resource_allocation.NOTIFY_task_INSERT();
 
 
-DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_UPDATE_with_task_view ON resource_allocation.task CASCADE;
-DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_UPDATE_with_task_view();
+DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_UPDATE ON resource_allocation.task CASCADE;
+DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_UPDATE();
 
 
-CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_UPDATE_with_task_view()
+CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_UPDATE()
 RETURNS TRIGGER AS $$
-DECLARE
-new_row_from_view resource_allocation.task_view%ROWTYPE;
+DECLARE payload text;
 BEGIN
-select * into new_row_from_view from resource_allocation.task_view where id = NEW.id LIMIT 1;
-PERFORM pg_notify(CAST('task_update_with_task_view' AS text),
-'{"old":' || row_to_json(OLD)::text || ',"new":' || row_to_json(new_row_from_view)::text || '}');
+IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN
+SELECT CAST(NEW.id AS text) INTO payload;
+PERFORM pg_notify(CAST('task_update' AS text), payload);
+END IF;
 RETURN NEW;
 END;
 $$ LANGUAGE plpgsql;
 
 
-CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_UPDATE_with_task_view
+CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_UPDATE
 AFTER UPDATE ON resource_allocation.task
 FOR EACH ROW
-EXECUTE PROCEDURE resource_allocation.NOTIFY_task_UPDATE_with_task_view();
+EXECUTE PROCEDURE resource_allocation.NOTIFY_task_UPDATE();
 
 
 DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_DELETE ON resource_allocation.task CASCADE;
@@ -58,9 +56,10 @@ DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_DELETE();
 
 CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_DELETE()
 RETURNS TRIGGER AS $$
+DECLARE payload text;
 BEGIN
-PERFORM pg_notify(CAST('task_delete' AS text),
-'{"old":' || row_to_json(OLD)::text || ',"new":' || 'null' || '}');
+SELECT CAST(OLD.id AS text) INTO payload;
+PERFORM pg_notify(CAST('task_delete' AS text), payload);
 RETURN OLD;
 END;
 $$ LANGUAGE plpgsql;
@@ -72,133 +71,201 @@ FOR EACH ROW
 EXECUTE PROCEDURE resource_allocation.NOTIFY_task_DELETE();
 
 
-DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT ON resource_allocation.task_predecessor CASCADE;
-DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_INSERT();
+DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT_column_task_id ON resource_allocation.task_predecessor CASCADE;
+DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_INSERT_column_task_id();
 
 
-CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_INSERT()
+CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_INSERT_column_task_id()
 RETURNS TRIGGER AS $$
+DECLARE payload text;
 BEGIN
-PERFORM pg_notify(CAST('task_predecessor_insert' AS text),
-'{"old":' || 'null' || ',"new":' || row_to_json(NEW)::text || '}');
+SELECT CAST(NEW.task_id AS text) INTO payload;
+PERFORM pg_notify(CAST('task_predecessor_insert_column_task_id' AS text), payload);
 RETURN NEW;
 END;
 $$ LANGUAGE plpgsql;
 
 
-CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT
+CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT_column_task_id
 AFTER INSERT ON resource_allocation.task_predecessor
 FOR EACH ROW
-EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_INSERT();
+EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_INSERT_column_task_id();
 
 
-DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE ON resource_allocation.task_predecessor CASCADE;
-DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_UPDATE();
+DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE_column_task_id ON resource_allocation.task_predecessor CASCADE;
+DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_UPDATE_column_task_id();
 
 
-CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_UPDATE()
+CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_UPDATE_column_task_id()
 RETURNS TRIGGER AS $$
+DECLARE payload text;
 BEGIN
-PERFORM pg_notify(CAST('task_predecessor_update' AS text),
-'{"old":' || row_to_json(OLD)::text || ',"new":' || row_to_json(NEW)::text || '}');
+IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN
+SELECT CAST(NEW.task_id AS text) INTO payload;
+PERFORM pg_notify(CAST('task_predecessor_update_column_task_id' AS text), payload);
+END IF;
 RETURN NEW;
 END;
 $$ LANGUAGE plpgsql;
 
 
-CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE
+CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE_column_task_id
 AFTER UPDATE ON resource_allocation.task_predecessor
 FOR EACH ROW
-EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_UPDATE();
+EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_UPDATE_column_task_id();
 
 
-DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE ON resource_allocation.task_predecessor CASCADE;
-DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_DELETE();
+DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE_column_task_id ON resource_allocation.task_predecessor CASCADE;
+DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_DELETE_column_task_id();
 
 
-CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_DELETE()
+CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_DELETE_column_task_id()
 RETURNS TRIGGER AS $$
+DECLARE payload text;
 BEGIN
-PERFORM pg_notify(CAST('task_predecessor_delete' AS text),
-'{"old":' || row_to_json(OLD)::text || ',"new":' || 'null' || '}');
+SELECT CAST(OLD.task_id AS text) INTO payload;
+PERFORM pg_notify(CAST('task_predecessor_delete_column_task_id' AS text), payload);
 RETURN OLD;
 END;
 $$ LANGUAGE plpgsql;
 
 
-CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE
+CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE_column_task_id
 AFTER DELETE ON resource_allocation.task_predecessor
 FOR EACH ROW
-EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_DELETE();
+EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_DELETE_column_task_id();
 
 
-DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_specification_UPDATE_with_task_view ON resource_allocation.specification CASCADE;
-DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_specification_UPDATE_with_task_view();
+DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT_column_predecessor_id ON resource_allocation.task_predecessor CASCADE;
+DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_INSERT_column_predecessor_id();
 
 
-CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_specification_UPDATE_with_task_view()
+CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_INSERT_column_predecessor_id()
 RETURNS TRIGGER AS $$
-DECLARE
-new_row_from_view resource_allocation.task_view%ROWTYPE;
+DECLARE payload text;
 BEGIN
-select * into new_row_from_view from resource_allocation.task_view where specification_id = NEW.id LIMIT 1;
-PERFORM pg_notify(CAST('specification_update_with_task_view' AS text),
-'{"old":' || row_to_json(OLD)::text || ',"new":' || row_to_json(new_row_from_view)::text || '}');
+SELECT CAST(NEW.predecessor_id AS text) INTO payload;
+PERFORM pg_notify(CAST('task_predecessor_insert_column_predecessor_id' AS text), payload);
 RETURN NEW;
 END;
 $$ LANGUAGE plpgsql;
 
 
-CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_specification_UPDATE_with_task_view
+CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT_column_predecessor_id
+AFTER INSERT ON resource_allocation.task_predecessor
+FOR EACH ROW
+EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_INSERT_column_predecessor_id();
+
+
+DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE_column_predecessor_id ON resource_allocation.task_predecessor CASCADE;
+DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_UPDATE_column_predecessor_id();
+
+
+CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_UPDATE_column_predecessor_id()
+RETURNS TRIGGER AS $$
+DECLARE payload text;
+BEGIN
+IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN
+SELECT CAST(NEW.predecessor_id AS text) INTO payload;
+PERFORM pg_notify(CAST('task_predecessor_update_column_predecessor_id' AS text), payload);
+END IF;
+RETURN NEW;
+END;
+$$ LANGUAGE plpgsql;
+
+
+CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE_column_predecessor_id
+AFTER UPDATE ON resource_allocation.task_predecessor
+FOR EACH ROW
+EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_UPDATE_column_predecessor_id();
+
+
+DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE_column_predecessor_id ON resource_allocation.task_predecessor CASCADE;
+DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_DELETE_column_predecessor_id();
+
+
+CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_DELETE_column_predecessor_id()
+RETURNS TRIGGER AS $$
+DECLARE payload text;
+BEGIN
+SELECT CAST(OLD.predecessor_id AS text) INTO payload;
+PERFORM pg_notify(CAST('task_predecessor_delete_column_predecessor_id' AS text), payload);
+RETURN OLD;
+END;
+$$ LANGUAGE plpgsql;
+
+
+CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE_column_predecessor_id
+AFTER DELETE ON resource_allocation.task_predecessor
+FOR EACH ROW
+EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_DELETE_column_predecessor_id();
+
+
+DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_specification_UPDATE ON resource_allocation.specification CASCADE;
+DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_specification_UPDATE();
+
+
+CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_specification_UPDATE()
+RETURNS TRIGGER AS $$
+DECLARE payload text;
+BEGIN
+IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN
+SELECT CAST(NEW.id AS text) INTO payload;
+PERFORM pg_notify(CAST('specification_update' AS text), payload);
+END IF;
+RETURN NEW;
+END;
+$$ LANGUAGE plpgsql;
+
+
+CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_specification_UPDATE
 AFTER UPDATE ON resource_allocation.specification
 FOR EACH ROW
-EXECUTE PROCEDURE resource_allocation.NOTIFY_specification_UPDATE_with_task_view();
+EXECUTE PROCEDURE resource_allocation.NOTIFY_specification_UPDATE();
 
 
-DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_claim_INSERT_with_resource_claim_view ON resource_allocation.resource_claim CASCADE;
-DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_INSERT_with_resource_claim_view();
+DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_claim_INSERT ON resource_allocation.resource_claim CASCADE;
+DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_INSERT();
 
 
-CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_INSERT_with_resource_claim_view()
+CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_INSERT()
 RETURNS TRIGGER AS $$
-DECLARE
-new_row_from_view resource_allocation.resource_claim_view%ROWTYPE;
+DECLARE payload text;
 BEGIN
-select * into new_row_from_view from resource_allocation.resource_claim_view where id = NEW.id LIMIT 1;
-PERFORM pg_notify(CAST('resource_claim_insert_with_resource_claim_view' AS text),
-'{"old":' || 'null' || ',"new":' || row_to_json(new_row_from_view)::text || '}');
+SELECT CAST(NEW.id AS text) INTO payload;
+PERFORM pg_notify(CAST('resource_claim_insert' AS text), payload);
 RETURN NEW;
 END;
 $$ LANGUAGE plpgsql;
 
 
-CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_claim_INSERT_with_resource_claim_view
+CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_claim_INSERT
 AFTER INSERT ON resource_allocation.resource_claim
 FOR EACH ROW
-EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_INSERT_with_resource_claim_view();
+EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_INSERT();
 
 
-DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_claim_UPDATE_with_resource_claim_view ON resource_allocation.resource_claim CASCADE;
-DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_UPDATE_with_resource_claim_view();
+DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_claim_UPDATE ON resource_allocation.resource_claim CASCADE;
+DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_UPDATE();
 
 
-CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_UPDATE_with_resource_claim_view()
+CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_UPDATE()
 RETURNS TRIGGER AS $$
-DECLARE
-new_row_from_view resource_allocation.resource_claim_view%ROWTYPE;
+DECLARE payload text;
 BEGIN
-select * into new_row_from_view from resource_allocation.resource_claim_view where id = NEW.id LIMIT 1;
-PERFORM pg_notify(CAST('resource_claim_update_with_resource_claim_view' AS text),
-'{"old":' || row_to_json(OLD)::text || ',"new":' || row_to_json(new_row_from_view)::text || '}');
+IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN
+SELECT CAST(NEW.id AS text) INTO payload;
+PERFORM pg_notify(CAST('resource_claim_update' AS text), payload);
+END IF;
 RETURN NEW;
 END;
 $$ LANGUAGE plpgsql;
 
 
-CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_claim_UPDATE_with_resource_claim_view
+CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_claim_UPDATE
 AFTER UPDATE ON resource_allocation.resource_claim
 FOR EACH ROW
-EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_UPDATE_with_resource_claim_view();
+EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_UPDATE();
 
 
 DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_claim_DELETE ON resource_allocation.resource_claim CASCADE;
@@ -207,9 +274,10 @@ DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_DELETE();
 
 CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_DELETE()
 RETURNS TRIGGER AS $$
+DECLARE payload text;
 BEGIN
-PERFORM pg_notify(CAST('resource_claim_delete' AS text),
-'{"old":' || row_to_json(OLD)::text || ',"new":' || 'null' || '}');
+SELECT CAST(OLD.id AS text) INTO payload;
+PERFORM pg_notify(CAST('resource_claim_delete' AS text), payload);
 RETURN OLD;
 END;
 $$ LANGUAGE plpgsql;
@@ -221,42 +289,48 @@ FOR EACH ROW
 EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_DELETE();
 
 
-DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_availability_UPDATE ON resource_monitoring.resource_availability CASCADE;
-DROP FUNCTION IF EXISTS resource_monitoring.NOTIFY_resource_availability_UPDATE();
+DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_availability_UPDATE_column_resource_id ON resource_monitoring.resource_availability CASCADE;
+DROP FUNCTION IF EXISTS resource_monitoring.NOTIFY_resource_availability_UPDATE_column_resource_id();
 
 
-CREATE OR REPLACE FUNCTION resource_monitoring.NOTIFY_resource_availability_UPDATE()
+CREATE OR REPLACE FUNCTION resource_monitoring.NOTIFY_resource_availability_UPDATE_column_resource_id()
 RETURNS TRIGGER AS $$
+DECLARE payload text;
 BEGIN
-PERFORM pg_notify(CAST('resource_availability_update' AS text),
-'{"old":' || row_to_json(OLD)::text || ',"new":' || row_to_json(NEW)::text || '}');
+IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN
+SELECT CAST(NEW.resource_id AS text) INTO payload;
+PERFORM pg_notify(CAST('resource_availability_update_column_resource_id' AS text), payload);
+END IF;
 RETURN NEW;
 END;
 $$ LANGUAGE plpgsql;
 
 
-CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_availability_UPDATE
+CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_availability_UPDATE_column_resource_id
 AFTER UPDATE ON resource_monitoring.resource_availability
 FOR EACH ROW
-EXECUTE PROCEDURE resource_monitoring.NOTIFY_resource_availability_UPDATE();
+EXECUTE PROCEDURE resource_monitoring.NOTIFY_resource_availability_UPDATE_column_resource_id();
 
 
-DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_capacity_UPDATE ON resource_monitoring.resource_capacity CASCADE;
-DROP FUNCTION IF EXISTS resource_monitoring.NOTIFY_resource_capacity_UPDATE();
+DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_capacity_UPDATE_column_resource_id ON resource_monitoring.resource_capacity CASCADE;
+DROP FUNCTION IF EXISTS resource_monitoring.NOTIFY_resource_capacity_UPDATE_column_resource_id();
 
 
-CREATE OR REPLACE FUNCTION resource_monitoring.NOTIFY_resource_capacity_UPDATE()
+CREATE OR REPLACE FUNCTION resource_monitoring.NOTIFY_resource_capacity_UPDATE_column_resource_id()
 RETURNS TRIGGER AS $$
+DECLARE payload text;
 BEGIN
-PERFORM pg_notify(CAST('resource_capacity_update' AS text),
-'{"old":' || row_to_json(OLD)::text || ',"new":' || row_to_json(NEW)::text || '}');
+IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN
+SELECT CAST(NEW.resource_id AS text) INTO payload;
+PERFORM pg_notify(CAST('resource_capacity_update_column_resource_id' AS text), payload);
+END IF;
 RETURN NEW;
 END;
 $$ LANGUAGE plpgsql;
 
 
-CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_capacity_UPDATE
+CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_capacity_UPDATE_column_resource_id
 AFTER UPDATE ON resource_monitoring.resource_capacity
 FOR EACH ROW
-EXECUTE PROCEDURE resource_monitoring.NOTIFY_resource_capacity_UPDATE();
+EXECUTE PROCEDURE resource_monitoring.NOTIFY_resource_capacity_UPDATE_column_resource_id();
 
diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_add_notifications.sql.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_add_notifications.sql.py
index 750b26c4871..07860f3b37c 100755
--- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_add_notifications.sql.py
+++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_add_notifications.sql.py
@@ -40,15 +40,18 @@ if __name__ == '__main__':
         f.write('--this RADBPGListener then broadcasts the event on the lofar bus.\n')
         f.write('\n')
 
-        f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'INSERT', view_for_row='task_view'))
-        f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'UPDATE', view_for_row='task_view'))
+        f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'INSERT'))
+        f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'UPDATE'))
         f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'DELETE'))
-        f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'INSERT'))
-        f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'UPDATE'))
-        f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'DELETE'))
-        f.writelines(makePostgresNotificationQueries('resource_allocation', 'specification', 'UPDATE', view_for_row='task_view', view_selection_id='specification_id'))
-        f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'INSERT', view_for_row='resource_claim_view'))
-        f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'UPDATE', view_for_row='resource_claim_view'))
+        f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'INSERT', column_name='task_id'))
+        f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'UPDATE', column_name='task_id'))
+        f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'DELETE', column_name='task_id'))
+        f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'INSERT', column_name='predecessor_id'))
+        f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'UPDATE', column_name='predecessor_id'))
+        f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'DELETE', column_name='predecessor_id'))
+        f.writelines(makePostgresNotificationQueries('resource_allocation', 'specification', 'UPDATE'))
+        f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'INSERT'))
+        f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'UPDATE'))
         f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'DELETE'))
-        f.writelines(makePostgresNotificationQueries('resource_monitoring', 'resource_availability', 'UPDATE'))
-        f.writelines(makePostgresNotificationQueries('resource_monitoring', 'resource_capacity', 'UPDATE'))
+        f.writelines(makePostgresNotificationQueries('resource_monitoring', 'resource_availability', 'UPDATE', column_name='resource_id'))
+        f.writelines(makePostgresNotificationQueries('resource_monitoring', 'resource_capacity', 'UPDATE', column_name='resource_id'))
diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py
index 0df9611f2f8..16109932eeb 100644
--- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py
+++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py
@@ -77,58 +77,56 @@ class RADBChangesHandler(RADBBusListener):
         with self._changedCondition:
             self._changedCondition.notifyAll()
 
-    def onTaskUpdated(self, old_task, new_task):
+    def onTaskUpdated(self, updated_task):
         '''onTaskUpdated is called upon receiving a TaskUpdated message.'''
-        #ignore old_task and new_task, which miss some properties via this update mechanism
-        #get task with all expected properties via radbrpc
-        task = self._radbrpc.getTask(new_task['id'])
-        task_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'task', 'value':task}
+        updated_task['starttime'] = updated_task['starttime'].datetime()
+        updated_task['endtime'] = updated_task['endtime'].datetime()
+        task_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'task', 'value':updated_task}
         self._handleChange(task_change)
 
-    def onTaskInserted(self, task):
+    def onTaskInserted(self, new_task):
         '''onTaskInserted is called upon receiving a TaskInserted message.
-        :param task: dictionary with the inserted task'''
-        #ignore old_task and new_task, which miss some properties via this update mechanism
-        #get task with all expected properties via radbrpc
-        task = self._radbrpc.getTask(task['id'])
-        updateTaskMomDetails(task, self._momqueryrpc)
-        task_change = {'changeType':CHANGE_INSERT_TYPE, 'objectType':'task', 'value':task}
+        :param new_task: dictionary with the inserted task'''
+        new_task['starttime'] = new_task['starttime'].datetime()
+        new_task['endtime'] = new_task['endtime'].datetime()
+        updateTaskMomDetails(new_task, self._momqueryrpc)
+        task_change = {'changeType':CHANGE_INSERT_TYPE, 'objectType':'task', 'value':new_task}
         self._handleChange(task_change)
 
-    def onTaskDeleted(self, task):
+    def onTaskDeleted(self, old_task_id):
         '''onTaskDeleted is called upon receiving a TaskDeleted message.
-        :param task: dictionary with the deleted task'''
-        task_change = {'changeType':CHANGE_DELETE_TYPE, 'objectType':'task', 'value':task}
+        :param old_task_id: id of the deleted task'''
+        task_change = {'changeType':CHANGE_DELETE_TYPE, 'objectType':'task', 'value':{'id':old_task_id}}
         self._handleChange(task_change)
 
-    def onResourceClaimUpdated(self, old_claim, new_claim):
+    def onResourceClaimUpdated(self, updated_claim):
         '''onResourceClaimUpdated is called upon receiving a ResourceClaimUpdated message.
-        :param task: dictionary with the updated claim'''
-        new_claim['starttime'] = new_claim['starttime'].datetime()
-        new_claim['endtime'] = new_claim['endtime'].datetime()
-        claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceClaim', 'value':new_claim}
+        :param updated_claim: dictionary with the updated claim'''
+        updated_claim['starttime'] = updated_claim['starttime'].datetime()
+        updated_claim['endtime'] = updated_claim['endtime'].datetime()
+        claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceClaim', 'value':updated_claim}
         self._handleChange(claim_change)
 
-    def onResourceClaimInserted(self, claim):
+    def onResourceClaimInserted(self, new_claim):
         '''onResourceClaimInserted is called upon receiving a ResourceClaimInserted message.
-        :param claim: dictionary with the inserted claim'''
-        claim['starttime'] = claim['starttime'].datetime()
-        claim['endtime'] = claim['endtime'].datetime()
-        claim_change = {'changeType':CHANGE_INSERT_TYPE, 'objectType':'resourceClaim', 'value':claim}
+        :param new_claim: dictionary with the inserted claim'''
+        new_claim['starttime'] = new_claim['starttime'].datetime()
+        new_claim['endtime'] = new_claim['endtime'].datetime()
+        claim_change = {'changeType':CHANGE_INSERT_TYPE, 'objectType':'resourceClaim', 'value':new_claim}
         self._handleChange(claim_change)
 
-    def onResourceClaimDeleted(self, claim):
+    def onResourceClaimDeleted(self, old_claim_id):
         '''onResourceClaimDeleted is called upon receiving a ResourceClaimDeleted message.
-        :param claim: dictionary with the deleted claim'''
-        claim_change = {'changeType':CHANGE_DELETE_TYPE, 'objectType':'resourceClaim', 'value':claim}
+        :param old_claim_id: id of the deleted claim'''
+        claim_change = {'changeType':CHANGE_DELETE_TYPE, 'objectType':'resourceClaim', 'value':{'id': old_claim_id}}
         self._handleChange(claim_change)
 
-    def onResourceAvailabilityUpdated(self, old_availability, new_availability):
-        claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceAvailability', 'value':new_availability}
+    def onResourceAvailabilityUpdated(self, old_availability, updated_availability):
+        claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceAvailability', 'value':updated_availability}
         self._handleChange(claim_change)
 
-    def onResourceCapacityUpdated(self, old_capacity, new_capacity):
-        claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceCapacity', 'value':new_capacity}
+    def onResourceCapacityUpdated(self, old_capacity, updated_capacity):
+        claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceCapacity', 'value':updated_capacity}
         self._handleChange(claim_change)
 
     def getMostRecentChangeNumber(self):
diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py b/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py
index 7b4496abd62..29733fc1c6d 100644
--- a/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py
+++ b/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py
@@ -147,9 +147,9 @@ class RARPC(RPCWrapper):
                         available_capacity=available_capacity,
                         total_capacity=total_capacity)
 
-    def getTask(self, id=None, mom_id=None, otdb_id=None):
-        '''get a task for either the given (task)id, or for the given mom_id, or for the given otdb_id'''
-        task = self.rpc('GetTask', id=id, mom_id=mom_id, otdb_id=otdb_id)
+    def getTask(self, id=None, mom_id=None, otdb_id=None, specification_id=None):
+        '''get a task for either the given (task)id, or for the given mom_id, or for the given otdb_id, or for the given specification_id'''
+        task = self.rpc('GetTask', id=id, mom_id=mom_id, otdb_id=otdb_id, specification_id=specification_id)
         if task:
             task['starttime'] = task['starttime'].datetime()
             task['endtime'] = task['endtime'].datetime()
@@ -186,6 +186,12 @@ class RARPC(RPCWrapper):
             task['endtime'] = task['endtime'].datetime()
         return tasks
 
+    def getTaskPredecessorIds(self, id=None):
+        return self.rpc('GetTaskPredecessorIds', id=id)
+
+    def getTaskSuccessorIds(self, **kwargs):
+        return self.rpc('GetTaskSuccessorIds', id=id)
+
     def insertTaskPredecessor(self, task_id, predecessor_id):
         return self.rpc('InsertTaskPredecessor', task_id=task_id, predecessor_id=predecessor_id)
 
diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/service.py b/SAS/ResourceAssignment/ResourceAssignmentService/service.py
index b205c500b86..10afe86da26 100644
--- a/SAS/ResourceAssignment/ResourceAssignmentService/service.py
+++ b/SAS/ResourceAssignment/ResourceAssignmentService/service.py
@@ -62,8 +62,10 @@ class RADBHandler(MessageHandlerInterface):
             'DeleteTask': self._deleteTask,
             'UpdateTask': self._updateTask,
             'UpdateTaskStatusForOtdbId': self._updateTaskStatusForOtdbId,
+            'GetTaskPredecessorIds': self._getTaskPredecessorIds,
+            'GetTaskSuccessorIds': self._getTaskSuccessorIds,
             'InsertTaskPredecessor': self._insertTaskPredecessor,
-            'insertTaskPredecessors': self._insertTaskPredecessors,
+            'InsertTaskPredecessors': self._insertTaskPredecessors,
             'GetTaskStatuses': self._getTaskStatuses,
             'GetTaskTypes': self._getTaskTypes,
             'GetSpecifications': self._getSpecifications,
@@ -222,7 +224,7 @@ class RADBHandler(MessageHandlerInterface):
 
     def _getTask(self, **kwargs):
         logger.info('GetTask: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
-        task = self.radb.getTask(id=kwargs.get('id'), mom_id=kwargs.get('mom_id'), otdb_id=kwargs.get('otdb_id'))
+        task = self.radb.getTask(id=kwargs.get('id'), mom_id=kwargs.get('mom_id'), otdb_id=kwargs.get('otdb_id'), specification_id=kwargs.get('specification_id'))
         return task
 
     def _insertTask(self, **kwargs):
@@ -258,6 +260,14 @@ class RADBHandler(MessageHandlerInterface):
                                        specification_id=kwargs.get('specification_id'))
         return {'id': id, 'updated': updated}
 
+    def _getTaskPredecessorIds(self, **kwargs):
+        logger.info('GetTaskPredecessorIds: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
+        return convertIntKeysToString(self.radb.getTaskPredecessorIds(kwargs.get('id')))
+
+    def _getTaskSuccessorIds(self, **kwargs):
+        logger.info('GetTaskSuccessorIds: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
+        return convertIntKeysToString(self.radb.getTaskSuccessorIds(kwargs.get('id')))
+
     def _insertTaskPredecessor(self, **kwargs):
         id = self.radb.insertTaskPredecessor(kwargs['task_id'],
                                              kwargs['predecessor_id'])
-- 
GitLab