From 54f48b64a761e422653c8a73a3e516cefcd3b87c Mon Sep 17 00:00:00 2001 From: Jan David Mol <mol@astron.nl> Date: Thu, 16 May 2019 13:44:26 +0000 Subject: [PATCH] SW-699: Use exchange,routing_key syntax for AbstractBusListener constructor, drop use of **kwargs, fix IngestEventHandler as it was still propagating subjects to listen for --- .../LTAIngestClient/lib/ingestbuslistener.py | 17 ++++++----------- .../lib/ingestmomadapter.py | 4 ++-- .../lib/ingesteventhandler.py | 6 ++---- .../TBB/TBBClient/lib/tbbbuslistener.py | 3 +-- QA/QA_Service/lib/QABusListener.py | 13 ++++--------- .../datamanagementbuslistener.py | 4 ++-- SAS/OTDB_Services/OTDBBusListener.py | 9 ++------- .../RATaskSpecifiedService/lib/RABusListener.py | 11 +++-------- .../ResourceAssigner/lib/rabuslistener.py | 9 ++------- .../radbbuslistener.py | 11 +++-------- .../Server/lib/TriggerEmailService.py | 10 ++-------- 11 files changed, 29 insertions(+), 68 deletions(-) diff --git a/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py b/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py index 47bf96d13a6..14db04f2c16 100644 --- a/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py +++ b/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py @@ -31,19 +31,14 @@ import logging logger = logging.getLogger() class IngestBusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs): + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): """ IngestBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from IngestBusListener and implement the specific on<SomeMessage> methods that you are interested in. - :param busname: valid Qpid address (default: lofar.lta.ingest.notification) - :param broker: valid Qpid broker host (default: None, which means localhost) - additional parameters in kwargs: - options= <dict> Dictionary of options passed to QPID - exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False) - numthreads= <int> Number of parallel threads processing messages (default: 1) - verbose= <bool> Output extra logging over stdout (default: False) + :param busname: valid Qpid address + :param broker: valid Qpid broker host """ - super(IngestBusListener, self).__init__(address=busname, subject=DEFAULT_INGEST_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs) + super(IngestBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_INGEST_NOTIFICATION_PREFIX+"#", broker=broker) def _handleMessage(self, msg): @@ -165,8 +160,8 @@ class IngestBusListener(AbstractBusListener): class JobsMonitor(IngestBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, listen_for_all_jobs=True, **kwargs): - super(JobsMonitor, self).__init__(busname=busname, broker=broker, **kwargs) + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, listen_for_all_jobs=True): + super(JobsMonitor, self).__init__(busname=busname, broker=broker) self.__monitored_jobs = set() self.__listen_for_all_jobs = listen_for_all_jobs diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py index 282a362db63..15036cf22f3 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py @@ -43,13 +43,13 @@ import logging logger = logging.getLogger() class IngestBusListenerForMomAdapter(IngestBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, momrpc=None, **kwargs): + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, momrpc=None): self._busname = busname self._broker = broker self._momrpc = momrpc self._removed_export_ids = set() # keep track of which export_id's were removed, so we don't have to remove them again - super(IngestBusListenerForMomAdapter, self).__init__(busname=busname, broker=broker, **kwargs) + super(IngestBusListenerForMomAdapter, self).__init__(busname=busname, broker=broker) def _handleMessage(self, msg): try: diff --git a/LTA/ltastorageoverview/lib/ingesteventhandler.py b/LTA/ltastorageoverview/lib/ingesteventhandler.py index a1402e76a14..856b6b02427 100755 --- a/LTA/ltastorageoverview/lib/ingesteventhandler.py +++ b/LTA/ltastorageoverview/lib/ingesteventhandler.py @@ -20,8 +20,7 @@ from lofar.lta.ltastorageoverview import store from lofar.lta.ingest.common.srm import * from lofar.lta.ingest.client.ingestbuslistener import IngestBusListener -from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_SUBJECTS -from lofar.messaging import adaptNameToEnvironment, DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME import logging logger = logging.getLogger(__name__) @@ -29,10 +28,9 @@ logger = logging.getLogger(__name__) class IngestEventHandler(IngestBusListener): def __init__(self, dbcreds, busname=DEFAULT_BUSNAME, - subjects=DEFAULT_INGEST_NOTIFICATION_SUBJECTS, broker=DEFAULT_BROKER): self._dbcreds = dbcreds - super(IngestEventHandler, self).__init__(busname=busname, subjects=subjects, broker=broker) + super(IngestEventHandler, self).__init__(busname=busname, broker=broker) def onJobFinished(self, job_dict): """onJobFinished is called upon receiving a JobFinished message. diff --git a/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py b/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py index 96762a03703..079fce6f0fe 100644 --- a/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py +++ b/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py @@ -33,10 +33,9 @@ class TBBBusListener(AbstractBusListener): TBBBusListener listens on the given notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from TBBBusListener and implement the specific on<SomeMessage> methods that you are interested in. :param busname: valid Qpid address - :param subjects: the subjects filter string to listen for. :param broker: valid Qpid broker host """ - super(TBBBusListener, self).__init__(address=busname, subject=DEFAULT_TBB_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs) + super(TBBBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_TBB_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs) def _handleMessage(self, msg): # try to handle an incoming message, and call the associated on<SomeMessage> method diff --git a/QA/QA_Service/lib/QABusListener.py b/QA/QA_Service/lib/QABusListener.py index 82439da3bcb..d0d5db7f6d7 100644 --- a/QA/QA_Service/lib/QABusListener.py +++ b/QA/QA_Service/lib/QABusListener.py @@ -34,19 +34,14 @@ logger = logging.getLogger(__name__) class QABusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs): + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): """ QABusListener listens on the lofar qa message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from QABusListener and implement the specific on<SomeMessage> methods that you are interested in. - :param address: valid Qpid address (default: lofar.otdb.status) - :param broker: valid Qpid broker host (default: None, which means localhost) - additional parameters in kwargs: - options= <dict> Dictionary of options passed to QPID - exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False) - numthreads= <int> Number of parallel threads processing messages (default: 1) - verbose= <bool> Output extra logging over stdout (default: False) + :param address: valid Qpid address + :param broker: valid Qpid broker host """ - super(QABusListener, self).__init__(address=busname, subject=DEFAULT_QA_NOTIFICATION_SUBJECT_PREFIX+"#", broker=broker, **kwargs) + super(QABusListener, self).__init__(exchange=busname, routing_key=DEFAULT_QA_NOTIFICATION_SUBJECT_PREFIX+"#", broker=broker) def _handleMessage(self, msg): logger.debug("QABusListener.handleMessage: %s" %str(msg)) diff --git a/SAS/DataManagement/DataManagementCommon/datamanagementbuslistener.py b/SAS/DataManagement/DataManagementCommon/datamanagementbuslistener.py index dbf5b2f9ac1..94d25ac5836 100644 --- a/SAS/DataManagement/DataManagementCommon/datamanagementbuslistener.py +++ b/SAS/DataManagement/DataManagementCommon/datamanagementbuslistener.py @@ -31,8 +31,8 @@ import logging logger = logging.getLogger(__name__) class DataManagementBusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs): - super(DataManagementBusListener, self).__init__(address=busname, subject=DEFAULT_DM_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs) + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + super(DataManagementBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_DM_NOTIFICATION_PREFIX+"#", broker=broker) def _handleMessage(self, msg): logger.info("on%s: %s" % (msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' '))) diff --git a/SAS/OTDB_Services/OTDBBusListener.py b/SAS/OTDB_Services/OTDBBusListener.py index a6a501e7695..125a8374323 100644 --- a/SAS/OTDB_Services/OTDBBusListener.py +++ b/SAS/OTDB_Services/OTDBBusListener.py @@ -38,19 +38,14 @@ logger = logging.getLogger(__name__) class OTDBBusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs): + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, num_threads=1): """ OTDBBusListener listens on the lofar otdb message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from OTDBBusListener and implement the specific on<SomeMessage> methods that you are interested in. :param address: valid Qpid address (default: lofar.otdb.status) :param broker: valid Qpid broker host (default: None, which means localhost) - additional parameters in kwargs: - options= <dict> Dictionary of options passed to QPID - exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False) - numthreads= <int> Number of parallel threads processing messages (default: 1) - verbose= <bool> Output extra logging over stdout (default: False) """ - super(OTDBBusListener, self).__init__(address=busname, subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT, broker=broker, **kwargs) + super(OTDBBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_OTDB_NOTIFICATION_SUBJECT, num_threads=num_threads, broker=broker) def _handleMessage(self, msg): logger.debug("OTDBBusListener.handleMessage: %s" %str(msg)) diff --git a/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RABusListener.py b/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RABusListener.py index 3977a3175b0..64d07b6318c 100644 --- a/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RABusListener.py +++ b/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RABusListener.py @@ -44,15 +44,10 @@ class RATaskSpecifiedBusListener(AbstractBusListener): """ RATaskSpecifiedBusListener listens on the lofar ra message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from RATaskSpecifiedBusListener and implement the specific on<SomeMessage> methods that you are interested in. - :param address: valid Qpid address (default: lofar.otdb.status) - :param broker: valid Qpid broker host (default: None, which means localhost) - additional parameters in kwargs: - options= <dict> Dictionary of options passed to QPID - exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False) - numthreads= <int> Number of parallel threads processing messages (default: 1) - verbose= <bool> Output extra logging over stdout (default: False) + :param busname: valid Qpid address + :param broker: valid Qpid broker host """ - super(RATaskSpecifiedBusListener, self).__init__(address=busname, subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT, broker=broker, **kwargs) + super(RATaskSpecifiedBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT, broker=broker) def _handleMessage(self, msg): logger.debug("RABusListener.handleMessage: %s" %str(msg)) diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/rabuslistener.py b/SAS/ResourceAssignment/ResourceAssigner/lib/rabuslistener.py index b2b44774b77..1115408a0cc 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/rabuslistener.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/rabuslistener.py @@ -39,19 +39,14 @@ logger = logging.getLogger(__name__) class RABusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs): + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): """ RABusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from RABusListener and implement the specific on<SomeMessage> methods that you are interested in. :param busname: valid Qpid address (default: lofar.ra.notification) :param broker: valid Qpid broker host (default: None, which means localhost) - additional parameters in kwargs: - options= <dict> Dictionary of options passed to QPID - exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False) - numthreads= <int> Number of parallel threads processing messages (default: 1) - verbose= <bool> Output extra logging over stdout (default: False) """ - super(RABusListener, self).__init__(address=busname, subject=DEFAULT_RA_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs) + super(RABusListener, self).__init__(exchange=busname, routing_key=DEFAULT_RA_NOTIFICATION_PREFIX+"#", broker=broker) def _handleMessage(self, msg): diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py index 6766435b6d8..dc3002b4919 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py @@ -39,20 +39,15 @@ logger = logging.getLogger(__name__) class RADBBusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs): + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, num_threads=1): """ RADBBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from RADBBusListener and implement the specific on<SomeMessage> methods that you are interested in. :param busname: valid Qpid address (default: lofar.ra.notification) :param broker: valid Qpid broker host (default: None, which means localhost) - additional parameters in kwargs: - options= <dict> Dictionary of options passed to QPID - exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False) - numthreads= <int> Number of parallel threads processing messages (default: 1) - verbose= <bool> Output extra logging over stdout (default: False) """ - super(RADBBusListener, self).__init__(address=busname, subject=DEFAULT_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs) + super(RADBBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_NOTIFICATION_PREFIX+"#", num_threads=num_threads, broker=broker) def _handleMessage(self, msg): @@ -119,7 +114,7 @@ class RADBBusListener(AbstractBusListener): if __name__ == '__main__': - with RADBBusListener(broker=None) as listener: + with RADBBusListener() as listener: waitForInterrupt() __all__ = ["RADBBusListener"] diff --git a/SAS/TriggerEmailService/Server/lib/TriggerEmailService.py b/SAS/TriggerEmailService/Server/lib/TriggerEmailService.py index 277b35b1164..e39fae71804 100644 --- a/SAS/TriggerEmailService/Server/lib/TriggerEmailService.py +++ b/SAS/TriggerEmailService/Server/lib/TriggerEmailService.py @@ -173,19 +173,13 @@ class OTDBTriggerListener(OTDBBusListener): class TriggerNotificationListener(AbstractBusListener): - def __init__(self, momquery_rpc=MoMQueryRPC(), busname=DEFAULT_BUSNAME, broker=None, **kwargs): + def __init__(self, momquery_rpc=MoMQueryRPC(), busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): """ TriggerNotificationListener listens on the lofar trigger message bus and emails when trigger gets submitted. :param address: valid Qpid address (default: lofar.otdb.status) :param broker: valid Qpid broker host (default: None, which means localhost) - additional parameters in kwargs: - options= <dict> Dictionary of options passed to QPID - exclusive= <bool> Create an exclusive binding so no other services can consume duplicate - messages (default: False) - numthreads= <int> Number of parallel threads processing messages (default: 1) - verbose= <bool> Output extra logging over stdout (default: False) """ - super(TriggerNotificationListener, self).__init__(address=busname, subject=DEFAULT_TRIGGER_NOTIFICATION_SUBJECT, broker=broker, **kwargs) + super(TriggerNotificationListener, self).__init__(exchange=busname, routing_key=DEFAULT_TRIGGER_NOTIFICATION_SUBJECT, broker=broker) self.mom_rpc_client = momquery_rpc -- GitLab