Skip to content
Snippets Groups Projects
Commit 414e0d46 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8888: refactored method names. added exclusive binding.

parent cea9185b
No related branches found
No related tags found
No related merge requests found
......@@ -36,7 +36,6 @@ import sys
import uuid
import threading
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Default settings for often used parameters.
......@@ -436,6 +435,12 @@ class AbstractBusListener(object):
self.frombus_options = {"capacity": self._numthreads*20}
options = kwargs.pop("options", None)
# Set appropriate flags for exclusive binding
if self.exclusive == True:
self.frombus_options["link"] = '{name:"' + str(uuid.uuid4()) + \
'", x-bindings:[{key:' + self.service_name + \
', arguments: {"qpid.exclusive-binding":True}}]}'
# only add options if it is given as a dictionary
if isinstance(options,dict):
for key,val in options.iteritems():
......@@ -448,6 +453,9 @@ class AbstractBusListener(object):
if self.verbose == True:
logger.debug("[%s: %s]", self.__class__.__name__, txt)
def isListening(self):
return self._listening
def start_listening(self, numthreads=None):
"""
Start the background threads and process incoming messages.
......@@ -506,24 +514,24 @@ class AbstractBusListener(object):
"""
self.stop_listening()
def prepare_loop(self):
def onListenLoopBegin(self):
"Called before main processing loop is entered."
pass
def prepare_receive(self):
def onBeforeReceiveMessage(self):
"Called in main processing loop just before a blocking wait for messages is done."
pass
def handle_message(self, msg):
def handleMessage(self, msg):
"Implement this method in your subclass to handle a received message"
raise NotImplementedError("Please implement the handle_message method in your subclass to handle a received message")
def finalize_handling(self, successful):
def onAfterReceiveMessage(self, successful):
"Called in the main loop after the result was send back to the requester."
"@successful@ reflects the state of the handling: true/false"
pass
def finalize_loop(self):
def onListenLoopEnd(self):
"Called after main processing loop is finished."
pass
......@@ -536,15 +544,15 @@ class AbstractBusListener(object):
thread_idx = args['index']
logger.info( "Thread %d START Listening for messages on %s" %(thread_idx, self.address))
try:
self.prepare_loop()
self.onListenLoopBegin()
except Exception as e:
logger.error("prepare_loop() failed with %s", e)
logger.error("onListenLoopBegin() failed with %s", e)
while self.running[0]:
try:
self.prepare_receive()
self.onBeforeReceiveMessage()
except Exception as e:
logger.error("prepare_receive() failed with %s", e)
logger.error("onBeforeReceiveMessage() failed with %s", e)
continue
try:
......@@ -561,7 +569,7 @@ class AbstractBusListener(object):
try:
self._debug("Running handler")
self.handle_message(lofar_msg)
self.handleMessage(lofar_msg)
self._debug("Finished handler")
......@@ -570,9 +578,9 @@ class AbstractBusListener(object):
args['num_processed_messages'] += 1
try:
self.finalize_handling(True)
self.onAfterReceiveMessage(True)
except Exception as e:
logger.error("finalize_handling() failed with %s", e)
logger.error("onAfterReceiveMessage() failed with %s", e)
continue
except Exception as e:
......@@ -580,9 +588,9 @@ class AbstractBusListener(object):
# during the execution of the service handler is caught here.
self._debug(str(e))
try:
self.finalize_handling(False)
self.onAfterReceiveMessage(False)
except Exception as e:
logger.error("finalize_handling() failed with %s", e)
logger.error("onAfterReceiveMessage() failed with %s", e)
continue
except Exception as e:
......@@ -591,7 +599,7 @@ class AbstractBusListener(object):
logger.info("Thread %d: Resuming listening on %s " % (thread_idx, self.address))
try:
self.finalize_loop()
self.onListenLoopEnd()
except Exception as e:
logger.error("finalize_loop() failed with %s", e)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment