diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index 3c8bf623a54c959eed6178336862b59e8abdb1d0..6aa81bd049a4f96a9b1f1eb51487d29e2e161df5 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -623,17 +623,18 @@ class AbstractBusListener(object): if self._listening == True: return - self._bus_listener = FromBus(self.address, broker=self.broker, connection_log_level=logging.INFO) - self._bus_listener.open() - if numthreads != None: self._numthreads = numthreads self._running.set() self._threads = {} + self._bus_listeners = {} for i in range(self._numthreads): thread = threading.Thread(target=self._loop) self._threads[thread] = self._create_thread_args(i) + _bus_listener = FromBus(self.address, broker=self.broker, connection_log_level=logging.INFO) + _bus_listener.open() + self._bus_listeners[thread] = _bus_listener thread.start() self._listening = True @@ -656,11 +657,12 @@ class AbstractBusListener(object): thread.join() logger.info("Thread %2d: STOPPED Listening for messages on %s" % (args['index'], self.address)) logger.info(" %d messages received and %d processed OK." % (args['num_received_messages'], args['num_processed_messages'])) - self._listening = False - # close the listeners - if self._bus_listener.isConnected(): - self._bus_listener.close() + # close the listeners + if self._bus_listeners[thread].isConnected(): + self._bus_listeners[thread].close() + + self._listening = False def __enter__(self): @@ -721,7 +723,7 @@ class AbstractBusListener(object): try: # get the next message - lofar_msg = self._bus_listener.receive(1) + lofar_msg = self._bus_listeners[currentThread].receive(1) # retry if timed-out if lofar_msg is None: continue diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.py b/LCS/Messaging/python/messaging/test/t_messagebus.py index 8f534d60922b6142aa7992ff291d83ac57fdc795..6404845041d6ab40d47f88df6729a3dd2f0884a2 100644 --- a/LCS/Messaging/python/messaging/test/t_messagebus.py +++ b/LCS/Messaging/python/messaging/test/t_messagebus.py @@ -379,7 +379,7 @@ class SendReceiveMessage(unittest.TestCase): """ Test send/receive of an RequestMessage even when sending the message raises a ProtonException. """ - from proton._utils import BlockingSender + from proton.utils import BlockingSender from proton import ProtonException # use fancy code injection into proton to enforce a ProtonException at the correct moment