From 1dcd50f0aad88e86b5716f077c42dc833c49d3a5 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Fri, 28 Jun 2019 11:33:11 +0000 Subject: [PATCH] SW-699: fixed nr_of_messages_in_queue using requests and rabbitmq REST api --- LCS/Messaging/python/messaging/messagebus.py | 21 +++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index 77ae7056322..deec0d1aed0 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -202,10 +202,13 @@ from typing import Optional from datetime import datetime from queue import Empty as EmptyQueueError from socket import gaierror +import json +import requests import logging logger = logging.getLogger(__name__) + # some serializers are considered 'insecure', but we know better ;) # so enable the python pickle serializer kombu.enable_insecure_serializers(['pickle']) @@ -324,6 +327,18 @@ def delete_queue(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG) raise MessagingError("Could not delete queue %s on broker %s error=%s" % (name, broker, e)) +def nr_of_messages_in_queue(queue_name: str, broker: str = DEFAULT_BROKER) -> int: + """get the number of messages in the queue""" + try: + # the kombu way of getting the number of messages via a passice queue_declare is not reliable... + # so, let's use the http REST API using request + url = "http://%s:15672/api/queues/%%2F/%s" % (broker, queue_name) + response = requests.get(url, auth=(DEFAULT_USER, DEFAULT_PASSWORD)) + queue_info = json.loads(response.text) + return queue_info.get('messages', 0) + except Exception as e: + return 0 + def create_binding(exchange: str, queue: str, routing_key: str='#', durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG): """ create a binding between the exchange and queue, possibly filtered by the routing_key, on the given broker. @@ -643,11 +658,7 @@ class FromBus(_AbstractBus): def nr_of_messages_in_queue(self) -> int: """get the number of waiting messages in the queue""" - try: - name, msg_count, consumer_count = self._receiver.queue.queue_declare(passive=True, channel=self._connection.default_channel) - return msg_count - except: - return 0 + return nr_of_messages_in_queue(self.queue, self.broker) def __str__(self): return "[FromBus] queue: %s on broker: %s #messages=%d" % (self.queue, self.broker, self.nr_of_messages_in_queue()) -- GitLab