Skip to content
Snippets Groups Projects
Commit 658ec7ef authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-658: less verbose connect/disconnect logging

parent ce56ca67
No related branches found
No related tags found
No related merge requests found
......@@ -151,7 +151,7 @@ class RPC():
HasArgs, HasKwArgs = _analyze_args(args, kwargs)
with TemporaryQueue(self.broker) as tmp_queue:
with tmp_queue.create_frombus() as reply_receiver:
with tmp_queue.create_frombus(connection_log_level=logging.DEBUG) as reply_receiver:
request_msg = RequestMessage(content=Content, reply_to=reply_receiver.address,
has_args=HasArgs, has_kwargs=HasKwArgs)
if timeout:
......
......@@ -20,7 +20,7 @@
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
from .messagebus import ToBus, FromBus, AbstractBusListener
from .messagebus import ToBus, AbstractBusListener
from .messages import ReplyMessage, RequestMessage
from .exceptions import MessageBusError, MessageFactoryError
import threading
......@@ -124,34 +124,6 @@ class Service(AbstractBusListener):
super(Service, self).__init__(address, broker, **kwargs)
def start_listening(self, numthreads=None):
"""
Start the background threads and process incoming messages.
"""
if self.isListening():
return
# only on a 'bus' we already connect the reply_bus
#if self.busname:
# self.reply_bus = ToBus(self.busname, broker=self.broker)
# self.reply_bus.open()
#else:
# self.reply_bus=None
# create listener FromBus in super class
super(Service, self).start_listening(numthreads=numthreads)
def stop_listening(self):
"""
Stop the background threads that listen to incoming messages.
"""
#if isinstance(self.reply_bus, ToBus):
# self.reply_bus.close()
# self.reply_bus=None
# close the listeners
super(Service, self).stop_listening()
def _create_thread_args(self, index):
# set up service_handler
if str(type(self.service_handler)) == "<class 'instancemethod'>" or \
......@@ -188,7 +160,7 @@ class Service(AbstractBusListener):
# send the result to the RPC client
try:
with ToBus(reply_to, broker=self.broker) as dest:
with ToBus(reply_to, broker=self.broker, connection_log_level=logging.DEBUG) as dest:
dest.send(reply_msg)
except MessageBusError as e:
logger.error("Failed to send reply messgage to reply address %s. Error: %s", reply_to, e)
......
......@@ -75,7 +75,7 @@ class _AbstractBus():
but that of __new__().
"""
def __init__(self, address, broker=None):
def __init__(self, address, broker=None, connection_log_level=logging.INFO):
"""
Initializer.
:param address: valid Qpid address
......@@ -84,6 +84,7 @@ class _AbstractBus():
self.address, self.subject = _AbstractBus._split_address_and_subject(address)
self.broker = broker if broker else DEFAULT_BROKER
self._connected = False
self._connection_log_level = connection_log_level
@staticmethod
def _split_address_and_subject(address):
......@@ -188,44 +189,46 @@ class FromBus(_AbstractBus):
but that of __new__().
"""
def __init__(self, address, broker=None, broker_options=None):
def __init__(self, address, broker=None, broker_options=None, connection_log_level=logging.INFO):
"""
Initializer.
:param address: valid Qpid address
:param broker: valid Qpid broker URL, e.g. "localhost:5672"
:param broker_options: OBSOLETE
"""
super(FromBus, self).__init__(address=address, broker=broker)
super(FromBus, self).__init__(address=address, broker=broker, connection_log_level=connection_log_level)
if broker_options:
logger.warning("broker_options are obsolete. address=%s broker=%s broker_options=%s",
self.address, self.broker, broker_options)
def _connect_to_endpoint(self):
logger.debug("[FromBus] Creating receiver for bus: %s with subject: %s on broker: %s" % (self.address,
self.subject,
self.broker))
logger.debug("[FromBus] Connecting receiver to bus: %s with subject: %s on broker: %s" % (self.address,
self.subject,
self.broker))
self._receiver = self.connection.create_receiver(address=self.address,
credit=DEFAULT_RECEIVER_CAPACITY,
options=_ProtonSubjectFilter(self.subject) if self.subject else None)
logger.info("[FromBus] Created receiver for bus: %s with subject: %s on broker: %s" % (self.address,
logger.log(self._connection_log_level,
"[FromBus] Connected receiver to bus: %s with subject: %s on broker: %s" % (self.address,
self.subject,
self.broker))
def _disconnect_from_endpoint(self):
if self._receiver is not None:
logger.debug("[FromBus] Disconnecting receiver for bus: %s with subject: %s on broker: %s" % (self.address,
self.subject,
self.broker))
logger.debug("[FromBus] Disconnecting receiver from bus: %s with subject: %s on broker: %s" % (self.address,
self.subject,
self.broker))
self._receiver.close()
self._receiver = None
logger.info("[FromBus] Disconnecting receiver for bus: %s with subject: %s on broker: %s" % (self.address,
self.subject,
self.broker))
logger.log(self._connection_log_level,
"[FromBus] Disconnected receiver from bus: %s with subject: %s on broker: %s" % (self.address,
self.subject,
self.broker))
def receive(self, timeout=DEFAULT_TIMEOUT, logDebugMessages=False):
"""
......@@ -352,32 +355,34 @@ class ToBus(_AbstractBus):
but that of __new__().
"""
def __init__(self, address, broker=None, broker_options=None):
def __init__(self, address, broker=None, broker_options=None, connection_log_level=logging.INFO):
"""
Initializer.
:param address: valid Qpid address
:param broker: valid Qpid broker URL, e.g. "localhost:5672"
:param broker_options: OBSOLETE
"""
super(ToBus, self).__init__(address=address, broker=broker)
super(ToBus, self).__init__(address=address, broker=broker, connection_log_level=connection_log_level)
if broker_options:
logger.warning("broker_options are obsolete. address=%s broker=%s broker_options=%s",
self.address, self.broker, broker_options)
def _connect_to_endpoint(self):
logger.debug("[ToBus] Creating sender for bus: %s on broker: %s" % (self.address, self.broker))
logger.debug("[ToBus] Connecting sender to bus: %s on broker: %s" % (self.address, self.broker))
self._sender = self.connection.create_sender(address=self.address)
logger.info("[ToBus] Created sender for bus: %s on broker: %s" % (self.address, self.broker))
logger.log(self._connection_log_level,
"[ToBus] Connected sender to bus: %s on broker: %s" % (self.address, self.broker))
def _disconnect_from_endpoint(self):
if self._sender is not None:
logger.debug("[ToBus] Disconnecting sender from bus: %s on broker: %s" % (self.address, self.broker))
self._sender.close()
self._sender = None
logger.info("[ToBus] Disconnected sender from bus: %s on broker: %s" % (self.address, self.broker))
logger.log(self._connection_log_level,
"[ToBus] Disconnected sender from bus: %s on broker: %s" % (self.address, self.broker))
def send(self, message, timeout=DEFAULT_TIMEOUT):
"""
......@@ -480,21 +485,22 @@ class TemporaryQueue(object):
def __str__(self):
return "TemporaryQueue address=%s".format(self.address)
def create_frombus(self, subject=None):
def create_frombus(self, subject=None, connection_log_level=logging.INFO):
"""
Factory method to create a FromBus instance which is connected to this TemporaryQueue
:param subject: Optional subject string to filter for. Only messages which match this subject are received.
:return: FromBus
"""
return FromBus(broker=self.broker,
address="%s/%s" % (self.address, subject) if subject else self.address)
address="%s/%s" % (self.address, subject) if subject else self.address,
connection_log_level=connection_log_level)
def create_tobus(self):
def create_tobus(self, connection_log_level=logging.INFO):
"""
Factory method to create a ToBus instance which is connected to this TemporaryQueue
:return: ToBus
"""
return ToBus(broker=self.broker, address=self.address)
return ToBus(broker=self.broker, address=self.address, connection_log_level=connection_log_level)
class AbstractBusListener(object):
......@@ -517,7 +523,6 @@ class AbstractBusListener(object):
self._listening = False
self._numthreads = kwargs.pop("numthreads", 1)
self.verbose = kwargs.pop("verbose", False)
self.frombus_options = {}
if len(kwargs):
raise AttributeError("Unexpected argument passed to AbstractBusListener constructor: %s", kwargs)
......@@ -542,7 +547,7 @@ class AbstractBusListener(object):
if self._listening == True:
return
self._bus_listener = FromBus(self.address, broker=self.broker, broker_options=self.frombus_options)
self._bus_listener = FromBus(self.address, broker=self.broker, connection_log_level=logging.INFO)
self._bus_listener.open()
if numthreads != None:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment