diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py index 9ab46514d73afc5f13fed049e05109a999114b29..03065ae1c7211fcb8ac0eab101967dc50a6bb8cc 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py @@ -35,23 +35,35 @@ from lofar.messaging import EventMessage, ToBus from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_BUSNAME, DEFAULT_NOTIFICATION_PREFIX from lofar.common import dbcredentials +from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC +from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RADB_BUSNAME +from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as DEFAULT_RADB_SERVICENAME + logger = logging.getLogger(__name__) class RADBPGListener(PostgresListener): def __init__(self, - busname=DEFAULT_NOTIFICATION_BUSNAME, + notification_busname=DEFAULT_NOTIFICATION_BUSNAME, notification_prefix=DEFAULT_NOTIFICATION_PREFIX, + radb_busname=DEFAULT_RADB_BUSNAME, + radb_servicename=DEFAULT_RADB_SERVICENAME, dbcreds=None, broker=None): super(RADBPGListener, self).__init__(dbcreds.host, dbcreds.database, dbcreds.user, dbcreds.password) self.notification_prefix = notification_prefix - self.event_bus = ToBus(busname, broker=broker) + self.event_bus = ToBus(notification_busname, broker=broker) + + self.rarpc = RARPC(busname=radb_busname, servicename=radb_servicename, broker=broker) self.subscribe('task_update_with_task_view', self.onTaskUpdated) self.subscribe('task_insert_with_task_view', self.onTaskInserted) self.subscribe('task_delete', self.onTaskDeleted) + self.subscribe('task_predecessor_insert', self.onTaskPredecessorInserted) + self.subscribe('task_predecessor_update', self.onTaskPredecessorUpdated) + self.subscribe('task_predecessor_delete', self.onTaskPredecessorDeleted) + # when the specification starttime and endtime are updated, then that effects the task as well # so subscribe to specification_update, and use task_view as view_for_row self.subscribe('specification_update_with_task_view', self.onSpecificationUpdated) @@ -64,42 +76,76 @@ class RADBPGListener(PostgresListener): self.subscribe('resource_capacity_update', self.onResourceCapacityUpdated) def onTaskUpdated(self, payload = None): - self._sendNotification('TaskUpdated', payload, ['starttime', 'endtime']) + self._convertPayloadAndSendNotification('TaskUpdated', payload, ['starttime', 'endtime']) def onTaskInserted(self, payload = None): - self._sendNotification('TaskInserted', payload, ['starttime', 'endtime']) + self._convertPayloadAndSendNotification('TaskInserted', payload, ['starttime', 'endtime']) def onTaskDeleted(self, payload = None): - self._sendNotification('TaskDeleted', payload) + self._convertPayloadAndSendNotification('TaskDeleted', payload) + + def _onTaskPredecessorChanged(self, change_type, payload = None): + logger.info(payload) + try: + content = json.loads(payload) + except Exception as e: + logger.error('Could not parse payload: %s\n%s' % (payload, e)) + return + + try: + task_content = {} + + if change_type == 'Inserted' or change_type == 'Updated': + task_id = content['new']['task_id'] + task = self.rarpc.getTask(task_id) + task_content = {'new': task } + elif change_type == 'Deleted': + task_id = content['old']['task_id'] + task_content = {'old': {'id':task_id} } + + self._sendNotification('Task%s' % change_type, task_content) + except Exception as e: + logger.error('Could not parse payload: %s\n%s' % (payload, e)) + + def onTaskPredecessorUpdated(self, payload = None): + self._onTaskPredecessorChanged('Updated', payload) + + def onTaskPredecessorInserted(self, payload = None): + self._onTaskPredecessorChanged('Inserted', payload) + + def onTaskPredecessorDeleted(self, payload = None): + self._onTaskPredecessorChanged('Deleted', payload) def onSpecificationUpdated(self, payload = None): # when the specification starttime and endtime are updated, then that effects the task as well # so send a TaskUpdated notification - self._sendNotification('TaskUpdated', payload, ['starttime', 'endtime']) + self._convertPayloadAndSendNotification('TaskUpdated', payload, ['starttime', 'endtime']) def onResourceClaimUpdated(self, payload = None): - self._sendNotification('ResourceClaimUpdated', payload, ['starttime', 'endtime']) + self._convertPayloadAndSendNotification('ResourceClaimUpdated', payload, ['starttime', 'endtime']) def onResourceClaimInserted(self, payload = None): - self._sendNotification('ResourceClaimInserted', payload, ['starttime', 'endtime']) + self._convertPayloadAndSendNotification('ResourceClaimInserted', payload, ['starttime', 'endtime']) def onResourceClaimDeleted(self, payload = None): - self._sendNotification('ResourceClaimDeleted', payload) + self._convertPayloadAndSendNotification('ResourceClaimDeleted', payload) def onResourceAvailabilityUpdated(self, payload = None): - self._sendNotification('ResourceAvailabilityUpdated', payload) + self._convertPayloadAndSendNotification('ResourceAvailabilityUpdated', payload) def onResourceCapacityUpdated(self, payload = None): - self._sendNotification('ResourceCapacityUpdated', payload) + self._convertPayloadAndSendNotification('ResourceCapacityUpdated', payload) def __enter__(self): super(RADBPGListener, self).__enter__() self.event_bus.open() + self.rarpc.open() return self def __exit__(self, exc_type, exc_val, exc_tb): super(RADBPGListener, self).__exit__(exc_type, exc_val, exc_tb) self.event_bus.close() + self.rarpc.close() def _formatTimestampsAsIso(self, fields, contentDict): '''convert all requested fields in the contentDict to proper isoformat datetime strings. @@ -127,13 +173,20 @@ class RADBPGListener(PostgresListener): return contentDict except Exception as e: - logger.error('Could not parse payload: %s\n%s' % (contentDict, e)) + logger.error('Error while convering timestamp fields \'%s\'in %s\n%s' % (fields, contentDict, e)) - def _sendNotification(self, subject, payload, timestampFields = None): + def _convertPayloadAndSendNotification(self, subject, payload, timestampFields = None): try: content = json.loads(payload) + except Exception as e: + logger.error('Could not parse payload: %s\n%s' % (payload, e)) + content=None + + return self._sendNotification(subject, content, timestampFields) + def _sendNotification(self, subject, content, timestampFields = None): + try: if 'new' in content and content['new'] and 'old' in content and content['old']: # check if new and old are equal. # however, new and old can be based on different views, @@ -149,8 +202,7 @@ class RADBPGListener(PostgresListener): if timestampFields: content = self._formatTimestampsAsIso(timestampFields, content) except Exception as e: - logger.error('Could not parse payload: %s\n%s' % (payload, e)) - content=None + logger.error('Exception while anayzing content: %s\n%s' % (content, e)) try: msg = EventMessage(context=self.notification_prefix + subject, content=content) @@ -164,8 +216,11 @@ def main(): parser = OptionParser("%prog [options]", description='runs the radb postgres listener which listens to changes on some tables in the radb and publishes the changes as notifications on the bus.') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') - parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_NOTIFICATION_BUSNAME, help="Name of the publication bus on the qpid broker, [default: %default]") - parser.add_option("-n", "--notification_prefix", dest="notification_prefix", type="string", default=DEFAULT_NOTIFICATION_PREFIX, help="The prefix for all notifications of this publisher, [default: %default]") + parser.add_option('--radb_busname', dest='radb_busname', type='string', default=DEFAULT_RADB_BUSNAME, help='Name of the bus exchange on the qpid broker on which the radbservice listens, default: %default') + parser.add_option('--radb_servicename', dest='radb_servicename', type='string', default=DEFAULT_RADB_SERVICENAME, help='Name of the radbservice, default: %default') + parser.add_option('--radb_notification_busname', dest='radb_notification_busname', type='string', default=DEFAULT_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the radb notifications are published, default: %default') + parser.add_option("--radb_notification_prefix", dest="radb_notification_prefix", type="string", default=DEFAULT_NOTIFICATION_PREFIX, help="The prefix for all notifications of this publisher, [default: %default]") + parser.add_option_group(dbcredentials.options_group(parser)) parser.set_defaults(dbcredentials="RADB") (options, args) = parser.parse_args() @@ -177,8 +232,10 @@ def main(): logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword()) - with RADBPGListener(busname=options.busname, - notification_prefix=options.notification_prefix, + with RADBPGListener(notification_busname=options.radb_notification_busname, + notification_prefix=options.radb_notification_prefix, + radb_busname=options.radb_busname, + radb_servicename=options.radb_servicename, dbcreds=dbcreds, broker=options.broker) as listener: listener.waitWhileListening() diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_notifications.sql b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_notifications.sql index afc072544f1f1edb1f6f9221a1d49afdcbb5f9f6..05fec7a48a4bcd9e0dbd43065ba21fa0b2d84720 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_notifications.sql +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_notifications.sql @@ -72,6 +72,66 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_DELETE(); +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_INSERT(); + + +CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_INSERT() +RETURNS TRIGGER AS $$ +BEGIN +PERFORM pg_notify(CAST('task_predecessor_insert' AS text), +'{"old":' || 'null' || ',"new":' || row_to_json(NEW)::text || '}'); +RETURN NEW; +END; +$$ LANGUAGE plpgsql; + + +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT +AFTER INSERT ON resource_allocation.task_predecessor +FOR EACH ROW +EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_INSERT(); + + +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_UPDATE(); + + +CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_UPDATE() +RETURNS TRIGGER AS $$ +BEGIN +PERFORM pg_notify(CAST('task_predecessor_update' AS text), +'{"old":' || row_to_json(OLD)::text || ',"new":' || row_to_json(NEW)::text || '}'); +RETURN NEW; +END; +$$ LANGUAGE plpgsql; + + +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE +AFTER UPDATE ON resource_allocation.task_predecessor +FOR EACH ROW +EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_UPDATE(); + + +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_DELETE(); + + +CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_DELETE() +RETURNS TRIGGER AS $$ +BEGIN +PERFORM pg_notify(CAST('task_predecessor_delete' AS text), +'{"old":' || row_to_json(OLD)::text || ',"new":' || 'null' || '}'); +RETURN OLD; +END; +$$ LANGUAGE plpgsql; + + +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE +AFTER DELETE ON resource_allocation.task_predecessor +FOR EACH ROW +EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_DELETE(); + + DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_specification_UPDATE_with_task_view ON resource_allocation.specification CASCADE; DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_specification_UPDATE_with_task_view(); diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_add_notifications.sql.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_add_notifications.sql.py index 3c5234398c9dda815f1ed213231925041dedb56b..750b26c4871f7880424bd98460337702b501af9f 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_add_notifications.sql.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_add_notifications.sql.py @@ -43,6 +43,9 @@ if __name__ == '__main__': f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'INSERT', view_for_row='task_view')) f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'UPDATE', view_for_row='task_view')) f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'DELETE')) + f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'INSERT')) + f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'UPDATE')) + f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'DELETE')) f.writelines(makePostgresNotificationQueries('resource_allocation', 'specification', 'UPDATE', view_for_row='task_view', view_selection_id='specification_id')) f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'INSERT', view_for_row='resource_claim_view')) f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'UPDATE', view_for_row='resource_claim_view'))