diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index 776c338f3586f6a41f800f76b50926bcfe70ef6a..9f0e7ed4e8d5da3a962f3db9fb0b121cf42ce2be 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -47,7 +47,7 @@ logger = logging.getLogger(__name__) DEFAULT_ADDRESS_OPTIONS = {'create': 'always'} DEFAULT_BROKER = "localhost:5672" DEFAULT_BROKER_OPTIONS = {'reconnect': True} -DEFAULT_RECEIVER_CAPACITY = 1 +DEFAULT_RECEIVER_CAPACITY = 128 DEFAULT_TIMEOUT = 5 # Construct address options string (address options object not supported well in Python) @@ -187,9 +187,6 @@ class FromBus(object): options = options if options else self.options - # Extract capacity (not supported in address string in Python, see COMMON_OPTS in qpid/messaging/driver.py) - # capacity = options.pop("capacity", DEFAULT_RECEIVER_CAPACITY) - optstr = address_options_to_str(options) what = "receiver for source: %s (broker: %s, session: %s, options: %s)" % \ @@ -200,8 +197,14 @@ class FromBus(object): # todo: options=optstr) # "%s; %s" % (address, optstr), capacity=capacity) logger.warning('[FromBus] Options are currently ignored since the switch to Proton!') # todo: get this selector to work! - self.receiver = self.connection.create_receiver(address=address, dynamic=self.dynamic) #, options=proton.reactor.Selector("subject = %s" % subject)) + self.receiver = self.connection.create_receiver(address=address, dynamic=self.dynamic, credit=DEFAULT_RECEIVER_CAPACITY) #, options=proton.reactor.Selector("subject = %s" % subject)) self.subject = subject # todo: when the selector works, get rid of the message rejection on wrong subject in receive() + + # import threading + # t = threading.Thread(target=self.connection.container.run) + # t.daemon = True + # t.start() + except proton.ProtonException: raise_exception(MessageBusError, "[FromBus] Failed to create %s" % (what,)) @@ -314,28 +317,43 @@ class FromBus(object): "[FromBus] reject() is not supported, using ack() instead") self.ack(msg) - def drain(self): + def drain(self, timeout=0.1): """Read and ack all messages until queue/exchange is empty""" while True: - recv_msg = self.receive(timeout=1.0) - if recv_msg: - self.ack(recv_msg) - else: - return - - - # todo: required? - #def nr_of_messages_in_queue(self, timeout=1.0): - # self._check_session() - - # try: - # recv = self.receiver_iter.next() - # return recv.available() - #except qpid.messaging.exceptions.Empty: # todo: find Proton alternative if necessary - # return 0 - # except Exception as e: - # raise_exception(MessageBusError, - # "[FromBus] Failed to get number of messages available in queue: %s" % self.address) + try: + if self.receiver.receive(timeout=timeout) is None: + break + self.receiver.accept() + except proton.Timeout: + break + + def nr_of_messages_in_queue(self, timeout=0.1): + """ + Get the current number of messages in this FromBus's local queue, which is at most DEFAULT_RECEIVER_CAPACITY + Please note that this is not per se equal to the number of messages in the queue at the broker! + A proton receiver can and will prefetch messages from a broker-queue, and store them in an internal (client-side) queue. + If-and-only-if a message is handled and ack'ed at the client, then the message truly disappears from the broker-queue. + :param timeout: time out in (fractional) seconds or None + :return: the current number of messages in this FromBus's local receiver queue + """ + self._check_session() + + if timeout is not None and timeout > 0: + try: + # allow the fetcher to receive some message(s) + current_nr_of_messages_in_queue = len(self.receiver.fetcher.incoming) + self.connection.container.do_work(timeout=0.5*timeout) + self.receiver.connection.wait(lambda: len(self.receiver.fetcher.incoming) != current_nr_of_messages_in_queue, + timeout=0.5*timeout) + except proton.Timeout: + pass + except Exception as e: + raise_exception(MessageBusError, + "[FromBus] Failed to get number of messages available in queue: %s" % self.address) + + # return the current number of queued incoming messages + return len(self.receiver.fetcher.incoming) + class ToBus(object): """ @@ -418,7 +436,6 @@ class ToBus(object): raise """ self.open() - logging.debug("[ToBus] enter complete") return self def close(self): diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.py b/LCS/Messaging/python/messaging/test/t_messagebus.py index 1217dc80815b020dfd8f0941a33085886d535ea8..36e9f454f69f9805a11dbe650aeca5fdd1944d67 100644 --- a/LCS/Messaging/python/messaging/test/t_messagebus.py +++ b/LCS/Messaging/python/messaging/test/t_messagebus.py @@ -32,6 +32,7 @@ import logging from lofar.messaging.messages import * from lofar.messaging.messagebus import * +from lofar.messaging.messagebus import DEFAULT_RECEIVER_CAPACITY from lofar.messaging.exceptions import MessageBusError, InvalidMessage TIMEOUT = 1.0 @@ -233,6 +234,54 @@ class ToBusSendMessage(unittest.TestCase): # ======== Combined FromBus/ToBus unit tests ======== # +class QueueIntrospection(unittest.TestCase): + """ + Test sending and receiving messages, and introspecting the in-between queue + """ + + def setUp(self): + self.frombus = FromBus(QUEUE) + self.tobus = ToBus(QUEUE) + + # if there are any dangling messages in the QUEUE, they hold state between the individual tests + # make sure the queue is empty by receiving any dangling messages + with self.frombus: + self.frombus.drain() + + def test_drain_non_empty_queue(self): + with self.tobus, self.frombus: + self.tobus.send(EventMessage(content="foo")) + self.tobus.send(EventMessage(content="foo")) + self.assertGreater(self.frombus.nr_of_messages_in_queue(), 0) + + self.frombus.drain() + self.assertEqual(0, self.frombus.nr_of_messages_in_queue()) + + + def test_counting_one_message_in_queue(self): + with self.tobus, self.frombus: + self.tobus.send(EventMessage(content="foo")) + self.assertEqual(1, self.frombus.nr_of_messages_in_queue()) + + self.frombus.receive() + self.assertEqual(0, self.frombus.nr_of_messages_in_queue()) + + def test_counting_multiple_messages_in_queue(self): + # DEFAULT_RECEIVER_CAPACITY should be > 2 otherwise we cannot even store multiple messages in the local queue + self.assertGreaterEqual(DEFAULT_RECEIVER_CAPACITY, 2) + + with self.tobus, self.frombus: + MAX_NR_OF_MESSAGES = min(10, DEFAULT_RECEIVER_CAPACITY) + for i in range(MAX_NR_OF_MESSAGES): + self.tobus.send(EventMessage(content="foo")) + self.assertEqual(i+1, self.frombus.nr_of_messages_in_queue()) + + for i in range(MAX_NR_OF_MESSAGES): + self.assertEqual(MAX_NR_OF_MESSAGES-i, self.frombus.nr_of_messages_in_queue()) + self.frombus.receive() + self.assertEqual(MAX_NR_OF_MESSAGES-i-1, self.frombus.nr_of_messages_in_queue()) + + class SendReceiveMessage(unittest.TestCase): """ Class to test sending and receiving a message.