Skip to content
Snippets Groups Projects
Commit 41a94585 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-827: added two methods to check if a queue/exchange exists

parent 5456fe87
No related branches found
No related tags found
2 merge requests!74Lofar release 4 0,!71Resolve SW-827
......@@ -278,6 +278,24 @@ def delete_exchange(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEB
except Exception as e:
raise MessagingError("Could not delete exchange %s on broker %s error=%s" % (name, broker, e))
def exchange_exists(name: str, broker: str=DEFAULT_BROKER) -> bool:
"""
does the exchange with the given name exist on the given broker?
:param name: the name for the exchange
:param broker: a message broker address
:return True if it exists, False if not.
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
exchange = kombu.Exchange(name, channel=connection)
try:
exchange.declare(channel=connection.default_channel, passive=True)
return True
except amqp.exceptions.NotFound:
return False
except Exception as e:
raise MessagingError("Could test if exchange %s exists on broker %s error=%s" % (name, broker, e))
  • Contributor

    Is the MessagingError text correct? Is there a 'not' missing?

  • Please register or sign in to reply
def create_queue(name: str, durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG, auto_delete: bool=False) -> bool:
"""
......@@ -329,6 +347,23 @@ def delete_queue(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG)
except Exception as e:
raise MessagingError("Could not delete queue %s on broker %s error=%s" % (name, broker, e))
def queue_exists(name: str, broker: str=DEFAULT_BROKER) -> bool:
"""
does the queue with the given name exist on the given broker?
:param name: the name for the queue
:param broker: a message broker address
:return True if it exists, False if not.
"""
try:
with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection:
queue = kombu.Queue(name, no_declare=True, channel=connection)
try:
queue.queue_declare(channel=connection.default_channel, passive=True)
return True
except amqp.exceptions.NotFound:
return False
except Exception as e:
raise MessagingError("Could test if queue %s exists on broker %s error=%s" % (name, broker, e))
def nr_of_messages_in_queue(queue_name: str, broker: str = DEFAULT_BROKER) -> int:
"""get the number of messages in the queue"""
......
......@@ -37,7 +37,7 @@ from datetime import datetime
from lofar.messaging.messages import *
from lofar.messaging.messagebus import *
from lofar.messaging.messagebus import _AbstractBus, can_connect_to_broker
from lofar.messaging.messagebus import create_queue, create_exchange, create_binding, create_bound_queue, delete_exchange, delete_queue
from lofar.messaging.messagebus import create_queue, create_exchange, create_binding, create_bound_queue, delete_exchange, delete_queue, exchange_exists, queue_exists
from lofar.messaging.config import DEFAULT_USER, DEFAULT_PASSWORD
from lofar.messaging.rpc import RequestMessage
from lofar.messaging.exceptions import MessageBusError, MessagingRuntimeError, MessagingTimeoutError
......@@ -52,29 +52,43 @@ class TestCreateDeleteFunctions(unittest.TestCase):
def test_create_delete_exchange(self):
name = "test-exchange-%s" % (uuid.uuid4())
self.assertFalse(exchange_exists(name))
# creating this new unique test exchange should succeed, and return True cause it's a new exchange
self.assertTrue(create_exchange(name, durable=False))
self.assertTrue(exchange_exists(name))
# creating it again should return False
self.assertFalse(create_exchange(name, durable=False))
# deleting it should succeed
self.assertTrue(delete_exchange(name))
self.assertFalse(exchange_exists(name))
# deleting it again should return False as there is nothing to deleting
self.assertFalse(delete_exchange(name))
def test_create_delete_queue(self):
name = "test-queue-%s" % (uuid.uuid4())
self.assertFalse(queue_exists(name))
# creating this new unique test queue should succeed, and return True cause it's a new queue
self.assertTrue(create_queue(name, durable=False))
self.assertTrue(queue_exists(name))
# creating it again should return False
self.assertFalse(create_queue(name, durable=False))
# deleting it should succeed
self.assertTrue(delete_queue(name))
self.assertFalse(queue_exists(name))
# deleting it again should return False as there is nothing to deleting
self.assertFalse(delete_queue(name))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment