Commit 20eca345 authored by Auke Klazema's avatar Auke Klazema

SW-811: Merge branch 'SW-811' into 'LOFAR-Release-4_0'

Resolve SW-811

See merge request ro/lofar!51
parents ef106f73 4f42b236
......@@ -58,3 +58,10 @@ class MessageFactoryError(MessagingError):
"""
pass
class MessagingTimeoutError(MessagingError, TimeoutError):
"""
raise upon timeouts
"""
pass
......@@ -26,21 +26,24 @@ Test program for the module lofar.messaging.messagebus
import uuid
import unittest
import requests
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.INFO)
logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.DEBUG)
from datetime import datetime
from lofar.messaging.messages import *
from lofar.messaging.messagebus import *
from lofar.messaging.messagebus import _AbstractBus, can_connect_to_broker
from lofar.messaging.messagebus import create_queue, create_exchange, create_binding, create_bound_queue, delete_exchange, delete_queue
from lofar.messaging.config import DEFAULT_USER, DEFAULT_PASSWORD
from lofar.messaging.rpc import RequestMessage
from lofar.messaging.exceptions import MessageBusError, MessagingRuntimeError
from lofar.messaging.exceptions import MessageBusError, MessagingRuntimeError, MessagingTimeoutError
from lofar.common.datetimeutils import round_to_millisecond_precision
from time import sleep
from threading import Lock
from threading import Lock, Event as ThreadingEvent
TIMEOUT = 1.0
......@@ -745,6 +748,93 @@ class MessageHandlerTester(unittest.TestCase):
exchange=tmp_exchange.address)) as listener:
pass
class ReconnectOnConnectionLossTests(unittest.TestCase):
def setUp(self):
self.tmp_exchange = TemporaryExchange()
self.addCleanup(self.tmp_exchange.close)
self.tmp_queue = self.tmp_exchange.create_temporary_queue()
self.addCleanup(self.tmp_queue.close)
self.tmp_exchange.open()
self.tmp_queue.open()
def _close_connection_of_bus_on_broker(self, bus: _AbstractBus):
# use the http REST API using request to forcefully close the connection on the broker-side
url = "http://%s:15672/api/connections/%s" % (bus.broker, bus.connection_name)
# rabbitmq http api is sometimes lagging a bit behind...
# wait until the connection url responds with 200-ok.
while True:
response = requests.get(url, auth=(DEFAULT_USER, DEFAULT_PASSWORD))
if response.status_code == 200:
break
sleep(0.25)
# now we can delete it.
response = requests.delete(url, auth=(DEFAULT_USER, DEFAULT_PASSWORD))
self.assertEqual(204, response.status_code)
def test_tobus_send_handling_connection_loss(self):
with ToBus(self.tmp_exchange.address) as tobus:
tobus.send(EventMessage())
# force server-side connection loss
self._close_connection_of_bus_on_broker(tobus)
# try to send with timeout of 0 (so there is no opportunity for reconnection) -> MessagingTimeoutError
with self.assertRaises(MessagingTimeoutError):
tobus.send(EventMessage(), timeout=0)
# send with normal timeout, should just succeed (and not raise)
tobus.send(EventMessage(), timeout=5)
def test_frombus_send_handling_connection_loss(self):
with ToBus(self.tmp_exchange.address) as tobus:
with self.tmp_exchange.create_temporary_queue(auto_delete_on_last_disconnect=False) as tmp_queue:
with tmp_queue.create_frombus() as frombus:
# test normal send/receive -> should work
tobus.send(EventMessage())
self.assertIsNotNone(frombus.receive())
# force server-side connection loss for the receiving frombus connection
self._close_connection_of_bus_on_broker(frombus)
# test normal send/receive -> should work
tobus.send(EventMessage())
self.assertIsNotNone(frombus.receive())
def test_buslistener_handling_connection_loss(self):
msg_handled_event = ThreadingEvent()
class SynchonizingHandler(AbstractMessageHandler):
def handle_message(self, msg: LofarMessage):
logger.info("handle_message(%s) ... setting msg_handled_event", msg)
msg_handled_event.set()
with BusListenerJanitor(BusListener(handler_type=SynchonizingHandler,
exchange=self.tmp_exchange.address)) as listener:
with ToBus(self.tmp_exchange.address) as tobus:
# send test message
tobus.send(EventMessage())
# wait until mesage is handled...
self.assertTrue(msg_handled_event.wait(2))
msg_handled_event.clear()
# magic lookup of the listeners receiver...
frombus = list(listener._threads.values())[0]['receiver']
# ... to force server-side connection loss
self._close_connection_of_bus_on_broker(frombus)
# send another test message...
tobus.send(EventMessage())
# listener should have handled the 2ns msg as well, even though the connection was broken
# thanks to auto reconnect
self.assertTrue(msg_handled_event.wait(2))
def load_tests(loader, tests, ignore):
"""add the doctests from lofar.messaging.messagebus to the unittest tests"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment