diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index b01a7ec22efeb344aea9a2ae0d4eec6d0ea7ed00..143006659b46b4874e9fc1bf90f039a2e571e4b6 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -278,6 +278,24 @@ def delete_exchange(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEB except Exception as e: raise MessagingError("Could not delete exchange %s on broker %s error=%s" % (name, broker, e)) +def exchange_exists(name: str, broker: str=DEFAULT_BROKER) -> bool: + """ + does the exchange with the given name exist on the given broker? + :param name: the name for the exchange + :param broker: a message broker address + :return True if it exists, False if not. + """ + try: + with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection: + exchange = kombu.Exchange(name, channel=connection) + try: + exchange.declare(channel=connection.default_channel, passive=True) + return True + except amqp.exceptions.NotFound: + return False + except Exception as e: + raise MessagingError("Could not test if exchange %s exists on broker %s error=%s" % (name, broker, e)) + def create_queue(name: str, durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG, auto_delete: bool=False) -> bool: """ @@ -329,6 +347,23 @@ def delete_queue(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG) except Exception as e: raise MessagingError("Could not delete queue %s on broker %s error=%s" % (name, broker, e)) +def queue_exists(name: str, broker: str=DEFAULT_BROKER) -> bool: + """ + does the queue with the given name exist on the given broker? + :param name: the name for the queue + :param broker: a message broker address + :return True if it exists, False if not. + """ + try: + with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection: + queue = kombu.Queue(name, no_declare=True, channel=connection) + try: + queue.queue_declare(channel=connection.default_channel, passive=True) + return True + except amqp.exceptions.NotFound: + return False + except Exception as e: + raise MessagingError("Could not test if queue %s exists on broker %s error=%s" % (name, broker, e)) def nr_of_messages_in_queue(queue_name: str, broker: str = DEFAULT_BROKER) -> int: """get the number of messages in the queue""" @@ -1367,7 +1402,8 @@ class BusListener: create_bound_queue(exchange=exchange, queue=self.address, routing_key=routing_key, broker=self.broker, log_level=logging.INFO) - def designated_queue_name(self, exchange: str, routing_key: str) -> str: + @staticmethod + def designated_queue_name(exchange: str, routing_key: str="#") -> str: """ create a designated queue name based on the given exchange name, routing_key, and the current running program name. Like so: <exchange>.for.<program_name>.<listener_type_name>.on.<sanitzed_routing_key> @@ -1381,7 +1417,7 @@ class BusListener: sanitized_routing_key = "all" return "%s.queue.for.%s.%s.on.%s" % (exchange, program_name(include_extension=False), - self.__class__.__name__, + __class__.__name__, sanitized_routing_key) def is_running(self) -> bool: @@ -1585,19 +1621,31 @@ class BusListenerJanitor: def __enter__(self) -> BusListener: """enter the context, and make the bus_listener start listening. :return a reference to the buslistener, not to the janitor!""" - self._bus_listener.start_listening() - return self._bus_listener + try: + self.open() + return self._bus_listener + except Exception as e: + logger.exception(e) + self.close() + raise def __exit__(self, exc_type, exc_val, exc_tb): """leave the context, make the bus_listener stop listening, and clean up the auto-generated queue""" + self.close() + + def open(self): + """make the bus_listener start listening.""" + self._bus_listener.start_listening() + + def close(self): + """make the bus_listener stop listening, and delete listener queue""" try: + bus_listener_address = self._bus_listener.address self._bus_listener.stop_listening() - except Exception as e: - logger.error(e) finally: - logger.info("BusListenerJanitor deleting auto-generated queue: %s", self._bus_listener.address) - delete_queue(self._bus_listener.address) + logger.info("BusListenerJanitor deleting auto-generated queue: %s", bus_listener_address) + delete_queue(bus_listener_address) # do not expose create/delete_queue/exchange etc methods in all, it's not part of the public API diff --git a/LCS/Messaging/python/messaging/test/t_RPC.py b/LCS/Messaging/python/messaging/test/t_RPC.py index e51786b0b8d592d2f3f998e852caa9c1fffbbd9a..51eb067fa7167408ed23347261f53dd5e1d2ab0a 100644 --- a/LCS/Messaging/python/messaging/test/t_RPC.py +++ b/LCS/Messaging/python/messaging/test/t_RPC.py @@ -11,7 +11,7 @@ import unittest import uuid from time import sleep -from lofar.messaging.messagebus import TemporaryExchange, can_connect_to_broker +from lofar.messaging.messagebus import TemporaryExchange, can_connect_to_broker, exchange_exists, queue_exists, BusListenerJanitor from lofar.messaging.rpc import RPCClient, RPCService, RPCException, RPCTimeoutException, ServiceMessageHandler TEST_SERVICE_NAME = "%s.%s" % (__name__, uuid.uuid4()) @@ -46,12 +46,14 @@ class TestRPC(unittest.TestCase): def test_rpc_client_to_service_call(self): with TemporaryExchange(__name__) as tmp_exchange: - with RPCService(TEST_SERVICE_NAME, + tmp_exchange_address = tmp_exchange.address + with BusListenerJanitor(RPCService(TEST_SERVICE_NAME, handler_type=MyServiceMessageHandler, handler_kwargs={'my_arg1': "foo", 'my_arg2': "bar"}, exchange=tmp_exchange.address, - num_threads=1) as service: + num_threads=1)) as service: + service_queue_address = service.address self.assertTrue(service.is_listening()) self.assertTrue(service.is_running()) @@ -68,6 +70,9 @@ class TestRPC(unittest.TestCase): with self.assertRaises(RPCTimeoutException): rpc_client.execute("my_public_slow_method") + self.assertFalse(queue_exists(service_queue_address)) + self.assertFalse(exchange_exists(tmp_exchange_address)) + if __name__ == '__main__': if not can_connect_to_broker(): logger.error("Cannot connect to default rabbitmq broker. Skipping test.") diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.py b/LCS/Messaging/python/messaging/test/t_messagebus.py index b7db6e25d1580e15090f5ba4025bb77cca3592d6..c13961d73bfccaddedd83fc7bbd8ce4b60cb0b78 100644 --- a/LCS/Messaging/python/messaging/test/t_messagebus.py +++ b/LCS/Messaging/python/messaging/test/t_messagebus.py @@ -37,7 +37,7 @@ from datetime import datetime from lofar.messaging.messages import * from lofar.messaging.messagebus import * from lofar.messaging.messagebus import _AbstractBus, can_connect_to_broker -from lofar.messaging.messagebus import create_queue, create_exchange, create_binding, create_bound_queue, delete_exchange, delete_queue +from lofar.messaging.messagebus import create_queue, create_exchange, create_binding, create_bound_queue, delete_exchange, delete_queue, exchange_exists, queue_exists from lofar.messaging.config import DEFAULT_USER, DEFAULT_PASSWORD from lofar.messaging.rpc import RequestMessage from lofar.messaging.exceptions import MessageBusError, MessagingRuntimeError, MessagingTimeoutError @@ -52,29 +52,43 @@ class TestCreateDeleteFunctions(unittest.TestCase): def test_create_delete_exchange(self): name = "test-exchange-%s" % (uuid.uuid4()) + + self.assertFalse(exchange_exists(name)) + # creating this new unique test exchange should succeed, and return True cause it's a new exchange self.assertTrue(create_exchange(name, durable=False)) + self.assertTrue(exchange_exists(name)) + # creating it again should return False self.assertFalse(create_exchange(name, durable=False)) # deleting it should succeed self.assertTrue(delete_exchange(name)) + self.assertFalse(exchange_exists(name)) + # deleting it again should return False as there is nothing to deleting self.assertFalse(delete_exchange(name)) def test_create_delete_queue(self): name = "test-queue-%s" % (uuid.uuid4()) + + self.assertFalse(queue_exists(name)) + # creating this new unique test queue should succeed, and return True cause it's a new queue self.assertTrue(create_queue(name, durable=False)) + self.assertTrue(queue_exists(name)) + # creating it again should return False self.assertFalse(create_queue(name, durable=False)) # deleting it should succeed self.assertTrue(delete_queue(name)) + self.assertFalse(queue_exists(name)) + # deleting it again should return False as there is nothing to deleting self.assertFalse(delete_queue(name)) @@ -113,6 +127,8 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): tmp_exchange_address = tmp_exchange.address self.assertTrue("MyTestExchange" in tmp_exchange_address) + self.assertFalse(exchange_exists(tmp_exchange_address)) + # test if the temporary exchange has been deleted when leaving scope # We should not be able to connect to it anymore with self.assertRaisesRegex(MessageBusError, '.*NOT_FOUND.*'): @@ -128,6 +144,8 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): tmp_queue_address = tmp_queue.address self.assertTrue("MyTestQueue" in tmp_queue_address) + self.assertFalse(queue_exists(tmp_queue_address)) + # test if the temporary queue has been deleted when leaving scope # We should not be able to connect to it anymore with self.assertRaisesRegex(MessageBusError, '.*NOT_FOUND.*'): @@ -139,10 +157,12 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): test the usage of the TemporaryExchange and TemporaryQueue in conjunction with normal ToBus and Frombus usage """ with TemporaryExchange("MyTestExchange") as tmp_exchange: + tmp_exchange_address = tmp_exchange.address # create a normal ToBus on this tmp_exchange with tmp_exchange.create_tobus() as tobus_on_exchange: # create a TemporaryQueue, bound to the tmp_exchange with TemporaryQueue("MyTestQueue", exchange=tmp_exchange.address) as tmp_queue: + tmp_queue_address = tmp_queue.address # create a normal FromBus on this tmp_queue with tmp_queue.create_frombus() as frombus: # and let's see if the tmp_queue can also create a tobus which then points to the bound_exchange @@ -164,6 +184,9 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): self.assertEqual(original_msg.id, received_msg.id) self.assertEqual(original_msg.content, received_msg.content) + self.assertFalse(exchange_exists(tmp_exchange_address)) + self.assertFalse(queue_exists(tmp_queue_address)) + def test_send_receive_over_temporary_queue_with_subject_filtering(self): """ test the usage of the TemporaryQueue in conjunction with normal ToBus and Frombus usage with additional filtering on subject @@ -172,6 +195,7 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): SUBJECT2 = "FAKE_SUBJECT" with TemporaryQueue("MyTestQueue", routing_key=SUBJECT) as tmp_queue: + tmp_queue_address = tmp_queue.address # create a normal To/FromBus on this tmp_queue NUM_MESSAGES_TO_SEND = 3 with tmp_queue.create_tobus() as tobus: @@ -203,6 +227,8 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): logger.info("received message: %s", received_msg) self.assertEqual(None, received_msg) + self.assertFalse(queue_exists(tmp_queue_address)) + def test_send_receive_over_temporary_exchange_with_queue_with_subject_filtering(self): """ test the usage of the TemporaryQueue in conjunction with normal ToBus and Frombus usage with additional filtering on subject @@ -211,9 +237,11 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): SUBJECT2 = "FAKE_SUBJECT" NUM_MESSAGES_TO_SEND = 3 with TemporaryExchange("MyTestExchange") as tmp_exchange: + tmp_exchange_address = tmp_exchange.address with tmp_exchange.create_tobus() as tobus: # create a TemporaryQueue, which listens for/receives only the messages with the given SUBJECT with TemporaryQueue("MyTestQueue", exchange=tmp_exchange.address, routing_key=SUBJECT) as tmp_queue: + tmp_queue_address = tmp_queue.address with tmp_queue.create_frombus() as frombus: for i in range(NUM_MESSAGES_TO_SEND): # send a message... @@ -242,17 +270,23 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): logger.info("received message: %s", received_msg) self.assertEqual(None, received_msg) + self.assertFalse(exchange_exists(tmp_exchange_address)) + self.assertFalse(queue_exists(tmp_queue_address)) + def test_send_receive_over_temporary_exchange_with_multiple_bound_queues_with_subject_filtering( self): """ test the usage of the TemporaryQueue in conjunction with normal ToBus and Frombus usage with additional filtering on subject """ with TemporaryExchange("MyTestExchange") as tmp_exchange: + tmp_exchange_address = tmp_exchange.address with tmp_exchange.create_tobus() as tobus: SUBJECT1 = "FooBarSubject" SUBJECT2 = "FAKE_SUBJECT" with TemporaryQueue("MyTestQueue", exchange=tmp_exchange.address, routing_key=SUBJECT1) as tmp_queue1, \ TemporaryQueue("MyTestQueue", exchange=tmp_exchange.address, routing_key=SUBJECT2) as tmp_queue2: + tmp_queue1_address = tmp_queue1.address + tmp_queue2_address = tmp_queue2.address # create a normal To/FromBus on this tmp_queue NUM_MESSAGES_TO_SEND = 3 @@ -295,6 +329,9 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): self.assertEqual(original_msg.content, received_msg2.content) self.assertEqual(original_msg.subject, received_msg2.subject) + self.assertFalse(exchange_exists(tmp_exchange_address)) + self.assertFalse(queue_exists(tmp_queue1_address)) + self.assertFalse(queue_exists(tmp_queue2_address)) # ======== FromBus unit tests ======== # @@ -306,7 +343,11 @@ class FromBusInitFailed(unittest.TestCase): def setUp(self): self.test_queue = TemporaryQueue(__class__.__name__) self.test_queue.open() - self.addCleanup(self.test_queue.close) + + def tearDown(self): + tmp_queue_address = self.test_queue.address + self.test_queue.close() + self.assertFalse(queue_exists(tmp_queue_address)) def test_no_broker_address(self): """ @@ -333,8 +374,11 @@ class FromBusInContext(unittest.TestCase): def setUp(self): self.test_queue = TemporaryQueue(__class__.__name__) self.test_queue.open() - self.addCleanup(self.test_queue.close) - self.error = "[FromBus] Failed to create receiver for source" + + def tearDown(self): + tmp_queue_address = self.test_queue.address + self.test_queue.close() + self.assertFalse(queue_exists(tmp_queue_address)) def test_receiver_exists(self): with FromBus(self.test_queue.address) as frombus: @@ -363,16 +407,20 @@ class ToBusInitFailed(unittest.TestCase): """ def setUp(self): - self.test_queue = TemporaryQueue(__class__.__name__) - self.test_queue.open() - self.addCleanup(self.test_queue.close) + self.test_exchange = TemporaryExchange(__class__.__name__) + self.test_exchange.open() + + def tearDown(self): + tmp_exchange_address = self.test_exchange.address + self.test_exchange.close() + self.assertFalse(exchange_exists(tmp_exchange_address)) def test_no_broker_address(self): """ Connecting to non-existent broker address must raise MessageBusError """ with self.assertRaisesRegex(MessageBusError, ".*failed to resolve broker hostname"): - with ToBus(self.test_queue.address, broker="foo.bar"): + with ToBus(self.test_exchange.address, broker="foo.bar"): pass def test_connection_refused(self): @@ -380,7 +428,7 @@ class ToBusInitFailed(unittest.TestCase): Connecting to broker on wrong port must raise MessageBusError """ with self.assertRaisesRegex(MessageBusError, ".*failed to resolve broker hostname"): - with ToBus(self.test_queue.address, broker="localhost:4"): + with ToBus(self.test_exchange.address, broker="localhost:4"): pass @@ -390,12 +438,23 @@ class SendReceiveMessage(unittest.TestCase): """ def setUp(self): - self.test_queue = TemporaryQueue(__class__.__name__) + self.test_exchange = TemporaryExchange(__class__.__name__) + self.test_exchange.open() + + self.test_queue = TemporaryQueue(__class__.__name__, exchange=self.test_exchange.address) self.test_queue.open() - self.addCleanup(self.test_queue.close) self.frombus = self.test_queue.create_frombus() - self.tobus = self.test_queue.create_tobus() + self.tobus = self.test_exchange.create_tobus() + + def tearDown(self): + tmp_queue_address = self.test_queue.address + self.test_queue.close() + self.assertFalse(queue_exists(tmp_queue_address)) + + tmp_exchange_address = self.test_exchange.address + self.test_exchange.close() + self.assertFalse(exchange_exists(tmp_exchange_address)) def _test_sendrecv(self, send_msg): """ @@ -491,7 +550,9 @@ class SendReceiveMessage(unittest.TestCase): class PriorityTest(unittest.TestCase): def test_priority(self): with TemporaryExchange(self.__class__.__name__) as tmp_exchange: + tmp_exchange_address = tmp_exchange.address with tmp_exchange.create_temporary_queue() as tmp_queue: + tmp_queue_address = tmp_queue.address msg1 = EventMessage(priority=4, subject="some.event", content=1) msg2 = EventMessage(priority=5, subject="some.event", content=2) @@ -507,6 +568,9 @@ class PriorityTest(unittest.TestCase): self.assertEqual(msg1.id, result_msg2.id) self.assertEqual(msg2.id, result_msg1.id) + self.assertFalse(exchange_exists(tmp_exchange_address)) + self.assertFalse(queue_exists(tmp_queue_address)) + class Rejector(BusListener): handled_messages = 0 @@ -531,7 +595,9 @@ class RejectorTester(unittest.TestCase): number_of_messages = 1000 with TemporaryExchange("Rejection") as tmp_exchange: + tmp_exchange_address = tmp_exchange.address with BusListenerJanitor(Rejector(tmp_exchange.address)) as rejector: + rejector_address = Rejector.designated_queue_name(tmp_exchange_address) with tmp_exchange.create_tobus() as spammer: for _ in range(number_of_messages): msg = EventMessage(content="ping", subject="spam") @@ -545,6 +611,9 @@ class RejectorTester(unittest.TestCase): logger.info("Number of messages on queue: {}".format(frombus.nr_of_messages_in_queue())) self.assertEqual(0, frombus.nr_of_messages_in_queue()) + self.assertFalse(exchange_exists(tmp_exchange_address)) + self.assertFalse(queue_exists(rejector_address)) + class PingPongPlayer(BusListener): @@ -629,10 +698,14 @@ class PingPongTester(unittest.TestCase): # setup temporary exchange, on which the player can publish their messages (ping/pong balls) with TemporaryExchange("PingPongTable") as tmp_exchange: + tmp_exchange_address = tmp_exchange.address + # create two players, on "both sides of the table" # i.e.: they each play on the tmp_exchange, but have the auto-generated designated listen queues for incoming balls with BusListenerJanitor(PingPongPlayer("Player1", "Player2", tmp_exchange.address, num_threads_per_player)) as player1: - with BusListenerJanitor(PingPongPlayer("Player2", "Player1", tmp_exchange.address, num_threads_per_player)) as player2: + player1_address = player1.address + with BusListenerJanitor(PingPongPlayer("Player2", "Player1", tmp_exchange.address, num_threads_per_player)) as player2: + player2_address = player2.address start_timestamp = datetime.utcnow() # first serve, referee throws a ping ball on the table in the direction of player1 @@ -668,6 +741,10 @@ class PingPongTester(unittest.TestCase): player1_num_turns, NUM_TURNS, player2_num_turns, NUM_TURNS, num_threads_per_player, 2*NUM_TURNS/(datetime.utcnow() - start_timestamp).total_seconds()) + self.assertFalse(exchange_exists(tmp_exchange_address)) + self.assertFalse(queue_exists(player1_address)) + self.assertFalse(queue_exists(player2_address)) + class MessageHandlerTester(unittest.TestCase): def test_handler_init_raises(self): @@ -678,11 +755,15 @@ class MessageHandlerTester(unittest.TestCase): # try to start a BusListener using this handler. Should fail and raise a MessagingRuntimeError with TemporaryExchange(self.__class__.__name__) as tmp_exchange: + tmp_exchange_name = tmp_exchange.address with self.assertRaises(MessagingRuntimeError): with BusListenerJanitor(BusListener(handler_type=RaisingHandler, - exchange=tmp_exchange.address)) as listener: + exchange=tmp_exchange_name)) as listener: pass + self.assertFalse(exchange_exists(tmp_exchange_name)) + self.assertFalse(queue_exists(BusListener.designated_queue_name(tmp_exchange_name))) + def test_empty_template_handler(self): # define a MessageHandler with a template for callback on<something> methods class BaseTemplateHandler(AbstractMessageHandler): @@ -743,22 +824,34 @@ class MessageHandlerTester(unittest.TestCase): # try to start a BusListener using a BaseTemplateHandler. Should fail and raise a TypeError with TemporaryExchange(self.__class__.__name__) as tmp_exchange: + tmp_exchange_name = tmp_exchange.address with self.assertRaises(RuntimeError): with BusListenerJanitor(BusListener(handler_type=BaseTemplateHandler, - exchange=tmp_exchange.address)) as listener: + exchange=tmp_exchange_name)) as listener: pass + self.assertFalse(exchange_exists(tmp_exchange_name)) + self.assertFalse(queue_exists(BusListener.designated_queue_name(tmp_exchange_name))) + + class ReconnectOnConnectionLossTests(unittest.TestCase): def setUp(self): self.tmp_exchange = TemporaryExchange() - self.addCleanup(self.tmp_exchange.close) - self.tmp_queue = self.tmp_exchange.create_temporary_queue() - self.addCleanup(self.tmp_queue.close) self.tmp_exchange.open() self.tmp_queue.open() + def tearDown(self): + tmp_queue_address = self.tmp_queue.address + self.tmp_queue.close() + self.assertFalse(queue_exists(tmp_queue_address)) + + tmp_exchange_address = self.tmp_exchange.address + self.tmp_exchange.close() + self.assertFalse(exchange_exists(tmp_exchange_address)) + + def _close_connection_of_bus_on_broker(self, bus: _AbstractBus): # use the http REST API using request to forcefully close the connection on the broker-side url = "http://%s:15672/api/connections/%s" % (bus.broker, bus.connection_name)