Skip to content
Snippets Groups Projects
Commit 7fa71baf authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8887: handle resource availability and capacity update notifications

parent d60ad2bc
No related branches found
No related tags found
No related merge requests found
...@@ -39,7 +39,7 @@ logger = logging.getLogger(__name__) ...@@ -39,7 +39,7 @@ logger = logging.getLogger(__name__)
class RADBBusListener(AbstractBusListener): class RADBBusListener(AbstractBusListener):
def __init__(self, busname=DEFAULT_NOTIFICATION_BUSNAME, subject=DEFAULT_NOTIFICATION_SUBJECTS, broker=None, **kwargs): def __init__(self, busname=DEFAULT_NOTIFICATION_BUSNAME, subjects=DEFAULT_NOTIFICATION_SUBJECTS, broker=None, **kwargs):
""" """
RADBBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. RADBBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received.
Typical usage is to derive your own subclass from RADBBusListener and implement the specific on<SomeMessage> methods that you are interested in. Typical usage is to derive your own subclass from RADBBusListener and implement the specific on<SomeMessage> methods that you are interested in.
...@@ -51,30 +51,37 @@ class RADBBusListener(AbstractBusListener): ...@@ -51,30 +51,37 @@ class RADBBusListener(AbstractBusListener):
numthreads= <int> Number of parallel threads processing messages (default: 1) numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False) verbose= <bool> Output extra logging over stdout (default: False)
""" """
address = "%s/%s" % (busname, subject) self.subject_prefix = (subjects.split('.')[0]+'.') if '.' in subjects else ''
address = "%s/%s" % (busname, subjects)
super(RADBBusListener, self).__init__(address, broker, **kwargs) super(RADBBusListener, self).__init__(address, broker, **kwargs)
def _handleMessage(self, msg): def _handleMessage(self, msg):
logger.debug("RADBBusListener.handleMessage: %s" %str(msg)) logger.info("on%s: %s" % (msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' ')))
if msg.subject == 'RADB.TaskUpdated': if msg.subject == '%sTaskUpdated' % self.subject_prefix:
self.onTaskUpdated(msg.content.get('old'), msg.content.get('new')) self.onTaskUpdated(msg.content.get('old'), msg.content.get('new'))
elif msg.subject == 'RADB.TaskInserted': elif msg.subject == '%sTaskInserted' % self.subject_prefix:
self.onTaskInserted(msg.content.get('new')) self.onTaskInserted(msg.content.get('new'))
elif msg.subject == 'RADB.TaskDeleted': elif msg.subject == '%sTaskDeleted' % self.subject_prefix:
self.onTaskDeleted(msg.content.get('old')) self.onTaskDeleted(msg.content.get('old'))
elif msg.subject == 'RADB.ResourceClaimUpdated': elif msg.subject == '%sResourceClaimUpdated' % self.subject_prefix:
self.onResourceClaimUpdated(msg.content.get('old'), msg.content.get('new')) self.onResourceClaimUpdated(msg.content.get('old'), msg.content.get('new'))
elif msg.subject == 'RADB.ResourceClaimInserted': elif msg.subject == '%sResourceClaimInserted' % self.subject_prefix:
self.onResourceClaimInserted(msg.content.get('new')) self.onResourceClaimInserted(msg.content.get('new'))
elif msg.subject == 'RADB.ResourceClaimDeleted': elif msg.subject == '%sResourceClaimDeleted' % self.subject_prefix:
self.onResourceClaimDeleted(msg.content.get('old')) self.onResourceClaimDeleted(msg.content.get('old'))
elif msg.subject == '%sResourceAvailabilityUpdated' % self.subject_prefix:
self.onResourceAvailabilityUpdated(msg.content.get('old'), msg.content.get('new'))
elif msg.subject == '%sResourceCapacityUpdated' % self.subject_prefix:
self.onResourceCapacityUpdated(msg.content.get('old'), msg.content.get('new'))
else: else:
logger.error("RADBBusListener.handleMessage: unknown subject: %s" %str(msg.subject)) logger.error("RADBBusListener.handleMessage: unknown subject: %s" %str(msg.subject))
def onTaskUpdated(self, old_task, new_task): def onTaskUpdated(self, old_task, new_task):
'''onTaskUpdated is called upon receiving a TaskUpdated message. '''onTaskUpdated is called upon receiving a TaskUpdated message.
:param old_task: dictionary with the updated task :param old_task: dictionary with the task before the update
:param new_task: dictionary with the updated task''' :param new_task: dictionary with the updated task'''
pass pass
...@@ -104,6 +111,19 @@ class RADBBusListener(AbstractBusListener): ...@@ -104,6 +111,19 @@ class RADBBusListener(AbstractBusListener):
:param old_claim: dictionary with the deleted claim''' :param old_claim: dictionary with the deleted claim'''
pass pass
def onResourceAvailabilityUpdated(self, old_availability, new_availability):
'''onResourceAvailabilityUpdated is called upon receiving a ResourceAvailabilityUpdated message.
:param old_availability: dictionary with the resource availability before the update
:param new_availability: dictionary with the updated availability'''
pass
def onResourceCapacityUpdated(self, old_capacity, new_capacity):
'''onResourceCapacityUpdated is called upon receiving a ResourceCapacityUpdated message.
:param old_capacity: dictionary with the resource capacity before the update
:param new_capacity: dictionary with the updated capacity'''
pass
if __name__ == '__main__': if __name__ == '__main__':
with RADBBusListener(broker=None) as listener: with RADBBusListener(broker=None) as listener:
waitForInterrupt() waitForInterrupt()
......
...@@ -30,7 +30,7 @@ import time ...@@ -30,7 +30,7 @@ import time
import json import json
from optparse import OptionParser from optparse import OptionParser
from lofar.common.postgres import PostgresListener, makePostgresNotificationQueries from lofar.common.postgres import PostgresListener
from lofar.messaging import EventMessage, ToBus from lofar.messaging import EventMessage, ToBus
from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_BUSNAME, DEFAULT_NOTIFICATION_PREFIX from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_BUSNAME, DEFAULT_NOTIFICATION_PREFIX
from lofar.common import dbcredentials from lofar.common import dbcredentials
...@@ -60,6 +60,9 @@ class RADBPGListener(PostgresListener): ...@@ -60,6 +60,9 @@ class RADBPGListener(PostgresListener):
self.subscribe('resource_claim_insert_with_resource_claim_view', self.onResourceClaimInserted) self.subscribe('resource_claim_insert_with_resource_claim_view', self.onResourceClaimInserted)
self.subscribe('resource_claim_delete', self.onResourceClaimDeleted) self.subscribe('resource_claim_delete', self.onResourceClaimDeleted)
self.subscribe('resource_availability_update', self.onResourceAvailabilityUpdated)
self.subscribe('resource_capacity_update', self.onResourceCapacityUpdated)
def onTaskUpdated(self, payload = None): def onTaskUpdated(self, payload = None):
self._sendNotification('TaskUpdated', payload, ['starttime', 'endtime']) self._sendNotification('TaskUpdated', payload, ['starttime', 'endtime'])
...@@ -83,6 +86,12 @@ class RADBPGListener(PostgresListener): ...@@ -83,6 +86,12 @@ class RADBPGListener(PostgresListener):
def onResourceClaimDeleted(self, payload = None): def onResourceClaimDeleted(self, payload = None):
self._sendNotification('ResourceClaimDeleted', payload) self._sendNotification('ResourceClaimDeleted', payload)
def onResourceAvailabilityUpdated(self, payload = None):
self._sendNotification('ResourceAvailabilityUpdated', payload)
def onResourceCapacityUpdated(self, payload = None):
self._sendNotification('ResourceCapacityUpdated', payload)
def __enter__(self): def __enter__(self):
super(RADBPGListener, self).__enter__() super(RADBPGListener, self).__enter__()
self.event_bus.open() self.event_bus.open()
...@@ -144,7 +153,7 @@ class RADBPGListener(PostgresListener): ...@@ -144,7 +153,7 @@ class RADBPGListener(PostgresListener):
try: try:
msg = EventMessage(context=self.notification_prefix + subject, content=content) msg = EventMessage(context=self.notification_prefix + subject, content=content)
logger.info('Sending notification: ' + str(msg).replace('\n', ' ')) logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' ')))
self.event_bus.send(msg) self.event_bus.send(msg)
except Exception as e: except Exception as e:
logger.error(str(e)) logger.error(str(e))
...@@ -173,14 +182,5 @@ def main(): ...@@ -173,14 +182,5 @@ def main():
broker=options.broker) as listener: broker=options.broker) as listener:
listener.waitWhileListening() listener.waitWhileListening()
def make_radb_postgres_notification_queries():
print makePostgresNotificationQueries('resource_allocation', 'task', 'INSERT', view_for_row='task_view')
print makePostgresNotificationQueries('resource_allocation', 'task', 'UPDATE', view_for_row='task_view')
print makePostgresNotificationQueries('resource_allocation', 'task', 'DELETE')
print makePostgresNotificationQueries('resource_allocation', 'specification', 'UPDATE', view_for_row='task_view', view_selection_id='specification_id')
print makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'INSERT', view_for_row='resource_claim_view')
print makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'UPDATE', view_for_row='resource_claim_view')
print makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'DELETE')
if __name__ == '__main__': if __name__ == '__main__':
main() main()
...@@ -47,10 +47,9 @@ CHANGE_INSERT_TYPE = 'insert' ...@@ -47,10 +47,9 @@ CHANGE_INSERT_TYPE = 'insert'
CHANGE_DELETE_TYPE = 'delete' CHANGE_DELETE_TYPE = 'delete'
class RADBChangesHandler(RADBBusListener): class RADBChangesHandler(RADBBusListener):
def __init__(self, busname=DEFAULT_NOTIFICATION_BUSNAME, subject=DEFAULT_NOTIFICATION_SUBJECTS, broker=None, momrpc=None, **kwargs): def __init__(self, busname=DEFAULT_NOTIFICATION_BUSNAME, subjects=DEFAULT_NOTIFICATION_SUBJECTS, broker=None, momrpc=None, **kwargs):
""" """
RADBChangesHandler listens on the lofar notification message bus and keeps track of all the change notifications. RADBChangesHandler listens on the lofar notification message bus and keeps track of all the change notifications.
:param busname: valid Qpid address (default: lofar.ra.notification)
:param broker: valid Qpid broker host (default: None, which means localhost) :param broker: valid Qpid broker host (default: None, which means localhost)
additional parameters in kwargs: additional parameters in kwargs:
options= <dict> Dictionary of options passed to QPID options= <dict> Dictionary of options passed to QPID
...@@ -58,8 +57,7 @@ class RADBChangesHandler(RADBBusListener): ...@@ -58,8 +57,7 @@ class RADBChangesHandler(RADBBusListener):
numthreads= <int> Number of parallel threads processing messages (default: 1) numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False) verbose= <bool> Output extra logging over stdout (default: False)
""" """
address = "%s/%s" % (busname, subject) super(RADBChangesHandler, self).__init__(busname=busname, subjects=subjects, broker=broker, **kwargs)
super(RADBChangesHandler, self).__init__(busname=busname, subject=subject, broker=broker, **kwargs)
self._changes = [] self._changes = []
self._lock = Lock() self._lock = Lock()
...@@ -127,6 +125,14 @@ class RADBChangesHandler(RADBBusListener): ...@@ -127,6 +125,14 @@ class RADBChangesHandler(RADBBusListener):
claim_change = {'changeType':CHANGE_DELETE_TYPE, 'objectType':'resourceClaim', 'value':claim} claim_change = {'changeType':CHANGE_DELETE_TYPE, 'objectType':'resourceClaim', 'value':claim}
self._handleChange(claim_change) self._handleChange(claim_change)
def onResourceAvailabilityUpdated(self, old_availability, new_availability):
claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceAvailability', 'value':new_availability}
self._handleChange(claim_change)
def onResourceCapacityUpdated(self, old_capacity, new_capacity):
claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceCapacity', 'value':new_capacity}
self._handleChange(claim_change)
def getMostRecentChangeNumber(self): def getMostRecentChangeNumber(self):
with self._lock: with self._lock:
if self._changes: if self._changes:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment