diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index e8710e3cd1090439ac7d2cfc7907dc78f0648d51..6505db4430010b33fcace1d5d50cd8586c6876af 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -36,7 +36,6 @@ import sys import uuid import threading -logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Default settings for often used parameters. @@ -436,6 +435,12 @@ class AbstractBusListener(object): self.frombus_options = {"capacity": self._numthreads*20} options = kwargs.pop("options", None) + # Set appropriate flags for exclusive binding + if self.exclusive == True: + self.frombus_options["link"] = '{name:"' + str(uuid.uuid4()) + \ + '", x-bindings:[{key:' + self.service_name + \ + ', arguments: {"qpid.exclusive-binding":True}}]}' + # only add options if it is given as a dictionary if isinstance(options,dict): for key,val in options.iteritems(): @@ -448,6 +453,9 @@ class AbstractBusListener(object): if self.verbose == True: logger.debug("[%s: %s]", self.__class__.__name__, txt) + def isListening(self): + return self._listening + def start_listening(self, numthreads=None): """ Start the background threads and process incoming messages. @@ -506,24 +514,24 @@ class AbstractBusListener(object): """ self.stop_listening() - def prepare_loop(self): + def onListenLoopBegin(self): "Called before main processing loop is entered." pass - def prepare_receive(self): + def onBeforeReceiveMessage(self): "Called in main processing loop just before a blocking wait for messages is done." pass - def handle_message(self, msg): + def handleMessage(self, msg): "Implement this method in your subclass to handle a received message" raise NotImplementedError("Please implement the handle_message method in your subclass to handle a received message") - def finalize_handling(self, successful): + def onAfterReceiveMessage(self, successful): "Called in the main loop after the result was send back to the requester." "@successful@ reflects the state of the handling: true/false" pass - def finalize_loop(self): + def onListenLoopEnd(self): "Called after main processing loop is finished." pass @@ -536,15 +544,15 @@ class AbstractBusListener(object): thread_idx = args['index'] logger.info( "Thread %d START Listening for messages on %s" %(thread_idx, self.address)) try: - self.prepare_loop() + self.onListenLoopBegin() except Exception as e: - logger.error("prepare_loop() failed with %s", e) + logger.error("onListenLoopBegin() failed with %s", e) while self.running[0]: try: - self.prepare_receive() + self.onBeforeReceiveMessage() except Exception as e: - logger.error("prepare_receive() failed with %s", e) + logger.error("onBeforeReceiveMessage() failed with %s", e) continue try: @@ -561,7 +569,7 @@ class AbstractBusListener(object): try: self._debug("Running handler") - self.handle_message(lofar_msg) + self.handleMessage(lofar_msg) self._debug("Finished handler") @@ -570,9 +578,9 @@ class AbstractBusListener(object): args['num_processed_messages'] += 1 try: - self.finalize_handling(True) + self.onAfterReceiveMessage(True) except Exception as e: - logger.error("finalize_handling() failed with %s", e) + logger.error("onAfterReceiveMessage() failed with %s", e) continue except Exception as e: @@ -580,9 +588,9 @@ class AbstractBusListener(object): # during the execution of the service handler is caught here. self._debug(str(e)) try: - self.finalize_handling(False) + self.onAfterReceiveMessage(False) except Exception as e: - logger.error("finalize_handling() failed with %s", e) + logger.error("onAfterReceiveMessage() failed with %s", e) continue except Exception as e: @@ -591,7 +599,7 @@ class AbstractBusListener(object): logger.info("Thread %d: Resuming listening on %s " % (thread_idx, self.address)) try: - self.finalize_loop() + self.onListenLoopEnd() except Exception as e: logger.error("finalize_loop() failed with %s", e)