diff --git a/LCS/Messaging/python/messaging/Service.py b/LCS/Messaging/python/messaging/Service.py index 3a7434a2a1c7a88bc75dd0ec9e7885fc6abf82b3..5ce419653ee366e5d5cbd2ce92d1876d9cf93f44 100644 --- a/LCS/Messaging/python/messaging/Service.py +++ b/LCS/Messaging/python/messaging/Service.py @@ -20,8 +20,8 @@ # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -from .messagebus import ToBus,FromBus -from .messages import ReplyMessage,RequestMessage +from .messagebus import ToBus, FromBus, AbstractBusListener +from .messages import ReplyMessage, RequestMessage import threading import time import uuid @@ -78,8 +78,7 @@ class MessageHandlerInterface(object): pass -# create service: -class Service(object): +class Service(AbstractBusListener): """ Service class for registering python functions with a Service name on a message bus. create new service with Service(busname, servicename, servicehandler) @@ -97,7 +96,7 @@ class Service(object): is a class in stead of a function. """ - def __init__(self, servicename, servicehandler, **kwargs): + def __init__(self, servicename, servicehandler, broker=None, **kwargs): """ Initialize Service object with servicename (str) and servicehandler function. additional parameters: @@ -109,120 +108,57 @@ class Service(object): """ self.service_name = servicename self.service_handler = servicehandler - self.connected = False - self.running = [False] - self.link_uuid = str(uuid.uuid4()) self.busname = kwargs.pop("busname", None) - self.exclusive = kwargs.pop("exclusive", True) - self._numthreads = kwargs.pop("numthreads", 1) - self.verbose = kwargs.pop("verbose", False) - self.options = {"capacity": self._numthreads*20} - options = kwargs.pop("options", None) self.parsefullmessage = kwargs.pop("parsefullmessage", False) self.handler_args = kwargs.pop("handler_args", {}) - self.listening = False - if len(kwargs): - raise AttributeError("Unexpected argument passed to Service class: %s", kwargs) - - # Set appropriate flags for exclusive binding - if self.exclusive == True: - self.options["link"] = '{name:"' + self.link_uuid + \ - '", x-bindings:[{key:' + self.service_name + \ - ', arguments: {"qpid.exclusive-binding":True}}]}' - - # only add options if it is given as a dictionary - if isinstance(options,dict): - for key,val in options.iteritems(): - self.options[key] = val - - def _debug(self, txt): - """ - Internal use only. - """ - if self.verbose == True: - logger.debug("[Service: %s]", txt) + + address = self.busname+"/"+self.service_name if self.busname else self.service_name + + super(Service, self).__init__(address, broker, **kwargs) def start_listening(self, numthreads=None): """ Start the background threads and process incoming messages. """ - if self.listening == True: + if self.isListening(): return # Usually a service will be listening on a 'bus' implemented by a topic exchange if self.busname != None: - self.listener = FromBus(self.busname+"/"+self.service_name, options=self.options) self.reply_bus = ToBus(self.busname) - self.listener.open() self.reply_bus.open() - # Handle case when queues are used else: - # assume that we are listening on a queue and therefore we cannot use a generic ToBus() for replies. - self.listener = FromBus(self.service_name, options=self.options) - self.listener.open() self.reply_bus=None - self.connected = True - - if numthreads != None: - self._numthreads = numthreads - - # use a list to ensure that threads always 'see' changes in the running state. - self.running = [ True ] - self._tr = [] - self.reccounter = [] - self.okcounter =[] - for i in range(self._numthreads): - # set up service_handler - if str(type(self.service_handler)) == "<type 'instancemethod'>" or \ - str(type(self.service_handler)) == "<type 'function'>": - thread_service_handler = MessageHandlerInterface() - thread_service_handler.handle_message = self.service_handler - else: - thread_service_handler = self.service_handler(**self.handler_args) - if not isinstance(thread_service_handler, MessageHandlerInterface): - raise TypeError("Servicehandler argument must by a function or a derived class from MessageHandlerInterface.") - - self._tr.append(threading.Thread(target=self._loop, - kwargs={"index":i, "service_handler":thread_service_handler})) - self.reccounter.append(0) - self.okcounter.append(0) - self._tr[i].start() - self.listening = True + # create listener FromBus in super class + super(Service, self).start_listening(numthreads=numthreads) def stop_listening(self): """ Stop the background threads that listen to incoming messages. """ - # stop all running threads - if self.running[0] == True: - self.running[0] = False - for i in range(self._numthreads): - self._tr[i].join() - logger.info("Thread %2d: STOPPED Listening for messages on Bus %s and service name %s." % (i, self.busname, self.service_name)) - logger.info(" %d messages received and %d processed OK." % (self.reccounter[i], self.okcounter[i])) - self.listening = False - # close the listeners - if self.connected == True: - if isinstance(self.listener, FromBus): - self.listener.close() - if isinstance(self.reply_bus, ToBus): - self.reply_bus.close() - self.connected = False + if isinstance(self.reply_bus, ToBus): + self.reply_bus.close() + # close the listeners + super(Service, self).stop_listening() + + def _create_thread_args(self, index): + # set up service_handler + if str(type(self.service_handler)) == "<type 'instancemethod'>" or \ + str(type(self.service_handler)) == "<type 'function'>": + thread_service_handler = MessageHandlerInterface() + thread_service_handler.handle_message = self.service_handler + else: + thread_service_handler = self.service_handler(**self.handler_args) - def __enter__(self): - """ - Internal use only. Handles scope with keyword 'with' - """ - self.start_listening() - return self + if not isinstance(thread_service_handler, MessageHandlerInterface): + raise TypeError("Servicehandler argument must by a function or a derived class from MessageHandlerInterface.") - def __exit__(self, exc_type, exc_val, exc_tb): - """ - Internal use only. Handles scope with keyword 'with' - """ - self.stop_listening() + # add service_handler to default args for thread + args = super(Service, self)._create_thread_args(index) + args['service_handler'] = thread_service_handler + return args def _send_reply(self, replymessage, status, reply_to, errtxt="",backtrace=""): """ @@ -277,133 +213,95 @@ class Service(object): except MessageBusError as e: logger.error("Failed to send reply messgage to reply address %s" %(reply_to)) + def _getServiceHandlerForCurrentThread(self): + currentThread = threading.currentThread() + args = self._threads[currentThread] + return args['service_handler'] - def _loop(self, **kwargs): - """ - Internal use only. Message listener loop that receives messages and starts the attached function with the message content as argument. - """ - thread_idx = kwargs.pop("index") - service_handler = kwargs.pop("service_handler") - logger.info( "Thread %d START Listening for messages on Bus %s and service name %s." %(thread_idx, self.busname, self.service_name)) - try: - service_handler.prepare_loop() - except Exception as e: - logger.error("prepare_loop() failed with %s", e) + def onListenLoopBegin(self): + "Called before main processing loop is entered." + self._getServiceHandlerForCurrentThread().prepare_loop() - while self.running[0]: - try: - service_handler.prepare_receive() - except Exception as e: - logger.error("prepare_receive() failed with %s", e) - continue + def onBeforeReceiveMessage(self): + "Called in main processing loop just before a blocking wait for messages is done." + self._getServiceHandlerForCurrentThread().prepare_receive() - try: - # get the next message - lofar_msg = self.listener.receive(1) - # retry if timed-out - if lofar_msg is None: - continue - - # report if messages are not Service Messages - if not isinstance(lofar_msg, RequestMessage): - logger.error( "Received wrong messagetype %s, RequestMessage expected." %(str(type(lofar_msg)))) - self.listener.ack(lofar_msg) - continue - - # Keep track of number of received messages - self.reccounter[thread_idx] += 1 - - # Execute the service handler function and send reply back to client - try: - self._debug("Running handler") - - # determine which handler method has to be called - if hasattr(service_handler, 'service2MethodMap') and lofar_msg.subject in service_handler.service2MethodMap: - # pass the handling of this message on to the specific method for this service - serviceHandlerMethod = service_handler.service2MethodMap[lofar_msg.subject] - else: - serviceHandlerMethod = service_handler.handle_message - - if self.parsefullmessage is True: - replymessage = serviceHandlerMethod(lofar_msg) - else: - # check for positional arguments and named arguments - # depending on presence of args and kwargs, - # the signature of the handler method should vary as well - if lofar_msg.has_args and lofar_msg.has_kwargs: - # both positional and named arguments - # rpcargs and rpckwargs are packed in the content - rpcargs = lofar_msg.content - - # rpckwargs is the last argument in the content - # rpcargs is the rest in front - rpckwargs = rpcargs[-1] - del rpcargs[-1] - rpcargs = tuple(rpcargs) - replymessage = serviceHandlerMethod(*rpcargs, **rpckwargs) - elif lofar_msg.has_args: - # only positional arguments - # msg.content should be a list - rpcargs = tuple(lofar_msg.content) - replymessage = serviceHandlerMethod(*rpcargs) - elif lofar_msg.has_kwargs: - # only named arguments - # msg.content should be a dict - rpckwargs = lofar_msg.content - replymessage = serviceHandlerMethod(**rpckwargs) - elif lofar_msg.content: - rpccontent = lofar_msg.content - replymessage = serviceHandlerMethod(rpccontent) - else: - replymessage = serviceHandlerMethod() - - self._debug("finished handler") - self._send_reply(replymessage,"OK",lofar_msg.reply_to) - self.okcounter[thread_idx] += 1 - self.listener.ack(lofar_msg) - try: - service_handler.finalize_handling(True) - except Exception as e: - logger.error("finalize_handling() failed with %s", e) - continue - - except Exception as e: - # Any thrown exceptions either Service exception or unhandled exception - # during the execution of the service handler is caught here. - self._debug("handling exception") - exc_info = sys.exc_info() - status="ERROR" - rawbacktrace = traceback.format_exception(*exc_info) - errtxt = rawbacktrace[-1] - self._debug(rawbacktrace) - # cleanup the backtrace print by removing the first two lines and the last - del rawbacktrace[1] - del rawbacktrace[0] - del rawbacktrace[-1] - backtrace = ''.join(rawbacktrace).encode('latin-1').decode('unicode_escape') - self._debug(backtrace) - if self.verbose is True: - logger.info("[Service:] Status: %s", str(status)) - logger.info("[Service:] ERRTXT: %s", str(errtxt)) - logger.info("[Service:] BackTrace: %s", str( backtrace )) - self._send_reply(None, status, lofar_msg.reply_to, errtxt=errtxt, backtrace=backtrace) - try: - service_handler.finalize_handling(False) - except Exception as e: - logger.error("finalize_handling() failed with %s", e) - continue - - except Exception as e: - # Unknown problem in the library. Report this and continue. - excinfo = sys.exc_info() - logger.error("[Service:] ERROR during processing of incoming message.") - traceback.print_exception(*excinfo) - logger.info("Thread %d: Resuming listening on bus %s for service %s" % - (thread_idx, self.busname, self.service_name)) + def handleMessage(self, lofar_msg): + service_handler = self._getServiceHandlerForCurrentThread() try: - service_handler.finalize_loop() + self._debug("Running handler") + + # determine which handler method has to be called + if hasattr(service_handler, 'service2MethodMap') and lofar_msg.subject in service_handler.service2MethodMap: + # pass the handling of this message on to the specific method for this service + serviceHandlerMethod = service_handler.service2MethodMap[lofar_msg.subject] + else: + serviceHandlerMethod = service_handler.handle_message + + if self.parsefullmessage is True: + replymessage = serviceHandlerMethod(lofar_msg) + else: + # check for positional arguments and named arguments + # depending on presence of args and kwargs, + # the signature of the handler method should vary as well + if lofar_msg.has_args and lofar_msg.has_kwargs: + # both positional and named arguments + # rpcargs and rpckwargs are packed in the content + rpcargs = lofar_msg.content + + # rpckwargs is the last argument in the content + # rpcargs is the rest in front + rpckwargs = rpcargs[-1] + del rpcargs[-1] + rpcargs = tuple(rpcargs) + replymessage = serviceHandlerMethod(*rpcargs, **rpckwargs) + elif lofar_msg.has_args: + # only positional arguments + # msg.content should be a list + rpcargs = tuple(lofar_msg.content) + replymessage = serviceHandlerMethod(*rpcargs) + elif lofar_msg.has_kwargs: + # only named arguments + # msg.content should be a dict + rpckwargs = lofar_msg.content + replymessage = serviceHandlerMethod(**rpckwargs) + elif lofar_msg.content: + rpccontent = lofar_msg.content + replymessage = serviceHandlerMethod(rpccontent) + else: + replymessage = serviceHandlerMethod() + + self._send_reply(replymessage,"OK",lofar_msg.reply_to) + except Exception as e: - logger.error("finalize_loop() failed with %s", e) + # Any thrown exceptions either Service exception or unhandled exception + # during the execution of the service handler is caught here. + self._debug("handling exception") + exc_info = sys.exc_info() + status="ERROR" + rawbacktrace = traceback.format_exception(*exc_info) + errtxt = rawbacktrace[-1] + self._debug(rawbacktrace) + # cleanup the backtrace print by removing the first two lines and the last + del rawbacktrace[1] + del rawbacktrace[0] + del rawbacktrace[-1] + backtrace = ''.join(rawbacktrace).encode('latin-1').decode('unicode_escape') + self._debug(backtrace) + if self.verbose is True: + logger.info("[Service:] Status: %s", str(status)) + logger.info("[Service:] ERRTXT: %s", str(errtxt)) + logger.info("[Service:] BackTrace: %s", str( backtrace )) + self._send_reply(None, status, lofar_msg.reply_to, errtxt=errtxt, backtrace=backtrace) + + def onAfterReceiveMessage(self, successful): + "Called in the main loop after the result was send back to the requester." + "@successful@ reflects the state of the handling: true/false" + self._getServiceHandlerForCurrentThread().finalize_handling(successful) + + def onListenLoopEnd(self): + "Called after main processing loop is finished." + self._getServiceHandlerForCurrentThread().finalize_loop() __all__ = ["Service", "MessageHandlerInterface"]