diff --git a/LCS/Messaging/python/messaging/RPC.py b/LCS/Messaging/python/messaging/RPC.py index 309cd43daca3f9f07838434fc241d862647c17da..e2fa57b8653bbc17b20d4492739df732a488705a 100644 --- a/LCS/Messaging/python/messaging/RPC.py +++ b/LCS/Messaging/python/messaging/RPC.py @@ -41,11 +41,11 @@ class RPC(): As a side-effect the sender and session are destroyed. """ - def __init__(self, bus, service, timeout=None, ForwardExceptions=None, Verbose=None): + def __init__(self, service, busname=None, timeout=None, ForwardExceptions=None, Verbose=None): """ Initialize an Remote procedure call using: - bus= <str> Bus Name service= <str> Service Name + bus= <str> Bus Name timeout= <float> Time to wait in seconds before the call is considered a failure. Verbose= <bool> If True output extra logging to stdout. @@ -59,7 +59,7 @@ class RPC(): self.ForwardExceptions = True if Verbose is True: self.Verbose = True - self.BusName = bus + self.BusName = busname self.ServiceName = service if self.BusName is None: self.Request = ToBus(self.ServiceName) diff --git a/LCS/Messaging/python/messaging/Service.py b/LCS/Messaging/python/messaging/Service.py index 0c9171bb1847722f883bc2a7d758333d3d778522..a41b4cd7c4824bd80ffbcec01b312b025e0c1c88 100644 --- a/LCS/Messaging/python/messaging/Service.py +++ b/LCS/Messaging/python/messaging/Service.py @@ -41,22 +41,22 @@ class MessageHandlerInterface(object): Service(busname, function or from_MessageHandlerInterface_derived_class, ..., HandlerArguments={}) handler = <from_MessageHandlerInterface_derived_class>(HandlerArguments) - handler.before_main_loop() + handler.prepare_loop() while alive: - handler.in_loop_before_receive() + handler.prepare_receive() msg = wait for messages() handler.handle_message(msg) - handler.in_loop_after_handling() - handler.after_main_loop() + handler.finalize_handling(handling_result) + handler.finalize_loop() """ def __init__(self, **kwargs): pass - def before_main_loop(self): + def prepare_loop(self): "Called before main processing loop is entered." pass - def in_loop_before_receive(self): + def prepare_receive(self): "Called in main processing loop just before a blocking wait for messages is done." pass @@ -64,11 +64,12 @@ class MessageHandlerInterface(object): "Function the should handle the received message and return a result." raise Exception("OOPS! YOU ENDED UP IN THE MESSAGE HANDLER OF THE ABSTRACT BASE CLASS!") - def in_loop_after_handling(self): + def finalize_handling(self, successful): "Called in the main loop after the result was send back to the requester." + "@successful@ reflects the state of the handling: true/false" pass - def after_main_loop(self): + def finalize_loop(self): "Called after main processing loop is finished." pass @@ -94,21 +95,22 @@ class Service(object): is a class in stead of a function. """ - def __init__(self, busname, servicename, servicehandler, **kwargs): + def __init__(self, servicename, servicehandler, **kwargs): """ - Initialize Service object with busname (str) ,servicename (str) and servicehandler function. + Initialize Service object with servicename (str) and servicehandler function. additional parameters: + busname= <string> Name of the bus in case exchanges are used in stead of queues options= <dict> Dictionary of options passed to QPID exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: True) numthreads= <int> Number of parallel threads processing messages (default: 1) verbose= <bool> Output extra logging over stdout (default: False) """ - self.busname = busname self.service_name = servicename - self.service_handler = None # Handled later in this routine + self.service_handler = servicehandler self.connected = False self.running = False self.link_uuid = str(uuid.uuid4()) + self.busname = kwargs.pop("busname", None) self.exclusive = kwargs.pop("exclusive", True) self._numthreads = kwargs.pop("numthreads", 1) self.verbose = kwargs.pop("verbose", False) @@ -131,16 +133,6 @@ class Service(object): for key,val in options.iteritems(): self.options[key] = val - # set up service_handler - if str(type(servicehandler)) == "<type 'instancemethod'>" or str(type(servicehandler)) == "<type 'function'>": - self.service_handler = MessageHandlerInterface() - self.service_handler.handle_message = servicehandler - else: - self.service_handler = servicehandler(**self.handler_args) - if not isinstance(self.service_handler, MessageHandlerInterface): - raise TypeError("Servicehandler argument must by a function or a derived class from MessageHandlerInterface.") - - def _debug(self, txt): """ Internal use only. @@ -162,7 +154,18 @@ class Service(object): self.reccounter = [] self.okcounter =[] for i in range(self._numthreads): - self._tr.append(threading.Thread(target=self._loop, args=[i])) + # set up service_handler + if str(type(self.service_handler)) == "<type 'instancemethod'>" or \ + str(type(self.service_handler)) == "<type 'function'>": + thread_service_handler = MessageHandlerInterface() + thread_service_handler.handle_message = self.service_handler + else: + thread_service_handler = self.service_handler(**self.handler_args) + if not isinstance(thread_service_handler, MessageHandlerInterface): + raise TypeError("Servicehandler argument must by a function or a derived class from MessageHandlerInterface.") + + self._tr.append(threading.Thread(target=self._loop, + kwargs={"index":i, "service_handler":thread_service_handler})) self.reccounter.append(0) self.okcounter.append(0) self._tr[i].start() @@ -198,16 +201,16 @@ class Service(object): """ # Usually a service will be listening on a 'bus' implemented by a topic exchange if self.busname is not None: - self.Listen = FromBus(self.busname+"/"+self.service_name, options=self.options) - self.Reply = ToBus(self.busname) - self.Listen.open() - self.Reply.open() + self.listener = FromBus(self.busname+"/"+self.service_name, options=self.options) + self.reply_bus = ToBus(self.busname) + self.listener.open() + self.reply_bus.open() # Handle case when queues are used else: # assume that we are listening on a queue and therefore we cannot use a generic ToBus() for replies. - self.Listen = FromBus(self.service_name, options=self.options) - self.Listen.open() - self.Reply=None + self.listener = FromBus(self.service_name, options=self.options) + self.listener.open() + self.reply_bus=None self.connected = True @@ -224,10 +227,10 @@ class Service(object): # close the listeners if self.connected is True: self.connected = False - if isinstance(self.Listen, FromBus): - self.Listen.close() - if isinstance(self.Reply, ToBus): - self.Reply.close() + if isinstance(self.listener, FromBus): + self.listener.close() + if isinstance(self.reply_bus, ToBus): + self.reply_bus.close() def _send_reply(self, replymessage, status, reply_to, errtxt="",backtrace=""): """ @@ -235,49 +238,51 @@ class Service(object): """ # Compose Reply message from reply and status. if isinstance(replymessage,ReplyMessage): - ToSend = replymessage + reply_msg = replymessage else: - ToSend = ReplyMessage(replymessage, reply_to) - ToSend.status = status - ToSend.errmsg = errtxt - ToSend.backtrace = backtrace + reply_msg = ReplyMessage(replymessage, reply_to) + reply_msg.status = status + reply_msg.errmsg = errtxt + reply_msg.backtrace = backtrace # show the message content if required by the verbose flag. if self.verbose is True: - ToSend.show() + reply_msg.show() # send the result to the RPC client - if isinstance(self.Reply,ToBus): - ToSend.subject = reply_to - self.Reply.send(ToSend) + if isinstance(self.reply_bus,ToBus): + reply_msg.subject = reply_to + self.reply_bus.send(reply_msg) else: try: with ToBus(reply_to) as dest: - dest.send(ToSend) + dest.send(reply_msg) except MessageBusError as e: logger.error("Failed to send reply to reply address %s" %(reply_to)) - def _loop(self, index): + def _loop(self, **kwargs): """ Internal use only. Message listener loop that receives messages and starts the attached function with the message content as argument. """ - logger.info( "Thread %d START Listening for messages on Bus %s and service name %s." %(index, self.busname, self.service_name)) + thread_idx = kwargs.pop("index") + service_handler = kwargs.pop("service_handler") + logger.info( "Thread %d START Listening for messages on Bus %s and service name %s." %(thread_idx, self.busname, self.service_name)) try: - self.service_handler.before_main_loop() + service_handler.prepare_loop() except Exception as e: - logger.error("before_main_loop() failed with %s", e) + logger.error("prepare_loop() failed with %s", e) while self.running: try: - self.service_handler.in_loop_before_receive() + service_handler.prepare_receive() except Exception as e: - logger.error("in_loop_before_receive() failed with %s", e) + logger.error("prepare_receive() failed with %s", e) continue try: # get the next message - msg = self.Listen.receive(1) + msg = self.listener.receive(1) # retry if timed-out if msg is None: continue @@ -285,27 +290,27 @@ class Service(object): # report if messages are not Service Messages if isinstance(msg, ServiceMessage) is not True: logger.error( "Received wrong messagetype %s, ServiceMessage expected." %(str(type(msg)))) - self.Listen.ack(msg) + self.listener.ack(msg) continue # Keep track of number of received messages - self.reccounter[index] += 1 + self.reccounter[thread_idx] += 1 # Execute the service handler function and send reply back to client try: self._debug("Running handler") if self.parsefullmessage is True: - replymessage = self.service_handler.handle_message(msg) + replymessage = service_handler.handle_message(msg) else: - replymessage = self.service_handler.handle_message(msg.content) + replymessage = service_handler.handle_message(msg.content) self._debug("finished handler") self._send_reply(replymessage,"OK",msg.reply_to) - self.okcounter[index] += 1 - self.Listen.ack(msg) + self.okcounter[thread_idx] += 1 + self.listener.ack(msg) try: - self.service_handler.in_loop_after_handling() + service_handler.finalize_handling(True) except Exception as e: - logger.error("in_loop_after_handling() failed with %s", e) + logger.error("finalize_handling() failed with %s", e) continue except Exception as e: @@ -328,17 +333,23 @@ class Service(object): logger.info("[Service:] ERRTXT: %s", str(errtxt)) logger.info("[Service:] BackTrace: %s", str( backtrace )) self._send_reply(None, status, msg.reply_to, errtxt=errtxt, backtrace=backtrace) + try: + service_handler.finalize_handling(False) + except Exception as e: + logger.error("finalize_handling() failed with %s", e) + continue except Exception as e: # Unknown problem in the library. Report this and continue. excinfo = sys.exc_info() logger.error("[Service:] ERROR during processing of incoming message.") traceback.print_exception(*excinfo) - logger.info( "Thread %d: Resuming listening on bus %s for service %s" % (index, self.busname, self.service_name)) + logger.info("Thread %d: Resuming listening on bus %s for service %s" % + (thread_idx, self.busname, self.service_name)) try: - self.service_handler.after_main_loop() + service_handler.finalize_loop() except Exception as e: - logger.error("after_main_loop() failed with %s", e) + logger.error("finalize_loop() failed with %s", e) __all__ = ["Service", "MessageHandlerInterface"] diff --git a/LCS/Messaging/python/messaging/test/t_service_message_handler.py b/LCS/Messaging/python/messaging/test/t_service_message_handler.py index 56d15b2c13967a9d1e02ed6f0c6254aa33c7e45a..653a4854ff9428971e8593d2e6ffdbb289923e02 100644 --- a/LCS/Messaging/python/messaging/test/t_service_message_handler.py +++ b/LCS/Messaging/python/messaging/test/t_service_message_handler.py @@ -36,157 +36,161 @@ def StringFunc(input_value): return input_value.upper() class OnlyMessageHandling(MessageHandlerInterface): - def __init__(self, arg_dict): + def __init__(self, **kwargs): MessageHandlerInterface.__init__(self) - print "Creation of OnlyMessageHandling class: %s" % arg_dict - self.handle_message = arg_dict.pop("function") - self.args = arg_dict + print "Creation of OnlyMessageHandling class: %s" % kwargs + self.handle_message = kwargs.pop("function") + self.args = kwargs class FullMessageHandling(MessageHandlerInterface): - def __init__(self, arg_dict): + def __init__(self, **kwargs): MessageHandlerInterface.__init__(self) - print "Creation of FullMessageHandling class: %s" % arg_dict - self.handle_message = arg_dict.pop("function") - self.args = arg_dict - def before_main_loop(self): - print "FullMessageHandling before_main_loop: %s" % self.args - def loop_before_receive(self): - print "FullMessageHandling loop_before_receive: %s" % self.args - def loop_after_handling(self): - print "FullMessageHandling loop_after_handling: %s" % self.args - def after_main_loop(self): - print "FullMessageHandling after_main_loop: %s" % self.args + print "Creation of FullMessageHandling class: %s" % kwargs + self.handle_message = kwargs.pop("function") + self.args = kwargs + def prepare_loop(self): + print "FullMessageHandling prepare_loop: %s" % self.args + def prepare_receive(self): + print "FullMessageHandling prepare_receive: %s" % self.args + def finalize_handling(self, successful): + print "FullMessageHandling finalize_handling: %s" % self.args + def finalize_loop(self): + print "FullMessageHandling finalize_loop: %s" % self.args class FailingMessageHandling(MessageHandlerInterface): - def __init__(self, arg_dict): + def __init__(self, **kwargs): MessageHandlerInterface.__init__(self) - print "Creation of FailingMessageHandling class: %s" % arg_dict - self.handle_message = arg_dict.pop("function") - self.args = arg_dict + print "Creation of FailingMessageHandling class: %s" % kwargs + self.handle_message = kwargs.pop("function") + self.args = kwargs self.counter = 0 - def before_main_loop(self): - print "FailingMessageHandling before_main_loop: %s" % self.args - raise Exception("oops in before_main_loop()") - def loop_before_receive(self): + def prepare_loop(self): + print "FailingMessageHandling prepare_loop: %s" % self.args + raise UserException("oops in prepare_loop()") + def prepare_receive(self): # allow one succesfull call otherwise the main loop never accepts the message :-) - print "FailingMessageHandling loop_before_receive: %s" % self.args + print "FailingMessageHandling prepare_receive: %s" % self.args if self.counter: time.sleep(1) # Prevent running around too fast - raise Exception("oops in loop_before_receive(%d)" % self.counter) + raise UserException("oops in prepare_receive(%d)" % self.counter) else: self.counter = self.counter + 1 - def loop_after_handling(self): - print "FailingMessageHandling loop_after_handling: %s" % self.args - raise Exception("oops in loop_after_handling()") - def after_main_loop(self): - print "FailingMessageHandling after_main_loop: %s" % self.args - raise Exception("oops in after_main_loop()") + def finalize_handling(self, successful): + print "FailingMessageHandling finalize_handling: %s, %s" % (self.args, successful) + raise UserException("oops in finalize_handling()") + def finalize_loop(self): + print "FailingMessageHandling finalize_loop: %s" % self.args + raise UserException("oops in finalize_loop()") if __name__ == '__main__': busname = sys.argv[1] if len(sys.argv) > 1 else "simpletest" # Register functs as a service handler listening at busname and ServiceName - serv1_plain = Service(busname, "Error1Service", ErrorFunc, numthreads=1, startonwith=True) - serv1_minimal_class = Service(busname, "Error2Service", OnlyMessageHandling, numthreads=1, startonwith=True, + serv1_plain = Service("String1Service", StringFunc, busname=busname, numthreads=1, startonwith=True) + serv1_minimal_class = Service("String2Service", OnlyMessageHandling, busname=busname, numthreads=1, startonwith=True, + handler_args={"function" : StringFunc}) + serv1_full_class = Service("String3Service", FullMessageHandling, busname=busname, numthreads=1, startonwith=True, + handler_args={"function" : StringFunc}) + serv1_failing_class = Service("String4Service", FailingMessageHandling, busname=busname, numthreads=1, startonwith=True, + handler_args={"function" : StringFunc}) + + # 'with' sets up the connection context and defines the scope of the service. + with serv1_plain, serv1_minimal_class, serv1_full_class, serv1_failing_class: + # Redo string tests via RPC + with RPC("String1Service", ForwardExceptions=True, busname=busname) as rpc: + result = rpc("aap noot mies") + if result[0] != "AAP NOOT MIES": + raise Exception("String function failed of String1Service:{}".format(result)) + print "string1Service is OK" + + with RPC("String2Service", ForwardExceptions=True, busname=busname) as rpc: + result = rpc("aap noot mies") + if result[0] != "AAP NOOT MIES": + raise Exception("String function failed of String2Service:{}".format(result)) + print "string2Service is OK" + + with RPC("String3Service", ForwardExceptions=True, busname=busname) as rpc: + result = rpc("aap noot mies") + if result[0] != "AAP NOOT MIES": + raise Exception("String function failed of String3Service:{}".format(result)) + print "string3Service is OK" + + with RPC("String4Service", ForwardExceptions=True, busname=busname) as rpc: + result = rpc("aap noot mies") + if result[0] != "AAP NOOT MIES": + raise Exception("String function failed of String4Service:{}".format(result)) + print "string4Service is OK" + + # Register functs as a service handler listening at busname and ServiceName + serv2_plain = Service("Error1Service", ErrorFunc, busname=busname, numthreads=1, startonwith=True) + serv2_minimal_class = Service("Error2Service", OnlyMessageHandling, busname=busname, numthreads=1, startonwith=True, handler_args={"function" : ErrorFunc}) - serv1_full_class = Service(busname, "Error3Service", FullMessageHandling, numthreads=1, startonwith=True, + serv2_full_class = Service("Error3Service", FullMessageHandling, busname=busname, numthreads=1, startonwith=True, handler_args={"function" : ErrorFunc}) - serv1_failing_class = Service(busname, "Error4Service", FailingMessageHandling, numthreads=1, startonwith=True, + serv2_failing_class = Service("Error4Service", FailingMessageHandling, busname=busname, numthreads=1, startonwith=True, handler_args={"function" : ErrorFunc}) # 'with' sets up the connection context and defines the scope of the service. - with serv1_plain, serv1_minimal_class, serv1_full_class, serv1_failing_class: + with serv2_plain, serv2_minimal_class, serv2_full_class, serv2_failing_class: # Redo Error tests via RPC - with RPC(busname, "Error1Service", ForwardExceptions=True) as rpc: + with RPC("Error1Service", ForwardExceptions=True, busname=busname) as rpc: try: result = rpc("aap noot mies") except RPCException as e: - print "Caught expected exception" + print "Error1Service is OK" - with RPC(busname, "Error2Service", ForwardExceptions=True) as rpc: + with RPC("Error2Service", ForwardExceptions=True, busname=busname) as rpc: try: result = rpc("aap noot mies") except RPCException as e: - print "Caught expected exception" + print "Error2Service is OK" - with RPC(busname, "Error3Service", ForwardExceptions=True) as rpc: + with RPC("Error3Service", ForwardExceptions=True, busname=busname) as rpc: try: result = rpc("aap noot mies") except RPCException as e: - print "Caught expected exception" + print "Error3Service is OK" - with RPC(busname, "Error4Service", ForwardExceptions=True) as rpc: + with RPC("Error4Service", ForwardExceptions=True, busname=busname) as rpc: try: result = rpc("aap noot mies") except Exception as e: - print "Caught expected exception" + print "Error4Service is OK" # Register functs as a service handler listening at busname and ServiceName - serv2_plain = Service(busname, "Except1Service", ExceptionFunc, numthreads=1, startonwith=True) - serv2_minimal_class = Service(busname, "Except2Service", OnlyMessageHandling, numthreads=1, startonwith=True, + serv3_plain = Service("Except1Service", ExceptionFunc, busname=busname, numthreads=1, startonwith=True) + serv3_minimal_class = Service("Except2Service", OnlyMessageHandling, busname=busname, numthreads=1, startonwith=True, handler_args={"function" : ExceptionFunc}) - serv2_full_class = Service(busname, "Except3Service", FullMessageHandling, numthreads=1, startonwith=True, + serv3_full_class = Service("Except3Service", FullMessageHandling, busname=busname, numthreads=1, startonwith=True, handler_args={"function" : ExceptionFunc}) - serv2_failing_class = Service(busname, "Except4Service", FailingMessageHandling, numthreads=1, startonwith=True, + serv3_failing_class = Service("Except4Service", FailingMessageHandling, busname=busname, numthreads=1, startonwith=True, handler_args={"function" : ExceptionFunc}) # 'with' sets up the connection context and defines the scope of the service. - with serv2_plain, serv2_minimal_class, serv2_full_class, serv2_failing_class: + with serv3_plain, serv3_minimal_class, serv3_full_class, serv3_failing_class: # Redo exception tests via RPC - with RPC(busname, "Except1Service", ForwardExceptions=True) as rpc: + with RPC("Except1Service", ForwardExceptions=True, busname=busname) as rpc: try: result = rpc("aap noot mies") except IndexError as e: - print "Caught expected exception" + print "Except1Service is OK" - with RPC(busname, "Except2Service", ForwardExceptions=True) as rpc: + with RPC("Except2Service", ForwardExceptions=True, busname=busname) as rpc: try: result = rpc("aap noot mies") except IndexError as e: - print "Caught expected exception" + print "Except2Service is OK" - with RPC(busname, "Except3Service", ForwardExceptions=True) as rpc: + with RPC("Except3Service", ForwardExceptions=True, busname=busname) as rpc: try: result = rpc("aap noot mies") except IndexError as e: - print "Caught expected exception" + print "Except3Service is OK" - with RPC(busname, "Except4Service", ForwardExceptions=True) as rpc: + with RPC("Except4Service", ForwardExceptions=True, busname=busname) as rpc: try: result = rpc("aap noot mies") except IndexError as e: - print "Caught expected exception" - - # Register functs as a service handler listening at busname and ServiceName - serv3_plain = Service(busname, "String1Service", StringFunc, numthreads=1, startonwith=True) - serv3_minimal_class = Service(busname, "String2Service", OnlyMessageHandling, numthreads=1, startonwith=True, - handler_args={"function" : StringFunc}) - serv3_full_class = Service(busname, "String3Service", FullMessageHandling, numthreads=1, startonwith=True, - handler_args={"function" : StringFunc}) - serv3_failing_class = Service(busname, "String4Service", FailingMessageHandling, numthreads=1, startonwith=True, - handler_args={"function" : StringFunc}) - - # 'with' sets up the connection context and defines the scope of the service. - with serv3_plain, serv3_minimal_class, serv3_full_class, serv3_failing_class: - # Redo string tests via RPC - with RPC(busname, "String1Service", ForwardExceptions=True) as rpc: - result = rpc("aap noot mies") - if result[0] != "AAP NOOT MIES": - raise Exception("String function failed of String1Service:{}".format(result)) - - with RPC(busname, "String2Service", ForwardExceptions=True) as rpc: - result = rpc("aap noot mies") - if result[0] != "AAP NOOT MIES": - raise Exception("String function failed of String2Service:{}".format(result)) - - with RPC(busname, "String3Service", ForwardExceptions=True) as rpc: - result = rpc("aap noot mies") - if result[0] != "AAP NOOT MIES": - raise Exception("String function failed of String3Service:{}".format(result)) - - with RPC(busname, "String4Service", ForwardExceptions=True) as rpc: - result = rpc("aap noot mies") - if result[0] != "AAP NOOT MIES": - raise Exception("String function failed of String4Service:{}".format(result)) + print "Except4Service is OK" - print "Functions tested with RPC: All OK" + print "Functions tested with RPC: All OK"