diff --git a/LCS/Messaging/python/messaging/Service.py b/LCS/Messaging/python/messaging/Service.py index aacc512325f3b3b0c7dcd1d935e5b115cc53dcfe..637f69731638933fc2b713000fbe0f8838add61d 100644 --- a/LCS/Messaging/python/messaging/Service.py +++ b/LCS/Messaging/python/messaging/Service.py @@ -31,65 +31,131 @@ import logging logger = logging.getLogger(__name__) +class MessageHandlerInterface(object): + """ + Interface class for tuning the handling of a message by the Service class. + The class defines some (placeholders for) functions that the Service class calls + during the handling of messages. It can for instance be used to maintain a database connection. + + The pseudocode of the Service class is: + Service(busname, function or from_MessageHandlerInterface_derived_class, ..., HandlerArguments={}) + + handler = <from_MessageHandlerInterface_derived_class>(HandlerArguments) + handler.before_main_loop() + while alive: + handler.loop_before_receive() + msg = wait for messages() + handler.handle_message(msg) + handler.loop_after_handling() + handler.after_main_loop() + """ + def __init__(self, **kwargs): + pass + + def before_main_loop(self): + "Called before main processing loop is entered." + pass + + def loop_before_receive(self): + "Called in main processing loop just before a blocking wait for messages is done." + pass + + def handle_message(self, msg): + "Function the should handle the received message and return a result." + pass + + def loop_after_handling(self): + "Called in the main loop after the result was send back to the requester." + pass + + def after_main_loop(self): + "Called after main processing loop is finished." + pass + + # create service: -class Service: +class Service(object): """ Service class for registering python functions with a Service name on a message bus. - create new service with Service( BusName, ServiceName, ServiceHandler ) - Additional options: - options=<dict> for the QPID connection - numthreads=<int> amount of threads processing messages - startonwith=<bool> automatically start listening when in scope using 'with' - verbose=<bool> show debug text + create new service with Service(busname, servicename, servicehandler) + busname <string> The name of the messagebus (queue or exchange) the service whould listen on. + servicename <string> The name that the user should use the invocate the servicehandler. + servicehandler <...> May be a function of an class that is derived from the MessageHandlerInterface. + The service uses this function or class for the handling of the messages. + Optional arguments: + options <dict> For the QPID connection + exclusive <bool> To create eclusive access to this messagebus. Default:True + numthreads <int> Amount of threads processing messages. Default:1 + parsefullmessage <bool> Pass full message of only message content to the service handler. Default:False. + startonwith <bool> Automatically start listening when in scope using 'with' + verbose <bool> Show debug text. Default:False + handler_args <dict> Arguments that are passed to the constructor of the servicehandler is case the servicehandler + is a class in stead of a function. """ - def __init__(self, busname, servicename, servicehandler, options=None, exclusive=True, numthreads=1, parsefullmessage=False, startonwith=False, verbose=False): + def __init__(self, busname, servicename, servicehandler, **kwargs): """ - Initialize Service object with busname (str) ,servicename (str) and servicehandler function. + Initialize Service object with busname (str) ,servicename (str) and servicehandler function. additional parameters: options= <dict> Dictionary of options passed to QPID exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: True) - numthreads= <int> Number of parallel threads processing messages (default: 1) + numthreads= <int> Number of parallel threads processing messages (default: 1) verbose= <bool> Output extra logging over stdout (default: False) """ - self.BusName = busname - self.ServiceName = servicename - self.ServiceHandler = servicehandler - self.connected = False - self.running = False - self.exclusive = exclusive - self.link_uuid = str(uuid.uuid4()) - self._numthreads = numthreads - self.Verbose = verbose - self.options = {"capacity": numthreads*20} - self.parsefullmessage=parsefullmessage - self.startonwith = startonwith + self.busname = busname + self.service_name = servicename + self.service_handler = None # Handled later in this routine + self.connected = False + self.running = False + self.link_uuid = str(uuid.uuid4()) + self.exclusive = kwargs.pop("exclusive", True) + self._numthreads = kwargs.pop("numthreads", 1) + self.verbose = kwargs.pop("verbose", False) + self.options = {"capacity": self._numthreads*20} + options = kwargs.pop("options", None) + self.parsefullmessage = kwargs.pop("parsefullmessage", False) + self.startonwith = kwargs.pop("startonwith", False) + self.handler_args = kwargs.pop("handler_args", None) + if len(kwargs): + raise ArgumentError("Unexpected argument passed to Serice class: %s", kwargs) # Set appropriate flags for exclusive binding if self.exclusive is True: - self.options["link"] = '{name:"' + self.link_uuid + '", x-bindings:[{key:' + self.ServiceName + ', arguments: {"qpid.exclusive-binding":True}}]}' + self.options["link"] = '{name:"' + self.link_uuid + \ + '", x-bindings:[{key:' + self.service_name + \ + ', arguments: {"qpid.exclusive-binding":True}}]}' # only add options if it is given as a dictionary if isinstance(options,dict): for key,val in options.iteritems(): self.options[key] = val + # set up service_handler + if str(type(servicehandler)) == "<type 'instancemethod'>" or str(type(servicehandler)) == "<type 'function'>": + self.service_handler = MessageHandlerInterface() + self.service_handler.handle_message = servicehandler + else: + self.service_handler = servicehandler(self.handler_args) + if not isinstance(self.service_handler, MessageHandlerInterface): + raise TypeError("Servicehandler argument must by a function or a derived class from MessageHandlerInterface.") + + def _debug(self, txt): """ - Internal use only. - """ - if self.Verbose is True: + Internal use only. + """ + if self.verbose is True: logger.debug("[Service: %s]", txt) - def StartListening(self, numthreads=None): + def start_listening(self, numthreads=None): """ - Start the background threads and process incoming messages. - """ + Start the background threads and process incoming messages. + """ if numthreads is not None: self._numthreads = numthreads if self.connected is False: - raise Exception("StartListening Called on closed connections") + raise Exception("start_listening Called on closed connections") self.running = True self._tr = [] @@ -101,22 +167,22 @@ class Service: self.okcounter.append(0) self._tr[i].start() - def StopListening(self): - """ - Stop the background threads that listen to incoming messages. - """ + def stop_listening(self): + """ + Stop the background threads that listen to incoming messages. + """ # stop all running threads if self.running is True: self.running = False for i in range(self._numthreads): self._tr[i].join() - logger.info("Thread %2d: STOPPED Listening for messages on Bus %s and service name %s." % (i, self.BusName, self.ServiceName)) + logger.info("Thread %2d: STOPPED Listening for messages on Bus %s and service name %s." % (i, self.busname, self.service_name)) logger.info(" %d messages received and %d processed OK." % (self.reccounter[i], self.okcounter[i])) - def WaitForInterrupt(self): - """ - Useful (low cpu load) loop that waits for keyboard interrupt. - """ + def wait_for_interrupt(self): + """ + Useful (low cpu load) loop that waits for keyboard interrupt. + """ looping = True while looping: try: @@ -128,18 +194,18 @@ class Service: def __enter__(self): """ - Internal use only. Handles scope with keyword 'with' + Internal use only. Handles scope with keyword 'with' """ # Usually a service will be listening on a 'bus' implemented by a topic exchange - if self.BusName is not None: - self.Listen = FromBus(self.BusName+"/"+self.ServiceName, options=self.options) - self.Reply = ToBus(self.BusName) + if self.busname is not None: + self.Listen = FromBus(self.busname+"/"+self.service_name, options=self.options) + self.Reply = ToBus(self.busname) self.Listen.open() self.Reply.open() # Handle case when queues are used else: # assume that we are listening on a queue and therefore we cannot use a generic ToBus() for replies. - self.Listen = FromBus(self.ServiceName, options=self.options) + self.Listen = FromBus(self.service_name, options=self.options) self.Listen.open() self.Reply=None @@ -147,14 +213,14 @@ class Service: # If required start listening on 'with' if self.startonwith is True: - self.StartListening() + self.start_listening() return self def __exit__(self, exc_type, exc_val, exc_tb): """ Internal use only. Handles scope with keyword 'with' """ - self.StopListening() + self.stop_listening() # close the listeners if self.connected is True: self.connected = False @@ -176,8 +242,8 @@ class Service: ToSend.errmsg = errtxt ToSend.backtrace = backtrace - # show the message content if required by the Verbose flag. - if self.Verbose is True: + # show the message content if required by the verbose flag. + if self.verbose is True: ToSend.show() # send the result to the RPC client @@ -196,8 +262,19 @@ class Service: """ Internal use only. Message listener loop that receives messages and starts the attached function with the message content as argument. """ - logger.info( "Thread %d START Listening for messages on Bus %s and service name %s." %(index, self.BusName, self.ServiceName)) + logger.info( "Thread %d START Listening for messages on Bus %s and service name %s." %(index, self.busname, self.service_name)) + try: + self.service_handler.before_main_loop() + except Exception as e: + logger.error("before_main_loop() failed with %s", e) + while self.running: + try: + self.service_handler.loop_before_receive() + except Exception as e: + logger.error("loop_before_receive() failed with %s", e) + continue + try: # get the next message msg = self.Listen.receive(1) @@ -218,13 +295,17 @@ class Service: try: self._debug("Running handler") if self.parsefullmessage is True: - replymessage = self.ServiceHandler(msg) + replymessage = self.service_handler.handle_message(msg) else: - replymessage = self.ServiceHandler(msg.content) + replymessage = self.service_handler.handle_message(msg.content) self._debug("finished handler") self._send_reply(replymessage,"OK",msg.reply_to) self.okcounter[index] += 1 self.Listen.ack(msg) + try: + self.service_handler.loop_after_handling() + except Exception as e: + logger.error("loop_after_handling() failed with %s", e) continue except Exception as e: @@ -242,7 +323,7 @@ class Service: del rawbacktrace[-1] backtrace = ''.join(rawbacktrace).encode('latin-1').decode('unicode_escape') self._debug(backtrace) - if self.Verbose is True: + if self.verbose is True: logger.info("[Service:] Status: %s", str(status)) logger.info("[Service:] ERRTXT: %s", str(errtxt)) logger.info("[Service:] BackTrace: %s", str( backtrace )) @@ -253,5 +334,10 @@ class Service: excinfo = sys.exc_info() logger.error("[Service:] ERROR during processing of incoming message.") traceback.print_exception(*excinfo) - logger.info( "Thread %d: Resuming listening on bus %s for service %s" % (index, self.BusName, self.ServiceName)) + logger.info( "Thread %d: Resuming listening on bus %s for service %s" % (index, self.busname, self.service_name)) + + try: + self.service_handler.after_main_loop() + except Exception as e: + logger.error("after_main_loop() failed with %s", e) diff --git a/LCS/Messaging/python/messaging/__init__.py b/LCS/Messaging/python/messaging/__init__.py index ab3e262510f54c3fa75b93fc31e859908c22085f..d513f1111d2406cf58b294371a096bce6c7467fd 100644 --- a/LCS/Messaging/python/messaging/__init__.py +++ b/LCS/Messaging/python/messaging/__init__.py @@ -28,4 +28,4 @@ from exceptions import * from messages import * from messagebus import * from RPC import RPC -from Service import Service +from Service import Service, MessageHandlerInterface diff --git a/LCS/Messaging/python/messaging/test/t_RPC.py b/LCS/Messaging/python/messaging/test/t_RPC.py index a3703443793320f746b9aef0f2c7200fa4a8c0e1..72131d5c1d371883ca20989b041d437cbbd3c7d4 100644 --- a/LCS/Messaging/python/messaging/test/t_RPC.py +++ b/LCS/Messaging/python/messaging/test/t_RPC.py @@ -119,11 +119,11 @@ if __name__ == '__main__': # 'with' sets up the connection context and defines the scope of the service. with serv1, serv2, serv3, serv4, serv5: # Start listening in the background. This will start as many threads as defined by the instance - serv1.StartListening() - serv2.StartListening() - serv3.StartListening() - serv4.StartListening() - serv5.StartListening() + serv1.start_listening() + serv2.start_listening() + serv3.start_listening() + serv4.start_listening() + serv5.start_listening() # Redo all tests but via through RPC # ErrorFunc @@ -173,8 +173,8 @@ if __name__ == '__main__': print "Functions tested with RPC: All OK" # Tell all background listener threads to stop and wait for them to finish. - serv1.StopListening() - serv2.StopListening() - serv3.StopListening() - serv4.StopListening() - serv5.StopListening() + serv1.stop_listening() + serv2.stop_listening() + serv3.stop_listening() + serv4.stop_listening() + serv5.stop_listening()