-
Jorrit Schaap authored
SW-657: added TODO to handle exception on sending to removed-temp-queue when an rpc client did timeout
Jorrit Schaap authoredSW-657: added TODO to handle exception on sending to removed-temp-queue when an rpc client did timeout
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
Service.py 12.45 KiB
#!/usr/bin/env python3
# Service.py: Service definition for the lofar.messaging module.
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
from .messagebus import ToBus, AbstractBusListener
from .messages import ReplyMessage, RequestMessage
from .exceptions import MessageBusError, MessageFactoryError
import threading
import time
import uuid
import sys
import traceback
import logging
logger = logging.getLogger(__name__)
class MessageHandlerInterface(object):
"""
Interface class for tuning the handling of a message by the Service class.
The class defines some (placeholders for) functions that the Service class calls
during the handling of messages. It can for instance be used to maintain a database connection.
The pseudocode of the Service class is:
Service(busname, function or from_MessageHandlerInterface_derived_class, ..., HandlerArguments={})
handler = <from_MessageHandlerInterface_derived_class>(HandlerArguments)
handler.prepare_loop()
while alive:
handler.prepare_receive()
msg = wait for messages()
handler.handle_message(msg)
handler.finalize_handling(handling_result)
handler.finalize_loop()
"""
def __init__(self, **kwargs):
# if you want your subclass to handle multiple services
# then you can specify for each service which method has to be called
# In case this map is empty, or the called service is not in this map,
# then the default handle_message is called
self.service2MethodMap = {}
def prepare_loop(self):
"Called before main processing loop is entered."
pass
def prepare_receive(self):
"Called in main processing loop just before a blocking wait for messages is done."
pass
def handle_message(self, msg):
"Function the should handle the received message and return a result."
raise NotImplementedError("OOPS! YOU ENDED UP IN THE MESSAGE HANDLER OF THE ABSTRACT BASE CLASS!")
def finalize_handling(self, successful):
"Called in the main loop after the result was send back to the requester."
"@successful@ reflects the state of the handling: true/false"
pass
def finalize_loop(self):
"Called after main processing loop is finished."
pass
class Service(AbstractBusListener):
"""
Service class for registering python functions with a Service name on a message bus.
create new service with Service(busname, servicename, servicehandler)
busname <string> The name of the messagebus (queue or exchange) the service whould listen on.
servicename <string> The name that the user should use the invocate the servicehandler.
servicehandler <...> May be a function of an class that is derived from the MessageHandlerInterface.
The service uses this function or class for the handling of the messages.
Optional arguments:
options <dict> For the QPID connection
exclusive <bool> To create eclusive access to this messagebus. Default:True
numthreads <int> Amount of threads processing messages. Default:1
parsefullmessage <bool> Pass full message of only message content to the service handler. Default:False.
verbose <bool> Show debug text. Default:False
handler_args <dict> Arguments that are passed to the constructor of the servicehandler is case the servicehandler
is a class in stead of a function.
"""
def __init__(self, servicename, servicehandler, broker=None, **kwargs):
"""
Initialize Service object with servicename (str) and servicehandler function.
additional parameters:
busname = <string> Name of the bus in case exchanges are used in stead of queues
exclusive = <bool> Create an exclusive binding so no other services can consume duplicate messages (default: True)
numthreads = <int> Number of parallel threads processing messages (default: 1)
verbose = <bool> Output extra logging over stdout (default: False)
use_service_methods = <bool> Listen to <servicename>.* and map 2nd subject part to method. (default: False)
Example: MyService.foo calls the method foo in the handler.
"""
self.service_name = servicename
self.service_handler = servicehandler
self.busname = kwargs.pop("busname", None)
self.use_service_methods = kwargs.pop("use_service_methods", False)
self.parsefullmessage = kwargs.pop("parsefullmessage", False)
self.handler_args = kwargs.pop("handler_args", {})
address = self.busname+"/"+self.service_name if self.busname else self.service_name
# if the service_handler wants to map the 2nd part of the subject to a method
# then we need to listen to <servicename>.*
servicename = self.service_name+'.*' if self.use_service_methods else self.service_name
address = self.busname+"/"+servicename if self.busname else self.servicename
super(Service, self).__init__(address, broker, **kwargs)
def _create_thread_args(self, index):
# set up service_handler
if str(type(self.service_handler)) == "<class 'instancemethod'>" or \
str(type(self.service_handler)) == "<class 'function'>":
thread_service_handler = MessageHandlerInterface()
thread_service_handler.handle_message = self.service_handler
else:
thread_service_handler = self.service_handler(**self.handler_args)
if not isinstance(thread_service_handler, MessageHandlerInterface):
raise TypeError("Servicehandler argument must by a function or a derived class from MessageHandlerInterface.")
# add service_handler to default args for thread
args = super(Service, self)._create_thread_args(index)
args['service_handler'] = thread_service_handler
return args
def _send_reply(self, replymessage, status, reply_to, errtxt="",backtrace=""):
"""
Internal use only. Send a reply message to the RPC client including exception info.
"""
# Compose Reply message from reply and status.
if isinstance(replymessage, ReplyMessage):
reply_msg = replymessage
else:
reply_msg = ReplyMessage(replymessage, reply_to)
reply_msg.status = status
reply_msg.errmsg = errtxt
reply_msg.backtrace = backtrace
# show the message content if required by the verbose flag.
if self.verbose:
reply_msg.show()
# send the result to the RPC client
try:
with ToBus(reply_to, broker=self.broker, connection_log_level=logging.DEBUG) as dest:
dest.send(reply_msg)
except MessageBusError as e:
logger.error("Failed to send reply messgage to reply address %s. Error: %s", reply_to, e)
def _getServiceHandlerForCurrentThread(self):
currentThread = threading.currentThread()
args = self._threads[currentThread]
return args['service_handler']
def _onListenLoopBegin(self):
"Called before main processing loop is entered."
self._getServiceHandlerForCurrentThread().prepare_loop()
def _onBeforeReceiveMessage(self):
"Called in main processing loop just before a blocking wait for messages is done."
self._getServiceHandlerForCurrentThread().prepare_receive()
def _handleMessage(self, lofar_msg):
service_handler = self._getServiceHandlerForCurrentThread()
try:
# determine which handler method has to be called
if hasattr(service_handler, 'service2MethodMap') and '.' in lofar_msg.subject:
subject_parts = lofar_msg.subject.split('.')
method_name = subject_parts[-1]
if method_name in service_handler.service2MethodMap:
# pass the handling of this message on to the specific method for this service
serviceHandlerMethod = service_handler.service2MethodMap[method_name]
else:
raise ValueError('Unknown method %s on service %s' % (method_name, lofar_msg.subject))
else:
serviceHandlerMethod = service_handler.handle_message
if self.parsefullmessage is True:
replymessage = serviceHandlerMethod(lofar_msg)
else:
# check for positional arguments and named arguments
# depending on presence of args and kwargs,
# the signature of the handler method should vary as well
if lofar_msg.has_args and lofar_msg.has_kwargs:
# both positional and named arguments
# rpcargs and rpckwargs are packed in the content
rpcargs = lofar_msg.body
# rpckwargs is the last argument in the content
# rpcargs is the rest in front
rpckwargs = rpcargs[-1]
del rpcargs[-1]
rpcargs = tuple(rpcargs)
replymessage = serviceHandlerMethod(*rpcargs, **rpckwargs)
elif lofar_msg.has_args:
# only positional arguments
# msg.body should be a list
rpcargs = tuple(lofar_msg.body)
replymessage = serviceHandlerMethod(*rpcargs)
elif lofar_msg.has_kwargs:
# only named arguments
# msg.body should be a dict
rpckwargs = lofar_msg.body
replymessage = serviceHandlerMethod(**rpckwargs)
elif lofar_msg.body:
rpccontent = lofar_msg.body
replymessage = serviceHandlerMethod(rpccontent)
else:
replymessage = serviceHandlerMethod()
#TODO: check for timeout and/or presence of response queue!
self._send_reply(replymessage,"OK",lofar_msg.reply_to)
except Exception as e:
# Any thrown exceptions either Service exception or unhandled exception
# during the execution of the service handler is caught here.
self._debug("handling exception")
exc_info = sys.exc_info()
status="ERROR"
rawbacktrace = traceback.format_exception(*exc_info)
errtxt = rawbacktrace[-1]
self._debug(rawbacktrace)
# cleanup the backtrace print by removing the first two lines and the last
del rawbacktrace[1]
del rawbacktrace[0]
del rawbacktrace[-1]
backtrace = ''.join(rawbacktrace).encode('latin-1').decode('unicode_escape')
logger.error("exception while handling message: %s\n%s" % (errtxt, backtrace))
if self.verbose:
logger.error("[Service:] Status: %s", str(status))
logger.error("[Service:] ERRTXT: %s", str(errtxt))
logger.error("[Service:] BackTrace: %s", str( backtrace ))
self._send_reply(None, status, lofar_msg.reply_to, errtxt=errtxt, backtrace=backtrace)
def _onAfterReceiveMessage(self, successful):
"Called in the main loop after the result was send back to the requester."
"@successful@ reflects the state of the handling: true/false"
self._getServiceHandlerForCurrentThread().finalize_handling(successful)
def _onListenLoopEnd(self):
"Called after main processing loop is finished."
self._getServiceHandlerForCurrentThread().finalize_loop()
__all__ = ["Service", "MessageHandlerInterface"]