diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index fcc0cca1cece4d39bbec10dc3ba2e14365ba668c..c994f3dd10478934577557d14412e9ca75138889 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -406,11 +406,36 @@ class _AbstractBus: self._connection = None self._lock = TimeoutLock() + @property def is_connected(self) -> bool: """Is this instance connected to the bus? """ with self._lock: return (self._connection is not None) and self._connection.connected + @property + def local_address(self) -> (str, int): + """get a ip,port tuple for the local socket of the connection""" + with self._lock: + if not self.is_connected: + raise MessageBusError("cannot get local socket address for an unconnected bus") + return self._connection._connection.sock.getsockname() + + @property + def remote_address(self) -> (str, int): + """get a ip,port tuple for the remote socket of the connection""" + with self._lock: + if not self.is_connected: + raise MessageBusError("cannot get remote socket address for an unconnected bus") + parts = self._connection.host.partition(':') + return (parts[0], int(parts[2])) + + @property + def connection_name(self) -> str: + """returns the connection name in rabbitmq format: local socket's ip:port -> remote socket's ip:port""" + local = self.local_address + remote = self.remote_address + return "%s:%d -> %s:%d" % (local[0], local[1], remote[0], remote[1]) + def open(self): """ Open a connection to the broker, and connect to the endpoint (a receiver for a FromBus, a sender for a ToBus) @@ -419,7 +444,7 @@ class _AbstractBus: """ try: with self._lock: - if self.is_connected(): + if self.is_connected: return logger.debug("[%s] Connecting to broker: %s", self.__class__.__name__, self.broker) @@ -454,7 +479,7 @@ class _AbstractBus: :raise MessagingError: in case disconnecting from the broker or the address failed. """ with self._lock: - if not self.is_connected(): + if not self.is_connected: return try: @@ -521,14 +546,14 @@ class FromBus(_AbstractBus): ... with TemporaryQueue() as tmp_queue: ... # create a new FromBus, use it in a context. ... with FromBus(queue=tmp_queue.address) as frombus: - ... print("connected =", frombus.is_connected()) + ... print("connected =", frombus.is_connected) ... ... # try to receive a message (there is None, cause nobody sent any) ... msg = frombus.receive(timeout=0.1) ... print("msg =", msg) ... ... # left context, so is_connected should be false now. - ... print("connected =", frombus.is_connected()) + ... print("connected =", frombus.is_connected) ... connected = True msg = None @@ -602,7 +627,7 @@ class FromBus(_AbstractBus): :param acknowledge: if True, then automatically acknowledge the received message :return: received message, or None if timeout occurred. """ - if not self.is_connected(): + if not self.is_connected: raise MessageBusError("FromBus is not connected to queue %s at broker %s. Cannot receive messages." % (self.queue, self.broker)) kombu_msg = None @@ -705,13 +730,13 @@ class ToBus(_AbstractBus): ... with TemporaryExchange() as tmp_exchange: ... # create a new ToBus, use it in a context. ... with ToBus(exchange=tmp_exchange.address) as tobus: - ... print("connected =", tobus.is_connected()) + ... print("connected =", tobus.is_connected) ... ... # send a message to the exchange on the broker ... tobus.send(EventMessage(content='foo')) ... ... # left context, so is_connected should be false now. - ... print("connected =", tobus.is_connected()) + ... print("connected =", tobus.is_connected) ... connected = True connected = False @@ -774,7 +799,7 @@ class ToBus(_AbstractBus): Send a message to the exchange we're connected to. :param message: message to be sent """ - if not self.is_connected(): + if not self.is_connected: raise MessageBusError("ToBus is not connected to exchange %s at broker %s. Cannot send message (subject=%s)." % ( self.exchange, self.broker, message.subject)) @@ -1479,7 +1504,7 @@ class BusListener: except MessageBusError as mbce: logger.error(mbce) # handle any MessageBusErrors (failing connections and such) by endlessly retry to reconnect - while not receiver.is_connected(): + while not receiver.is_connected: try: receiver.reconnect() except MessagingError as e: diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.py b/LCS/Messaging/python/messaging/test/t_messagebus.py index c6d818b8b6ee761992e28beaa0464dd0fcf3dbd5..3b093cf881f13a2c5969bcebfb81eeb3be7397b8 100644 --- a/LCS/Messaging/python/messaging/test/t_messagebus.py +++ b/LCS/Messaging/python/messaging/test/t_messagebus.py @@ -26,18 +26,21 @@ Test program for the module lofar.messaging.messagebus import uuid import unittest +import requests import logging logger = logging.getLogger(__name__) -logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.INFO) +logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.DEBUG) from datetime import datetime from lofar.messaging.messages import * from lofar.messaging.messagebus import * +from lofar.messaging.messagebus import _AbstractBus from lofar.messaging.messagebus import create_queue, create_exchange, create_binding, create_bound_queue, delete_exchange, delete_queue +from lofar.messaging.config import DEFAULT_USER, DEFAULT_PASSWORD from lofar.messaging.rpc import RequestMessage -from lofar.messaging.exceptions import MessageBusError, MessagingRuntimeError +from lofar.messaging.exceptions import MessageBusError, MessagingRuntimeError, MessagingError from lofar.common.datetimeutils import round_to_millisecond_precision from time import sleep from threading import Lock @@ -745,6 +748,40 @@ class MessageHandlerTester(unittest.TestCase): exchange=tmp_exchange.address)) as listener: pass +class ReconnectOnConnectionLossTests(unittest.TestCase): + def setUp(self): + self.tmp_exchange = TemporaryExchange() + self.addCleanup(self.tmp_exchange.close) + + self.tmp_queue = self.tmp_exchange.create_temporary_queue() + self.addCleanup(self.tmp_queue.close) + + self.tmp_exchange.open() + self.tmp_queue.open() + + def _close_connection_of_bus_on_broker(self, bus: _AbstractBus): + # use the http REST API using request to forcefully close the connection on the broker-side + url = "http://%s:15672/api/connections/%s" % (bus.broker, bus.connection_name) + + # rabbitmq http api is sometimes lagging a bit behind... + # wait until the connection url responds with 200-ok. + while True: + response = requests.get(url, auth=(DEFAULT_USER, DEFAULT_PASSWORD)) + if response.status_code == 200: + break + sleep(0.1) + + # now we can delete it. + response = requests.delete(url, auth=(DEFAULT_USER, DEFAULT_PASSWORD)) + self.assertEqual(204, response.status_code) + + def test_tobus_send_raises_on_connection_loss(self): + with ToBus(self.tmp_exchange.address) as tobus: + tobus.send(EventMessage()) + self._close_connection_of_bus_on_broker(tobus) + with self.assertRaises(MessagingError): + tobus.send(EventMessage()) + def load_tests(loader, tests, ignore): """add the doctests from lofar.messaging.messagebus to the unittest tests"""