From 658ec7ef54caa8cfc283a8dc50feac6c0b92fa2c Mon Sep 17 00:00:00 2001
From: Jorrit Schaap <schaap@astron.nl>
Date: Fri, 19 Apr 2019 07:01:05 +0000
Subject: [PATCH] SW-658: less verbose connect/disconnect logging

---
 LCS/Messaging/python/messaging/RPC.py        |  2 +-
 LCS/Messaging/python/messaging/Service.py    | 32 +-----------
 LCS/Messaging/python/messaging/messagebus.py | 53 +++++++++++---------
 3 files changed, 32 insertions(+), 55 deletions(-)

diff --git a/LCS/Messaging/python/messaging/RPC.py b/LCS/Messaging/python/messaging/RPC.py
index b56ed46ea57..4f142cece33 100644
--- a/LCS/Messaging/python/messaging/RPC.py
+++ b/LCS/Messaging/python/messaging/RPC.py
@@ -151,7 +151,7 @@ class RPC():
         HasArgs, HasKwArgs = _analyze_args(args, kwargs)
 
         with TemporaryQueue(self.broker) as tmp_queue:
-            with tmp_queue.create_frombus() as reply_receiver:
+            with tmp_queue.create_frombus(connection_log_level=logging.DEBUG) as reply_receiver:
                 request_msg = RequestMessage(content=Content, reply_to=reply_receiver.address,
                                              has_args=HasArgs, has_kwargs=HasKwArgs)
                 if timeout:
diff --git a/LCS/Messaging/python/messaging/Service.py b/LCS/Messaging/python/messaging/Service.py
index 94805154b09..94f9d165736 100644
--- a/LCS/Messaging/python/messaging/Service.py
+++ b/LCS/Messaging/python/messaging/Service.py
@@ -20,7 +20,7 @@
 # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
 #
 
-from .messagebus import ToBus, FromBus, AbstractBusListener
+from .messagebus import ToBus, AbstractBusListener
 from .messages import ReplyMessage, RequestMessage
 from .exceptions import MessageBusError, MessageFactoryError
 import threading
@@ -124,34 +124,6 @@ class Service(AbstractBusListener):
 
         super(Service, self).__init__(address, broker, **kwargs)
 
-    def start_listening(self, numthreads=None):
-        """
-        Start the background threads and process incoming messages.
-        """
-        if self.isListening():
-            return
-
-        # only on a 'bus' we already connect the reply_bus
-        #if self.busname:
-        #    self.reply_bus = ToBus(self.busname, broker=self.broker)
-        #    self.reply_bus.open()
-        #else:
-        #    self.reply_bus=None
-
-        # create listener FromBus in super class
-        super(Service, self).start_listening(numthreads=numthreads)
-
-    def stop_listening(self):
-        """
-        Stop the background threads that listen to incoming messages.
-        """
-        #if isinstance(self.reply_bus, ToBus):
-        #    self.reply_bus.close()
-        #    self.reply_bus=None
-
-        # close the listeners
-        super(Service, self).stop_listening()
-
     def _create_thread_args(self, index):
         # set up service_handler
         if str(type(self.service_handler)) == "<class 'instancemethod'>" or \
@@ -188,7 +160,7 @@ class Service(AbstractBusListener):
 
         # send the result to the RPC client
         try:
-            with ToBus(reply_to, broker=self.broker) as dest:
+            with ToBus(reply_to, broker=self.broker, connection_log_level=logging.DEBUG) as dest:
                 dest.send(reply_msg)
         except MessageBusError as e:
             logger.error("Failed to send reply messgage to reply address %s. Error: %s", reply_to, e)
diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py
index afe09722c8d..978759c13ba 100644
--- a/LCS/Messaging/python/messaging/messagebus.py
+++ b/LCS/Messaging/python/messaging/messagebus.py
@@ -75,7 +75,7 @@ class _AbstractBus():
     but that of __new__().
     """
 
-    def __init__(self, address, broker=None):
+    def __init__(self, address, broker=None, connection_log_level=logging.INFO):
         """
         Initializer.
         :param address: valid Qpid address
@@ -84,6 +84,7 @@ class _AbstractBus():
         self.address, self.subject = _AbstractBus._split_address_and_subject(address)
         self.broker = broker if broker else DEFAULT_BROKER
         self._connected = False
+        self._connection_log_level = connection_log_level
 
     @staticmethod
     def _split_address_and_subject(address):
@@ -188,44 +189,46 @@ class FromBus(_AbstractBus):
     but that of __new__().
     """
 
-    def __init__(self, address, broker=None, broker_options=None):
+    def __init__(self, address, broker=None, broker_options=None, connection_log_level=logging.INFO):
         """
         Initializer.
         :param address: valid Qpid address
         :param broker: valid Qpid broker URL, e.g. "localhost:5672"
         :param broker_options: OBSOLETE
         """
-        super(FromBus, self).__init__(address=address, broker=broker)
+        super(FromBus, self).__init__(address=address, broker=broker, connection_log_level=connection_log_level)
 
         if broker_options:
             logger.warning("broker_options are obsolete. address=%s broker=%s broker_options=%s",
                            self.address, self.broker, broker_options)
 
     def _connect_to_endpoint(self):
-        logger.debug("[FromBus] Creating receiver for bus: %s with subject: %s on broker: %s" % (self.address,
-                                                                                                 self.subject,
-                                                                                                 self.broker))
+        logger.debug("[FromBus] Connecting receiver to bus: %s with subject: %s on broker: %s" % (self.address,
+                                                                                                  self.subject,
+                                                                                                  self.broker))
 
         self._receiver = self.connection.create_receiver(address=self.address,
                                                          credit=DEFAULT_RECEIVER_CAPACITY,
                                                          options=_ProtonSubjectFilter(self.subject) if self.subject else None)
 
-        logger.info("[FromBus] Created receiver for bus: %s with subject: %s on broker: %s" % (self.address,
+        logger.log(self._connection_log_level,
+                   "[FromBus] Connected receiver to bus: %s with subject: %s on broker: %s" % (self.address,
                                                                                                self.subject,
                                                                                                self.broker))
 
     def _disconnect_from_endpoint(self):
         if self._receiver is not None:
-            logger.debug("[FromBus] Disconnecting receiver for bus: %s with subject: %s on broker: %s" % (self.address,
-                                                                                                          self.subject,
-                                                                                                          self.broker))
+            logger.debug("[FromBus] Disconnecting receiver from bus: %s with subject: %s on broker: %s" % (self.address,
+                                                                                                           self.subject,
+                                                                                                           self.broker))
 
             self._receiver.close()
             self._receiver = None
 
-            logger.info("[FromBus] Disconnecting receiver for bus: %s with subject: %s on broker: %s" % (self.address,
-                                                                                                         self.subject,
-                                                                                                         self.broker))
+            logger.log(self._connection_log_level,
+                       "[FromBus] Disconnected receiver from bus: %s with subject: %s on broker: %s" % (self.address,
+                                                                                                        self.subject,
+                                                                                                        self.broker))
 
     def receive(self, timeout=DEFAULT_TIMEOUT, logDebugMessages=False):
         """
@@ -352,32 +355,34 @@ class ToBus(_AbstractBus):
     but that of __new__().
     """
 
-    def __init__(self, address, broker=None, broker_options=None):
+    def __init__(self, address, broker=None, broker_options=None, connection_log_level=logging.INFO):
         """
         Initializer.
         :param address: valid Qpid address
         :param broker: valid Qpid broker URL, e.g. "localhost:5672"
         :param broker_options: OBSOLETE
         """
-        super(ToBus, self).__init__(address=address, broker=broker)
+        super(ToBus, self).__init__(address=address, broker=broker, connection_log_level=connection_log_level)
 
         if broker_options:
             logger.warning("broker_options are obsolete. address=%s broker=%s broker_options=%s",
                            self.address, self.broker, broker_options)
 
     def _connect_to_endpoint(self):
-        logger.debug("[ToBus] Creating sender for bus: %s on broker: %s" % (self.address, self.broker))
+        logger.debug("[ToBus] Connecting sender to bus: %s on broker: %s" % (self.address, self.broker))
 
         self._sender = self.connection.create_sender(address=self.address)
 
-        logger.info("[ToBus] Created sender for bus: %s on broker: %s" % (self.address, self.broker))
+        logger.log(self._connection_log_level,
+                   "[ToBus] Connected sender to bus: %s on broker: %s" % (self.address, self.broker))
 
     def _disconnect_from_endpoint(self):
         if self._sender is not None:
             logger.debug("[ToBus] Disconnecting sender from bus: %s on broker: %s" % (self.address, self.broker))
             self._sender.close()
             self._sender = None
-            logger.info("[ToBus] Disconnected sender from bus: %s on broker: %s" % (self.address, self.broker))
+            logger.log(self._connection_log_level,
+                       "[ToBus] Disconnected sender from bus: %s on broker: %s" % (self.address, self.broker))
 
     def send(self, message, timeout=DEFAULT_TIMEOUT):
         """
@@ -480,21 +485,22 @@ class TemporaryQueue(object):
     def __str__(self):
         return "TemporaryQueue address=%s".format(self.address)
 
-    def create_frombus(self, subject=None):
+    def create_frombus(self, subject=None, connection_log_level=logging.INFO):
         """
         Factory method to create a FromBus instance which is connected to this TemporaryQueue
         :param subject: Optional subject string to filter for. Only messages which match this subject are received.
         :return: FromBus
         """
         return FromBus(broker=self.broker,
-                       address="%s/%s" % (self.address, subject) if subject else self.address)
+                       address="%s/%s" % (self.address, subject) if subject else self.address,
+                       connection_log_level=connection_log_level)
 
-    def create_tobus(self):
+    def create_tobus(self, connection_log_level=logging.INFO):
         """
         Factory method to create a ToBus instance which is connected to this TemporaryQueue
         :return: ToBus
         """
-        return ToBus(broker=self.broker, address=self.address)
+        return ToBus(broker=self.broker, address=self.address, connection_log_level=connection_log_level)
 
 
 class AbstractBusListener(object):
@@ -517,7 +523,6 @@ class AbstractBusListener(object):
         self._listening       = False
         self._numthreads      = kwargs.pop("numthreads", 1)
         self.verbose          = kwargs.pop("verbose", False)
-        self.frombus_options  = {}
 
         if len(kwargs):
             raise AttributeError("Unexpected argument passed to AbstractBusListener constructor: %s", kwargs)
@@ -542,7 +547,7 @@ class AbstractBusListener(object):
         if self._listening == True:
             return
 
-        self._bus_listener  = FromBus(self.address, broker=self.broker, broker_options=self.frombus_options)
+        self._bus_listener  = FromBus(self.address, broker=self.broker, connection_log_level=logging.INFO)
         self._bus_listener.open()
 
         if numthreads != None:
-- 
GitLab