From 7fa71bafba8df5fd94fd76a97aaa7ef425e661ee Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Fri, 8 Apr 2016 08:47:38 +0000 Subject: [PATCH] Task #8887: handle resource availability and capacity update notifications --- .../radbbuslistener.py | 40 ++++++++++++++----- .../radbpglistener.py | 22 +++++----- .../lib/radbchangeshandler.py | 14 +++++-- 3 files changed, 51 insertions(+), 25 deletions(-) diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py index 7e5f2285a6d..fcf3ff0cf91 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py @@ -39,7 +39,7 @@ logger = logging.getLogger(__name__) class RADBBusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_NOTIFICATION_BUSNAME, subject=DEFAULT_NOTIFICATION_SUBJECTS, broker=None, **kwargs): + def __init__(self, busname=DEFAULT_NOTIFICATION_BUSNAME, subjects=DEFAULT_NOTIFICATION_SUBJECTS, broker=None, **kwargs): """ RADBBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from RADBBusListener and implement the specific on<SomeMessage> methods that you are interested in. @@ -51,30 +51,37 @@ class RADBBusListener(AbstractBusListener): numthreads= <int> Number of parallel threads processing messages (default: 1) verbose= <bool> Output extra logging over stdout (default: False) """ - address = "%s/%s" % (busname, subject) + self.subject_prefix = (subjects.split('.')[0]+'.') if '.' in subjects else '' + + address = "%s/%s" % (busname, subjects) super(RADBBusListener, self).__init__(address, broker, **kwargs) + def _handleMessage(self, msg): - logger.debug("RADBBusListener.handleMessage: %s" %str(msg)) + logger.info("on%s: %s" % (msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' '))) - if msg.subject == 'RADB.TaskUpdated': + if msg.subject == '%sTaskUpdated' % self.subject_prefix: self.onTaskUpdated(msg.content.get('old'), msg.content.get('new')) - elif msg.subject == 'RADB.TaskInserted': + elif msg.subject == '%sTaskInserted' % self.subject_prefix: self.onTaskInserted(msg.content.get('new')) - elif msg.subject == 'RADB.TaskDeleted': + elif msg.subject == '%sTaskDeleted' % self.subject_prefix: self.onTaskDeleted(msg.content.get('old')) - elif msg.subject == 'RADB.ResourceClaimUpdated': + elif msg.subject == '%sResourceClaimUpdated' % self.subject_prefix: self.onResourceClaimUpdated(msg.content.get('old'), msg.content.get('new')) - elif msg.subject == 'RADB.ResourceClaimInserted': + elif msg.subject == '%sResourceClaimInserted' % self.subject_prefix: self.onResourceClaimInserted(msg.content.get('new')) - elif msg.subject == 'RADB.ResourceClaimDeleted': + elif msg.subject == '%sResourceClaimDeleted' % self.subject_prefix: self.onResourceClaimDeleted(msg.content.get('old')) + elif msg.subject == '%sResourceAvailabilityUpdated' % self.subject_prefix: + self.onResourceAvailabilityUpdated(msg.content.get('old'), msg.content.get('new')) + elif msg.subject == '%sResourceCapacityUpdated' % self.subject_prefix: + self.onResourceCapacityUpdated(msg.content.get('old'), msg.content.get('new')) else: logger.error("RADBBusListener.handleMessage: unknown subject: %s" %str(msg.subject)) def onTaskUpdated(self, old_task, new_task): '''onTaskUpdated is called upon receiving a TaskUpdated message. - :param old_task: dictionary with the updated task + :param old_task: dictionary with the task before the update :param new_task: dictionary with the updated task''' pass @@ -104,6 +111,19 @@ class RADBBusListener(AbstractBusListener): :param old_claim: dictionary with the deleted claim''' pass + def onResourceAvailabilityUpdated(self, old_availability, new_availability): + '''onResourceAvailabilityUpdated is called upon receiving a ResourceAvailabilityUpdated message. + :param old_availability: dictionary with the resource availability before the update + :param new_availability: dictionary with the updated availability''' + pass + + def onResourceCapacityUpdated(self, old_capacity, new_capacity): + '''onResourceCapacityUpdated is called upon receiving a ResourceCapacityUpdated message. + :param old_capacity: dictionary with the resource capacity before the update + :param new_capacity: dictionary with the updated capacity''' + pass + + if __name__ == '__main__': with RADBBusListener(broker=None) as listener: waitForInterrupt() diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py index bc4b99a1c6e..eb646347c81 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py @@ -30,7 +30,7 @@ import time import json from optparse import OptionParser -from lofar.common.postgres import PostgresListener, makePostgresNotificationQueries +from lofar.common.postgres import PostgresListener from lofar.messaging import EventMessage, ToBus from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_BUSNAME, DEFAULT_NOTIFICATION_PREFIX from lofar.common import dbcredentials @@ -60,6 +60,9 @@ class RADBPGListener(PostgresListener): self.subscribe('resource_claim_insert_with_resource_claim_view', self.onResourceClaimInserted) self.subscribe('resource_claim_delete', self.onResourceClaimDeleted) + self.subscribe('resource_availability_update', self.onResourceAvailabilityUpdated) + self.subscribe('resource_capacity_update', self.onResourceCapacityUpdated) + def onTaskUpdated(self, payload = None): self._sendNotification('TaskUpdated', payload, ['starttime', 'endtime']) @@ -83,6 +86,12 @@ class RADBPGListener(PostgresListener): def onResourceClaimDeleted(self, payload = None): self._sendNotification('ResourceClaimDeleted', payload) + def onResourceAvailabilityUpdated(self, payload = None): + self._sendNotification('ResourceAvailabilityUpdated', payload) + + def onResourceCapacityUpdated(self, payload = None): + self._sendNotification('ResourceCapacityUpdated', payload) + def __enter__(self): super(RADBPGListener, self).__enter__() self.event_bus.open() @@ -144,7 +153,7 @@ class RADBPGListener(PostgresListener): try: msg = EventMessage(context=self.notification_prefix + subject, content=content) - logger.info('Sending notification: ' + str(msg).replace('\n', ' ')) + logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) self.event_bus.send(msg) except Exception as e: logger.error(str(e)) @@ -173,14 +182,5 @@ def main(): broker=options.broker) as listener: listener.waitWhileListening() -def make_radb_postgres_notification_queries(): - print makePostgresNotificationQueries('resource_allocation', 'task', 'INSERT', view_for_row='task_view') - print makePostgresNotificationQueries('resource_allocation', 'task', 'UPDATE', view_for_row='task_view') - print makePostgresNotificationQueries('resource_allocation', 'task', 'DELETE') - print makePostgresNotificationQueries('resource_allocation', 'specification', 'UPDATE', view_for_row='task_view', view_selection_id='specification_id') - print makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'INSERT', view_for_row='resource_claim_view') - print makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'UPDATE', view_for_row='resource_claim_view') - print makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'DELETE') - if __name__ == '__main__': main() diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py index 3dc1a063ee4..46aad3a5bd5 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py @@ -47,10 +47,9 @@ CHANGE_INSERT_TYPE = 'insert' CHANGE_DELETE_TYPE = 'delete' class RADBChangesHandler(RADBBusListener): - def __init__(self, busname=DEFAULT_NOTIFICATION_BUSNAME, subject=DEFAULT_NOTIFICATION_SUBJECTS, broker=None, momrpc=None, **kwargs): + def __init__(self, busname=DEFAULT_NOTIFICATION_BUSNAME, subjects=DEFAULT_NOTIFICATION_SUBJECTS, broker=None, momrpc=None, **kwargs): """ RADBChangesHandler listens on the lofar notification message bus and keeps track of all the change notifications. - :param busname: valid Qpid address (default: lofar.ra.notification) :param broker: valid Qpid broker host (default: None, which means localhost) additional parameters in kwargs: options= <dict> Dictionary of options passed to QPID @@ -58,8 +57,7 @@ class RADBChangesHandler(RADBBusListener): numthreads= <int> Number of parallel threads processing messages (default: 1) verbose= <bool> Output extra logging over stdout (default: False) """ - address = "%s/%s" % (busname, subject) - super(RADBChangesHandler, self).__init__(busname=busname, subject=subject, broker=broker, **kwargs) + super(RADBChangesHandler, self).__init__(busname=busname, subjects=subjects, broker=broker, **kwargs) self._changes = [] self._lock = Lock() @@ -127,6 +125,14 @@ class RADBChangesHandler(RADBBusListener): claim_change = {'changeType':CHANGE_DELETE_TYPE, 'objectType':'resourceClaim', 'value':claim} self._handleChange(claim_change) + def onResourceAvailabilityUpdated(self, old_availability, new_availability): + claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceAvailability', 'value':new_availability} + self._handleChange(claim_change) + + def onResourceCapacityUpdated(self, old_capacity, new_capacity): + claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceCapacity', 'value':new_capacity} + self._handleChange(claim_change) + def getMostRecentChangeNumber(self): with self._lock: if self._changes: -- GitLab