diff --git a/LCS/Messaging/python/messaging/exceptions.py b/LCS/Messaging/python/messaging/exceptions.py index dccad46686706943f94e7d805d5308e1a9994836..52e023d2145e5775f05296c9a1cef9b48b67654c 100644 --- a/LCS/Messaging/python/messaging/exceptions.py +++ b/LCS/Messaging/python/messaging/exceptions.py @@ -58,3 +58,10 @@ class MessageFactoryError(MessagingError): """ pass + +class MessagingTimeoutError(MessagingError, TimeoutError): + """ + raise upon timeouts + """ + pass + diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index fcc0cca1cece4d39bbec10dc3ba2e14365ba668c..9ae0075280e16ff020186ef7975a490b7cdd3897 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -192,7 +192,7 @@ import re import uuid import threading from typing import Optional -from datetime import datetime +from datetime import datetime, timedelta from queue import Empty as EmptyQueueError from socket import gaierror import json @@ -406,11 +406,36 @@ class _AbstractBus: self._connection = None self._lock = TimeoutLock() + @property def is_connected(self) -> bool: """Is this instance connected to the bus? """ with self._lock: return (self._connection is not None) and self._connection.connected + @property + def local_address(self) -> (str, int): + """get a ip,port tuple for the local socket of the connection""" + with self._lock: + if not self.is_connected: + raise MessageBusError("cannot get local socket address for an unconnected bus") + return self._connection._connection.sock.getsockname() + + @property + def remote_address(self) -> (str, int): + """get a ip,port tuple for the remote socket of the connection""" + with self._lock: + if not self.is_connected: + raise MessageBusError("cannot get remote socket address for an unconnected bus") + parts = self._connection.host.partition(':') + return (parts[0], int(parts[2])) + + @property + def connection_name(self) -> str: + """returns the connection name in rabbitmq format: local socket's ip:port -> remote socket's ip:port""" + local = self.local_address + remote = self.remote_address + return "%s:%d -> %s:%d" % (local[0], local[1], remote[0], remote[1]) + def open(self): """ Open a connection to the broker, and connect to the endpoint (a receiver for a FromBus, a sender for a ToBus) @@ -419,13 +444,13 @@ class _AbstractBus: """ try: with self._lock: - if self.is_connected(): + if self.is_connected: return logger.debug("[%s] Connecting to broker: %s", self.__class__.__name__, self.broker) self._connection = kombu.Connection(hostname=self.broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) self._connection.connect() - logger.debug("[%s] Connected to broker: %s", self.__class__.__name__, self.broker) + logger.debug("[%s] Connected to broker: %s (%s)", self.__class__.__name__, self.broker, self.connection_name) # let the subclass (FromBus or ToBus) create a receiver of sender self._connect_to_endpoint() @@ -454,7 +479,7 @@ class _AbstractBus: :raise MessagingError: in case disconnecting from the broker or the address failed. """ with self._lock: - if not self.is_connected(): + if not self.is_connected: return try: @@ -492,7 +517,7 @@ class _AbstractBus: logging.error(ex) # open and allow any open/connect exceptions to be raised and bubbled upwards. self.open() - logger.info("[%s] Reconnected to broker: %s", self.__class__.__name__, self.broker) + logger.info("[%s] Reconnected to broker: %s (%s)", self.__class__.__name__, self.broker, self.connection_name) def __enter__(self) -> '_AbstractBus': """Open the connection when entering a 'with' context.""" @@ -511,6 +536,13 @@ class _AbstractBus: # implement in subclass raise NotImplementedError() + def _is_connection_error(self, error: Exception) -> bool: + if isinstance(error, amqp.exceptions.ConnectionError): + return True + if isinstance(error, kombu.exceptions.ConnectionError): + return True + return False + class FromBus(_AbstractBus): """ @@ -521,14 +553,14 @@ class FromBus(_AbstractBus): ... with TemporaryQueue() as tmp_queue: ... # create a new FromBus, use it in a context. ... with FromBus(queue=tmp_queue.address) as frombus: - ... print("connected =", frombus.is_connected()) + ... print("connected =", frombus.is_connected) ... ... # try to receive a message (there is None, cause nobody sent any) ... msg = frombus.receive(timeout=0.1) ... print("msg =", msg) ... ... # left context, so is_connected should be false now. - ... print("connected =", frombus.is_connected()) + ... print("connected =", frombus.is_connected) ... connected = True msg = None @@ -595,6 +627,14 @@ class FromBus(_AbstractBus): logger.exception(error_msg) raise MessagingError(error_msg) + def _is_connection_error(self, error: Exception) -> bool: + if isinstance(error, TypeError): + # special exception case for kombu not handling connection-loss very well... + if str(error) == "'NoneType' object is not subscriptable": + return True + + return super()._is_connection_error(error) + def receive(self, timeout: float=DEFAULT_BUS_TIMEOUT, acknowledge: bool = True) -> Optional[LofarMessage]: """ Receive the next message from the queue we're listening on. @@ -602,7 +642,7 @@ class FromBus(_AbstractBus): :param acknowledge: if True, then automatically acknowledge the received message :return: received message, or None if timeout occurred. """ - if not self.is_connected(): + if not self.is_connected: raise MessageBusError("FromBus is not connected to queue %s at broker %s. Cannot receive messages." % (self.queue, self.broker)) kombu_msg = None @@ -612,33 +652,44 @@ class FromBus(_AbstractBus): if not have_lock: return None - try: - elapsed_sec = (datetime.utcnow() - start).total_seconds() - kombu_msg = self._receiver.get(timeout=max(timeout-elapsed_sec, 0.001)) - logger.debug("[FromBus] Message received on: %s mgs: %s" % (self.queue, kombu_msg)) + while True: + try: + elapsed_sec = (datetime.utcnow() - start).total_seconds() + if elapsed_sec > timeout: + raise MessagingTimeoutError("[FromBus] Timeout while trying to receive message from: %s" % (self.queue,)) - # convert kombu msg to lofarmessage - lofar_msg = MessageFactory.create_lofar_message_from_kombu_message(kombu_msg) + kombu_msg = self._receiver.get(timeout=max(timeout-elapsed_sec, 0.001)) + logger.debug("[FromBus] Message received on: %s mgs: %s" % (self.queue, kombu_msg)) - # keep track of unacked messages - # the outside world only knows about lofar messages, so track them based on the lofar_message id. - # also keep track of thread id, because ack'ing/rejecting messages across threads is a bad idea! - self._unacked_messages[lofar_msg.id] = (kombu_msg, threading.current_thread().ident) + # convert kombu msg to lofarmessage + lofar_msg = MessageFactory.create_lofar_message_from_kombu_message(kombu_msg) - if acknowledge: - self.ack(lofar_msg) + # keep track of unacked messages + # the outside world only knows about lofar messages, so track them based on the lofar_message id. + # also keep track of thread id, because ack'ing/rejecting messages across threads is a bad idea! + self._unacked_messages[lofar_msg.id] = (kombu_msg, threading.current_thread().ident) - return lofar_msg + if acknowledge: + self.ack(lofar_msg) - except kombu.exceptions.TimeoutError: - return None - except EmptyQueueError: - return None - except Exception as e: - logger.exception(e) - if kombu_msg: - kombu_msg.reject() - raise MessagingError("[FromBus] unknown exception while receiving message on %s: %s" % (self.queue, e)) + return lofar_msg + + except kombu.exceptions.TimeoutError: + return None + except EmptyQueueError: + return None + except MessagingError: + # just reraise our own errors + raise + except Exception as e: + if self._is_connection_error(e): + logger.warning("Could not receive message due to connection problems: %s", e) + self.reconnect() + else: + logger.exception(e) + if kombu_msg: + kombu_msg.reject() + raise MessagingError("[FromBus] unknown exception while receiving message on %s: %s" % (self.queue, e)) def ack(self, lofar_msg: LofarMessage): """ @@ -693,7 +744,7 @@ class FromBus(_AbstractBus): return nr_of_messages_in_queue(self.queue, self.broker) def __str__(self): - return "[FromBus] queue: %s on broker: %s" % (self.queue, self.broker) + return "[FromBus] queue: %s on broker: %s (%s)" % (self.queue, self.broker, self.connection_name) class ToBus(_AbstractBus): @@ -705,13 +756,13 @@ class ToBus(_AbstractBus): ... with TemporaryExchange() as tmp_exchange: ... # create a new ToBus, use it in a context. ... with ToBus(exchange=tmp_exchange.address) as tobus: - ... print("connected =", tobus.is_connected()) + ... print("connected =", tobus.is_connected) ... ... # send a message to the exchange on the broker ... tobus.send(EventMessage(content='foo')) ... ... # left context, so is_connected should be false now. - ... print("connected =", tobus.is_connected()) + ... print("connected =", tobus.is_connected) ... connected = True connected = False @@ -769,34 +820,39 @@ class ToBus(_AbstractBus): raise MessagingError(error_msg) - def send(self, message: LofarMessage): + def send(self, message: LofarMessage, timeout: int=DEFAULT_BUS_TIMEOUT): """ Send a message to the exchange we're connected to. :param message: message to be sent """ - if not self.is_connected(): - raise MessageBusError("ToBus is not connected to exchange %s at broker %s. Cannot send message (subject=%s)." % ( - self.exchange, self.broker, message.subject)) + start = datetime.utcnow() + while True: + try: + logger.debug("[ToBus] Sending message to: %s (%s)", self.exchange, message) - try: - logger.debug("[ToBus] Sending message to: %s (%s)", self.exchange, message) + kwargs_dict = message.as_kombu_publish_kwargs() - kwargs_dict = message.as_kombu_publish_kwargs() + # every message is sent the connected exchange, and then routed to zero or more queues using the subject. + kwargs_dict['exchange'] = self.exchange + kwargs_dict['routing_key'] = message.subject - # every message is sent the connected exchange, and then routed to zero or more queues using the subject. - kwargs_dict['exchange'] = self.exchange - kwargs_dict['routing_key'] = message.subject + with self._lock: + self._sender.publish(serializer='pickle', **kwargs_dict) - with self._lock: - self._sender.publish(serializer='pickle', **kwargs_dict) - - logger.debug("[ToBus] Sent message to: %s", self.exchange) + logger.debug("[ToBus] Sent message to: %s", self.exchange) + return + except Exception as e: + if self._is_connection_error(e): + logger.warning("Could not send message due to connection problems: %s", e) + self.reconnect() + else: + raise MessagingError("[ToBus] Failed to send message to: %s error=%s" % (self.exchange, e)) - except Exception as e: - raise MessagingError("[ToBus] Failed to send message to: %s error=%s" % (self.exchange, e)) + if datetime.utcnow() - start > timedelta(seconds=timeout): + raise MessagingTimeoutError("[ToBus] Timeout while trying to send message to: %s" % (self.exchange,)) def __str__(self): - return "[ToBus] exchange: %s on broker: %s" % (self.exchange, self.broker) + return "[ToBus] exchange: %s on broker: %s (%s)" % (self.exchange, self.broker, self.connection_name) class TemporaryExchange: """ @@ -868,12 +924,13 @@ class TemporaryExchange: """ return ToBus(broker=self.broker, exchange=self.address) - def create_temporary_queue(self) -> 'TemporaryQueue': + def create_temporary_queue(self, auto_delete_on_last_disconnect: bool=True) -> 'TemporaryQueue': """ Factory method to create a TemporaryQueue instance which is connected to this TemporaryExchange + :param auto_delete_on_last_disconnect: If True auto-delete the queue on the broker when the last consumer disconnects. :return: TemporaryQueue """ - return TemporaryQueue(broker=self.broker, exchange=self.address) + return TemporaryQueue(broker=self.broker, exchange=self.address, auto_delete_on_last_disconnect=auto_delete_on_last_disconnect) class TemporaryQueue: @@ -927,6 +984,7 @@ class TemporaryQueue: def __init__(self, name_prefix: str=None, exchange: str=None, routing_key: str="#", addressed_to_me_only: bool = False, + auto_delete_on_last_disconnect: bool=True, broker=DEFAULT_BROKER): """ Create a TemporaryQueue instance with an optional name on the given broker. @@ -939,6 +997,7 @@ class TemporaryQueue: :param addressed_to_me_only: If True then apply the tmp-queue's address as binding routing key, so only messages for this queue are routed to this queue. This overrules the given routing_key parameter. + :param auto_delete_on_last_disconnect: If True auto-delete the queue on the broker when the last consumer disconnects. :param broker: the messaging broker to connect to. """ self._name_prefix = name_prefix @@ -946,6 +1005,7 @@ class TemporaryQueue: self._bound_exchange = exchange self._routing_key = routing_key self._addressed_to_me_only = addressed_to_me_only + self._auto_delete_on_last_disconnect = auto_delete_on_last_disconnect self._created_exchange = False self.address = None @@ -981,7 +1041,7 @@ class TemporaryQueue: self._bound_exchange = "exchange-for-" + self.address # create the tmp queue... - create_queue(self.address, broker=self.broker, durable=False, auto_delete=True) + create_queue(self.address, broker=self.broker, durable=False, auto_delete=self._auto_delete_on_last_disconnect) # create the exchange (if needed), and remember if we need to destroy it (if it was created) self._created_exchange = create_exchange(self._bound_exchange, broker=self.broker, @@ -1253,17 +1313,13 @@ class BusListener: def __init__(self, handler_type: AbstractMessageHandler.__class__, handler_kwargs: dict = None, exchange: str = None, routing_key: str = "#", - queue: str = None, num_threads: int = 1, broker: str = DEFAULT_BROKER): """ - Create a buslistener instance. Either specify exchange and routing_key, or a queue. Not both. + Create a buslistener instance. - If you specify a queue to listen on, then it is assumed the queue exists and you've taken care in the queue configuration - on the broker that the correct messages are routed to this queue. - - The recommended way though is to specify an exchange and routing_key. Then this buslistener creates a designated queue - on the broker, specifically for this listener with the following constructed name: <exchange>.for.<program_name>.<routing_key> + Specify an exchange and routing_key. Then this buslistener creates a designated queue on the broker, + specifically for this listener with the following constructed name: <exchange>.for.<program_name>.<routing_key> The designated queue is bound to the given exchange with the given routing_key. The rational behind this is that: @@ -1280,8 +1336,8 @@ class BusListener: :param handler_type: TODO!!!!!!!! :param handler_kwargs: TODO!!!!!!!! - :param exchange: Bind the listener to this given exchange with the given routing key via an auto-generated designated queue. Ignored if a the 'queue' parameter is specified. - :param routing_key: Bind the listener to this given exchange with the given routing key via an auto-generated designated queue. Ignored if a the 'queue' parameter is specified. + :param exchange: Bind the listener to this given exchange with the given routing key via an auto-generated designated queue. + :param routing_key: Bind the listener to this given exchange with the given routing key via an auto-generated designated queue. :param num_threads: the number of receiver/handler threads. default=1, use higher number only if it makes sense, for example when you are waiting for a slow database while handling the message. @@ -1300,7 +1356,8 @@ class BusListener: self.exchange = exchange self.broker = broker self._num_threads = num_threads - self._threads = [] + self._threads = {} + self._lock = threading.Lock() self._running = threading.Event() self._listening = False self.address = self.designated_queue_name(exchange, routing_key) @@ -1350,7 +1407,8 @@ class BusListener: thread = threading.Thread(target=self._listen_loop, name=thread_name, kwargs={'thread_started_event':thread_started_event}) - self._threads.append(thread) + with self._lock: + self._threads[thread] = {} # bookkeeping dict per thread thread.start() # check if the _listen_loop was started successfully @@ -1374,11 +1432,12 @@ class BusListener: if self.is_running(): self._running.clear() - for thread in self._threads: + for thread in list(self._threads.keys()): try: logger.debug("STOPPING %s on thread '%s'", self, thread.name) thread.join() - self._threads.remove(thread) + with self._lock: + del self._threads[thread] logger.info("STOPPED %s on thread '%s'", self, thread.name) except Exception as e: logger.exception("Could not stop thread %s: %s", thread.name, e) @@ -1428,13 +1487,21 @@ class BusListener: """ Internal use only. Message listener loop that receives messages and starts the attached function with the message content as argument. """ - logger.debug( "STARTING %s on thread '%s' ", self, threading.currentThread().name) + current_thread = threading.currentThread() + # dict for thread specific bookkeeping + thread_bookkeeping = self._threads[current_thread] + + logger.debug( "STARTING %s on thread '%s' ", self, current_thread.name) # create an instance of the given handler for this background thread # (to keep the internals of the handler thread agnostic) with self._create_handler() as thread_handler: with FromBus(self.address, broker=self.broker) as receiver: - logger.info("STARTED %s on thread '%s' ", self, threading.currentThread().name) + logger.info("STARTED %s on thread '%s' ", self, current_thread.name) + + with self._lock: + thread_bookkeeping['handler'] = thread_handler + thread_bookkeeping['receiver'] = receiver # notify the thread starter that we successfully started the listen loop thread_started_event.set() @@ -1469,22 +1536,6 @@ class BusListener: except Exception as e: logger.exception("after_receive_message() failed: %s", e) - # try: - # num_messages_remaining = receiver.nr_of_messages_in_queue() - # if num_messages_remaining > 0: - # logger.info("%d messages remaining in %s on %s", num_messages_remaining, self.address, self.broker) - # except Exception as e: - # logger.exception("after_receive_message() failed: %s", e) - - except MessageBusError as mbce: - logger.error(mbce) - # handle any MessageBusErrors (failing connections and such) by endlessly retry to reconnect - while not receiver.is_connected(): - try: - receiver.reconnect() - except MessagingError as e: - logger.error(e) - threading.sleep(1) except MessagingError as me: # just log any own MessagingError, and continue loop. logger.error(me) diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.py b/LCS/Messaging/python/messaging/test/t_messagebus.py index c6d818b8b6ee761992e28beaa0464dd0fcf3dbd5..b7db6e25d1580e15090f5ba4025bb77cca3592d6 100644 --- a/LCS/Messaging/python/messaging/test/t_messagebus.py +++ b/LCS/Messaging/python/messaging/test/t_messagebus.py @@ -26,21 +26,24 @@ Test program for the module lofar.messaging.messagebus import uuid import unittest +import requests import logging logger = logging.getLogger(__name__) -logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.INFO) +logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.DEBUG) from datetime import datetime from lofar.messaging.messages import * from lofar.messaging.messagebus import * +from lofar.messaging.messagebus import _AbstractBus, can_connect_to_broker from lofar.messaging.messagebus import create_queue, create_exchange, create_binding, create_bound_queue, delete_exchange, delete_queue +from lofar.messaging.config import DEFAULT_USER, DEFAULT_PASSWORD from lofar.messaging.rpc import RequestMessage -from lofar.messaging.exceptions import MessageBusError, MessagingRuntimeError +from lofar.messaging.exceptions import MessageBusError, MessagingRuntimeError, MessagingTimeoutError from lofar.common.datetimeutils import round_to_millisecond_precision from time import sleep -from threading import Lock +from threading import Lock, Event as ThreadingEvent TIMEOUT = 1.0 @@ -745,6 +748,93 @@ class MessageHandlerTester(unittest.TestCase): exchange=tmp_exchange.address)) as listener: pass +class ReconnectOnConnectionLossTests(unittest.TestCase): + def setUp(self): + self.tmp_exchange = TemporaryExchange() + self.addCleanup(self.tmp_exchange.close) + + self.tmp_queue = self.tmp_exchange.create_temporary_queue() + self.addCleanup(self.tmp_queue.close) + + self.tmp_exchange.open() + self.tmp_queue.open() + + def _close_connection_of_bus_on_broker(self, bus: _AbstractBus): + # use the http REST API using request to forcefully close the connection on the broker-side + url = "http://%s:15672/api/connections/%s" % (bus.broker, bus.connection_name) + + # rabbitmq http api is sometimes lagging a bit behind... + # wait until the connection url responds with 200-ok. + while True: + response = requests.get(url, auth=(DEFAULT_USER, DEFAULT_PASSWORD)) + if response.status_code == 200: + break + sleep(0.25) + + # now we can delete it. + response = requests.delete(url, auth=(DEFAULT_USER, DEFAULT_PASSWORD)) + self.assertEqual(204, response.status_code) + + def test_tobus_send_handling_connection_loss(self): + with ToBus(self.tmp_exchange.address) as tobus: + tobus.send(EventMessage()) + + # force server-side connection loss + self._close_connection_of_bus_on_broker(tobus) + + # try to send with timeout of 0 (so there is no opportunity for reconnection) -> MessagingTimeoutError + with self.assertRaises(MessagingTimeoutError): + tobus.send(EventMessage(), timeout=0) + + # send with normal timeout, should just succeed (and not raise) + tobus.send(EventMessage(), timeout=5) + + def test_frombus_send_handling_connection_loss(self): + with ToBus(self.tmp_exchange.address) as tobus: + with self.tmp_exchange.create_temporary_queue(auto_delete_on_last_disconnect=False) as tmp_queue: + with tmp_queue.create_frombus() as frombus: + # test normal send/receive -> should work + tobus.send(EventMessage()) + self.assertIsNotNone(frombus.receive()) + + # force server-side connection loss for the receiving frombus connection + self._close_connection_of_bus_on_broker(frombus) + + # test normal send/receive -> should work + tobus.send(EventMessage()) + self.assertIsNotNone(frombus.receive()) + + + def test_buslistener_handling_connection_loss(self): + msg_handled_event = ThreadingEvent() + + class SynchonizingHandler(AbstractMessageHandler): + def handle_message(self, msg: LofarMessage): + logger.info("handle_message(%s) ... setting msg_handled_event", msg) + msg_handled_event.set() + + with BusListenerJanitor(BusListener(handler_type=SynchonizingHandler, + exchange=self.tmp_exchange.address)) as listener: + with ToBus(self.tmp_exchange.address) as tobus: + # send test message + tobus.send(EventMessage()) + + # wait until mesage is handled... + self.assertTrue(msg_handled_event.wait(2)) + msg_handled_event.clear() + + # magic lookup of the listeners receiver... + frombus = list(listener._threads.values())[0]['receiver'] + # ... to force server-side connection loss + self._close_connection_of_bus_on_broker(frombus) + + # send another test message... + tobus.send(EventMessage()) + + # listener should have handled the 2ns msg as well, even though the connection was broken + # thanks to auto reconnect + self.assertTrue(msg_handled_event.wait(2)) + def load_tests(loader, tests, ignore): """add the doctests from lofar.messaging.messagebus to the unittest tests"""