From 41a94585aa8cabbab7d312f54f18e52bb7187738 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Mon, 7 Oct 2019 14:49:38 +0200 Subject: [PATCH] SW-827: added two methods to check if a queue/exchange exists --- LCS/Messaging/python/messaging/messagebus.py | 35 +++++++++++++++++++ .../python/messaging/test/t_messagebus.py | 16 ++++++++- 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index b01a7ec22ef..2f6160dac32 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -278,6 +278,24 @@ def delete_exchange(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEB except Exception as e: raise MessagingError("Could not delete exchange %s on broker %s error=%s" % (name, broker, e)) +def exchange_exists(name: str, broker: str=DEFAULT_BROKER) -> bool: + """ + does the exchange with the given name exist on the given broker? + :param name: the name for the exchange + :param broker: a message broker address + :return True if it exists, False if not. + """ + try: + with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection: + exchange = kombu.Exchange(name, channel=connection) + try: + exchange.declare(channel=connection.default_channel, passive=True) + return True + except amqp.exceptions.NotFound: + return False + except Exception as e: + raise MessagingError("Could test if exchange %s exists on broker %s error=%s" % (name, broker, e)) + def create_queue(name: str, durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG, auto_delete: bool=False) -> bool: """ @@ -329,6 +347,23 @@ def delete_queue(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG) except Exception as e: raise MessagingError("Could not delete queue %s on broker %s error=%s" % (name, broker, e)) +def queue_exists(name: str, broker: str=DEFAULT_BROKER) -> bool: + """ + does the queue with the given name exist on the given broker? + :param name: the name for the queue + :param broker: a message broker address + :return True if it exists, False if not. + """ + try: + with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection: + queue = kombu.Queue(name, no_declare=True, channel=connection) + try: + queue.queue_declare(channel=connection.default_channel, passive=True) + return True + except amqp.exceptions.NotFound: + return False + except Exception as e: + raise MessagingError("Could test if queue %s exists on broker %s error=%s" % (name, broker, e)) def nr_of_messages_in_queue(queue_name: str, broker: str = DEFAULT_BROKER) -> int: """get the number of messages in the queue""" diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.py b/LCS/Messaging/python/messaging/test/t_messagebus.py index b7db6e25d15..3e08e839fdf 100644 --- a/LCS/Messaging/python/messaging/test/t_messagebus.py +++ b/LCS/Messaging/python/messaging/test/t_messagebus.py @@ -37,7 +37,7 @@ from datetime import datetime from lofar.messaging.messages import * from lofar.messaging.messagebus import * from lofar.messaging.messagebus import _AbstractBus, can_connect_to_broker -from lofar.messaging.messagebus import create_queue, create_exchange, create_binding, create_bound_queue, delete_exchange, delete_queue +from lofar.messaging.messagebus import create_queue, create_exchange, create_binding, create_bound_queue, delete_exchange, delete_queue, exchange_exists, queue_exists from lofar.messaging.config import DEFAULT_USER, DEFAULT_PASSWORD from lofar.messaging.rpc import RequestMessage from lofar.messaging.exceptions import MessageBusError, MessagingRuntimeError, MessagingTimeoutError @@ -52,29 +52,43 @@ class TestCreateDeleteFunctions(unittest.TestCase): def test_create_delete_exchange(self): name = "test-exchange-%s" % (uuid.uuid4()) + + self.assertFalse(exchange_exists(name)) + # creating this new unique test exchange should succeed, and return True cause it's a new exchange self.assertTrue(create_exchange(name, durable=False)) + self.assertTrue(exchange_exists(name)) + # creating it again should return False self.assertFalse(create_exchange(name, durable=False)) # deleting it should succeed self.assertTrue(delete_exchange(name)) + self.assertFalse(exchange_exists(name)) + # deleting it again should return False as there is nothing to deleting self.assertFalse(delete_exchange(name)) def test_create_delete_queue(self): name = "test-queue-%s" % (uuid.uuid4()) + + self.assertFalse(queue_exists(name)) + # creating this new unique test queue should succeed, and return True cause it's a new queue self.assertTrue(create_queue(name, durable=False)) + self.assertTrue(queue_exists(name)) + # creating it again should return False self.assertFalse(create_queue(name, durable=False)) # deleting it should succeed self.assertTrue(delete_queue(name)) + self.assertFalse(queue_exists(name)) + # deleting it again should return False as there is nothing to deleting self.assertFalse(delete_queue(name)) -- GitLab