diff --git a/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py b/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py index 47bf96d13a6975a0c5059d22ac71ec0465f5f85b..14db04f2c1646e50ea8779be3b39989b42232606 100644 --- a/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py +++ b/LTA/LTAIngest/LTAIngestClient/lib/ingestbuslistener.py @@ -31,19 +31,14 @@ import logging logger = logging.getLogger() class IngestBusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs): + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): """ IngestBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from IngestBusListener and implement the specific on<SomeMessage> methods that you are interested in. - :param busname: valid Qpid address (default: lofar.lta.ingest.notification) - :param broker: valid Qpid broker host (default: None, which means localhost) - additional parameters in kwargs: - options= <dict> Dictionary of options passed to QPID - exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False) - numthreads= <int> Number of parallel threads processing messages (default: 1) - verbose= <bool> Output extra logging over stdout (default: False) + :param busname: valid Qpid address + :param broker: valid Qpid broker host """ - super(IngestBusListener, self).__init__(address=busname, subject=DEFAULT_INGEST_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs) + super(IngestBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_INGEST_NOTIFICATION_PREFIX+"#", broker=broker) def _handleMessage(self, msg): @@ -165,8 +160,8 @@ class IngestBusListener(AbstractBusListener): class JobsMonitor(IngestBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, listen_for_all_jobs=True, **kwargs): - super(JobsMonitor, self).__init__(busname=busname, broker=broker, **kwargs) + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, listen_for_all_jobs=True): + super(JobsMonitor, self).__init__(busname=busname, broker=broker) self.__monitored_jobs = set() self.__listen_for_all_jobs = listen_for_all_jobs diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py index 282a362db63840354087d8394b1d18ade3545ef1..15036cf22f36dbf717a6cca0097bca7fe5961021 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestmomadapter.py @@ -43,13 +43,13 @@ import logging logger = logging.getLogger() class IngestBusListenerForMomAdapter(IngestBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, momrpc=None, **kwargs): + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, momrpc=None): self._busname = busname self._broker = broker self._momrpc = momrpc self._removed_export_ids = set() # keep track of which export_id's were removed, so we don't have to remove them again - super(IngestBusListenerForMomAdapter, self).__init__(busname=busname, broker=broker, **kwargs) + super(IngestBusListenerForMomAdapter, self).__init__(busname=busname, broker=broker) def _handleMessage(self, msg): try: diff --git a/LTA/ltastorageoverview/lib/ingesteventhandler.py b/LTA/ltastorageoverview/lib/ingesteventhandler.py index a1402e76a14bc84c3dc6383075ac21fbc2160707..856b6b02427fa3ac117a10411328aaa453faeff4 100755 --- a/LTA/ltastorageoverview/lib/ingesteventhandler.py +++ b/LTA/ltastorageoverview/lib/ingesteventhandler.py @@ -20,8 +20,7 @@ from lofar.lta.ltastorageoverview import store from lofar.lta.ingest.common.srm import * from lofar.lta.ingest.client.ingestbuslistener import IngestBusListener -from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_SUBJECTS -from lofar.messaging import adaptNameToEnvironment, DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME import logging logger = logging.getLogger(__name__) @@ -29,10 +28,9 @@ logger = logging.getLogger(__name__) class IngestEventHandler(IngestBusListener): def __init__(self, dbcreds, busname=DEFAULT_BUSNAME, - subjects=DEFAULT_INGEST_NOTIFICATION_SUBJECTS, broker=DEFAULT_BROKER): self._dbcreds = dbcreds - super(IngestEventHandler, self).__init__(busname=busname, subjects=subjects, broker=broker) + super(IngestEventHandler, self).__init__(busname=busname, broker=broker) def onJobFinished(self, job_dict): """onJobFinished is called upon receiving a JobFinished message. diff --git a/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py b/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py index 96762a0370337e80c8def8a811b3fc8f0be51dd0..079fce6f0fecf45823351a1700a466dbd8a4dfa1 100644 --- a/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py +++ b/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py @@ -33,10 +33,9 @@ class TBBBusListener(AbstractBusListener): TBBBusListener listens on the given notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from TBBBusListener and implement the specific on<SomeMessage> methods that you are interested in. :param busname: valid Qpid address - :param subjects: the subjects filter string to listen for. :param broker: valid Qpid broker host """ - super(TBBBusListener, self).__init__(address=busname, subject=DEFAULT_TBB_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs) + super(TBBBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_TBB_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs) def _handleMessage(self, msg): # try to handle an incoming message, and call the associated on<SomeMessage> method diff --git a/QA/QA_Service/lib/QABusListener.py b/QA/QA_Service/lib/QABusListener.py index 82439da3bcb8e14500b4dd69f0c91fbde24749cd..d0d5db7f6d751f85cc4e1b44260d1476cf645285 100644 --- a/QA/QA_Service/lib/QABusListener.py +++ b/QA/QA_Service/lib/QABusListener.py @@ -34,19 +34,14 @@ logger = logging.getLogger(__name__) class QABusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs): + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): """ QABusListener listens on the lofar qa message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from QABusListener and implement the specific on<SomeMessage> methods that you are interested in. - :param address: valid Qpid address (default: lofar.otdb.status) - :param broker: valid Qpid broker host (default: None, which means localhost) - additional parameters in kwargs: - options= <dict> Dictionary of options passed to QPID - exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False) - numthreads= <int> Number of parallel threads processing messages (default: 1) - verbose= <bool> Output extra logging over stdout (default: False) + :param address: valid Qpid address + :param broker: valid Qpid broker host """ - super(QABusListener, self).__init__(address=busname, subject=DEFAULT_QA_NOTIFICATION_SUBJECT_PREFIX+"#", broker=broker, **kwargs) + super(QABusListener, self).__init__(exchange=busname, routing_key=DEFAULT_QA_NOTIFICATION_SUBJECT_PREFIX+"#", broker=broker) def _handleMessage(self, msg): logger.debug("QABusListener.handleMessage: %s" %str(msg)) diff --git a/SAS/DataManagement/DataManagementCommon/datamanagementbuslistener.py b/SAS/DataManagement/DataManagementCommon/datamanagementbuslistener.py index dbf5b2f9ac1a83169fa83d5664dfb68d41460091..94d25ac583628468fe5c700a62608ecda10a562c 100644 --- a/SAS/DataManagement/DataManagementCommon/datamanagementbuslistener.py +++ b/SAS/DataManagement/DataManagementCommon/datamanagementbuslistener.py @@ -31,8 +31,8 @@ import logging logger = logging.getLogger(__name__) class DataManagementBusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs): - super(DataManagementBusListener, self).__init__(address=busname, subject=DEFAULT_DM_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs) + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + super(DataManagementBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_DM_NOTIFICATION_PREFIX+"#", broker=broker) def _handleMessage(self, msg): logger.info("on%s: %s" % (msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' '))) diff --git a/SAS/OTDB_Services/OTDBBusListener.py b/SAS/OTDB_Services/OTDBBusListener.py index a6a501e76950476ffc95b99338f1f90be0b5c6ec..125a8374323983edbedee26b1c86db923edd18f5 100644 --- a/SAS/OTDB_Services/OTDBBusListener.py +++ b/SAS/OTDB_Services/OTDBBusListener.py @@ -38,19 +38,14 @@ logger = logging.getLogger(__name__) class OTDBBusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs): + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, num_threads=1): """ OTDBBusListener listens on the lofar otdb message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from OTDBBusListener and implement the specific on<SomeMessage> methods that you are interested in. :param address: valid Qpid address (default: lofar.otdb.status) :param broker: valid Qpid broker host (default: None, which means localhost) - additional parameters in kwargs: - options= <dict> Dictionary of options passed to QPID - exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False) - numthreads= <int> Number of parallel threads processing messages (default: 1) - verbose= <bool> Output extra logging over stdout (default: False) """ - super(OTDBBusListener, self).__init__(address=busname, subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT, broker=broker, **kwargs) + super(OTDBBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_OTDB_NOTIFICATION_SUBJECT, num_threads=num_threads, broker=broker) def _handleMessage(self, msg): logger.debug("OTDBBusListener.handleMessage: %s" %str(msg)) diff --git a/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RABusListener.py b/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RABusListener.py index 3977a3175b0f861df6944927deb24067ed8a66ed..64d07b6318c92a623423fb1423bd219af5267f96 100644 --- a/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RABusListener.py +++ b/SAS/ResourceAssignment/RATaskSpecifiedService/lib/RABusListener.py @@ -44,15 +44,10 @@ class RATaskSpecifiedBusListener(AbstractBusListener): """ RATaskSpecifiedBusListener listens on the lofar ra message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from RATaskSpecifiedBusListener and implement the specific on<SomeMessage> methods that you are interested in. - :param address: valid Qpid address (default: lofar.otdb.status) - :param broker: valid Qpid broker host (default: None, which means localhost) - additional parameters in kwargs: - options= <dict> Dictionary of options passed to QPID - exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False) - numthreads= <int> Number of parallel threads processing messages (default: 1) - verbose= <bool> Output extra logging over stdout (default: False) + :param busname: valid Qpid address + :param broker: valid Qpid broker host """ - super(RATaskSpecifiedBusListener, self).__init__(address=busname, subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT, broker=broker, **kwargs) + super(RATaskSpecifiedBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT, broker=broker) def _handleMessage(self, msg): logger.debug("RABusListener.handleMessage: %s" %str(msg)) diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/rabuslistener.py b/SAS/ResourceAssignment/ResourceAssigner/lib/rabuslistener.py index b2b44774b77ee57bd2be3619a5e6a826db7b0db9..1115408a0cc44196e8afd18d728c56bea7ba5cf1 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/rabuslistener.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/rabuslistener.py @@ -39,19 +39,14 @@ logger = logging.getLogger(__name__) class RABusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs): + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): """ RABusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from RABusListener and implement the specific on<SomeMessage> methods that you are interested in. :param busname: valid Qpid address (default: lofar.ra.notification) :param broker: valid Qpid broker host (default: None, which means localhost) - additional parameters in kwargs: - options= <dict> Dictionary of options passed to QPID - exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False) - numthreads= <int> Number of parallel threads processing messages (default: 1) - verbose= <bool> Output extra logging over stdout (default: False) """ - super(RABusListener, self).__init__(address=busname, subject=DEFAULT_RA_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs) + super(RABusListener, self).__init__(exchange=busname, routing_key=DEFAULT_RA_NOTIFICATION_PREFIX+"#", broker=broker) def _handleMessage(self, msg): diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py index 6766435b6d85d20345922caad81b03085065a514..dc3002b49199ce19d741591f72b8258a71554de8 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py @@ -39,20 +39,15 @@ logger = logging.getLogger(__name__) class RADBBusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs): + def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, num_threads=1): """ RADBBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from RADBBusListener and implement the specific on<SomeMessage> methods that you are interested in. :param busname: valid Qpid address (default: lofar.ra.notification) :param broker: valid Qpid broker host (default: None, which means localhost) - additional parameters in kwargs: - options= <dict> Dictionary of options passed to QPID - exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False) - numthreads= <int> Number of parallel threads processing messages (default: 1) - verbose= <bool> Output extra logging over stdout (default: False) """ - super(RADBBusListener, self).__init__(address=busname, subject=DEFAULT_NOTIFICATION_PREFIX+"#", broker=broker, **kwargs) + super(RADBBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_NOTIFICATION_PREFIX+"#", num_threads=num_threads, broker=broker) def _handleMessage(self, msg): @@ -119,7 +114,7 @@ class RADBBusListener(AbstractBusListener): if __name__ == '__main__': - with RADBBusListener(broker=None) as listener: + with RADBBusListener() as listener: waitForInterrupt() __all__ = ["RADBBusListener"] diff --git a/SAS/TriggerEmailService/Server/lib/TriggerEmailService.py b/SAS/TriggerEmailService/Server/lib/TriggerEmailService.py index 277b35b116478d09f63061e442ebeecc046f9103..e39fae718043373e98d4f103eb1921d5c420ddf3 100644 --- a/SAS/TriggerEmailService/Server/lib/TriggerEmailService.py +++ b/SAS/TriggerEmailService/Server/lib/TriggerEmailService.py @@ -173,19 +173,13 @@ class OTDBTriggerListener(OTDBBusListener): class TriggerNotificationListener(AbstractBusListener): - def __init__(self, momquery_rpc=MoMQueryRPC(), busname=DEFAULT_BUSNAME, broker=None, **kwargs): + def __init__(self, momquery_rpc=MoMQueryRPC(), busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): """ TriggerNotificationListener listens on the lofar trigger message bus and emails when trigger gets submitted. :param address: valid Qpid address (default: lofar.otdb.status) :param broker: valid Qpid broker host (default: None, which means localhost) - additional parameters in kwargs: - options= <dict> Dictionary of options passed to QPID - exclusive= <bool> Create an exclusive binding so no other services can consume duplicate - messages (default: False) - numthreads= <int> Number of parallel threads processing messages (default: 1) - verbose= <bool> Output extra logging over stdout (default: False) """ - super(TriggerNotificationListener, self).__init__(address=busname, subject=DEFAULT_TRIGGER_NOTIFICATION_SUBJECT, broker=broker, **kwargs) + super(TriggerNotificationListener, self).__init__(exchange=busname, routing_key=DEFAULT_TRIGGER_NOTIFICATION_SUBJECT, broker=broker) self.mom_rpc_client = momquery_rpc