diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index 9f0e7ed4e8d5da3a962f3db9fb0b121cf42ce2be..036740affbcfcf6fa3e17e29c92f88fa442ec4f4 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -205,9 +205,9 @@ class FromBus(object): # t.daemon = True # t.start() - except proton.ProtonException: + except proton.ProtonException as pe: raise_exception(MessageBusError, - "[FromBus] Failed to create %s" % (what,)) + "[FromBus] Failed to create %s: %s" % (what, pe)) logger.debug("[FromBus] Created %s", what) def receive(self, timeout=DEFAULT_TIMEOUT, logDebugMessages=True): @@ -556,6 +556,85 @@ class ToBus(object): logger.debug("[ToBus] Message sent to: %s subject: %s" % (self.address, qmsg.subject)) +class TemporaryQueue(object): + """ + A TemporaryQueue instance can be used to setup a dynamic temporary queue which is closed and deleted automagically when leaving context. + Together with the factory methods create_frombus and/or create_tobus it gives us to following simple but often used use case: + + with TemporaryQueue("MyTestQueue") as tmp_queue: + with tmp_queue.create_tobus() as tobus, tmp_queue.create_frombus() as frombus: + # send a message... + original_msg = EventMessage(content="foobar") + tobus.send(original_msg) + + # ...receive the message. + received_msg = frombus.receive() + + Alternative use cases with only a tobus or only a frombus on the tmp_queue are also possible. + """ + def __init__(self, name=None, broker="localhost"): + """ + Create a TemporaryQueue instance with an optional name on the given broker. + :param name: Optional name, which is part of the final address which also includes a uuid. + :param broker: the qpid broker to connect to. + """ + self.name = name + self.broker = broker + self._dynamic_receiver = None + self.address = None + + def __enter__(self): + """ + Opens/creates the temporary queue. It is automatically closed when leaving context in __exit__. + :return: self. + """ + self.open() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + Close/remove the temporary queue. + """ + self.close() + + def open(self): + """ + Open/create the temporary queue. + It is advised to use the TemporaryQueue instance in a 'with' context, which guarantees the close call. + """ + logger.info("Creating TemporaryQueue...") + connection = proton.utils.BlockingConnection(self.broker) + self._dynamic_receiver = connection.create_receiver(address=None, dynamic=True, name=self.name) + self.address = self._dynamic_receiver.link.remote_source.address + logger.info("Created TemporaryQueue at %s", self.address) + + def close(self): + """ + Close/remove the temporary queue. + It is advised to use the TemporaryQueue instance in a 'with' context, which guarantees the close call. + """ + logger.debug("Closing TemporaryQueue at %s", self.address) + self._dynamic_receiver.close() + self._dynamic_receiver.connection.close() + self._dynamic_receiver = None + logger.info("Closed TemporaryQueue at %s", self.address) + self.address = None + + def create_frombus(self): + """ + Factory method to create a FromBus instance which is connected to this TemporaryQueue + :return: FromBus + """ + return FromBus(broker=self.broker, address=self.address) + + def create_tobus(self): + """ + Factory method to create a ToBus instance which is connected to this TemporaryQueue + :return: ToBus + """ + return ToBus(broker=self.broker, address=self.address) + + class AbstractBusListener(object): """ AbstractBusListener class for handling messages which are received on a message bus. @@ -768,4 +847,4 @@ class AbstractBusListener(object): logger.error("finalize_loop() failed with %s", e) -__all__ = ["FromBus", "ToBus", "AbstractBusListener"] +__all__ = ["FromBus", "ToBus", "TemporaryQueue", "AbstractBusListener"] diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.py b/LCS/Messaging/python/messaging/test/t_messagebus.py index 36e9f454f69f9805a11dbe650aeca5fdd1944d67..1ca57dbb3f589e945c5bf6b515f3541f8bfd43de 100644 --- a/LCS/Messaging/python/messaging/test/t_messagebus.py +++ b/LCS/Messaging/python/messaging/test/t_messagebus.py @@ -35,326 +35,358 @@ from lofar.messaging.messagebus import * from lofar.messaging.messagebus import DEFAULT_RECEIVER_CAPACITY from lofar.messaging.exceptions import MessageBusError, InvalidMessage -TIMEOUT = 1.0 -QUEUE = sys.argv[-1] if len(sys.argv) > 1 and "t_messagebus.py" not in sys.argv[-1] else "t_messagebus.queue" logger = logging.getLogger(__name__) +TIMEOUT = 1.0 -# ======== FromBus unit tests ======== # - -class FromBusInitFailed(unittest.TestCase): - """ - Class to test initialization failures of FromBus - """ - - def setUp(self): - self.error = "[FromBus] Initialization failed" +class TestTemporaryQueue(unittest.TestCase): + """Test the TemporaryQueue class""" - def test_no_broker_address(self): + def test_temporary_is_really_temporary(self): """ - Connecting to non-existent broker address must raise MessageBusError + test if the temporary queue is really removed after usage """ - regexp = re.escape(self.error) - regexp += '.*' + 'No address associated with hostname|Name or service not known' + '.*' + tmp_queue_address = None + with TemporaryQueue("MyTestQueue") as tmp_queue: + tmp_queue_address = tmp_queue.address + self.assertTrue("MyTestQueue" in tmp_queue_address) + + # test if the temporary queue has been deleted when leaving scope + # We should not be able to connect to it anymore + regexp = re.escape("[FromBus] Failed to create receiver for source") + '.*' + 'Node not found: %s' % tmp_queue_address + '.*' with self.assertRaisesRegex(MessageBusError, regexp): - with FromBus(QUEUE, broker="foo.bar", broker_options={'reconnect': False}): + with FromBus(tmp_queue_address): pass - def test_connection_refused(self): + def test_send_receive_over_temporary_queue(self): """ - Connecting to broker on wrong port must raise MessageBusError + test the usage of the TemporaryQueue in conjunction with normal ToBus and Frombus usage """ - regexp = re.escape(self.error) + '.*' + '(Connection refused|111)' + '.*' - with self.assertRaisesRegex(MessageBusError, regexp): - with FromBus("fake" + QUEUE, broker="localhost:4", broker_options={'reconnect': False}): - pass - - -class FromBusNotInContext(unittest.TestCase): - """ - Class to test that exception is raised when FromBus is used outside context - """ + with TemporaryQueue("MyTestQueue") as tmp_queue: + # create a normal To/FromBus on this tmp_queue + with tmp_queue.create_tobus() as tobus, tmp_queue.create_frombus() as frombus: + # send a message... + original_msg = EventMessage(content="foobar") + tobus.send(original_msg) - def setUp(self): - self.frombus = FromBus(QUEUE) - self.error = re.escape("[FromBus] No active receiver") + '.*' + # ...receive the message... + received_msg = frombus.receive() - def test_add_queue_raises(self): - """ - Adding a queue when outside context must raise MessageBusError - """ - with self.assertRaises(MessageBusError): - self.frombus._add_queue("fooqueue") + # and test if they are equal + self.assertEqual(original_msg.id, received_msg.id) + self.assertEqual(original_msg.body, received_msg.body) - def test_receive_raises(self): - """ - Getting a message when outside context must raise MessageBusError - """ - with self.assertRaisesRegex(MessageBusError, self.error): - self.frombus.receive() - def test_ack_raises(self): - """ - Ack-ing a message when outside context must raise MessageBusError - """ - with self.assertRaisesRegex(MessageBusError, self.error): - self.frombus.ack(None) +# create a TemporaryQueue for testing. Is automagically deleted upon exit. +with TemporaryQueue("t_messagebus") as test_queue: - def test_nack_raises(self): - """ - Nack-ing a message when outside context must raise MessageBusError - """ - with self.assertRaisesRegex(MessageBusError, self.error): - self.frombus.nack(None) + # ======== FromBus unit tests ======== # - def test_reject_raises(self): + class FromBusInitFailed(unittest.TestCase): """ - Rejecting a message when outside context must raise MessageBusError + Class to test initialization failures of FromBus """ - with self.assertRaisesRegex(MessageBusError, self.error): - self.frombus.reject(None) + def setUp(self): + self.error = "[FromBus] Initialization failed" -class FromBusInContext(unittest.TestCase): - """ - Class to test FromBus when inside context. - """ + def test_no_broker_address(self): + """ + Connecting to non-existent broker address must raise MessageBusError + """ + regexp = re.escape(self.error) + regexp += '.*' + 'No address associated with hostname|Name or service not known' + '.*' + with self.assertRaisesRegex(MessageBusError, regexp): + with FromBus(test_queue.address, broker="foo.bar", broker_options={'reconnect': False}): + pass + + def test_connection_refused(self): + """ + Connecting to broker on wrong port must raise MessageBusError + """ + regexp = re.escape(self.error) + '.*' + '(Connection refused|111)' + '.*' + with self.assertRaisesRegex(MessageBusError, regexp): + with FromBus("fake" + test_queue.address, broker="localhost:4", broker_options={'reconnect': False}): + pass - def setUp(self): - self.error = "[FromBus] Failed to create receiver for source" - def test_receiver_fails(self): + class FromBusNotInContext(unittest.TestCase): """ - Adding a non-existent queue must raise MessageBusError + Class to test that exception is raised when FromBus is used outside context """ - queue = "fake" + QUEUE - regexp = re.escape(self.error) + '.*' + 'Node not found: %s' % queue + '.*' - with self.assertRaisesRegex(MessageBusError, regexp): - with FromBus(QUEUE) as frombus: - frombus._add_queue(queue) - def test_receiver_succeeds(self): - """ - Adding an existing queue must succeed - Note JK: I removed the multiple queue thing since I don't see it actually being used (or being useful) - """ - with FromBus(QUEUE) as frombus: - self.assertTrue(frombus.receiver is not None) + def setUp(self): + self.frombus = FromBus(test_queue.address) + self.error = re.escape("[FromBus] No active receiver") + '.*' - def test_receive_timeout(self): - """ - Getting a message when there's none must yield None after timeout. - """ - with FromBus(QUEUE) as frombus: - self.assertIsNone(frombus.receive(timeout=TIMEOUT)) + def test_add_queue_raises(self): + """ + Adding a queue when outside context must raise MessageBusError + """ + with self.assertRaises(MessageBusError): + self.frombus._add_queue("fooqueue") + def test_receive_raises(self): + """ + Getting a message when outside context must raise MessageBusError + """ + with self.assertRaisesRegex(MessageBusError, self.error): + self.frombus.receive() -# ======== ToBus unit tests ======== # + def test_ack_raises(self): + """ + Ack-ing a message when outside context must raise MessageBusError + """ + with self.assertRaisesRegex(MessageBusError, self.error): + self.frombus.ack(None) -class ToBusInitFailed(unittest.TestCase): - """ - Class to test initialization failures of ToBus - """ + def test_nack_raises(self): + """ + Nack-ing a message when outside context must raise MessageBusError + """ + with self.assertRaisesRegex(MessageBusError, self.error): + self.frombus.nack(None) - def setUp(self): - self.error = "[ToBus] Initialization failed" + def test_reject_raises(self): + """ + Rejecting a message when outside context must raise MessageBusError + """ + with self.assertRaisesRegex(MessageBusError, self.error): + self.frombus.reject(None) - def test_no_broker_address(self): - """ - Connecting to non-existent broker address must raise MessageBusError - """ - regexp = re.escape(self.error) - regexp += '.*' + '(No address associated with hostname|Name or service not known)' - with self.assertRaisesRegex(MessageBusError, regexp): - with ToBus(QUEUE, broker="foo.bar", broker_options={'reconnect': False}): - pass - def test_connection_refused(self): + class FromBusInContext(unittest.TestCase): """ - Connecting to broker on wrong port must raise MessageBusError + Class to test FromBus when inside context. """ - regexp = re.escape(self.error) + '.*' + '(Connection refused|111)' + '.*' - with self.assertRaisesRegex(MessageBusError, regexp): - with ToBus(QUEUE, broker="localhost:4", broker_options={'reconnect': False}): - pass + def setUp(self): + self.error = "[FromBus] Failed to create receiver for source" -class ToBusSendMessage(unittest.TestCase): - """ - Class to test different error conditions when sending a message - """ + def test_receiver_fails(self): + """ + Adding a non-existent queue must raise MessageBusError + """ + queue = "fake" + test_queue.address + regexp = re.escape(self.error) + '.*' + 'Node not found: %s' % queue + '.*' + with self.assertRaisesRegex(MessageBusError, regexp): + with FromBus(test_queue.address) as frombus: + frombus._add_queue(queue) - def setUp(self): - pass + def test_receiver_succeeds(self): + """ + Adding an existing queue must succeed + """ + with FromBus(test_queue.address) as frombus: + self.assertTrue(frombus.receiver is not None) - def test_send_outside_context_raises(self): - """ - If a ToBus object is used outside a context, then there's no active - session, and a MessageBusError must be raised. - """ - tobus = ToBus(QUEUE) - regexp = re.escape("[ToBus] No active sender") + '.*' - with self.assertRaisesRegex(MessageBusError, regexp): - tobus.send(None) + def test_receive_timeout(self): + """ + Getting a message when there's none must yield None after timeout. + """ + with FromBus(test_queue.address) as frombus: + self.assertIsNone(frombus.receive(timeout=TIMEOUT)) - def test_no_senders_raises(self): - """ - If there are no senders, then a MessageBusError must be raised. - Note that this can only happen if someone has deliberately tampered with - the ToBus object. - """ - with self.assertRaises(AttributeError): # Due to sender not being there for close - with ToBus(QUEUE) as tobus: - tobus.sender = None - regexp = re.escape("[ToBus] No active sender") + ".*" - with self.assertRaisesRegex(MessageBusError, regexp): - tobus.send(None) - def test_multiple_senders_raises(self): - """ - If there's more than one sender, then a MessageBusError must be raised. - Note that this can only happen if someone has deliberately tampered with - the ToBus object (e.g., by using the protected _add_queue() method). - """ - with ToBus(QUEUE) as tobus: - regexp = re.escape("[ToBus] More than one sender") - with self.assertRaisesRegex(MessageBusError, regexp): - tobus._add_queue(QUEUE, {}) + # ======== ToBus unit tests ======== # - def test_send_invalid_message_raises(self): + class ToBusInitFailed(unittest.TestCase): """ - If an invalid message is sent (i.e., not an LofarMessage), then an - InvalidMessage must be raised. + Class to test initialization failures of ToBus """ - with ToBus(QUEUE) as tobus: - regexp = re.escape("Invalid message type") - with self.assertRaisesRegex(InvalidMessage, regexp): - tobus.send("Blah blah blah") - - -# ======== Combined FromBus/ToBus unit tests ======== # - -class QueueIntrospection(unittest.TestCase): - """ - Test sending and receiving messages, and introspecting the in-between queue - """ - - def setUp(self): - self.frombus = FromBus(QUEUE) - self.tobus = ToBus(QUEUE) - # if there are any dangling messages in the QUEUE, they hold state between the individual tests - # make sure the queue is empty by receiving any dangling messages - with self.frombus: - self.frombus.drain() + def setUp(self): + self.error = "[ToBus] Initialization failed" - def test_drain_non_empty_queue(self): - with self.tobus, self.frombus: - self.tobus.send(EventMessage(content="foo")) - self.tobus.send(EventMessage(content="foo")) - self.assertGreater(self.frombus.nr_of_messages_in_queue(), 0) - - self.frombus.drain() - self.assertEqual(0, self.frombus.nr_of_messages_in_queue()) - - - def test_counting_one_message_in_queue(self): - with self.tobus, self.frombus: - self.tobus.send(EventMessage(content="foo")) - self.assertEqual(1, self.frombus.nr_of_messages_in_queue()) - - self.frombus.receive() - self.assertEqual(0, self.frombus.nr_of_messages_in_queue()) - - def test_counting_multiple_messages_in_queue(self): - # DEFAULT_RECEIVER_CAPACITY should be > 2 otherwise we cannot even store multiple messages in the local queue - self.assertGreaterEqual(DEFAULT_RECEIVER_CAPACITY, 2) + def test_no_broker_address(self): + """ + Connecting to non-existent broker address must raise MessageBusError + """ + regexp = re.escape(self.error) + regexp += '.*' + '(No address associated with hostname|Name or service not known)' + with self.assertRaisesRegex(MessageBusError, regexp): + with ToBus(test_queue.address, broker="foo.bar", broker_options={'reconnect': False}): + pass + + def test_connection_refused(self): + """ + Connecting to broker on wrong port must raise MessageBusError + """ + regexp = re.escape(self.error) + '.*' + '(Connection refused|111)' + '.*' + with self.assertRaisesRegex(MessageBusError, regexp): + with ToBus(test_queue.address, broker="localhost:4", broker_options={'reconnect': False}): + pass - with self.tobus, self.frombus: - MAX_NR_OF_MESSAGES = min(10, DEFAULT_RECEIVER_CAPACITY) - for i in range(MAX_NR_OF_MESSAGES): - self.tobus.send(EventMessage(content="foo")) - self.assertEqual(i+1, self.frombus.nr_of_messages_in_queue()) - for i in range(MAX_NR_OF_MESSAGES): - self.assertEqual(MAX_NR_OF_MESSAGES-i, self.frombus.nr_of_messages_in_queue()) - self.frombus.receive() - self.assertEqual(MAX_NR_OF_MESSAGES-i-1, self.frombus.nr_of_messages_in_queue()) + class ToBusSendMessage(unittest.TestCase): + """ + Class to test different error conditions when sending a message + """ + def setUp(self): + pass -class SendReceiveMessage(unittest.TestCase): - """ - Class to test sending and receiving a message. - """ + def test_send_outside_context_raises(self): + """ + If a ToBus object is used outside a context, then there's no active + session, and a MessageBusError must be raised. + """ + tobus = ToBus(test_queue.address) + regexp = re.escape("[ToBus] No active sender") + '.*' + with self.assertRaisesRegex(MessageBusError, regexp): + tobus.send(None) + + def test_no_senders_raises(self): + """ + If there are no senders, then a MessageBusError must be raised. + Note that this can only happen if someone has deliberately tampered with + the ToBus object. + """ + with self.assertRaises(AttributeError): # Due to sender not being there for close + with ToBus(test_queue.address) as tobus: + tobus.sender = None + regexp = re.escape("[ToBus] No active sender") + ".*" + with self.assertRaisesRegex(MessageBusError, regexp): + tobus.send(None) + + def test_multiple_senders_raises(self): + """ + If there's more than one sender, then a MessageBusError must be raised. + Note that this can only happen if someone has deliberately tampered with + the ToBus object (e.g., by using the protected _add_queue() method). + """ + with ToBus(test_queue.address) as tobus: + regexp = re.escape("[ToBus] More than one sender") + with self.assertRaisesRegex(MessageBusError, regexp): + tobus._add_queue(test_queue.address, {}) - def setUp(self): - self.frombus = FromBus(QUEUE) - self.tobus = ToBus(QUEUE) + def test_send_invalid_message_raises(self): + """ + If an invalid message is sent (i.e., not an LofarMessage), then an + InvalidMessage must be raised. + """ + with ToBus(test_queue.address) as tobus: + regexp = re.escape("Invalid message type") + with self.assertRaisesRegex(InvalidMessage, regexp): + tobus.send("Blah blah blah") - # if there are any dangling messages in the QUEUE, they hold state between the individual tests - # make sure the queue is empty by receiving any dangling messages - with self.frombus: - self.frombus.drain() - def _test_sendrecv(self, send_msg): - """ - Helper class that implements the send/receive logic and message checks. - :param send_msg: Message to send - """ - with self.tobus, self.frombus: - self.tobus.send(send_msg) - recv_msg = self.frombus.receive(timeout=TIMEOUT) - self.frombus.ack(recv_msg) - self.assertEqual( - (send_msg.SystemName, send_msg.MessageId, send_msg.MessageType), - (recv_msg.SystemName, recv_msg.MessageId, recv_msg.MessageType)) - self.assertEqual(send_msg.body, recv_msg.body) - - def test_sendrecv_event_message(self): - """ - Test send/receive of an EventMessage, containing a string. - """ - content = "An event message" - self._test_sendrecv(EventMessage(content)) + # ======== Combined FromBus/ToBus unit tests ======== # - def test_sendrecv_monitoring_message(self): + class QueueIntrospection(unittest.TestCase): """ - Test send/receive of an MonitoringMessage, containing a python list. + Test sending and receiving messages, and introspecting the in-between queue """ - content = ["A", "monitoring", "message"] - self._test_sendrecv(MonitoringMessage(content)) - def test_sendrecv_progress_message(self): - """ - Test send/receive of an ProgressMessage, containing a python dict. - """ - content = {"Progress": "Message"} - self._test_sendrecv(ProgressMessage(content)) + def setUp(self): + self.frombus = FromBus(test_queue.address) + self.tobus = ToBus(test_queue.address) - def test_sendrecv_request_message(self): - """ - Test send/receive of an RequestMessage, containing a byte array. - """ - content = {"request": "Do Something", "argument": "Very Often"} - self._test_sendrecv(RequestMessage(content, reply_to=QUEUE)) + # if there are any dangling messages in the test_queue.address, they hold state between the individual tests + # make sure the queue is empty by receiving any dangling messages + with self.frombus: + self.frombus.drain() - def test_sendrecv_request_message_with_large_content_map(self): - """ - Test send/receive of an RequestMessage, containing a dict with a large string value. - Qpid, cannot (de)serialize strings > 64k in a dict - We circumvent this in ToBus.send and FromBus.receive by converting long strings in a dict to a buffer and back. - """ - content = {"key1": "short message", "key2": "long message " + (2**17)*'a'} - self._test_sendrecv(RequestMessage(content, reply_to=QUEUE)) + def test_drain_non_empty_queue(self): + with self.tobus, self.frombus: + self.tobus.send(EventMessage(content="foo")) + self.tobus.send(EventMessage(content="foo")) + self.assertGreater(self.frombus.nr_of_messages_in_queue(), 0) + self.frombus.drain() + self.assertEqual(0, self.frombus.nr_of_messages_in_queue()) -if __name__ == '__main__': - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) - # delete last cmdlime argument if it holds the test-queue-name, - # so it is not passed on to the unittest framework. - # see also t_messagebus.run - if len(sys.argv) > 1 and sys.argv[-1].strip() != "t_messagebus.py": - del sys.argv[-1] + def test_counting_one_message_in_queue(self): + with self.tobus, self.frombus: + self.tobus.send(EventMessage(content="foo")) + self.assertEqual(1, self.frombus.nr_of_messages_in_queue()) - unittest.main() + self.frombus.receive() + self.assertEqual(0, self.frombus.nr_of_messages_in_queue()) + + def test_counting_multiple_messages_in_queue(self): + # DEFAULT_RECEIVER_CAPACITY should be > 2 otherwise we cannot even store multiple messages in the local queue + self.assertGreaterEqual(DEFAULT_RECEIVER_CAPACITY, 2) + + with self.tobus, self.frombus: + MAX_NR_OF_MESSAGES = min(10, DEFAULT_RECEIVER_CAPACITY) + for i in range(MAX_NR_OF_MESSAGES): + self.tobus.send(EventMessage(content="foo")) + self.assertEqual(i+1, self.frombus.nr_of_messages_in_queue()) + + for i in range(MAX_NR_OF_MESSAGES): + self.assertEqual(MAX_NR_OF_MESSAGES-i, self.frombus.nr_of_messages_in_queue()) + self.frombus.receive() + self.assertEqual(MAX_NR_OF_MESSAGES-i-1, self.frombus.nr_of_messages_in_queue()) + + class SendReceiveMessage(unittest.TestCase): + """ + Class to test sending and receiving a message. + """ + + def setUp(self): + self.frombus = FromBus(test_queue.address) + self.tobus = ToBus(test_queue.address) + + # if there are any dangling messages in the test_queue.address, they hold state between the individual tests + # make sure the queue is empty by receiving any dangling messages + with self.frombus: + self.frombus.drain() + + def _test_sendrecv(self, send_msg): + """ + Helper class that implements the send/receive logic and message checks. + :param send_msg: Message to send + """ + with self.tobus, self.frombus: + self.tobus.send(send_msg) + recv_msg = self.frombus.receive(timeout=TIMEOUT) + self.frombus.ack(recv_msg) + self.assertEqual( + (send_msg.SystemName, send_msg.MessageId, send_msg.MessageType), + (recv_msg.SystemName, recv_msg.MessageId, recv_msg.MessageType)) + self.assertEqual(send_msg.body, recv_msg.body) + + def test_sendrecv_event_message(self): + """ + Test send/receive of an EventMessage, containing a string. + """ + content = "An event message" + self._test_sendrecv(EventMessage(content)) + + def test_sendrecv_monitoring_message(self): + """ + Test send/receive of an MonitoringMessage, containing a python list. + """ + content = ["A", "monitoring", "message"] + self._test_sendrecv(MonitoringMessage(content)) + + def test_sendrecv_progress_message(self): + """ + Test send/receive of an ProgressMessage, containing a python dict. + """ + content = {"Progress": "Message"} + self._test_sendrecv(ProgressMessage(content)) + + def test_sendrecv_request_message(self): + """ + Test send/receive of an RequestMessage, containing a byte array. + """ + content = {"request": "Do Something", "argument": "Very Often"} + self._test_sendrecv(RequestMessage(content, reply_to=test_queue.address)) + + def test_sendrecv_request_message_with_large_content_map(self): + """ + Test send/receive of an RequestMessage, containing a dict with a large string value. + Qpid, cannot (de)serialize strings > 64k in a dict + We circumvent this in ToBus.send and FromBus.receive by converting long strings in a dict to a buffer and back. + """ + content = {"key1": "short message", "key2": "long message " + (2**17)*'a'} + self._test_sendrecv(RequestMessage(content, reply_to=test_queue.address)) + + # main program should run within context of the TemporaryQueue test_queue as well + # because the tests are using this test_queue + if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) + unittest.main() diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.run b/LCS/Messaging/python/messaging/test/t_messagebus.run index e225bb14fa8abccee7fabc6f7d9436f3ebb9c745..3e1084662fc7845cd197389f84645a43d7ba6e3f 100755 --- a/LCS/Messaging/python/messaging/test/t_messagebus.run +++ b/LCS/Messaging/python/messaging/test/t_messagebus.run @@ -1,13 +1,5 @@ #!/bin/bash -e -# Cleanup on normal exit and on SIGHUP, SIGINT, SIGQUIT, and SIGTERM -trap 'qpid-config del queue --force $queue' 0 1 2 3 15 - -# Generate randome queue name -queue=$(< /dev/urandom tr -dc [:alnum:] | head -c16) - -# Create the queue -qpid-config add queue $queue # Run the unit test source python-coverage.sh -python_coverage_test "Messaging/python" t_messagebus.py $queue +python_coverage_test "Messaging/python" t_messagebus.py