Skip to content
Snippets Groups Projects
Commit 1dcd50f0 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-699: fixed nr_of_messages_in_queue using requests and rabbitmq REST api

parent 955afce5
No related branches found
No related tags found
1 merge request!4Lofar release 4 0 minor fixes
...@@ -202,10 +202,13 @@ from typing import Optional ...@@ -202,10 +202,13 @@ from typing import Optional
from datetime import datetime from datetime import datetime
from queue import Empty as EmptyQueueError from queue import Empty as EmptyQueueError
from socket import gaierror from socket import gaierror
import json
import requests
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# some serializers are considered 'insecure', but we know better ;) # some serializers are considered 'insecure', but we know better ;)
# so enable the python pickle serializer # so enable the python pickle serializer
kombu.enable_insecure_serializers(['pickle']) kombu.enable_insecure_serializers(['pickle'])
...@@ -324,6 +327,18 @@ def delete_queue(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG) ...@@ -324,6 +327,18 @@ def delete_queue(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG)
raise MessagingError("Could not delete queue %s on broker %s error=%s" % (name, broker, e)) raise MessagingError("Could not delete queue %s on broker %s error=%s" % (name, broker, e))
def nr_of_messages_in_queue(queue_name: str, broker: str = DEFAULT_BROKER) -> int:
"""get the number of messages in the queue"""
try:
# the kombu way of getting the number of messages via a passice queue_declare is not reliable...
# so, let's use the http REST API using request
url = "http://%s:15672/api/queues/%%2F/%s" % (broker, queue_name)
response = requests.get(url, auth=(DEFAULT_USER, DEFAULT_PASSWORD))
queue_info = json.loads(response.text)
return queue_info.get('messages', 0)
except Exception as e:
return 0
def create_binding(exchange: str, queue: str, routing_key: str='#', durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG): def create_binding(exchange: str, queue: str, routing_key: str='#', durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG):
""" """
create a binding between the exchange and queue, possibly filtered by the routing_key, on the given broker. create a binding between the exchange and queue, possibly filtered by the routing_key, on the given broker.
...@@ -643,11 +658,7 @@ class FromBus(_AbstractBus): ...@@ -643,11 +658,7 @@ class FromBus(_AbstractBus):
def nr_of_messages_in_queue(self) -> int: def nr_of_messages_in_queue(self) -> int:
"""get the number of waiting messages in the queue""" """get the number of waiting messages in the queue"""
try: return nr_of_messages_in_queue(self.queue, self.broker)
name, msg_count, consumer_count = self._receiver.queue.queue_declare(passive=True, channel=self._connection.default_channel)
return msg_count
except:
return 0
def __str__(self): def __str__(self):
return "[FromBus] queue: %s on broker: %s #messages=%d" % (self.queue, self.broker, self.nr_of_messages_in_queue()) return "[FromBus] queue: %s on broker: %s #messages=%d" % (self.queue, self.broker, self.nr_of_messages_in_queue())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment