diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index ecba41ef17fa999d53ffba561cd73093f14f67df..77ae7056322d06970a10d45d1eaebb78baf880e2 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -273,13 +273,14 @@ def delete_exchange(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEB raise MessagingError("Could not delete exchange %s on broker %s error=%s" % (name, broker, e)) -def create_queue(name: str, durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG) -> bool: +def create_queue(name: str, durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG, auto_delete: bool=False) -> bool: """ create a message queue with the given name on the given broker :param name: the name for the queue :param durable: if True, then the queue 'survives' broker restarts :param broker: a message broker address :param log_level: optional logging level (to add/reduce spamming) + :param auto_delete: if True, then the queue is automatically deleted when the last consumer disconnects. :raises: MessagingError if the queue could not be created :return True if created, False if not-created (because it already exists) """ @@ -287,6 +288,7 @@ def create_queue(name: str, durable: bool=True, broker: str=DEFAULT_BROKER, log_ with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection: queue = kombu.Queue(name, durable=durable, + auto_delete=auto_delete, max_priority=9 # need to set max_priority to get a queue that respects priorities on messages. ) try: @@ -353,7 +355,7 @@ def create_binding(exchange: str, queue: str, routing_key: str='#', durable: boo raise MessageBusError("Could not create binding from exchange %s to queue %s with routing_key %s " \ " on broker %s error=%s" % (exchange, queue, routing_key, broker, e)) -def create_bound_queue(exchange: str, queue: str, routing_key: str='#', durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG): +def create_bound_queue(exchange: str, queue: str, routing_key: str='#', durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG, auto_delete: bool=False): """ create an exchange (if needed), queue (if needed), and the in-between binding, possibly filtered by the routing_key, on the given broker. :param exchange: the name for the exchange @@ -361,10 +363,11 @@ def create_bound_queue(exchange: str, queue: str, routing_key: str='#', durable: :param routing_key: filter only messages with the given routing_key to the queue :param durable: if True, then the queue 'survives' broker restarts :param broker: a message broker address + :param auto_delete: if True, then the queue is automatically deleted when the last consumer disconnects. :param log_level: optional logging level (to add/reduce spamming) """ create_exchange(exchange, durable=durable, broker=broker, log_level=log_level) - create_queue(queue, durable=durable, broker=broker, log_level=log_level) + create_queue(queue, durable=durable, broker=broker, log_level=log_level, auto_delete=auto_delete) create_binding(exchange, queue, routing_key, durable=durable, broker=broker, log_level=log_level) @@ -935,11 +938,11 @@ class TemporaryQueue: self._bound_exchange = "exchange-for-" + self.address # create the tmp queue... - create_queue(self.address, broker=self.broker, durable=False) + create_queue(self.address, broker=self.broker, durable=False, auto_delete=True) - # create the exchange (if needed), and remember if we need to destoy it (if it was created) - self._created_exchange = create_exchange(self._bound_exchange, - broker=self.broker, durable=False) + # create the exchange (if needed), and remember if we need to destroy it (if it was created) + self._created_exchange = create_exchange(self._bound_exchange, broker=self.broker, + durable=False) # and finally create the binding # if no routing_key given, then use this tmp-queue's specific address as routing key