Skip to content
Snippets Groups Projects
Commit 54f48b64 authored by Jan David Mol's avatar Jan David Mol
Browse files

SW-699: Use exchange,routing_key syntax for AbstractBusListener constructor,...

SW-699: Use exchange,routing_key syntax for AbstractBusListener constructor, drop use of **kwargs, fix IngestEventHandler as it was still propagating subjects to listen for
parent a8adaf93
No related branches found
No related tags found
No related merge requests found
Showing
with 29 additions and 68 deletions
......@@ -31,19 +31,14 @@ import logging
logger = logging.getLogger()
class IngestBusListener(AbstractBusListener):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
"""
IngestBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received.
Typical usage is to derive your own subclass from IngestBusListener and implement the specific on<SomeMessage> methods that you are interested in.
:param busname: valid Qpid address (default: lofar.lta.ingest.notification)
:param broker: valid Qpid broker host (default: None, which means localhost)
additional parameters in kwargs:
options= <dict> Dictionary of options passed to QPID
exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False)
numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False)
:param busname: valid Qpid address
:param broker: valid Qpid broker host
"""
super(IngestBusListener, self).__init__(address=busname, subject=DEFAULT_INGEST_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs)
super(IngestBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_INGEST_NOTIFICATION_PREFIX+"#", broker=broker)
def _handleMessage(self, msg):
......@@ -165,8 +160,8 @@ class IngestBusListener(AbstractBusListener):
class JobsMonitor(IngestBusListener):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, listen_for_all_jobs=True, **kwargs):
super(JobsMonitor, self).__init__(busname=busname, broker=broker, **kwargs)
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, listen_for_all_jobs=True):
super(JobsMonitor, self).__init__(busname=busname, broker=broker)
self.__monitored_jobs = set()
self.__listen_for_all_jobs = listen_for_all_jobs
......
......@@ -43,13 +43,13 @@ import logging
logger = logging.getLogger()
class IngestBusListenerForMomAdapter(IngestBusListener):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, momrpc=None, **kwargs):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, momrpc=None):
self._busname = busname
self._broker = broker
self._momrpc = momrpc
self._removed_export_ids = set() # keep track of which export_id's were removed, so we don't have to remove them again
super(IngestBusListenerForMomAdapter, self).__init__(busname=busname, broker=broker, **kwargs)
super(IngestBusListenerForMomAdapter, self).__init__(busname=busname, broker=broker)
def _handleMessage(self, msg):
try:
......
......@@ -20,8 +20,7 @@
from lofar.lta.ltastorageoverview import store
from lofar.lta.ingest.common.srm import *
from lofar.lta.ingest.client.ingestbuslistener import IngestBusListener
from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_SUBJECTS
from lofar.messaging import adaptNameToEnvironment, DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
import logging
logger = logging.getLogger(__name__)
......@@ -29,10 +28,9 @@ logger = logging.getLogger(__name__)
class IngestEventHandler(IngestBusListener):
def __init__(self, dbcreds,
busname=DEFAULT_BUSNAME,
subjects=DEFAULT_INGEST_NOTIFICATION_SUBJECTS,
broker=DEFAULT_BROKER):
self._dbcreds = dbcreds
super(IngestEventHandler, self).__init__(busname=busname, subjects=subjects, broker=broker)
super(IngestEventHandler, self).__init__(busname=busname, broker=broker)
def onJobFinished(self, job_dict):
"""onJobFinished is called upon receiving a JobFinished message.
......
......@@ -33,10 +33,9 @@ class TBBBusListener(AbstractBusListener):
TBBBusListener listens on the given notification message bus and calls (empty) on<SomeMessage> methods when such a message is received.
Typical usage is to derive your own subclass from TBBBusListener and implement the specific on<SomeMessage> methods that you are interested in.
:param busname: valid Qpid address
:param subjects: the subjects filter string to listen for.
:param broker: valid Qpid broker host
"""
super(TBBBusListener, self).__init__(address=busname, subject=DEFAULT_TBB_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs)
super(TBBBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_TBB_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs)
def _handleMessage(self, msg):
# try to handle an incoming message, and call the associated on<SomeMessage> method
......
......@@ -34,19 +34,14 @@ logger = logging.getLogger(__name__)
class QABusListener(AbstractBusListener):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
"""
QABusListener listens on the lofar qa message bus and calls (empty) on<SomeMessage> methods when such a message is received.
Typical usage is to derive your own subclass from QABusListener and implement the specific on<SomeMessage> methods that you are interested in.
:param address: valid Qpid address (default: lofar.otdb.status)
:param broker: valid Qpid broker host (default: None, which means localhost)
additional parameters in kwargs:
options= <dict> Dictionary of options passed to QPID
exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False)
numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False)
:param address: valid Qpid address
:param broker: valid Qpid broker host
"""
super(QABusListener, self).__init__(address=busname, subject=DEFAULT_QA_NOTIFICATION_SUBJECT_PREFIX+"#", broker=broker, **kwargs)
super(QABusListener, self).__init__(exchange=busname, routing_key=DEFAULT_QA_NOTIFICATION_SUBJECT_PREFIX+"#", broker=broker)
def _handleMessage(self, msg):
logger.debug("QABusListener.handleMessage: %s" %str(msg))
......
......@@ -31,8 +31,8 @@ import logging
logger = logging.getLogger(__name__)
class DataManagementBusListener(AbstractBusListener):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs):
super(DataManagementBusListener, self).__init__(address=busname, subject=DEFAULT_DM_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs)
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
super(DataManagementBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_DM_NOTIFICATION_PREFIX+"#", broker=broker)
def _handleMessage(self, msg):
logger.info("on%s: %s" % (msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' ')))
......
......@@ -38,19 +38,14 @@ logger = logging.getLogger(__name__)
class OTDBBusListener(AbstractBusListener):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, num_threads=1):
"""
OTDBBusListener listens on the lofar otdb message bus and calls (empty) on<SomeMessage> methods when such a message is received.
Typical usage is to derive your own subclass from OTDBBusListener and implement the specific on<SomeMessage> methods that you are interested in.
:param address: valid Qpid address (default: lofar.otdb.status)
:param broker: valid Qpid broker host (default: None, which means localhost)
additional parameters in kwargs:
options= <dict> Dictionary of options passed to QPID
exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False)
numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False)
"""
super(OTDBBusListener, self).__init__(address=busname, subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT, broker=broker, **kwargs)
super(OTDBBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_OTDB_NOTIFICATION_SUBJECT, num_threads=num_threads, broker=broker)
def _handleMessage(self, msg):
logger.debug("OTDBBusListener.handleMessage: %s" %str(msg))
......
......@@ -44,15 +44,10 @@ class RATaskSpecifiedBusListener(AbstractBusListener):
"""
RATaskSpecifiedBusListener listens on the lofar ra message bus and calls (empty) on<SomeMessage> methods when such a message is received.
Typical usage is to derive your own subclass from RATaskSpecifiedBusListener and implement the specific on<SomeMessage> methods that you are interested in.
:param address: valid Qpid address (default: lofar.otdb.status)
:param broker: valid Qpid broker host (default: None, which means localhost)
additional parameters in kwargs:
options= <dict> Dictionary of options passed to QPID
exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False)
numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False)
:param busname: valid Qpid address
:param broker: valid Qpid broker host
"""
super(RATaskSpecifiedBusListener, self).__init__(address=busname, subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT, broker=broker, **kwargs)
super(RATaskSpecifiedBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT, broker=broker)
def _handleMessage(self, msg):
logger.debug("RABusListener.handleMessage: %s" %str(msg))
......
......@@ -39,19 +39,14 @@ logger = logging.getLogger(__name__)
class RABusListener(AbstractBusListener):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
"""
RABusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received.
Typical usage is to derive your own subclass from RABusListener and implement the specific on<SomeMessage> methods that you are interested in.
:param busname: valid Qpid address (default: lofar.ra.notification)
:param broker: valid Qpid broker host (default: None, which means localhost)
additional parameters in kwargs:
options= <dict> Dictionary of options passed to QPID
exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False)
numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False)
"""
super(RABusListener, self).__init__(address=busname, subject=DEFAULT_RA_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs)
super(RABusListener, self).__init__(exchange=busname, routing_key=DEFAULT_RA_NOTIFICATION_PREFIX+"#", broker=broker)
def _handleMessage(self, msg):
......
......@@ -39,20 +39,15 @@ logger = logging.getLogger(__name__)
class RADBBusListener(AbstractBusListener):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, num_threads=1):
"""
RADBBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received.
Typical usage is to derive your own subclass from RADBBusListener and implement the specific on<SomeMessage> methods that you are interested in.
:param busname: valid Qpid address (default: lofar.ra.notification)
:param broker: valid Qpid broker host (default: None, which means localhost)
additional parameters in kwargs:
options= <dict> Dictionary of options passed to QPID
exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False)
numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False)
"""
super(RADBBusListener, self).__init__(address=busname, subject=DEFAULT_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs)
super(RADBBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_NOTIFICATION_PREFIX+"#", num_threads=num_threads, broker=broker)
def _handleMessage(self, msg):
......@@ -119,7 +114,7 @@ class RADBBusListener(AbstractBusListener):
if __name__ == '__main__':
with RADBBusListener(broker=None) as listener:
with RADBBusListener() as listener:
waitForInterrupt()
__all__ = ["RADBBusListener"]
......@@ -173,19 +173,13 @@ class OTDBTriggerListener(OTDBBusListener):
class TriggerNotificationListener(AbstractBusListener):
def __init__(self, momquery_rpc=MoMQueryRPC(), busname=DEFAULT_BUSNAME, broker=None, **kwargs):
def __init__(self, momquery_rpc=MoMQueryRPC(), busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
"""
TriggerNotificationListener listens on the lofar trigger message bus and emails when trigger gets submitted.
:param address: valid Qpid address (default: lofar.otdb.status)
:param broker: valid Qpid broker host (default: None, which means localhost)
additional parameters in kwargs:
options= <dict> Dictionary of options passed to QPID
exclusive= <bool> Create an exclusive binding so no other services can consume duplicate
messages (default: False)
numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False)
"""
super(TriggerNotificationListener, self).__init__(address=busname, subject=DEFAULT_TRIGGER_NOTIFICATION_SUBJECT, broker=broker, **kwargs)
super(TriggerNotificationListener, self).__init__(exchange=busname, routing_key=DEFAULT_TRIGGER_NOTIFICATION_SUBJECT, broker=broker)
self.mom_rpc_client = momquery_rpc
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment