diff --git a/LCS/Messaging/python/messaging/RPC.py b/LCS/Messaging/python/messaging/RPC.py index b56ed46ea578b5132cef6d5ab1ce5ba90add6455..4f142cece330bf20c2b2c8f36fabd7eeca33eeb9 100644 --- a/LCS/Messaging/python/messaging/RPC.py +++ b/LCS/Messaging/python/messaging/RPC.py @@ -151,7 +151,7 @@ class RPC(): HasArgs, HasKwArgs = _analyze_args(args, kwargs) with TemporaryQueue(self.broker) as tmp_queue: - with tmp_queue.create_frombus() as reply_receiver: + with tmp_queue.create_frombus(connection_log_level=logging.DEBUG) as reply_receiver: request_msg = RequestMessage(content=Content, reply_to=reply_receiver.address, has_args=HasArgs, has_kwargs=HasKwArgs) if timeout: diff --git a/LCS/Messaging/python/messaging/Service.py b/LCS/Messaging/python/messaging/Service.py index 94805154b0962b272c736d59f23a93b80a9d1317..94f9d1657369296fb91f2c68cda63dcc3eefdeac 100644 --- a/LCS/Messaging/python/messaging/Service.py +++ b/LCS/Messaging/python/messaging/Service.py @@ -20,7 +20,7 @@ # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -from .messagebus import ToBus, FromBus, AbstractBusListener +from .messagebus import ToBus, AbstractBusListener from .messages import ReplyMessage, RequestMessage from .exceptions import MessageBusError, MessageFactoryError import threading @@ -124,34 +124,6 @@ class Service(AbstractBusListener): super(Service, self).__init__(address, broker, **kwargs) - def start_listening(self, numthreads=None): - """ - Start the background threads and process incoming messages. - """ - if self.isListening(): - return - - # only on a 'bus' we already connect the reply_bus - #if self.busname: - # self.reply_bus = ToBus(self.busname, broker=self.broker) - # self.reply_bus.open() - #else: - # self.reply_bus=None - - # create listener FromBus in super class - super(Service, self).start_listening(numthreads=numthreads) - - def stop_listening(self): - """ - Stop the background threads that listen to incoming messages. - """ - #if isinstance(self.reply_bus, ToBus): - # self.reply_bus.close() - # self.reply_bus=None - - # close the listeners - super(Service, self).stop_listening() - def _create_thread_args(self, index): # set up service_handler if str(type(self.service_handler)) == "<class 'instancemethod'>" or \ @@ -188,7 +160,7 @@ class Service(AbstractBusListener): # send the result to the RPC client try: - with ToBus(reply_to, broker=self.broker) as dest: + with ToBus(reply_to, broker=self.broker, connection_log_level=logging.DEBUG) as dest: dest.send(reply_msg) except MessageBusError as e: logger.error("Failed to send reply messgage to reply address %s. Error: %s", reply_to, e) diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index afe09722c8d00c3a842de5c6c129830412d36660..978759c13ba3ccbb46bf552d5b34523184531efb 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -75,7 +75,7 @@ class _AbstractBus(): but that of __new__(). """ - def __init__(self, address, broker=None): + def __init__(self, address, broker=None, connection_log_level=logging.INFO): """ Initializer. :param address: valid Qpid address @@ -84,6 +84,7 @@ class _AbstractBus(): self.address, self.subject = _AbstractBus._split_address_and_subject(address) self.broker = broker if broker else DEFAULT_BROKER self._connected = False + self._connection_log_level = connection_log_level @staticmethod def _split_address_and_subject(address): @@ -188,44 +189,46 @@ class FromBus(_AbstractBus): but that of __new__(). """ - def __init__(self, address, broker=None, broker_options=None): + def __init__(self, address, broker=None, broker_options=None, connection_log_level=logging.INFO): """ Initializer. :param address: valid Qpid address :param broker: valid Qpid broker URL, e.g. "localhost:5672" :param broker_options: OBSOLETE """ - super(FromBus, self).__init__(address=address, broker=broker) + super(FromBus, self).__init__(address=address, broker=broker, connection_log_level=connection_log_level) if broker_options: logger.warning("broker_options are obsolete. address=%s broker=%s broker_options=%s", self.address, self.broker, broker_options) def _connect_to_endpoint(self): - logger.debug("[FromBus] Creating receiver for bus: %s with subject: %s on broker: %s" % (self.address, - self.subject, - self.broker)) + logger.debug("[FromBus] Connecting receiver to bus: %s with subject: %s on broker: %s" % (self.address, + self.subject, + self.broker)) self._receiver = self.connection.create_receiver(address=self.address, credit=DEFAULT_RECEIVER_CAPACITY, options=_ProtonSubjectFilter(self.subject) if self.subject else None) - logger.info("[FromBus] Created receiver for bus: %s with subject: %s on broker: %s" % (self.address, + logger.log(self._connection_log_level, + "[FromBus] Connected receiver to bus: %s with subject: %s on broker: %s" % (self.address, self.subject, self.broker)) def _disconnect_from_endpoint(self): if self._receiver is not None: - logger.debug("[FromBus] Disconnecting receiver for bus: %s with subject: %s on broker: %s" % (self.address, - self.subject, - self.broker)) + logger.debug("[FromBus] Disconnecting receiver from bus: %s with subject: %s on broker: %s" % (self.address, + self.subject, + self.broker)) self._receiver.close() self._receiver = None - logger.info("[FromBus] Disconnecting receiver for bus: %s with subject: %s on broker: %s" % (self.address, - self.subject, - self.broker)) + logger.log(self._connection_log_level, + "[FromBus] Disconnected receiver from bus: %s with subject: %s on broker: %s" % (self.address, + self.subject, + self.broker)) def receive(self, timeout=DEFAULT_TIMEOUT, logDebugMessages=False): """ @@ -352,32 +355,34 @@ class ToBus(_AbstractBus): but that of __new__(). """ - def __init__(self, address, broker=None, broker_options=None): + def __init__(self, address, broker=None, broker_options=None, connection_log_level=logging.INFO): """ Initializer. :param address: valid Qpid address :param broker: valid Qpid broker URL, e.g. "localhost:5672" :param broker_options: OBSOLETE """ - super(ToBus, self).__init__(address=address, broker=broker) + super(ToBus, self).__init__(address=address, broker=broker, connection_log_level=connection_log_level) if broker_options: logger.warning("broker_options are obsolete. address=%s broker=%s broker_options=%s", self.address, self.broker, broker_options) def _connect_to_endpoint(self): - logger.debug("[ToBus] Creating sender for bus: %s on broker: %s" % (self.address, self.broker)) + logger.debug("[ToBus] Connecting sender to bus: %s on broker: %s" % (self.address, self.broker)) self._sender = self.connection.create_sender(address=self.address) - logger.info("[ToBus] Created sender for bus: %s on broker: %s" % (self.address, self.broker)) + logger.log(self._connection_log_level, + "[ToBus] Connected sender to bus: %s on broker: %s" % (self.address, self.broker)) def _disconnect_from_endpoint(self): if self._sender is not None: logger.debug("[ToBus] Disconnecting sender from bus: %s on broker: %s" % (self.address, self.broker)) self._sender.close() self._sender = None - logger.info("[ToBus] Disconnected sender from bus: %s on broker: %s" % (self.address, self.broker)) + logger.log(self._connection_log_level, + "[ToBus] Disconnected sender from bus: %s on broker: %s" % (self.address, self.broker)) def send(self, message, timeout=DEFAULT_TIMEOUT): """ @@ -480,21 +485,22 @@ class TemporaryQueue(object): def __str__(self): return "TemporaryQueue address=%s".format(self.address) - def create_frombus(self, subject=None): + def create_frombus(self, subject=None, connection_log_level=logging.INFO): """ Factory method to create a FromBus instance which is connected to this TemporaryQueue :param subject: Optional subject string to filter for. Only messages which match this subject are received. :return: FromBus """ return FromBus(broker=self.broker, - address="%s/%s" % (self.address, subject) if subject else self.address) + address="%s/%s" % (self.address, subject) if subject else self.address, + connection_log_level=connection_log_level) - def create_tobus(self): + def create_tobus(self, connection_log_level=logging.INFO): """ Factory method to create a ToBus instance which is connected to this TemporaryQueue :return: ToBus """ - return ToBus(broker=self.broker, address=self.address) + return ToBus(broker=self.broker, address=self.address, connection_log_level=connection_log_level) class AbstractBusListener(object): @@ -517,7 +523,6 @@ class AbstractBusListener(object): self._listening = False self._numthreads = kwargs.pop("numthreads", 1) self.verbose = kwargs.pop("verbose", False) - self.frombus_options = {} if len(kwargs): raise AttributeError("Unexpected argument passed to AbstractBusListener constructor: %s", kwargs) @@ -542,7 +547,7 @@ class AbstractBusListener(object): if self._listening == True: return - self._bus_listener = FromBus(self.address, broker=self.broker, broker_options=self.frombus_options) + self._bus_listener = FromBus(self.address, broker=self.broker, connection_log_level=logging.INFO) self._bus_listener.open() if numthreads != None: