Skip to content
Snippets Groups Projects
Commit 8b7e081c authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8887: also send task inserted/updated/deleted notifications on task_predecessor changes

parent 35b26f51
No related branches found
No related tags found
No related merge requests found
...@@ -35,23 +35,35 @@ from lofar.messaging import EventMessage, ToBus ...@@ -35,23 +35,35 @@ from lofar.messaging import EventMessage, ToBus
from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_BUSNAME, DEFAULT_NOTIFICATION_PREFIX from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_BUSNAME, DEFAULT_NOTIFICATION_PREFIX
from lofar.common import dbcredentials from lofar.common import dbcredentials
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as DEFAULT_RADB_SERVICENAME
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class RADBPGListener(PostgresListener): class RADBPGListener(PostgresListener):
def __init__(self, def __init__(self,
busname=DEFAULT_NOTIFICATION_BUSNAME, notification_busname=DEFAULT_NOTIFICATION_BUSNAME,
notification_prefix=DEFAULT_NOTIFICATION_PREFIX, notification_prefix=DEFAULT_NOTIFICATION_PREFIX,
radb_busname=DEFAULT_RADB_BUSNAME,
radb_servicename=DEFAULT_RADB_SERVICENAME,
dbcreds=None, dbcreds=None,
broker=None): broker=None):
super(RADBPGListener, self).__init__(dbcreds.host, dbcreds.database, dbcreds.user, dbcreds.password) super(RADBPGListener, self).__init__(dbcreds.host, dbcreds.database, dbcreds.user, dbcreds.password)
self.notification_prefix = notification_prefix self.notification_prefix = notification_prefix
self.event_bus = ToBus(busname, broker=broker) self.event_bus = ToBus(notification_busname, broker=broker)
self.rarpc = RARPC(busname=radb_busname, servicename=radb_servicename, broker=broker)
self.subscribe('task_update_with_task_view', self.onTaskUpdated) self.subscribe('task_update_with_task_view', self.onTaskUpdated)
self.subscribe('task_insert_with_task_view', self.onTaskInserted) self.subscribe('task_insert_with_task_view', self.onTaskInserted)
self.subscribe('task_delete', self.onTaskDeleted) self.subscribe('task_delete', self.onTaskDeleted)
self.subscribe('task_predecessor_insert', self.onTaskPredecessorInserted)
self.subscribe('task_predecessor_update', self.onTaskPredecessorUpdated)
self.subscribe('task_predecessor_delete', self.onTaskPredecessorDeleted)
# when the specification starttime and endtime are updated, then that effects the task as well # when the specification starttime and endtime are updated, then that effects the task as well
# so subscribe to specification_update, and use task_view as view_for_row # so subscribe to specification_update, and use task_view as view_for_row
self.subscribe('specification_update_with_task_view', self.onSpecificationUpdated) self.subscribe('specification_update_with_task_view', self.onSpecificationUpdated)
...@@ -64,42 +76,76 @@ class RADBPGListener(PostgresListener): ...@@ -64,42 +76,76 @@ class RADBPGListener(PostgresListener):
self.subscribe('resource_capacity_update', self.onResourceCapacityUpdated) self.subscribe('resource_capacity_update', self.onResourceCapacityUpdated)
def onTaskUpdated(self, payload = None): def onTaskUpdated(self, payload = None):
self._sendNotification('TaskUpdated', payload, ['starttime', 'endtime']) self._convertPayloadAndSendNotification('TaskUpdated', payload, ['starttime', 'endtime'])
def onTaskInserted(self, payload = None): def onTaskInserted(self, payload = None):
self._sendNotification('TaskInserted', payload, ['starttime', 'endtime']) self._convertPayloadAndSendNotification('TaskInserted', payload, ['starttime', 'endtime'])
def onTaskDeleted(self, payload = None): def onTaskDeleted(self, payload = None):
self._sendNotification('TaskDeleted', payload) self._convertPayloadAndSendNotification('TaskDeleted', payload)
def _onTaskPredecessorChanged(self, change_type, payload = None):
logger.info(payload)
try:
content = json.loads(payload)
except Exception as e:
logger.error('Could not parse payload: %s\n%s' % (payload, e))
return
try:
task_content = {}
if change_type == 'Inserted' or change_type == 'Updated':
task_id = content['new']['task_id']
task = self.rarpc.getTask(task_id)
task_content = {'new': task }
elif change_type == 'Deleted':
task_id = content['old']['task_id']
task_content = {'old': {'id':task_id} }
self._sendNotification('Task%s' % change_type, task_content)
except Exception as e:
logger.error('Could not parse payload: %s\n%s' % (payload, e))
def onTaskPredecessorUpdated(self, payload = None):
self._onTaskPredecessorChanged('Updated', payload)
def onTaskPredecessorInserted(self, payload = None):
self._onTaskPredecessorChanged('Inserted', payload)
def onTaskPredecessorDeleted(self, payload = None):
self._onTaskPredecessorChanged('Deleted', payload)
def onSpecificationUpdated(self, payload = None): def onSpecificationUpdated(self, payload = None):
# when the specification starttime and endtime are updated, then that effects the task as well # when the specification starttime and endtime are updated, then that effects the task as well
# so send a TaskUpdated notification # so send a TaskUpdated notification
self._sendNotification('TaskUpdated', payload, ['starttime', 'endtime']) self._convertPayloadAndSendNotification('TaskUpdated', payload, ['starttime', 'endtime'])
def onResourceClaimUpdated(self, payload = None): def onResourceClaimUpdated(self, payload = None):
self._sendNotification('ResourceClaimUpdated', payload, ['starttime', 'endtime']) self._convertPayloadAndSendNotification('ResourceClaimUpdated', payload, ['starttime', 'endtime'])
def onResourceClaimInserted(self, payload = None): def onResourceClaimInserted(self, payload = None):
self._sendNotification('ResourceClaimInserted', payload, ['starttime', 'endtime']) self._convertPayloadAndSendNotification('ResourceClaimInserted', payload, ['starttime', 'endtime'])
def onResourceClaimDeleted(self, payload = None): def onResourceClaimDeleted(self, payload = None):
self._sendNotification('ResourceClaimDeleted', payload) self._convertPayloadAndSendNotification('ResourceClaimDeleted', payload)
def onResourceAvailabilityUpdated(self, payload = None): def onResourceAvailabilityUpdated(self, payload = None):
self._sendNotification('ResourceAvailabilityUpdated', payload) self._convertPayloadAndSendNotification('ResourceAvailabilityUpdated', payload)
def onResourceCapacityUpdated(self, payload = None): def onResourceCapacityUpdated(self, payload = None):
self._sendNotification('ResourceCapacityUpdated', payload) self._convertPayloadAndSendNotification('ResourceCapacityUpdated', payload)
def __enter__(self): def __enter__(self):
super(RADBPGListener, self).__enter__() super(RADBPGListener, self).__enter__()
self.event_bus.open() self.event_bus.open()
self.rarpc.open()
return self return self
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
super(RADBPGListener, self).__exit__(exc_type, exc_val, exc_tb) super(RADBPGListener, self).__exit__(exc_type, exc_val, exc_tb)
self.event_bus.close() self.event_bus.close()
self.rarpc.close()
def _formatTimestampsAsIso(self, fields, contentDict): def _formatTimestampsAsIso(self, fields, contentDict):
'''convert all requested fields in the contentDict to proper isoformat datetime strings. '''convert all requested fields in the contentDict to proper isoformat datetime strings.
...@@ -127,13 +173,20 @@ class RADBPGListener(PostgresListener): ...@@ -127,13 +173,20 @@ class RADBPGListener(PostgresListener):
return contentDict return contentDict
except Exception as e: except Exception as e:
logger.error('Could not parse payload: %s\n%s' % (contentDict, e)) logger.error('Error while convering timestamp fields \'%s\'in %s\n%s' % (fields, contentDict, e))
def _sendNotification(self, subject, payload, timestampFields = None): def _convertPayloadAndSendNotification(self, subject, payload, timestampFields = None):
try: try:
content = json.loads(payload) content = json.loads(payload)
except Exception as e:
logger.error('Could not parse payload: %s\n%s' % (payload, e))
content=None
return self._sendNotification(subject, content, timestampFields)
def _sendNotification(self, subject, content, timestampFields = None):
try:
if 'new' in content and content['new'] and 'old' in content and content['old']: if 'new' in content and content['new'] and 'old' in content and content['old']:
# check if new and old are equal. # check if new and old are equal.
# however, new and old can be based on different views, # however, new and old can be based on different views,
...@@ -149,8 +202,7 @@ class RADBPGListener(PostgresListener): ...@@ -149,8 +202,7 @@ class RADBPGListener(PostgresListener):
if timestampFields: if timestampFields:
content = self._formatTimestampsAsIso(timestampFields, content) content = self._formatTimestampsAsIso(timestampFields, content)
except Exception as e: except Exception as e:
logger.error('Could not parse payload: %s\n%s' % (payload, e)) logger.error('Exception while anayzing content: %s\n%s' % (content, e))
content=None
try: try:
msg = EventMessage(context=self.notification_prefix + subject, content=content) msg = EventMessage(context=self.notification_prefix + subject, content=content)
...@@ -164,8 +216,11 @@ def main(): ...@@ -164,8 +216,11 @@ def main():
parser = OptionParser("%prog [options]", parser = OptionParser("%prog [options]",
description='runs the radb postgres listener which listens to changes on some tables in the radb and publishes the changes as notifications on the bus.') description='runs the radb postgres listener which listens to changes on some tables in the radb and publishes the changes as notifications on the bus.')
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_NOTIFICATION_BUSNAME, help="Name of the publication bus on the qpid broker, [default: %default]") parser.add_option('--radb_busname', dest='radb_busname', type='string', default=DEFAULT_RADB_BUSNAME, help='Name of the bus exchange on the qpid broker on which the radbservice listens, default: %default')
parser.add_option("-n", "--notification_prefix", dest="notification_prefix", type="string", default=DEFAULT_NOTIFICATION_PREFIX, help="The prefix for all notifications of this publisher, [default: %default]") parser.add_option('--radb_servicename', dest='radb_servicename', type='string', default=DEFAULT_RADB_SERVICENAME, help='Name of the radbservice, default: %default')
parser.add_option('--radb_notification_busname', dest='radb_notification_busname', type='string', default=DEFAULT_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the radb notifications are published, default: %default')
parser.add_option("--radb_notification_prefix", dest="radb_notification_prefix", type="string", default=DEFAULT_NOTIFICATION_PREFIX, help="The prefix for all notifications of this publisher, [default: %default]")
parser.add_option_group(dbcredentials.options_group(parser)) parser.add_option_group(dbcredentials.options_group(parser))
parser.set_defaults(dbcredentials="RADB") parser.set_defaults(dbcredentials="RADB")
(options, args) = parser.parse_args() (options, args) = parser.parse_args()
...@@ -177,8 +232,10 @@ def main(): ...@@ -177,8 +232,10 @@ def main():
logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword()) logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword())
with RADBPGListener(busname=options.busname, with RADBPGListener(notification_busname=options.radb_notification_busname,
notification_prefix=options.notification_prefix, notification_prefix=options.radb_notification_prefix,
radb_busname=options.radb_busname,
radb_servicename=options.radb_servicename,
dbcreds=dbcreds, dbcreds=dbcreds,
broker=options.broker) as listener: broker=options.broker) as listener:
listener.waitWhileListening() listener.waitWhileListening()
......
...@@ -72,6 +72,66 @@ FOR EACH ROW ...@@ -72,6 +72,66 @@ FOR EACH ROW
EXECUTE PROCEDURE resource_allocation.NOTIFY_task_DELETE(); EXECUTE PROCEDURE resource_allocation.NOTIFY_task_DELETE();
DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT ON resource_allocation.task_predecessor CASCADE;
DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_INSERT();
CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_INSERT()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(CAST('task_predecessor_insert' AS text),
'{"old":' || 'null' || ',"new":' || row_to_json(NEW)::text || '}');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT
AFTER INSERT ON resource_allocation.task_predecessor
FOR EACH ROW
EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_INSERT();
DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE ON resource_allocation.task_predecessor CASCADE;
DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_UPDATE();
CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_UPDATE()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(CAST('task_predecessor_update' AS text),
'{"old":' || row_to_json(OLD)::text || ',"new":' || row_to_json(NEW)::text || '}');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE
AFTER UPDATE ON resource_allocation.task_predecessor
FOR EACH ROW
EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_UPDATE();
DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE ON resource_allocation.task_predecessor CASCADE;
DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_DELETE();
CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_DELETE()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(CAST('task_predecessor_delete' AS text),
'{"old":' || row_to_json(OLD)::text || ',"new":' || 'null' || '}');
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE
AFTER DELETE ON resource_allocation.task_predecessor
FOR EACH ROW
EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_DELETE();
DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_specification_UPDATE_with_task_view ON resource_allocation.specification CASCADE; DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_specification_UPDATE_with_task_view ON resource_allocation.specification CASCADE;
DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_specification_UPDATE_with_task_view(); DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_specification_UPDATE_with_task_view();
......
...@@ -43,6 +43,9 @@ if __name__ == '__main__': ...@@ -43,6 +43,9 @@ if __name__ == '__main__':
f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'INSERT', view_for_row='task_view')) f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'INSERT', view_for_row='task_view'))
f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'UPDATE', view_for_row='task_view')) f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'UPDATE', view_for_row='task_view'))
f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'DELETE')) f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'DELETE'))
f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'INSERT'))
f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'UPDATE'))
f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'DELETE'))
f.writelines(makePostgresNotificationQueries('resource_allocation', 'specification', 'UPDATE', view_for_row='task_view', view_selection_id='specification_id')) f.writelines(makePostgresNotificationQueries('resource_allocation', 'specification', 'UPDATE', view_for_row='task_view', view_selection_id='specification_id'))
f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'INSERT', view_for_row='resource_claim_view')) f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'INSERT', view_for_row='resource_claim_view'))
f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'UPDATE', view_for_row='resource_claim_view')) f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'UPDATE', view_for_row='resource_claim_view'))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment