Skip to content
Snippets Groups Projects
Commit 97d31bd3 authored by Ruud Overeem's avatar Ruud Overeem
Browse files

Task #8531: Modified Service to allow code-insertion at 4 new places....

Task #8531: Modified Service to allow code-insertion at 4 new places. Currently tested with 'pass' in those function to assure backward compatiblity.
parent b3ee73d2
No related branches found
No related tags found
No related merge requests found
...@@ -31,65 +31,131 @@ import logging ...@@ -31,65 +31,131 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class MessageHandlerInterface(object):
"""
Interface class for tuning the handling of a message by the Service class.
The class defines some (placeholders for) functions that the Service class calls
during the handling of messages. It can for instance be used to maintain a database connection.
The pseudocode of the Service class is:
Service(busname, function or from_MessageHandlerInterface_derived_class, ..., HandlerArguments={})
handler = <from_MessageHandlerInterface_derived_class>(HandlerArguments)
handler.before_main_loop()
while alive:
handler.loop_before_receive()
msg = wait for messages()
handler.handle_message(msg)
handler.loop_after_handling()
handler.after_main_loop()
"""
def __init__(self, **kwargs):
pass
def before_main_loop(self):
"Called before main processing loop is entered."
pass
def loop_before_receive(self):
"Called in main processing loop just before a blocking wait for messages is done."
pass
def handle_message(self, msg):
"Function the should handle the received message and return a result."
pass
def loop_after_handling(self):
"Called in the main loop after the result was send back to the requester."
pass
def after_main_loop(self):
"Called after main processing loop is finished."
pass
# create service: # create service:
class Service: class Service(object):
""" """
Service class for registering python functions with a Service name on a message bus. Service class for registering python functions with a Service name on a message bus.
create new service with Service( BusName, ServiceName, ServiceHandler ) create new service with Service(busname, servicename, servicehandler)
Additional options: busname <string> The name of the messagebus (queue or exchange) the service whould listen on.
options=<dict> for the QPID connection servicename <string> The name that the user should use the invocate the servicehandler.
numthreads=<int> amount of threads processing messages servicehandler <...> May be a function of an class that is derived from the MessageHandlerInterface.
startonwith=<bool> automatically start listening when in scope using 'with' The service uses this function or class for the handling of the messages.
verbose=<bool> show debug text Optional arguments:
options <dict> For the QPID connection
exclusive <bool> To create eclusive access to this messagebus. Default:True
numthreads <int> Amount of threads processing messages. Default:1
parsefullmessage <bool> Pass full message of only message content to the service handler. Default:False.
startonwith <bool> Automatically start listening when in scope using 'with'
verbose <bool> Show debug text. Default:False
handler_args <dict> Arguments that are passed to the constructor of the servicehandler is case the servicehandler
is a class in stead of a function.
""" """
def __init__(self, busname, servicename, servicehandler, options=None, exclusive=True, numthreads=1, parsefullmessage=False, startonwith=False, verbose=False): def __init__(self, busname, servicename, servicehandler, **kwargs):
""" """
Initialize Service object with busname (str) ,servicename (str) and servicehandler function. Initialize Service object with busname (str) ,servicename (str) and servicehandler function.
additional parameters: additional parameters:
options= <dict> Dictionary of options passed to QPID options= <dict> Dictionary of options passed to QPID
exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: True) exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: True)
numthreads= <int> Number of parallel threads processing messages (default: 1) numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False) verbose= <bool> Output extra logging over stdout (default: False)
""" """
self.BusName = busname self.busname = busname
self.ServiceName = servicename self.service_name = servicename
self.ServiceHandler = servicehandler self.service_handler = None # Handled later in this routine
self.connected = False self.connected = False
self.running = False self.running = False
self.exclusive = exclusive self.link_uuid = str(uuid.uuid4())
self.link_uuid = str(uuid.uuid4()) self.exclusive = kwargs.pop("exclusive", True)
self._numthreads = numthreads self._numthreads = kwargs.pop("numthreads", 1)
self.Verbose = verbose self.verbose = kwargs.pop("verbose", False)
self.options = {"capacity": numthreads*20} self.options = {"capacity": self._numthreads*20}
self.parsefullmessage=parsefullmessage options = kwargs.pop("options", None)
self.startonwith = startonwith self.parsefullmessage = kwargs.pop("parsefullmessage", False)
self.startonwith = kwargs.pop("startonwith", False)
self.handler_args = kwargs.pop("handler_args", None)
if len(kwargs):
raise ArgumentError("Unexpected argument passed to Serice class: %s", kwargs)
# Set appropriate flags for exclusive binding # Set appropriate flags for exclusive binding
if self.exclusive is True: if self.exclusive is True:
self.options["link"] = '{name:"' + self.link_uuid + '", x-bindings:[{key:' + self.ServiceName + ', arguments: {"qpid.exclusive-binding":True}}]}' self.options["link"] = '{name:"' + self.link_uuid + \
'", x-bindings:[{key:' + self.service_name + \
', arguments: {"qpid.exclusive-binding":True}}]}'
# only add options if it is given as a dictionary # only add options if it is given as a dictionary
if isinstance(options,dict): if isinstance(options,dict):
for key,val in options.iteritems(): for key,val in options.iteritems():
self.options[key] = val self.options[key] = val
# set up service_handler
if str(type(servicehandler)) == "<type 'instancemethod'>" or str(type(servicehandler)) == "<type 'function'>":
self.service_handler = MessageHandlerInterface()
self.service_handler.handle_message = servicehandler
else:
self.service_handler = servicehandler(self.handler_args)
if not isinstance(self.service_handler, MessageHandlerInterface):
raise TypeError("Servicehandler argument must by a function or a derived class from MessageHandlerInterface.")
def _debug(self, txt): def _debug(self, txt):
""" """
Internal use only. Internal use only.
""" """
if self.Verbose is True: if self.verbose is True:
logger.debug("[Service: %s]", txt) logger.debug("[Service: %s]", txt)
def StartListening(self, numthreads=None): def start_listening(self, numthreads=None):
""" """
Start the background threads and process incoming messages. Start the background threads and process incoming messages.
""" """
if numthreads is not None: if numthreads is not None:
self._numthreads = numthreads self._numthreads = numthreads
if self.connected is False: if self.connected is False:
raise Exception("StartListening Called on closed connections") raise Exception("start_listening Called on closed connections")
self.running = True self.running = True
self._tr = [] self._tr = []
...@@ -101,22 +167,22 @@ class Service: ...@@ -101,22 +167,22 @@ class Service:
self.okcounter.append(0) self.okcounter.append(0)
self._tr[i].start() self._tr[i].start()
def StopListening(self): def stop_listening(self):
""" """
Stop the background threads that listen to incoming messages. Stop the background threads that listen to incoming messages.
""" """
# stop all running threads # stop all running threads
if self.running is True: if self.running is True:
self.running = False self.running = False
for i in range(self._numthreads): for i in range(self._numthreads):
self._tr[i].join() self._tr[i].join()
logger.info("Thread %2d: STOPPED Listening for messages on Bus %s and service name %s." % (i, self.BusName, self.ServiceName)) logger.info("Thread %2d: STOPPED Listening for messages on Bus %s and service name %s." % (i, self.busname, self.service_name))
logger.info(" %d messages received and %d processed OK." % (self.reccounter[i], self.okcounter[i])) logger.info(" %d messages received and %d processed OK." % (self.reccounter[i], self.okcounter[i]))
def WaitForInterrupt(self): def wait_for_interrupt(self):
""" """
Useful (low cpu load) loop that waits for keyboard interrupt. Useful (low cpu load) loop that waits for keyboard interrupt.
""" """
looping = True looping = True
while looping: while looping:
try: try:
...@@ -128,18 +194,18 @@ class Service: ...@@ -128,18 +194,18 @@ class Service:
def __enter__(self): def __enter__(self):
""" """
Internal use only. Handles scope with keyword 'with' Internal use only. Handles scope with keyword 'with'
""" """
# Usually a service will be listening on a 'bus' implemented by a topic exchange # Usually a service will be listening on a 'bus' implemented by a topic exchange
if self.BusName is not None: if self.busname is not None:
self.Listen = FromBus(self.BusName+"/"+self.ServiceName, options=self.options) self.Listen = FromBus(self.busname+"/"+self.service_name, options=self.options)
self.Reply = ToBus(self.BusName) self.Reply = ToBus(self.busname)
self.Listen.open() self.Listen.open()
self.Reply.open() self.Reply.open()
# Handle case when queues are used # Handle case when queues are used
else: else:
# assume that we are listening on a queue and therefore we cannot use a generic ToBus() for replies. # assume that we are listening on a queue and therefore we cannot use a generic ToBus() for replies.
self.Listen = FromBus(self.ServiceName, options=self.options) self.Listen = FromBus(self.service_name, options=self.options)
self.Listen.open() self.Listen.open()
self.Reply=None self.Reply=None
...@@ -147,14 +213,14 @@ class Service: ...@@ -147,14 +213,14 @@ class Service:
# If required start listening on 'with' # If required start listening on 'with'
if self.startonwith is True: if self.startonwith is True:
self.StartListening() self.start_listening()
return self return self
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
""" """
Internal use only. Handles scope with keyword 'with' Internal use only. Handles scope with keyword 'with'
""" """
self.StopListening() self.stop_listening()
# close the listeners # close the listeners
if self.connected is True: if self.connected is True:
self.connected = False self.connected = False
...@@ -176,8 +242,8 @@ class Service: ...@@ -176,8 +242,8 @@ class Service:
ToSend.errmsg = errtxt ToSend.errmsg = errtxt
ToSend.backtrace = backtrace ToSend.backtrace = backtrace
# show the message content if required by the Verbose flag. # show the message content if required by the verbose flag.
if self.Verbose is True: if self.verbose is True:
ToSend.show() ToSend.show()
# send the result to the RPC client # send the result to the RPC client
...@@ -196,8 +262,19 @@ class Service: ...@@ -196,8 +262,19 @@ class Service:
""" """
Internal use only. Message listener loop that receives messages and starts the attached function with the message content as argument. Internal use only. Message listener loop that receives messages and starts the attached function with the message content as argument.
""" """
logger.info( "Thread %d START Listening for messages on Bus %s and service name %s." %(index, self.BusName, self.ServiceName)) logger.info( "Thread %d START Listening for messages on Bus %s and service name %s." %(index, self.busname, self.service_name))
try:
self.service_handler.before_main_loop()
except Exception as e:
logger.error("before_main_loop() failed with %s", e)
while self.running: while self.running:
try:
self.service_handler.loop_before_receive()
except Exception as e:
logger.error("loop_before_receive() failed with %s", e)
continue
try: try:
# get the next message # get the next message
msg = self.Listen.receive(1) msg = self.Listen.receive(1)
...@@ -218,13 +295,17 @@ class Service: ...@@ -218,13 +295,17 @@ class Service:
try: try:
self._debug("Running handler") self._debug("Running handler")
if self.parsefullmessage is True: if self.parsefullmessage is True:
replymessage = self.ServiceHandler(msg) replymessage = self.service_handler.handle_message(msg)
else: else:
replymessage = self.ServiceHandler(msg.content) replymessage = self.service_handler.handle_message(msg.content)
self._debug("finished handler") self._debug("finished handler")
self._send_reply(replymessage,"OK",msg.reply_to) self._send_reply(replymessage,"OK",msg.reply_to)
self.okcounter[index] += 1 self.okcounter[index] += 1
self.Listen.ack(msg) self.Listen.ack(msg)
try:
self.service_handler.loop_after_handling()
except Exception as e:
logger.error("loop_after_handling() failed with %s", e)
continue continue
except Exception as e: except Exception as e:
...@@ -242,7 +323,7 @@ class Service: ...@@ -242,7 +323,7 @@ class Service:
del rawbacktrace[-1] del rawbacktrace[-1]
backtrace = ''.join(rawbacktrace).encode('latin-1').decode('unicode_escape') backtrace = ''.join(rawbacktrace).encode('latin-1').decode('unicode_escape')
self._debug(backtrace) self._debug(backtrace)
if self.Verbose is True: if self.verbose is True:
logger.info("[Service:] Status: %s", str(status)) logger.info("[Service:] Status: %s", str(status))
logger.info("[Service:] ERRTXT: %s", str(errtxt)) logger.info("[Service:] ERRTXT: %s", str(errtxt))
logger.info("[Service:] BackTrace: %s", str( backtrace )) logger.info("[Service:] BackTrace: %s", str( backtrace ))
...@@ -253,5 +334,10 @@ class Service: ...@@ -253,5 +334,10 @@ class Service:
excinfo = sys.exc_info() excinfo = sys.exc_info()
logger.error("[Service:] ERROR during processing of incoming message.") logger.error("[Service:] ERROR during processing of incoming message.")
traceback.print_exception(*excinfo) traceback.print_exception(*excinfo)
logger.info( "Thread %d: Resuming listening on bus %s for service %s" % (index, self.BusName, self.ServiceName)) logger.info( "Thread %d: Resuming listening on bus %s for service %s" % (index, self.busname, self.service_name))
try:
self.service_handler.after_main_loop()
except Exception as e:
logger.error("after_main_loop() failed with %s", e)
...@@ -28,4 +28,4 @@ from exceptions import * ...@@ -28,4 +28,4 @@ from exceptions import *
from messages import * from messages import *
from messagebus import * from messagebus import *
from RPC import RPC from RPC import RPC
from Service import Service from Service import Service, MessageHandlerInterface
...@@ -119,11 +119,11 @@ if __name__ == '__main__': ...@@ -119,11 +119,11 @@ if __name__ == '__main__':
# 'with' sets up the connection context and defines the scope of the service. # 'with' sets up the connection context and defines the scope of the service.
with serv1, serv2, serv3, serv4, serv5: with serv1, serv2, serv3, serv4, serv5:
# Start listening in the background. This will start as many threads as defined by the instance # Start listening in the background. This will start as many threads as defined by the instance
serv1.StartListening() serv1.start_listening()
serv2.StartListening() serv2.start_listening()
serv3.StartListening() serv3.start_listening()
serv4.StartListening() serv4.start_listening()
serv5.StartListening() serv5.start_listening()
# Redo all tests but via through RPC # Redo all tests but via through RPC
# ErrorFunc # ErrorFunc
...@@ -173,8 +173,8 @@ if __name__ == '__main__': ...@@ -173,8 +173,8 @@ if __name__ == '__main__':
print "Functions tested with RPC: All OK" print "Functions tested with RPC: All OK"
# Tell all background listener threads to stop and wait for them to finish. # Tell all background listener threads to stop and wait for them to finish.
serv1.StopListening() serv1.stop_listening()
serv2.StopListening() serv2.stop_listening()
serv3.StopListening() serv3.stop_listening()
serv4.StopListening() serv4.stop_listening()
serv5.StopListening() serv5.stop_listening()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment