From 3035867576f7958a8401f73854abbf1214d229ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20K=C3=BCnsem=C3=B6ller?= <jkuensem@physik.uni-bielefeld.de> Date: Tue, 30 Apr 2019 11:04:19 +0000 Subject: [PATCH] SW-657: Modified the AbstractBusListener's use of FromBus so that the PingPongTester test works --- LCS/Messaging/python/messaging/messagebus.py | 18 ++++++++++-------- .../python/messaging/test/t_messagebus.py | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index 3c8bf623a54..6aa81bd049a 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -623,17 +623,18 @@ class AbstractBusListener(object): if self._listening == True: return - self._bus_listener = FromBus(self.address, broker=self.broker, connection_log_level=logging.INFO) - self._bus_listener.open() - if numthreads != None: self._numthreads = numthreads self._running.set() self._threads = {} + self._bus_listeners = {} for i in range(self._numthreads): thread = threading.Thread(target=self._loop) self._threads[thread] = self._create_thread_args(i) + _bus_listener = FromBus(self.address, broker=self.broker, connection_log_level=logging.INFO) + _bus_listener.open() + self._bus_listeners[thread] = _bus_listener thread.start() self._listening = True @@ -656,11 +657,12 @@ class AbstractBusListener(object): thread.join() logger.info("Thread %2d: STOPPED Listening for messages on %s" % (args['index'], self.address)) logger.info(" %d messages received and %d processed OK." % (args['num_received_messages'], args['num_processed_messages'])) - self._listening = False - # close the listeners - if self._bus_listener.isConnected(): - self._bus_listener.close() + # close the listeners + if self._bus_listeners[thread].isConnected(): + self._bus_listeners[thread].close() + + self._listening = False def __enter__(self): @@ -721,7 +723,7 @@ class AbstractBusListener(object): try: # get the next message - lofar_msg = self._bus_listener.receive(1) + lofar_msg = self._bus_listeners[currentThread].receive(1) # retry if timed-out if lofar_msg is None: continue diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.py b/LCS/Messaging/python/messaging/test/t_messagebus.py index 8f534d60922..6404845041d 100644 --- a/LCS/Messaging/python/messaging/test/t_messagebus.py +++ b/LCS/Messaging/python/messaging/test/t_messagebus.py @@ -379,7 +379,7 @@ class SendReceiveMessage(unittest.TestCase): """ Test send/receive of an RequestMessage even when sending the message raises a ProtonException. """ - from proton._utils import BlockingSender + from proton.utils import BlockingSender from proton import ProtonException # use fancy code injection into proton to enforce a ProtonException at the correct moment -- GitLab