Skip to content
Snippets Groups Projects
Commit ff70745c authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-516: reimplemtented FromBus drain and nr_of_messages_in_queue methods for proton/python3

parent 5974039f
No related branches found
No related tags found
No related merge requests found
...@@ -47,7 +47,7 @@ logger = logging.getLogger(__name__) ...@@ -47,7 +47,7 @@ logger = logging.getLogger(__name__)
DEFAULT_ADDRESS_OPTIONS = {'create': 'always'} DEFAULT_ADDRESS_OPTIONS = {'create': 'always'}
DEFAULT_BROKER = "localhost:5672" DEFAULT_BROKER = "localhost:5672"
DEFAULT_BROKER_OPTIONS = {'reconnect': True} DEFAULT_BROKER_OPTIONS = {'reconnect': True}
DEFAULT_RECEIVER_CAPACITY = 1 DEFAULT_RECEIVER_CAPACITY = 128
DEFAULT_TIMEOUT = 5 DEFAULT_TIMEOUT = 5
# Construct address options string (address options object not supported well in Python) # Construct address options string (address options object not supported well in Python)
...@@ -187,9 +187,6 @@ class FromBus(object): ...@@ -187,9 +187,6 @@ class FromBus(object):
options = options if options else self.options options = options if options else self.options
# Extract capacity (not supported in address string in Python, see COMMON_OPTS in qpid/messaging/driver.py)
# capacity = options.pop("capacity", DEFAULT_RECEIVER_CAPACITY)
optstr = address_options_to_str(options) optstr = address_options_to_str(options)
what = "receiver for source: %s (broker: %s, session: %s, options: %s)" % \ what = "receiver for source: %s (broker: %s, session: %s, options: %s)" % \
...@@ -200,8 +197,14 @@ class FromBus(object): ...@@ -200,8 +197,14 @@ class FromBus(object):
# todo: options=optstr) # "%s; %s" % (address, optstr), capacity=capacity) # todo: options=optstr) # "%s; %s" % (address, optstr), capacity=capacity)
logger.warning('[FromBus] Options are currently ignored since the switch to Proton!') logger.warning('[FromBus] Options are currently ignored since the switch to Proton!')
# todo: get this selector to work! # todo: get this selector to work!
self.receiver = self.connection.create_receiver(address=address, dynamic=self.dynamic) #, options=proton.reactor.Selector("subject = %s" % subject)) self.receiver = self.connection.create_receiver(address=address, dynamic=self.dynamic, credit=DEFAULT_RECEIVER_CAPACITY) #, options=proton.reactor.Selector("subject = %s" % subject))
self.subject = subject # todo: when the selector works, get rid of the message rejection on wrong subject in receive() self.subject = subject # todo: when the selector works, get rid of the message rejection on wrong subject in receive()
# import threading
# t = threading.Thread(target=self.connection.container.run)
# t.daemon = True
# t.start()
except proton.ProtonException: except proton.ProtonException:
raise_exception(MessageBusError, raise_exception(MessageBusError,
"[FromBus] Failed to create %s" % (what,)) "[FromBus] Failed to create %s" % (what,))
...@@ -314,28 +317,43 @@ class FromBus(object): ...@@ -314,28 +317,43 @@ class FromBus(object):
"[FromBus] reject() is not supported, using ack() instead") "[FromBus] reject() is not supported, using ack() instead")
self.ack(msg) self.ack(msg)
def drain(self): def drain(self, timeout=0.1):
"""Read and ack all messages until queue/exchange is empty""" """Read and ack all messages until queue/exchange is empty"""
while True: while True:
recv_msg = self.receive(timeout=1.0) try:
if recv_msg: if self.receiver.receive(timeout=timeout) is None:
self.ack(recv_msg) break
else: self.receiver.accept()
return except proton.Timeout:
break
def nr_of_messages_in_queue(self, timeout=0.1):
"""
Get the current number of messages in this FromBus's local queue, which is at most DEFAULT_RECEIVER_CAPACITY
Please note that this is not per se equal to the number of messages in the queue at the broker!
A proton receiver can and will prefetch messages from a broker-queue, and store them in an internal (client-side) queue.
If-and-only-if a message is handled and ack'ed at the client, then the message truly disappears from the broker-queue.
:param timeout: time out in (fractional) seconds or None
:return: the current number of messages in this FromBus's local receiver queue
"""
self._check_session()
if timeout is not None and timeout > 0:
try:
# allow the fetcher to receive some message(s)
current_nr_of_messages_in_queue = len(self.receiver.fetcher.incoming)
self.connection.container.do_work(timeout=0.5*timeout)
self.receiver.connection.wait(lambda: len(self.receiver.fetcher.incoming) != current_nr_of_messages_in_queue,
timeout=0.5*timeout)
except proton.Timeout:
pass
except Exception as e:
raise_exception(MessageBusError,
"[FromBus] Failed to get number of messages available in queue: %s" % self.address)
# todo: required? # return the current number of queued incoming messages
#def nr_of_messages_in_queue(self, timeout=1.0): return len(self.receiver.fetcher.incoming)
# self._check_session()
# try:
# recv = self.receiver_iter.next()
# return recv.available()
#except qpid.messaging.exceptions.Empty: # todo: find Proton alternative if necessary
# return 0
# except Exception as e:
# raise_exception(MessageBusError,
# "[FromBus] Failed to get number of messages available in queue: %s" % self.address)
class ToBus(object): class ToBus(object):
""" """
...@@ -418,7 +436,6 @@ class ToBus(object): ...@@ -418,7 +436,6 @@ class ToBus(object):
raise raise
""" """
self.open() self.open()
logging.debug("[ToBus] enter complete")
return self return self
def close(self): def close(self):
......
...@@ -32,6 +32,7 @@ import logging ...@@ -32,6 +32,7 @@ import logging
from lofar.messaging.messages import * from lofar.messaging.messages import *
from lofar.messaging.messagebus import * from lofar.messaging.messagebus import *
from lofar.messaging.messagebus import DEFAULT_RECEIVER_CAPACITY
from lofar.messaging.exceptions import MessageBusError, InvalidMessage from lofar.messaging.exceptions import MessageBusError, InvalidMessage
TIMEOUT = 1.0 TIMEOUT = 1.0
...@@ -233,6 +234,54 @@ class ToBusSendMessage(unittest.TestCase): ...@@ -233,6 +234,54 @@ class ToBusSendMessage(unittest.TestCase):
# ======== Combined FromBus/ToBus unit tests ======== # # ======== Combined FromBus/ToBus unit tests ======== #
class QueueIntrospection(unittest.TestCase):
"""
Test sending and receiving messages, and introspecting the in-between queue
"""
def setUp(self):
self.frombus = FromBus(QUEUE)
self.tobus = ToBus(QUEUE)
# if there are any dangling messages in the QUEUE, they hold state between the individual tests
# make sure the queue is empty by receiving any dangling messages
with self.frombus:
self.frombus.drain()
def test_drain_non_empty_queue(self):
with self.tobus, self.frombus:
self.tobus.send(EventMessage(content="foo"))
self.tobus.send(EventMessage(content="foo"))
self.assertGreater(self.frombus.nr_of_messages_in_queue(), 0)
self.frombus.drain()
self.assertEqual(0, self.frombus.nr_of_messages_in_queue())
def test_counting_one_message_in_queue(self):
with self.tobus, self.frombus:
self.tobus.send(EventMessage(content="foo"))
self.assertEqual(1, self.frombus.nr_of_messages_in_queue())
self.frombus.receive()
self.assertEqual(0, self.frombus.nr_of_messages_in_queue())
def test_counting_multiple_messages_in_queue(self):
# DEFAULT_RECEIVER_CAPACITY should be > 2 otherwise we cannot even store multiple messages in the local queue
self.assertGreaterEqual(DEFAULT_RECEIVER_CAPACITY, 2)
with self.tobus, self.frombus:
MAX_NR_OF_MESSAGES = min(10, DEFAULT_RECEIVER_CAPACITY)
for i in range(MAX_NR_OF_MESSAGES):
self.tobus.send(EventMessage(content="foo"))
self.assertEqual(i+1, self.frombus.nr_of_messages_in_queue())
for i in range(MAX_NR_OF_MESSAGES):
self.assertEqual(MAX_NR_OF_MESSAGES-i, self.frombus.nr_of_messages_in_queue())
self.frombus.receive()
self.assertEqual(MAX_NR_OF_MESSAGES-i-1, self.frombus.nr_of_messages_in_queue())
class SendReceiveMessage(unittest.TestCase): class SendReceiveMessage(unittest.TestCase):
""" """
Class to test sending and receiving a message. Class to test sending and receiving a message.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment