Skip to content
Snippets Groups Projects
Commit 1abc43ee authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-516: added new class TemporaryQueue which can be used when a...

SW-516: added new class TemporaryQueue which can be used when a temporary-auto-delete-queue is needed, like for example in the tests.
parent f1111853
No related branches found
No related tags found
No related merge requests found
......@@ -205,9 +205,9 @@ class FromBus(object):
# t.daemon = True
# t.start()
except proton.ProtonException:
except proton.ProtonException as pe:
raise_exception(MessageBusError,
"[FromBus] Failed to create %s" % (what,))
"[FromBus] Failed to create %s: %s" % (what, pe))
logger.debug("[FromBus] Created %s", what)
def receive(self, timeout=DEFAULT_TIMEOUT, logDebugMessages=True):
......@@ -556,6 +556,85 @@ class ToBus(object):
logger.debug("[ToBus] Message sent to: %s subject: %s" % (self.address, qmsg.subject))
class TemporaryQueue(object):
"""
A TemporaryQueue instance can be used to setup a dynamic temporary queue which is closed and deleted automagically when leaving context.
Together with the factory methods create_frombus and/or create_tobus it gives us to following simple but often used use case:
with TemporaryQueue("MyTestQueue") as tmp_queue:
with tmp_queue.create_tobus() as tobus, tmp_queue.create_frombus() as frombus:
# send a message...
original_msg = EventMessage(content="foobar")
tobus.send(original_msg)
# ...receive the message.
received_msg = frombus.receive()
Alternative use cases with only a tobus or only a frombus on the tmp_queue are also possible.
"""
def __init__(self, name=None, broker="localhost"):
"""
Create a TemporaryQueue instance with an optional name on the given broker.
:param name: Optional name, which is part of the final address which also includes a uuid.
:param broker: the qpid broker to connect to.
"""
self.name = name
self.broker = broker
self._dynamic_receiver = None
self.address = None
def __enter__(self):
"""
Opens/creates the temporary queue. It is automatically closed when leaving context in __exit__.
:return: self.
"""
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Close/remove the temporary queue.
"""
self.close()
def open(self):
"""
Open/create the temporary queue.
It is advised to use the TemporaryQueue instance in a 'with' context, which guarantees the close call.
"""
logger.info("Creating TemporaryQueue...")
connection = proton.utils.BlockingConnection(self.broker)
self._dynamic_receiver = connection.create_receiver(address=None, dynamic=True, name=self.name)
self.address = self._dynamic_receiver.link.remote_source.address
logger.info("Created TemporaryQueue at %s", self.address)
def close(self):
"""
Close/remove the temporary queue.
It is advised to use the TemporaryQueue instance in a 'with' context, which guarantees the close call.
"""
logger.debug("Closing TemporaryQueue at %s", self.address)
self._dynamic_receiver.close()
self._dynamic_receiver.connection.close()
self._dynamic_receiver = None
logger.info("Closed TemporaryQueue at %s", self.address)
self.address = None
def create_frombus(self):
"""
Factory method to create a FromBus instance which is connected to this TemporaryQueue
:return: FromBus
"""
return FromBus(broker=self.broker, address=self.address)
def create_tobus(self):
"""
Factory method to create a ToBus instance which is connected to this TemporaryQueue
:return: ToBus
"""
return ToBus(broker=self.broker, address=self.address)
class AbstractBusListener(object):
"""
AbstractBusListener class for handling messages which are received on a message bus.
......@@ -768,4 +847,4 @@ class AbstractBusListener(object):
logger.error("finalize_loop() failed with %s", e)
__all__ = ["FromBus", "ToBus", "AbstractBusListener"]
__all__ = ["FromBus", "ToBus", "TemporaryQueue", "AbstractBusListener"]
......@@ -35,10 +35,50 @@ from lofar.messaging.messagebus import *
from lofar.messaging.messagebus import DEFAULT_RECEIVER_CAPACITY
from lofar.messaging.exceptions import MessageBusError, InvalidMessage
TIMEOUT = 1.0
QUEUE = sys.argv[-1] if len(sys.argv) > 1 and "t_messagebus.py" not in sys.argv[-1] else "t_messagebus.queue"
logger = logging.getLogger(__name__)
TIMEOUT = 1.0
class TestTemporaryQueue(unittest.TestCase):
"""Test the TemporaryQueue class"""
def test_temporary_is_really_temporary(self):
"""
test if the temporary queue is really removed after usage
"""
tmp_queue_address = None
with TemporaryQueue("MyTestQueue") as tmp_queue:
tmp_queue_address = tmp_queue.address
self.assertTrue("MyTestQueue" in tmp_queue_address)
# test if the temporary queue has been deleted when leaving scope
# We should not be able to connect to it anymore
regexp = re.escape("[FromBus] Failed to create receiver for source") + '.*' + 'Node not found: %s' % tmp_queue_address + '.*'
with self.assertRaisesRegex(MessageBusError, regexp):
with FromBus(tmp_queue_address):
pass
def test_send_receive_over_temporary_queue(self):
"""
test the usage of the TemporaryQueue in conjunction with normal ToBus and Frombus usage
"""
with TemporaryQueue("MyTestQueue") as tmp_queue:
# create a normal To/FromBus on this tmp_queue
with tmp_queue.create_tobus() as tobus, tmp_queue.create_frombus() as frombus:
# send a message...
original_msg = EventMessage(content="foobar")
tobus.send(original_msg)
# ...receive the message...
received_msg = frombus.receive()
# and test if they are equal
self.assertEqual(original_msg.id, received_msg.id)
self.assertEqual(original_msg.body, received_msg.body)
# create a TemporaryQueue for testing. Is automagically deleted upon exit.
with TemporaryQueue("t_messagebus") as test_queue:
# ======== FromBus unit tests ======== #
......@@ -57,7 +97,7 @@ class FromBusInitFailed(unittest.TestCase):
regexp = re.escape(self.error)
regexp += '.*' + 'No address associated with hostname|Name or service not known' + '.*'
with self.assertRaisesRegex(MessageBusError, regexp):
with FromBus(QUEUE, broker="foo.bar", broker_options={'reconnect': False}):
with FromBus(test_queue.address, broker="foo.bar", broker_options={'reconnect': False}):
pass
def test_connection_refused(self):
......@@ -66,7 +106,7 @@ class FromBusInitFailed(unittest.TestCase):
"""
regexp = re.escape(self.error) + '.*' + '(Connection refused|111)' + '.*'
with self.assertRaisesRegex(MessageBusError, regexp):
with FromBus("fake" + QUEUE, broker="localhost:4", broker_options={'reconnect': False}):
with FromBus("fake" + test_queue.address, broker="localhost:4", broker_options={'reconnect': False}):
pass
......@@ -76,7 +116,7 @@ class FromBusNotInContext(unittest.TestCase):
"""
def setUp(self):
self.frombus = FromBus(QUEUE)
self.frombus = FromBus(test_queue.address)
self.error = re.escape("[FromBus] No active receiver") + '.*'
def test_add_queue_raises(self):
......@@ -127,25 +167,24 @@ class FromBusInContext(unittest.TestCase):
"""
Adding a non-existent queue must raise MessageBusError
"""
queue = "fake" + QUEUE
queue = "fake" + test_queue.address
regexp = re.escape(self.error) + '.*' + 'Node not found: %s' % queue + '.*'
with self.assertRaisesRegex(MessageBusError, regexp):
with FromBus(QUEUE) as frombus:
with FromBus(test_queue.address) as frombus:
frombus._add_queue(queue)
def test_receiver_succeeds(self):
"""
Adding an existing queue must succeed
Note JK: I removed the multiple queue thing since I don't see it actually being used (or being useful)
"""
with FromBus(QUEUE) as frombus:
with FromBus(test_queue.address) as frombus:
self.assertTrue(frombus.receiver is not None)
def test_receive_timeout(self):
"""
Getting a message when there's none must yield None after timeout.
"""
with FromBus(QUEUE) as frombus:
with FromBus(test_queue.address) as frombus:
self.assertIsNone(frombus.receive(timeout=TIMEOUT))
......@@ -166,7 +205,7 @@ class ToBusInitFailed(unittest.TestCase):
regexp = re.escape(self.error)
regexp += '.*' + '(No address associated with hostname|Name or service not known)'
with self.assertRaisesRegex(MessageBusError, regexp):
with ToBus(QUEUE, broker="foo.bar", broker_options={'reconnect': False}):
with ToBus(test_queue.address, broker="foo.bar", broker_options={'reconnect': False}):
pass
def test_connection_refused(self):
......@@ -175,7 +214,7 @@ class ToBusInitFailed(unittest.TestCase):
"""
regexp = re.escape(self.error) + '.*' + '(Connection refused|111)' + '.*'
with self.assertRaisesRegex(MessageBusError, regexp):
with ToBus(QUEUE, broker="localhost:4", broker_options={'reconnect': False}):
with ToBus(test_queue.address, broker="localhost:4", broker_options={'reconnect': False}):
pass
......@@ -192,7 +231,7 @@ class ToBusSendMessage(unittest.TestCase):
If a ToBus object is used outside a context, then there's no active
session, and a MessageBusError must be raised.
"""
tobus = ToBus(QUEUE)
tobus = ToBus(test_queue.address)
regexp = re.escape("[ToBus] No active sender") + '.*'
with self.assertRaisesRegex(MessageBusError, regexp):
tobus.send(None)
......@@ -204,7 +243,7 @@ class ToBusSendMessage(unittest.TestCase):
the ToBus object.
"""
with self.assertRaises(AttributeError): # Due to sender not being there for close
with ToBus(QUEUE) as tobus:
with ToBus(test_queue.address) as tobus:
tobus.sender = None
regexp = re.escape("[ToBus] No active sender") + ".*"
with self.assertRaisesRegex(MessageBusError, regexp):
......@@ -216,17 +255,17 @@ class ToBusSendMessage(unittest.TestCase):
Note that this can only happen if someone has deliberately tampered with
the ToBus object (e.g., by using the protected _add_queue() method).
"""
with ToBus(QUEUE) as tobus:
with ToBus(test_queue.address) as tobus:
regexp = re.escape("[ToBus] More than one sender")
with self.assertRaisesRegex(MessageBusError, regexp):
tobus._add_queue(QUEUE, {})
tobus._add_queue(test_queue.address, {})
def test_send_invalid_message_raises(self):
"""
If an invalid message is sent (i.e., not an LofarMessage), then an
InvalidMessage must be raised.
"""
with ToBus(QUEUE) as tobus:
with ToBus(test_queue.address) as tobus:
regexp = re.escape("Invalid message type")
with self.assertRaisesRegex(InvalidMessage, regexp):
tobus.send("Blah blah blah")
......@@ -240,10 +279,10 @@ class QueueIntrospection(unittest.TestCase):
"""
def setUp(self):
self.frombus = FromBus(QUEUE)
self.tobus = ToBus(QUEUE)
self.frombus = FromBus(test_queue.address)
self.tobus = ToBus(test_queue.address)
# if there are any dangling messages in the QUEUE, they hold state between the individual tests
# if there are any dangling messages in the test_queue.address, they hold state between the individual tests
# make sure the queue is empty by receiving any dangling messages
with self.frombus:
self.frombus.drain()
......@@ -281,17 +320,16 @@ class QueueIntrospection(unittest.TestCase):
self.frombus.receive()
self.assertEqual(MAX_NR_OF_MESSAGES-i-1, self.frombus.nr_of_messages_in_queue())
class SendReceiveMessage(unittest.TestCase):
"""
Class to test sending and receiving a message.
"""
def setUp(self):
self.frombus = FromBus(QUEUE)
self.tobus = ToBus(QUEUE)
self.frombus = FromBus(test_queue.address)
self.tobus = ToBus(test_queue.address)
# if there are any dangling messages in the QUEUE, they hold state between the individual tests
# if there are any dangling messages in the test_queue.address, they hold state between the individual tests
# make sure the queue is empty by receiving any dangling messages
with self.frombus:
self.frombus.drain()
......@@ -336,7 +374,7 @@ class SendReceiveMessage(unittest.TestCase):
Test send/receive of an RequestMessage, containing a byte array.
"""
content = {"request": "Do Something", "argument": "Very Often"}
self._test_sendrecv(RequestMessage(content, reply_to=QUEUE))
self._test_sendrecv(RequestMessage(content, reply_to=test_queue.address))
def test_sendrecv_request_message_with_large_content_map(self):
"""
......@@ -345,16 +383,10 @@ class SendReceiveMessage(unittest.TestCase):
We circumvent this in ToBus.send and FromBus.receive by converting long strings in a dict to a buffer and back.
"""
content = {"key1": "short message", "key2": "long message " + (2**17)*'a'}
self._test_sendrecv(RequestMessage(content, reply_to=QUEUE))
self._test_sendrecv(RequestMessage(content, reply_to=test_queue.address))
# main program should run within context of the TemporaryQueue test_queue as well
# because the tests are using this test_queue
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)
# delete last cmdlime argument if it holds the test-queue-name,
# so it is not passed on to the unittest framework.
# see also t_messagebus.run
if len(sys.argv) > 1 and sys.argv[-1].strip() != "t_messagebus.py":
del sys.argv[-1]
unittest.main()
#!/bin/bash -e
# Cleanup on normal exit and on SIGHUP, SIGINT, SIGQUIT, and SIGTERM
trap 'qpid-config del queue --force $queue' 0 1 2 3 15
# Generate randome queue name
queue=$(< /dev/urandom tr -dc [:alnum:] | head -c16)
# Create the queue
qpid-config add queue $queue
# Run the unit test
source python-coverage.sh
python_coverage_test "Messaging/python" t_messagebus.py $queue
python_coverage_test "Messaging/python" t_messagebus.py
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment