From 51d6bb5f1af844c2e98fc6b68f7444df7fbc4ea9 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Mon, 3 Jun 2019 11:50:59 +0000 Subject: [PATCH] SW-699: major refactoring of the messagebus's BusListener and Service. Together with Jan-David and Auke, I decided to only allow a single API for the service, and we chose the most commenly used API based on the MessageHandlerInterface which we refactored into the ServiceMessageHandler. TODO: add docstrings, fix tests, adapt all Services in lofar python code. --- .../python/messagelogger/messagelogger.py | 37 +- LCS/Messaging/python/messaging/CMakeLists.txt | 2 +- LCS/Messaging/python/messaging/RPC.py | 12 +- LCS/Messaging/python/messaging/Service.py | 387 +++--- LCS/Messaging/python/messaging/__init__.py | 14 +- LCS/Messaging/python/messaging/config.py | 38 +- LCS/Messaging/python/messaging/messagebus.py | 1131 +++++++++++------ LCS/Messaging/python/messaging/messages.py | 82 +- LCS/Messaging/python/messaging/test/t_RPC.py | 31 +- .../python/messaging/test/t_messagebus.py | 171 +-- .../test/t_service_message_handler.py | 20 +- 11 files changed, 1170 insertions(+), 755 deletions(-) diff --git a/LCS/Messaging/python/messagelogger/messagelogger.py b/LCS/Messaging/python/messagelogger/messagelogger.py index 84161c8074a..044fc716dd1 100644 --- a/LCS/Messaging/python/messagelogger/messagelogger.py +++ b/LCS/Messaging/python/messagelogger/messagelogger.py @@ -30,28 +30,35 @@ import logging logger = logging.getLogger(__name__) from lofar.messaging.messages import LofarMessage -from lofar.messaging.messagebus import AbstractBusListener +from lofar.messaging.messagebus import BusListener, AbstractMessageHandler from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.common.util import waitForInterrupt -class MessageLogger(AbstractBusListener): - def __init__(self, exchange_name:str=DEFAULT_BUSNAME, routing_key:str="#", broker:str=DEFAULT_BROKER, - remove_content_newlines=False): - self.remove_content_newlines = remove_content_newlines - super(MessageLogger, self).__init__(exchange_name=exchange_name, +class MessageLogger(BusListener): + class _Handler(AbstractMessageHandler): + def __init__(self, remove_content_newlines: bool): + self.remove_content_newlines = remove_content_newlines + + def handle_message(self, msg: LofarMessage): + content = str(msg.content).replace(linesep, " ") if self.remove_content_newlines else str(msg.content) + logger.info("%s subject='%s' %s%scontent: %s", + msg.__class__.__name__, + msg.subject, + "priority=%s " if msg.priority != 4 else "", # only show non-default priorities + (" %s" % msg.ttl) if msg.ttl else "", + content) + return True + + def __init__(self, exchange:str=DEFAULT_BUSNAME, routing_key:str="#", broker:str=DEFAULT_BROKER, + remove_content_newlines: bool=False): + super(MessageLogger, self).__init__(handler_type=MessageLogger._Handler, + handler_kwargs={'remove_content_newlines': remove_content_newlines}, + exchange=exchange, routing_key=routing_key, num_threads=1, broker=broker) - def _handleMessage(self, msg: LofarMessage): - content = str(msg.content).replace(linesep, " ") if self.remove_content_newlines else msg.content - logger.info("%s subject='%s' priority=%s %scontent: %s", - msg.__class__.__name__, - msg.subject, - msg.priority, - (" %s" % msg.ttl) if msg.ttl else "", - content) def main(): from optparse import OptionParser @@ -77,7 +84,7 @@ def main(): logger.info("* starting messagelogger *") logger.info("**************************") - with MessageLogger(exchange_name=options.busname, routing_key=options.routing_key, + with MessageLogger(exchange=options.busname, routing_key=options.routing_key, broker=options.broker, remove_content_newlines=options.no_newlines): waitForInterrupt() diff --git a/LCS/Messaging/python/messaging/CMakeLists.txt b/LCS/Messaging/python/messaging/CMakeLists.txt index 7db764fbf38..a2208566d6e 100644 --- a/LCS/Messaging/python/messaging/CMakeLists.txt +++ b/LCS/Messaging/python/messaging/CMakeLists.txt @@ -5,7 +5,7 @@ lofar_package(PyMessaging 1.0 DEPENDS PyCommon) lofar_find_package(Python 3.4 REQUIRED) include(FindPythonModule) -find_python_module(kombu) +find_python_module(kombu REQUIRED) include(PythonInstall) diff --git a/LCS/Messaging/python/messaging/RPC.py b/LCS/Messaging/python/messaging/RPC.py index a0ca7fd6787..9962ad2a39b 100644 --- a/LCS/Messaging/python/messaging/RPC.py +++ b/LCS/Messaging/python/messaging/RPC.py @@ -20,8 +20,8 @@ # # RPC invocation with possible timeout -from .messagebus import ToBus, TemporaryQueue, DEFAULT_BROKER, DEFAULT_TIMEOUT -from .messages import RequestMessage, ReplyMessage +from lofar.messaging.messagebus import ToBus, TemporaryQueue, DEFAULT_BROKER, DEFAULT_TIMEOUT +from lofar.messaging.messages import RequestMessage, ReplyMessage import logging logger = logging.getLogger(__name__) @@ -83,7 +83,7 @@ class RPC(): self._service_name = service_name self._timeout = timeout self._broker = broker - self._request_sender = ToBus(busname, subject_based_routing=True, broker=self._broker) + self._request_sender = ToBus(busname, broker=self._broker) def open(self): """ @@ -136,7 +136,7 @@ class RPC(): timeout) with TemporaryQueue(name_prefix="Reply-" + self._service_name, - exchange_name=self._busname, + exchange=self._busname, addressed_to_me_only=True, broker=self._broker) as tmp_reply_queue: with tmp_reply_queue.create_frombus() as reply_receiver: @@ -232,7 +232,7 @@ class RPCWrapper(object): in this case: 'MyBus/MyService.foo', 'MyBus/MyService.bar', ...etc Again, define a RPCWrapper-derived class once, like so: - class MyRPC(RPCWrapper): + class MyServiceClient(RPCWrapper): def foo(self): return self.rpc('foo') @@ -300,7 +300,7 @@ class RPCWrapper(object): Close all opened rpc connections""" self.close() - def rpc(self, method=None, *args, **kwargs): + def execute_rpc(self, method=None, *args, **kwargs): '''execute the rpc call on the <bus>/<service>.<method> and return the result''' try: service_method = (self.servicename + '.' + method) if self.servicename and method \ diff --git a/LCS/Messaging/python/messaging/Service.py b/LCS/Messaging/python/messaging/Service.py index 274f50291a5..1452b9f68a2 100644 --- a/LCS/Messaging/python/messaging/Service.py +++ b/LCS/Messaging/python/messaging/Service.py @@ -20,218 +20,209 @@ # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -from .messagebus import ToBus, AbstractBusListener, DEFAULT_BROKER -from .messages import ReplyMessage, RequestMessage -from .exceptions import MessagingError -import sys -import traceback +""" +A ServiceMessageHandler is a special type of AbstractMessageHandler to be used in conjunction with +the specialized Service implementation of the BusListener. +The two additions on top of AbstractMessageHandler/BusListener for the ServiceMessageHandler/Service are: +1) A ServiceMessageHandler can call methods based on the RequestMessage's subject +2) The Service then responds the result with a ReplyMessage + + +Here's an example: +>>> # implement your own ServiceMessageHandler, add some nice methods that do work +... class MyServiceMessageHandler(ServiceMessageHandler): +... def foo(self, my_param): +... print("foo was called. my_param =", my_param) +... # ... do some work ... +... +... def bar(self): +... print("bar was called.") +... # ... do some work ... + +Use your handler in a Service instance +We assume you know that the TemporaryExchange is only needed here to have a working example. +>>> from lofar.messaging.messagebus import TemporaryExchange +>>> with TemporaryExchange() as tmp_exchange: +... with Service(service_name="MyService", +... handler_type=MyServiceMessageHandler, +... exchange=tmp_exchange.address): +... +... # That's it, now you have a running Service, waiting for incoming RequestMessages... +... # Normally you should use the RPC/RPCWrapper class for that, but let's keep this example simple and send some messages ourselves +... with tmp_exchange.create_tobus() as tobus: +... tobus.send(RequestMessage(subject="MyService.foo", content="whatever", +... reply_to=tmp_exchange.address)) # normally the RPC would take care of the reply queue, and handling of the ReplyMessage. +... tobus.send(RequestMessage(subject="MyService.bar", +... reply_to=tmp_exchange.address)) # normally the RPC would take care of the reply queue, and handling of the ReplyMessage. +... +... # ... do some work ... simulate this by sleeping a little... +... # ...in the mean time, the Service receives and handles the messages (on its own thread) +... from time import sleep +... sleep(0.25) +foo was called. my_param = whatever +bar was called. +""" + +from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging.messagebus import ToBus, BusListener, AbstractMessageHandler +from lofar.messaging.messages import ReplyMessage, RequestMessage, LofarMessage +from typing import Optional import logging +import inspect logger = logging.getLogger(__name__) -class MessageHandlerInterface(object): - """ - Interface class for tuning the handling of a message by the Service class. - The class defines some (placeholders for) functions that the Service class calls - during the handling of messages. It can for instance be used to maintain a database connection. - - The pseudocode of the Service class is: - Service(busname, function or from_MessageHandlerInterface_derived_class, ..., HandlerArguments={}) - - handler = <from_MessageHandlerInterface_derived_class>(HandlerArguments) - handler.prepare_loop() - while alive: - handler.prepare_receive() - msg = wait for messages() - handler.handle_message(msg) - handler.finalize_handling(handling_result) - handler.finalize_loop() - """ - def __init__(self, **kwargs): - # if you want your subclass to handle multiple services - # then you can specify for each service which method has to be called - # In case this map is empty, or the called service is not in this map, - # then the default handle_message is called - self.service2MethodMap = {} - - def prepare_loop(self): - "Called before main processing loop is entered." - pass - - def prepare_receive(self): - "Called in main processing loop just before a blocking wait for messages is done." - pass - - def handle_message(self, msg): - "Function the should handle the received message and return a result." - raise NotImplementedError("OOPS! YOU ENDED UP IN THE MESSAGE HANDLER OF THE ABSTRACT BASE CLASS!") - - def finalize_handling(self, successful): - "Called in the main loop after the result was send back to the requester." - "@successful@ reflects the state of the handling: true/false" - pass - - def finalize_loop(self): - "Called after main processing loop is finished." - pass - - -class Service(AbstractBusListener): - """ - Service class for registering python functions with a Service name on a message bus. - create new service with Service(busname, servicename, servicehandler) - busname <string> The name of the messagebus (queue or exchange) the service whould listen on. - servicename <string> The name that the user should use the invocate the servicehandler. - servicehandler <...> May be a function of an class that is derived from the MessageHandlerInterface. - The service uses this function or class for the handling of the messages. - Optional arguments: - options <dict> For the QPID connection - exclusive <bool> To create eclusive access to this messagebus. Default:True - numthreads <int> Amount of threads processing messages. Default:1 - verbose <bool> Show debug text. Default:False - handler_args <dict> Arguments that are passed to the constructor of the servicehandler is case the servicehandler - is a class in stead of a function. - """ - - def __init__(self, - service_name: str, - service_handler: MessageHandlerInterface, - busname: str, - num_threads: int = 1, - broker: str = DEFAULT_BROKER, - use_service_methods = False, - handler_args = None): - """ - Initialize Service object with servicename (str) and servicehandler function. - additional parameters: - busname = <string> Name of the bus in case exchanges are used in stead of queues - exclusive = <bool> Create an exclusive binding so no other services can consume duplicate messages (default: True) - numthreads = <int> Number of parallel threads processing messages (default: 1) - verbose = <bool> Output extra logging over stdout (default: False) - use_service_methods = <bool> Listen to <servicename>.* and map 2nd subject part to method. (default: False) - Example: MyService.foo calls the method foo in the handler. - """ - self.service_name = service_name - self.use_service_methods = use_service_methods - self.handler_args = handler_args - - # set up service_handler, either for a wrapped function, or a MessageHandlerInterface - if str(type(service_handler)) == "<class 'instancemethod'>" or str(type(service_handler)) == "<class 'function'>": - self.service_handler = MessageHandlerInterface() - self.service_handler.handle_message = service_handler - else: - self.service_handler = service_handler(**self.handler_args) - - if not isinstance(self.service_handler, MessageHandlerInterface): - raise TypeError("Servicehandler argument must by a function or a derived class from MessageHandlerInterface.") - - super(Service, self).__init__(exchange_name=busname, - routing_key="%s.#" % (self.service_name,), - broker=broker, num_threads=num_threads) - - def _send_reply(self, content, status, reply_to_address, errtxt="",backtrace=""): - """ - Internal use only. Send a reply message to the RPC client including exception info. - """ - - reply_msg = ReplyMessage(content=content, status=status, subject=reply_to_address) - reply_msg.status = status - reply_msg.errmsg = errtxt - reply_msg.backtrace = backtrace - - # send the result to the RPC client - try: - # send the msg to the common exchange, and use reply_to_address as subject for routing it to the reply queue. - with ToBus(self.exchange_name, broker=self.broker) as reply_to_sender: - reply_to_sender.send(reply_msg) - except MessagingError as e: - logger.error("Failed to send reply messgage to %s. Error: %s", reply_to_address, e) +class ServiceMessageHandler(AbstractMessageHandler): + def __init__(self): + self.service_name = None + self.exchange = None + self.broker = None + self._subject_to_method_map = {} + super(ServiceMessageHandler, self).__init__() + + def register_service_method(self, method_name, method): + logger.debug("%s registering method %s", self.service_name, method_name) + self._subject_to_method_map[method_name] = method + + def register_public_handler_methods(self): + excluded_class_names = (ServiceMessageHandler.__name__,) + tuple(x.__name__ for x in ServiceMessageHandler.__bases__) + + def is_public_method_of_subclass(member): + # should be member method + if not inspect.ismethod(member): + return False + + # should be "public" + if member.__name__.startswith('_'): + return False + + # should be method of the actual ServiceMessageHandler implementation + if not issubclass(member.__self__.__class__, self.__class__): + return False + + # should not be method of the excluded super classes + super_class_name = member.__qualname__.partition(".")[0] + return super_class_name not in excluded_class_names + - def _onListenLoopBegin(self): - "Called before main processing loop is entered." - self.service_handler.prepare_loop() + for m in inspect.getmembers(self, is_public_method_of_subclass): + self.register_service_method(m[0], m[1]) - def _onBeforeReceiveMessage(self): - "Called in main processing loop just before a blocking wait for messages is done." - self.service_handler.prepare_receive() - def _handleMessage(self, lofar_msg): + def handle_message(self, msg: LofarMessage): + if not isinstance(msg, RequestMessage): + raise ValueError("%s: Ignoring non-RequestMessage: %s" % (self.__class__.__name__, msg)) + try: - if not isinstance(lofar_msg, RequestMessage): - logger.warning("%s ignoring non-RequestMessage %s", self.service_name, lofar_msg) - return - - # determine which handler method has to be called - if hasattr(self.service_handler, 'service2MethodMap') and '.' in lofar_msg.subject: - subject_parts = lofar_msg.subject.split('.') - method_name = subject_parts[-1] - if method_name in self.service_handler.service2MethodMap: - # pass the handling of this message on to the specific method for this service - serviceHandlerMethod = self.service_handler.service2MethodMap[method_name] - else: - raise ValueError('Unknown method %s on service %s' % (method_name, lofar_msg.subject)) - else: - serviceHandlerMethod = self.service_handler.handle_message + result = self.__service_handle_message(msg) + reply_msg = ReplyMessage(content=result, + handled_successfully=True, + subject=msg.reply_to, + error_message=None) + except Exception as e: + logger.warning(e) + reply_msg = ReplyMessage(content=None, + handled_successfully=False, + subject=msg.reply_to, + error_message=str(e)) + + with ToBus(self.exchange, broker=self.broker) as reply_to_sender: + reply_to_sender.send(reply_msg) + + if not reply_msg.handled_successfully: + raise Exception(reply_msg.error_message) + + + def __service_handle_message(self, request_msg: RequestMessage) -> Optional[object]: + subject_prefix = "%s." % self.service_name + + if not request_msg.subject.startswith(subject_prefix): + # thanks to proper routing, this should never occur + raise ValueError("%s: unexpected subject for service_name=%s RequestMessage with subject %s" % ( + self.__class__.__name__, + self.service_name, + request_msg.subject)) - # check for positional arguments and named arguments - # depending on presence of args and kwargs, - # the signature of the handler method should vary as well - if lofar_msg.has_args and lofar_msg.has_kwargs: + # do lookup based on subject without the service_name prefix + stripped_subject = request_msg.subject.replace(subject_prefix, "", 1) + + if stripped_subject not in self._subject_to_method_map: + raise KeyError("%s: No known handler method for a RequestMessage with subject %s" % ( + self.__class__.__name__, request_msg.subject)) + + service_handler_method = self._subject_to_method_map[stripped_subject] + + try: + # depending on has_args and has_kwargs, unpack the message content as/into function parameters, + # and call the service_handler_method with the appropiate arguments. + # return True, and the function result as a tuple. + if request_msg.has_args and request_msg.has_kwargs: # both positional and named arguments # rpcargs and rpckwargs are packed in the content - rpcargs = lofar_msg.content - - # rpckwargs is the last argument in the content - # rpcargs is the rest in front - rpckwargs = rpcargs[-1] - del rpcargs[-1] - rpcargs = tuple(rpcargs) - replymessage = serviceHandlerMethod(*rpcargs, **rpckwargs) - elif lofar_msg.has_args: + rpc_args = tuple(request_msg.content[:-1]) + rpc_kwargs = request_msg.content[-1] + + return service_handler_method(*rpc_args, **rpc_kwargs) + elif request_msg.has_args: # only positional arguments - # msg.content should be a list - rpcargs = tuple(lofar_msg.content) - replymessage = serviceHandlerMethod(*rpcargs) - elif lofar_msg.has_kwargs: + # msg.content should be a list/tuple + rpc_args = tuple(request_msg.content) + return service_handler_method(*rpc_args) + elif request_msg.has_kwargs: # only named arguments # msg.content should be a dict - rpckwargs = lofar_msg.content - replymessage = serviceHandlerMethod(**rpckwargs) - elif lofar_msg.content: - rpccontent = lofar_msg.content - replymessage = serviceHandlerMethod(rpccontent) + rpc_kwargs = dict(request_msg.content) + return service_handler_method(**rpc_kwargs) + elif request_msg.content: + rpc_content = request_msg.content + return service_handler_method(rpc_content) else: - replymessage = serviceHandlerMethod() - - self._send_reply(replymessage,"OK",lofar_msg.reply_to) + return service_handler_method() except Exception as e: - # Any thrown exceptions either Service exception or unhandled exception - # during the execution of the service handler is caught here. - logger.debug("handling exception") - exc_info = sys.exc_info() - status="ERROR" - rawbacktrace = traceback.format_exception(*exc_info) - errtxt = rawbacktrace[-1] - logger.debug(rawbacktrace) - # cleanup the backtrace print by removing the first two lines and the last - del rawbacktrace[1] - del rawbacktrace[0] - del rawbacktrace[-1] - backtrace = ''.join(rawbacktrace).encode('latin-1').decode('unicode_escape').rstrip() - logger.error("exception while handling message: %s %s" % (errtxt, backtrace)) - logger.debug("[Service:] Status: %s", str(status)) - logger.debug("[Service:] ERRTXT: %s", str(errtxt)) - logger.debug("[Service:] BackTrace: %s", str( backtrace )) - self._send_reply(None, status, lofar_msg.reply_to, errtxt=errtxt, backtrace=backtrace) - - def _onAfterReceiveMessage(self, successful): - "Called in the main loop after the result was send back to the requester." - "@successful@ reflects the state of the handling: true/false" - self.service_handler.finalize_handling(successful) - - def _onListenLoopEnd(self): - "Called after main processing loop is finished." - self.service_handler.finalize_loop() - -__all__ = ["Service", "MessageHandlerInterface"] + raise Exception("%s: Error while handling msg with subject %s in service %s in method %s: %s" % ( + self.__class__.__name__, + request_msg.subject, + self.service_name, + service_handler_method.__name__, + e)) + +class Service(BusListener): + def __init__(self, service_name: str, + handler_type: ServiceMessageHandler.__class__, + handler_kwargs: dict = None, + exchange: str = DEFAULT_BUSNAME, + num_threads: int = 1, + broker: str = DEFAULT_BROKER): + + if not issubclass(handler_type, ServiceMessageHandler): + raise TypeError("handler_type should be a ServiceMessageHandler subclass") + + self.service_name = service_name + self.exchange = exchange + + # call the BusListener's contructor + # and make sure it connects to the exchange via a designated queue using the service's name as routing_key filter. + super(Service, self).__init__(exchange=exchange, routing_key="%s.#"%service_name, + handler_type = handler_type, + handler_kwargs = handler_kwargs, + num_threads=num_threads, broker=broker) + + def _create_handler(self): + service_handler = self._handler_type(**self._handler_kwargs) + service_handler.service_name = self.service_name + service_handler.exchange = self.exchange + service_handler.broker = self.broker + service_handler.register_public_handler_methods() + + return service_handler + + + +if __name__ == "__main__": + logging.basicConfig(format='%(levelname)s %(threadName)s %(message)s', level=logging.DEBUG) + logging.getLogger('lofar.messaging.messagebus').level = logging.INFO + + # run the doctests in this module + import doctest + doctest.testmod(verbose=True, report=True) \ No newline at end of file diff --git a/LCS/Messaging/python/messaging/__init__.py b/LCS/Messaging/python/messaging/__init__.py index e0535559e7b..8f0be5053c6 100644 --- a/LCS/Messaging/python/messaging/__init__.py +++ b/LCS/Messaging/python/messaging/__init__.py @@ -21,22 +21,12 @@ # $Id: __init__.py 1568 2015-09-18 15:21:11Z loose $ """ -Module initialization file. +Lofar's messaging package. +seealso:: lofar.messaging.messagebus """ -from .exceptions import * -from .messages import * -from .messagebus import * -from .RPC import * -from .Service import * -import logging from lofar.common import isProductionEnvironment, isTestEnvironment -def setQpidLogLevel(qpidLogLevel): - for name, logger in list(logging.Logger.manager.loggerDict.items()): - if name.startswith('qpid.') and isinstance(logger, logging.Logger): - logger.setLevel(qpidLogLevel) - def adaptNameToEnvironment(name): if isProductionEnvironment(): return name #return original name only for PRODUCTION LOFARENV diff --git a/LCS/Messaging/python/messaging/config.py b/LCS/Messaging/python/messaging/config.py index 106cce1cc58..b75f20ac470 100644 --- a/LCS/Messaging/python/messaging/config.py +++ b/LCS/Messaging/python/messaging/config.py @@ -1,22 +1,30 @@ +from lofar.messaging import adaptNameToEnvironment from lofar.common import isProductionEnvironment, isTestEnvironment -def adaptNameToEnvironment(name): - if isProductionEnvironment(): - return name #return original name only for PRODUCTION LOFARENV +# the DEFAULT_BROKER that's used in lofar's messaging refers to the single +# broker at either the production or test scu, depending on the runtime environment. +# For a non-production/non-test env, just use localhost. +DEFAULT_BROKER = "scu001.control.lofar" if isProductionEnvironment() else \ + "scu199.control.lofar" if isTestEnvironment() else \ + "localhost" - if isTestEnvironment(): - return 'test.%s' % name #return 'test.' prefixed name only for TEST LOFARENV +DEFAULT_PORT = 5675 if isProductionEnvironment() or isTestEnvironment() else 5672 - # in all other cases prefix queue/bus name with 'devel.' - return 'devel.%s' % name +# # import the user and password from RabbitMQ 'db'credentials +# try: +# from lofar.common.dbcredentials import DBCredentials +# _db_creds = DBCredentials().get("RabbitMQ") +# DEFAULT_USER = _db_creds.user +# DEFAULT_PASSWORD = _db_creds.password +# except: +# DEFAULT_USER = "guest" +# DEFAULT_PASSWORD = "guest" -# Default settings for often used parameters. -if isProductionEnvironment(): - DEFAULT_BROKER = "scu001.control.lofar" -elif isTestEnvironment(): - DEFAULT_BROKER = "scu199.control.lofar" -else: # development environment - DEFAULT_BROKER = "localhost" +DEFAULT_USER = "guest" +DEFAULT_PASSWORD = "guest" -# default exchange to use for all messages +# default exchange to use for publishing messages DEFAULT_BUSNAME = adaptNameToEnvironment("lofar") + +# default timeout in seconds +DEFAULT_TIMEOUT = 5 \ No newline at end of file diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index ea3d9dcd6f1..c67bffb45e2 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -1,4 +1,3 @@ - # messagebus.py: Provide an easy way exchange messages on the message bus. # # Copyright (C) 2015 @@ -18,34 +17,188 @@ # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. -# -# $Id: messagebus.py 1580 2015-09-30 14:18:57Z loose $ """ -Provide an easy way exchange messages on the message bus. +LOFAR's common and simple messaging API around AMQP messaging frameworks. + +For more background information on messaging: + - AMQP (the protocol): https://www.amqp.org and https://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol + - RabbitMQ (the broker): https://www.rabbitmq.com/ + - Kombu (the python client used in this package): https://kombu.readthedocs.io/en/stable/ + +Concepts (the quick common background summary): +Messaging is used to provide fast robust reliable communication between software programs. +Messages (containing your valuable information) are exchanged via a so-called broker. We use RabbitMQ for that. +Messages are 'published' on so called 'exchanges' on the broker. +If you're listening on that exchange, you receive the message. But if you're not listening, the message is lost for you forever. +That's why there are also 'queues' (on the broker). A queue can hold messages for you, even if you're not listening, and +you'll receive them as soon as you start listening. + +The trick of a well used messaging system is to have single or a only few exchanges on which every program publishes his +messages with a proper 'subject' (also called 'routing_key'), and have the broker route the message to zero or more +queues which are interested in these particular messages. It makes sense to have one or more designated queue's for each program, +so each program receives only the messages which are interesting for him. + +LOFAR's messaging way: +The two core classes are: + - the ToBus to publish messages on an exchange + - the FromBus to receive messages from a queue + +RabbitMQ provides a nice webinterface and CLI to setup exchanges and queues, but we also provide some convenience methods, +Let's use them here in an example to setup a simple exchange, and bind a queue to it. +We're assuming you're running a RabbitMQ broker on localhost. + +>>> create_bound_queue("my.exchange", "my.queue") + +Now we have done three things: +1) create an exchange on the broker called "my.exchange" +2) create a queue on the broker called "my.queue" +3) create a binding between the two routing all messages from the exchange to the queue + +Let's send a message to the bus: +>>> with ToBus("my.exchange") as tobus: +... tobus.send(EventMessage(subject="some.event", content="foo")) + +That's it, it's that simple. So, what just happened? +By constructing a ToBus instance in a python 'with' context we make sure that the connection to the broker is cleaned up. +Calling tobus.send can send any LofarMessage to the exchange, in this case an EventMessage. +The message has been sent (published), and we can now forget about it... + +... or we can listen for interesting messages with a FromBus, like so: +>>> with FromBus("my.queue") as frombus: +... msg = frombus.receive() +... print(msg.content) +... +foo + +Notice that we *did* receive the message, even after contructing a FromBus after the message was send! +That's because it was stored in the queue at the broker, ready to be delivered as soon as we started listening. + +Let's be nice, and cleanup our exchange and queue at the broker, like so: +>>> delete_exchange("my.exchange") +True +>>> delete_queue("my.queue") +True + + +These are the basics, now let's move one to the more interesting usage, +for example working with dynamically created/deleted exchanges/queues. +This is a typical pattern used in many tests where we want unique short-lived exchanges and queues, +which are guaranteed to be deleted upon test completion. +Another use-case is for example in the RPC/Service usage, but that's a later example. + +>>> # create a TemporaryExchange in a context, so it's automagically created and deleted +... with TemporaryExchange("my.exchange") as tmp_exchange: +... +... # create a TemporaryQueue in a context, so it's automagically created and deleted +... # connect/bind it to the tmp_exchange +... with TemporaryQueue("my.queue", exchange=tmp_exchange.address) as tmp_queue: +... +... # use the convenience factory method to create a ToBus instance on the exchange +... # notice that it's also used in a context for automatic connect/disconnect. +... with tmp_exchange.create_tobus() as tobus: +... tobus.send(EventMessage(subject="some.event", content="foo")) +... +... # and finally use the convenience factory method to create a FromBus instance on the queue +... # notice that it's also used in a context for automatic connect/disconnect. +... with tmp_queue.create_frombus() as frombus: +... msg = frombus.receive() +... print(msg.content) +... +foo + +Ok, until now the examples were simple, and only sending/receiving a single message... +In practice most of our programs are event-driven, and act on received messages. +That means we should be able to continuously listen for messages, and handle them when we receive any. +That's what the BusListener is for. It's a core class used in many lofar programs. +Let's illustrate with an example... + - Use the now familiar TemporaryExchange and TemporaryQueue + - Define a concrete implementation of an handler for the BusListener: MyMessageHandler + - Show how the BusListener is used, and how the MyMessageHandler is injected. + - an additional feature shown here is the use of the routing_key from the tmp_exchange to the tmp_queue: + only messages with subject 'some.#' are routed to the queue, and hence received by the buslistener. + +>>> with TemporaryExchange("my.exchange") as tmp_exchange: +... with tmp_exchange.create_tobus() as tobus: +... with TemporaryQueue("my.queue", exchange=tmp_exchange.address, routing_key="some.#") as tmp_queue: +... +... # implement an example AbstractMessageHandler which just prints the received message subject and content +... class MyMessageHandler(AbstractMessageHandler): +... def handle_message(self, lofar_msg): +... print(lofar_msg.subject, lofar_msg.content) +... # ... do some more fancy stuff with the msg... +... +... # construct a BusListener instance in a context, +... # so it starts/stops listening and and handling messages automagically +... with BusListener(MyMessageHandler, queue=tmp_queue.address): +... tobus.send(EventMessage(subject="some.event", content="foo")) +... tobus.send(EventMessage(subject="another.event", content="xyz")) +... tobus.send(EventMessage(subject="some.event", content="bar")) +... +... # ... do some work ... simulate this by sleeping a little... +... # ...in the mean time, BusListener receives and handles the messages (on its own thread) +... from time import sleep +... sleep(0.5) +... +some.event foo +some.event bar + +In practice you might find it a too big hassle to setup a designated queue for such a listener. It's also easy to make +a mess of queue names, routing keys, etc etc... So, isn't there a uniform and simple way to set up a designated queue +for each listener? Yes, there is: just provide the general exchange name and a routing_key to the listener, and the +designated queue is created automagically for you, with the following standard name: <exchange_name>.for.<program_name>.<routing_key> +Please note that this queue is not deleted upon exit, and that's by design! This way, all our lofar programs use the same +paradigm to create queues, and broker queue confiration is simplified and uniform. +Suppose that, as in the following example, or in unittests etc, you would like to leave the system as clean as you found it. +Then you want to get rid of the auto-generated queue for this listener. Use the BusListenerJanitor! +Let's illustrate this with a slight midification of the above example... + - Use the now familiar TemporaryExchange (but no TemporaryQueue!) + - Define again aconcrete implementation of an BusListener: BusListener + - Show how the BusListener is used: + - let it bind automagically to the exchange this time, via the standarized auto-generated queue (including filtering on subject 'some.#') + - use the BusListenerJanitor to do the cleanup of the auto-generated queue for us. + + +>>> with TemporaryExchange("my.exchange") as tmp_exchange: +... with tmp_exchange.create_tobus() as tobus: +... +... # construct a BusListener instance in a BusListenerJanitor context, +... # so it starts/stops listening and and handling messages automagically +... # and have the auto-generated buslistener's queue be auto-deleted via the janitor. +... with BusListenerJanitor(BusListener(MyMessageHandler, exchange=tmp_exchange.address, routing_key="some.#")): +... tobus.send(EventMessage(subject="some.event", content="foo")) +... tobus.send(EventMessage(subject="another.event", content="xyz")) +... tobus.send(EventMessage(subject="some.event", content="bar")) +... +... # ... do some work ... simulate this by sleeping a little... +... # ...in the mean time, BusListener receives and handles the messages (on its own thread) +... from time import sleep +... sleep(0.25) +... +some.event foo +some.event bar + + + """ -from amqp.exceptions import NotFound -from lofar.messaging.exceptions import MessagingError -from lofar.messaging.messages import convert_to_lofar_message -from lofar.common.util import raise_exception -from lofar.common import isProductionEnvironment, isTestEnvironment + +from lofar.messaging.exceptions import * +from lofar.messaging.messages import * +from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_TIMEOUT, DEFAULT_PORT, DEFAULT_USER, DEFAULT_PASSWORD from lofar.common.threading_utils import TimeoutLock from lofar.common.util import program_name -import kombu -import kombu.exceptions -import amqp.exceptions -import logging +import kombu, kombu.exceptions, amqp.exceptions import uuid import threading +from typing import Optional from datetime import datetime from queue import Empty as EmptyQueueError +from socket import gaierror +import logging logger = logging.getLogger(__name__) -from .config import DEFAULT_BROKER, DEFAULT_BUSNAME -DEFAULT_TIMEOUT = 5 - # some serializers are considered 'insecure', but we know better ;) # so enable the python pickle serializer kombu.enable_insecure_serializers(['pickle']) @@ -54,6 +207,7 @@ kombu.enable_insecure_serializers(['pickle']) logging.getLogger("amqp").setLevel(logging.INFO) + def create_exchange(name: str, durable: bool=True, broker: str=DEFAULT_BROKER) -> bool: """ create a message exchange of type topic with the given name on the given broker @@ -61,10 +215,10 @@ def create_exchange(name: str, durable: bool=True, broker: str=DEFAULT_BROKER) - :param durable: if True, then the exchange 'survives' broker restarts :param broker: a message broker address :raises: MessagingError if the exchange could not be created - :return True if created, False if not-created because it already exists + :return True if created, False if not-created (because it already exists) """ try: - with kombu.Connection(hostname=broker) as connection: + with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection: exchange = kombu.Exchange(name, durable=durable, type='topic') try: exchange.declare(channel=connection.default_channel, passive=True) @@ -82,10 +236,10 @@ def delete_exchange(name: str, broker: str=DEFAULT_BROKER) -> bool: :param name: the name for the exchange :param broker: a message broker address :raises: MessagingError if the exchange could not be deleted - :return True if deleted, False if not-deleted because it does not exist + :return True if deleted, False if not-deleted (because it does not exist) """ try: - with kombu.Connection(hostname=broker) as connection: + with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection: exchange = kombu.Exchange(name, channel=connection) try: exchange.declare(channel=connection.default_channel, passive=True) @@ -105,10 +259,10 @@ def create_queue(name: str, durable: bool=True, broker: str=DEFAULT_BROKER) -> b :param durable: if True, then the queue 'survives' broker restarts :param broker: a message broker address :raises: MessagingError if the queue could not be created - :return True if created, False if not-created because it already exists + :return True if created, False if not-created (because it already exists) """ try: - with kombu.Connection(hostname=broker) as connection: + with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection: queue = kombu.Queue(name, durable=durable) try: queue.queue_declare(channel=connection.default_channel, passive=True) @@ -126,10 +280,10 @@ def delete_queue(name: str, broker: str=DEFAULT_BROKER) -> bool: :param name: the name for the queue :param broker: a message broker address :raises: MessagingError if the queue could not be deleted - :return True if deleted, False if not-deleted because it does not exist + :return True if deleted, False if not-deleted (because it does not exist) """ try: - with kombu.Connection(hostname=broker) as connection: + with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection: queue = kombu.Queue(name, no_declare=True, channel=connection) try: queue.queue_declare(channel=connection.default_channel, passive=True) @@ -142,170 +296,160 @@ def delete_queue(name: str, broker: str=DEFAULT_BROKER) -> bool: raise MessagingError("Could not delete queue %s on broker %s error=%s" % (name, broker, e)) -def create_binding(exchange_name: str, queue_name: str, routing_key: str='#', durable: bool=True, broker: str=DEFAULT_BROKER): +def create_binding(exchange: str, queue: str, routing_key: str='#', durable: bool=True, broker: str=DEFAULT_BROKER): """ create a binding between the exchange and queue, possibly filtered by the routing_key, on the given broker. please note that this only creates the binding, not the exchange, nor the queue. See also create_bound_queue(...) - :param exchange_name: the name for the exchange - :param queue_name: the name for the queue + :param exchange: the name for the exchange + :param queue: the name for the queue :param routing_key: filter only messages with the given routing_key to the queue :param durable: if True, then the queue 'survives' broker restarts :param broker: a message broker address """ try: - with kombu.Connection(hostname=broker) as connection: - exchange = kombu.Exchange(exchange_name, durable=durable, type='topic', no_declare=True) - queue = kombu.Queue(queue_name, exchange=exchange, routing_key=routing_key, durable=durable, no_declare=True) + with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection: + exchange = kombu.Exchange(exchange, durable=durable, type='topic', no_declare=True) + queue = kombu.Queue(queue, exchange=exchange, routing_key=routing_key, durable=durable, no_declare=True) queue.queue_bind(channel=connection.default_channel) - logger.debug("Added binding from exchange %s to queue %s with routing_key %s on broker %s", exchange_name, - queue_name, + logger.debug("Added binding from exchange %s to queue %s with routing_key %s on broker %s", exchange, + queue, routing_key, broker) return True except amqp.exceptions.NotFound as e: - raise MessagingError("Could not create binding from exchange %s to queue %s with routing_key %s " \ - " on broker %s because either the exchange or queue does not exist. error=%s" % (exchange_name, - queue_name, + raise MessageBusError("Could not create binding from exchange %s to queue %s with routing_key %s " \ + " on broker %s because either the exchange or queue does not exist. error=%s" % (exchange, + queue, routing_key, broker, e)) except Exception as e: - raise MessagingError("Could not create binding from exchange %s to queue %s with routing_key %s " \ - " on broker %s error=%s" % (exchange_name, queue_name, routing_key, broker, e)) + raise MessageBusError("Could not create binding from exchange %s to queue %s with routing_key %s " \ + " on broker %s error=%s" % (exchange, queue, routing_key, broker, e)) -def create_bound_queue(exchange_name, queue_name, routing_key='#', durable=True, broker=DEFAULT_BROKER): +def create_bound_queue(exchange: str, queue: str, routing_key: str='#', durable: bool=True, broker: str=DEFAULT_BROKER): """ - create a exchange (if needed), queue (if needed), and the in-between binding, possibly filtered by the routing_key, on the given broker. - :param exchange_name: the name for the exchange - :param queue_name: the name for the queue + create an exchange (if needed), queue (if needed), and the in-between binding, possibly filtered by the routing_key, on the given broker. + :param exchange: the name for the exchange + :param queue: the name for the queue :param routing_key: filter only messages with the given routing_key to the queue :param durable: if True, then the queue 'survives' broker restarts :param broker: a message broker address """ - create_exchange(exchange_name, durable=durable, broker=broker) - create_queue(queue_name, durable=durable, broker=broker) - create_binding(exchange_name, queue_name, routing_key, durable=durable, broker=broker) + create_exchange(exchange, durable=durable, broker=broker) + create_queue(queue, durable=durable, broker=broker) + create_binding(exchange, queue, routing_key, durable=durable, broker=broker) class _AbstractBus: """ - Common class for ToBus and Frombus, providing an easy way to connectto the qpid message bus. - Note that most methods require that a FromBus/ToBus object is used *inside* a - context. When entering the context, the connection with the broker is - opened, and a session and a receiver are created. When exiting the context, - the connection to the broker is closed; as a side-effect the receiver(s) - and session are destroyed. - - note:: The rationale behind using a context is that this is (unfortunately) - the *only* way that we can guarantee proper resource management. If there - were a __deinit__() as counterpart to __init__(), we could have used that. - We cannot use __del__(), because it is not the counterpart of __init__(), - but that of __new__(). + Common class for ToBus and FromBus, providing an common way to connect to the amqp message bus. """ - def __init__(self, address, broker=None, connection_log_level=logging.DEBUG): + def __init__(self, address: str, broker: str=DEFAULT_BROKER): """ - Initializer. - :param address: valid Qpid address - :param broker: valid Qpid broker URL, e.g. "localhost:5672" + Constructor, specifying the address of the queue or exchange to connect to on the given broker. + :param address: the 'name' of the queue or exchange to connect to. + :param broker: the valid broker url, like 'localhost' """ - self.address, self.subject = _AbstractBus._split_address_and_subject(address) - self.broker = broker if broker else DEFAULT_BROKER - self._connected = False - self._connection_log_level = connection_log_level + self.address = address + self.broker = broker + self._connection = None self._lock = TimeoutLock() - @staticmethod - def _split_address_and_subject(address): - """ - :return: Tuple of real address and (optional) subject - """ - # parse/split address into queue/exchange name and (optional) subject - if address and '/' in address: - return address.split('/') - - return address, None - - def is_connected(self): - return self._connected - - def isConnected(self): - #deprecated. Use is_connected() instead. Kept for backwards compatibility. - return self.is_connected() + def is_connected(self) -> bool: + """Is this instance connected to the bus? """ + with self._lock: + return self._connection is not None def open(self): """ - The following actions will be performed when entering a context: - * connect to the broker - * add a receiver - The connection to the broker will be closed if any of these failed. - :raise MessagingError: if any of the above actions failed. - :return: self + Open a connection to the broker, and connect to the endpoint (a receiver for a FromBus, a sender for a ToBus) + It is recommended to not use open() and close() directly, but in a 'with' context. + :raise MessagingError: in case connecting to the broker or the address failed. """ try: with self._lock: - if self._connected: + if self.is_connected(): return logger.debug("[%s] Connecting to broker: %s", self.__class__.__name__, self.broker) - self.connection = kombu.Connection(hostname=self.broker) - self.connection.connect() + self._connection = kombu.Connection(hostname=self.broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) + self._connection.connect() logger.debug("[%s] Connected to broker: %s", self.__class__.__name__, self.broker) # let the subclass (FromBus or ToBus) create a receiver of sender self._connect_to_endpoint() - self._connected = True - except Exception as ex: error_msg = "[%s] Connecting to %s at broker %s failed: %s" % (self.__class__.__name__, self.address, self.broker, ex) - if isinstance(ex, NotFound): + if isinstance(ex, amqp.exceptions.NotFound) or isinstance(ex, gaierror) or isinstance(ex, OSError): + # log "normal" known exceptions as error... logging.error(error_msg) else: + # log other exceptions with stack trace... logging.exception(error_msg) - raise MessagingError(error_msg) + # it is possible that a connection was established already + # raising the MessageBusError below will not call the close method via the 'with' context manager + # because the __enter__ was not finished yet + # so, we have to close any connections here our selves. + self.close() + + # ... and finally raise a MessageBusError ourselves with an error message + raise MessageBusError(error_msg) def close(self): """ - Disconnect from the subscribed address and close the connection. + Disconnect from the endpoint (a receiver for a FromBus, a sender for a ToBus), and close the connection to the broker. + It is recommended to not use open() and close() directly, but in a 'with' context. + :raise MessagingError: in case disconnecting from the broker or the address failed. """ - try: - with self._lock: - if not self._connected: - return + with self._lock: + if not self.is_connected(): + return + try: self._disconnect_from_endpoint() - - logger.debug("[%s] Disconnecting from broker: %s", self.__class__.__name__, self.broker) - self.connection.close() - logger.log(self._connection_log_level, "[%s] Disconnected from broker: %s", self.__class__.__name__, self.broker) - except Exception as ex: - if isinstance(ex, AttributeError) and 'drain_events' in str(ex): - # looks like a kombu bug, just continue - pass - else: - error_msg = "[%s] Disconnecting from %s at broker %s failed: %s" % (self.__class__.__name__, - self.address, - self.broker, - ex) + except Exception as ex: + error_msg = "[%s] Disconnecting from endpoint %s at broker %s failed: %s" % (self.__class__.__name__, + self.address, + self.broker, + ex) logger.exception(error_msg) raise MessagingError(error_msg) - finally: - del self.connection - self.connection = None - self._connected = False - def __enter__(self): + try: + logger.debug("[%s] Disconnecting from broker: %s", self.__class__.__name__, self.broker) + self._connection.close() + logger.debug("[%s] Disconnected from broker: %s", self.__class__.__name__, self.broker) + except Exception as ex: + if isinstance(ex, AttributeError) and 'drain_events' in str(ex): + # looks like a kombu bug, just continue + pass + else: + error_msg = "[%s] Disconnecting from %s at broker %s failed: %s" % (self.__class__.__name__, + self.address, + self.broker, + ex) + + logger.exception(error_msg) + raise MessagingError(error_msg) + finally: + self._connection = None + + def __enter__(self) -> '_AbstractBus': + """Open the connection when entering a 'with' context.""" self.open() return self def __exit__(self, exc_type, exc_val, exc_tb): + """Close the connection leaving the 'with' context.""" self.close() def _connect_to_endpoint(self): @@ -319,64 +463,76 @@ class _AbstractBus: class FromBus(_AbstractBus): """ - This class provides an easy way to fetch messages from the message bus. - Note that most methods require that a FromBus object is used *inside* a - context. When entering the context, the connection with the broker is - opened, and a session and a receiver are created. When exiting the context, - the connection to the broker is closed; as a side-effect the receiver(s) - and session are destroyed. - - note:: The rationale behind using a context is that this is (unfortunately) - the *only* way that we can guarantee proper resource management. If there - were a __deinit__() as counterpart to __init__(), we could have used that. - We cannot use __del__(), because it is not the counterpart of __init__(), - but that of __new__(). + A FromBus provides an easy way to fetch messages from the message bus. + The recommended way is to use a FromBus in a 'with' context, like so. + + >>> # use a TemporaryQueue where we can let the FromBus connect to + ... with TemporaryQueue() as tmp_queue: + ... # create a new FromBus, use it in a context. + ... with FromBus(address=tmp_queue.address) as frombus: + ... print("connected =", frombus.is_connected()) + ... + ... # try to receive a message (there is None, cause nobody sent any) + ... msg = frombus.receive(timeout=0.1) + ... print("msg =", msg) + ... + ... # left context, so is_connected should be false now. + ... print("connected =", frombus.is_connected()) + ... + connected = True + msg = None + connected = False """ - def __init__(self, address, broker=None): + def __init__(self, address: str, broker: str=DEFAULT_BROKER): """ - Initializer. - :param address: valid Qpid address - :param broker: valid Qpid broker URL, e.g. "localhost:5672" + Constructor, specifying the address of the queue to connect to on the given broker. + :param address: the 'name' of the queue to connect to. + :param broker: the valid broker url, like 'localhost' """ + self._unacked_messages = {} + self._receiver = None super(FromBus, self).__init__(address=address, broker=broker) - def _connect_to_endpoint(self): - logger.debug("[FromBus] Connecting receiver to: %s with subject: %s on broker: %s" % (self.address, - self.subject, - self.broker)) + """ + Implementation of abstract method. Connect a receiver to the broker queue specified by address. + Can raise kombu/amqp exceptions, which are handled in the _AbstractBus. + """ + logger.debug("[FromBus] Connecting receiver to: %s on broker: %s", self.address, self.broker) queue = kombu.Queue(self.address, no_declare=True) # try to passivly declare the queue on the broker, raises if non existent - queue.queue_declare(passive=True, channel=self.connection.default_channel) + queue.queue_declare(passive=True, channel=self._connection.default_channel) - self._receiver = self.connection.SimpleQueue(queue) + self._receiver = self._connection.SimpleQueue(queue) - logger.log(self._connection_log_level, "[FromBus] Connected receiver to: %s on broker: %s" % (self.address, self.broker)) + logger.debug("[FromBus] Connected receiver to: %s on broker: %s", self.address, self.broker) def _disconnect_from_endpoint(self): + """ + Implementation of abstract method. Disconnect the receiver from the broker queue. + Can raise kombu/amqp exceptions, which are handled in the _AbstractBus. + """ if self._receiver is not None: - logger.debug("[FromBus] Disconnecting receiver from bus: %s with subject: %s on broker: %s" % (self.address, - self.subject, - self.broker)) + logger.debug("[FromBus] Disconnecting receiver from bus: %s on broker: %s", self.address, self.broker) self._receiver.close() self._receiver = None - logger.log(self._connection_log_level, "[FromBus] Disconnected receiver from bus: %s with subject: %s on broker: %s" % (self.address, - self.subject, - self.broker)) + logger.debug("[FromBus] Disconnected receiver from bus: %s on broker: %s", + self.address, self.broker) - def receive(self, timeout=DEFAULT_TIMEOUT): + def receive(self, timeout: float=DEFAULT_TIMEOUT, acknowledge: bool = True) -> Optional[LofarMessage]: """ - Receive the next message from any of the queues we're listening on. + Receive the next message from the queue we're listening on. :param timeout: maximum time in seconds to wait for a message. - :return: received message, None if timeout occurred. + :param acknowledge: if True, then automatically acknowledge the received message + :return: received message, or None if timeout occurred. """ - msg = None + kombu_msg = None start = datetime.utcnow() with self._lock.timeout_context(timeout=timeout) as have_lock: @@ -385,22 +541,21 @@ class FromBus(_AbstractBus): try: elapsed_sec = (datetime.utcnow() - start).total_seconds() - msg = self._receiver.get(timeout=max(timeout-elapsed_sec, 0.001)) - logger.debug("[FromBus] Message received on: %s mgs: %s" % (self.address, msg)) + kombu_msg = self._receiver.get(timeout=max(timeout-elapsed_sec, 0.001)) + logger.debug("[FromBus] Message received on: %s mgs: %s" % (self.address, kombu_msg)) # convert kombu msg to lofarmessage - lofar_msg = convert_to_lofar_message(msg) - - # filter by subject here in the client. - # See: https://derickbailey.com/2015/07/22/airport-baggage-claims-selective-consumers-and-rabbitmq-anti-patterns/ - # RabbitMQ/Kombu think having consumers which filter by subject is an anti-pattern, so it's not supported out of the box. - # So, let's filter here, and reject any message that does not match. - if self.subject and self.subject != lofar_msg.subject: - msg.reject(requeue=False) - return None - else: - msg.ack() - return lofar_msg + lofar_msg = convert_to_lofar_message(kombu_msg) + + # keep track of unacked messages + # the outside world only knows about lofar messages, so track them based on the lofar_message id. + # also keep track of thread id, because ack'ing/rejecting messages across threads is a bad idea! + self._unacked_messages[lofar_msg.id] = (kombu_msg, threading.current_thread().ident) + + if acknowledge: + self.ack(lofar_msg) + + return lofar_msg except kombu.exceptions.TimeoutError: return None @@ -408,75 +563,112 @@ class FromBus(_AbstractBus): return None except Exception as e: logger.exception(e) - if msg: - msg.reject() - raise_exception(MessagingError, - "[FromBus] unknown exception while receiving message on %s: %s" % (self.address, e)) + if kombu_msg: + kombu_msg.reject() + logger.debug("rejected msg %s", kombu_msg) + raise MessagingError("[FromBus] unknown exception while receiving message on %s: %s" % (self.address, e)) + + def ack(self, lofar_msg: LofarMessage): + """ + Acknowledge the message to the broker. + :param lofar_msg: the message to be ack'ed + """ + with self._lock: + kombu_msg, thread_id = self._unacked_messages.pop(lofar_msg.id, (None, None)) + if kombu_msg: + if threading.current_thread().ident != thread_id: + raise RuntimeError("Cannot acknowledge messages across threads") + + kombu_msg.ack() + logger.debug("acknowledged %s", lofar_msg) + + def reject(self, lofar_msg: LofarMessage): + """ + Reject the message to the broker. + :param lofar_msg: the message to be rejected + """ + with self._lock: + kombu_msg, thread_id = self._unacked_messages.pop(lofar_msg.id, (None, None)) + if kombu_msg: + if threading.current_thread().ident != thread_id: + raise RuntimeError("Cannot reject messages across threads") + + kombu_msg.reject() + logger.debug("rejected %s", lofar_msg) class ToBus(_AbstractBus): """ - This class provides an easy way to post messages onto the message bus. - - Note that most methods require that a ToBus object is used *inside* a - context. When entering the context, the connection with the broker is - opened, and a session and a sender are created. When exiting the context, - the connection to the broker is closed; as a side-effect the sender and - session are destroyed. - - note:: The rationale behind using a context is that this is (unfortunately) - the *only* way that we can guarantee proper resource management. If there - were a __deinit__() as counterpart to __init__(), we could have used that. - We cannot use __del__(), because it is not the counterpart of __init__(), - but that of __new__(). + A ToBus provides an easy way to send/publish messages to the message bus. + The recommended way is to use a ToBus in a 'with' context, like so. + + >>> # use a TemporaryExchange where we can let the ToBus connect to + ... with TemporaryExchange() as tmp_exchange: + ... # create a new ToBus, use it in a context. + ... with ToBus(exchange=tmp_exchange.address) as tobus: + ... print("connected =", tobus.is_connected()) + ... + ... # send a message to the exchange on the broker + ... tobus.send(EventMessage(content='foo')) + ... + ... # left context, so is_connected should be false now. + ... print("connected =", tobus.is_connected()) + ... + connected = True + connected = False """ - def __init__(self, address:str=DEFAULT_BUSNAME, broker=None, subject_based_routing=True): + def __init__(self, exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER): """ - Initializer. - :param address: valid Qpid address - :param broker: valid Qpid broker URL, e.g. "localhost:5672" - :param subject_based_routing: if True, then publish the message on this bus'es address which is assumed to be an exchange, - and set the given subject as routing_key of the message. - This assumes the broker is configured correcly to filter on the given - subject/routing_key, so the message is delivered to the correct consumer(s). - If False, then publish the message directly to this bus'es address which is assumed to be a queue. + Constructor, specifying the address of the exchange to connect to on the given broker. + :param exchange: the name of the exchange to connect to. + :param broker: the valid broker url, like 'localhost' """ - super(ToBus, self).__init__(address=address, broker=broker) - - self._subject_based_routing = subject_based_routing + self._sender = None + super(ToBus, self).__init__(address=exchange, broker=broker) def _connect_to_endpoint(self): + """ + Implementation of abstract method. Connect a sender to the broker exchange specified by address. + Can raise kombu/amqp exceptions, which are handled in the _AbstractBus. + """ logger.debug("[ToBus] Connecting sender to: %s on broker: %s" % (self.address, self.broker)) - # self._sender = self.connection.create_sender(address=self.address) - self._sender = kombu.Producer(self.connection) + # self._sender = self._connection.create_sender(address=self.address) + self._sender = kombu.Producer(self._connection) - logger.log(self._connection_log_level, "[ToBus] Connected sender to: %s on broker: %s" % (self.address, self.broker)) + logger.debug("[ToBus] Connected sender to: %s on broker: %s" % (self.address, self.broker)) def _disconnect_from_endpoint(self): + """ + Implementation of abstract method. Disconnect the sender from the broker exchange specified by address. + Can raise kombu/amqp exceptions, which are handled in the _AbstractBus. + """ if self._sender is not None: logger.debug("[ToBus] Disconnecting sender from: %s on broker: %s" % (self.address, self.broker)) self._sender.close() self._sender = None - logger.log(self._connection_log_level, "[ToBus] Disconnected sender from: %s on broker: %s" % (self.address, self.broker)) + logger.debug("[ToBus] Disconnected sender from: %s on broker: %s" % (self.address, self.broker)) - def send(self, message): + def send(self, message: LofarMessage, direct_address: str=None): """ - Send a message to the exchange (target) we're connected to. + Send a message to the exchange we're connected to, or... see param subject_based_routing :param message: message to be sent - :return: + :param direct_address: if given then publish the message directly this address which is assumed to be an existing queue on the broker. + Otherwise (the default behaviour) publish the message to this bus'es address which is assumed to be an exchange, + and use the message's subject as routing_key. This assumes the broker is configured correcly to filter on the given + subject/routing_key, so the message is delivered to the correct consumer(s). """ try: logger.debug("[ToBus] Sending message to: %s (%s)", self.address, message) kwargs_dict = message.as_kombu_publish_kwargs() - if self._subject_based_routing: + if direct_address: + kwargs_dict['routing_key'] = direct_address + kwargs_dict['exchange'] = '' + else: kwargs_dict['routing_key'] = message.subject kwargs_dict['exchange'] = self.address - else: - kwargs_dict['routing_key'] = self.address - kwargs_dict['exchange'] = '' with self._lock: self._sender.publish(serializer='pickle', **kwargs_dict) @@ -484,20 +676,26 @@ class ToBus(_AbstractBus): logger.debug("[ToBus] Sent message to: %s", self.address) except Exception as e: - raise_exception(MessagingError, "[ToBus] Failed to send message to: %s error=%s" % (self.address, e)) + raise MessagingError("[ToBus] Failed to send message to: %s error=%s" % (self.address, e)) class TemporaryExchange: """ A TemporaryExchange instance can be used to setup a dynamic temporary exchange which is closed and deleted automagically when leaving context. + + Particularly useful for testing, like so: + >>> with TemporaryExchange("my.exchange") as tmp_exchange: + ... with tmp_exchange.create_tobus() as tobus: + ... tobus.send(EventMessage(subject="some.event", content="foo")) + """ - def __init__(self, name_prefix: str, broker: str=DEFAULT_BROKER): + def __init__(self, name_prefix: str=None, broker: str=DEFAULT_BROKER): """ Create a TemporaryExchange instance with an optional name on the given broker. :param name_prefix: prefix for the final address which also includes a uuid. :param broker: the message broker to connect to. """ - self._name_prefix = name_prefix + self._name_prefix = name_prefix or "" self.broker = broker self._tmp_exchange = None self.address = None @@ -522,7 +720,8 @@ class TemporaryExchange: It is advised to use the TemporaryExchange instance in a 'with' context, which guarantees the close call. """ # create an identifiable address based on the given name which is also (almost) unique, and readable. - self.address = "%s-tmp-exchange-%s" % (self._name_prefix, uuid.uuid4().hex[:8]) + self.address = "%stmp-exchange-%s" % (self._name_prefix+"-" if self._name_prefix else "", + uuid.uuid4().hex[:8]) logger.debug("Creating TemporaryExchange at %s ...", self.address) create_exchange(name=self.address, broker=self.broker) logger.debug("Created TemporaryExchange at %s", self.address) @@ -541,41 +740,74 @@ class TemporaryExchange: self.address = None def __str__(self): - return "TemporaryExchange address=%s".format(self.address) + return "TemporaryExchange address=%s" % self.address def create_tobus(self): """ Factory method to create a ToBus instance which is connected to this TemporaryExchange :return: ToBus """ - return ToBus(broker=self.broker, address=self.address) + return ToBus(broker=self.broker, exchange=self.address) + class TemporaryQueue: """ A TemporaryQueue instance can be used to setup a dynamic temporary queue which is closed and deleted automagically when leaving context. - Together with the factory methods create_frombus and/or create_tobus it gives us to following simple but often used use case: - - with TemporaryQueue("MyTestQueue") as tmp_queue: - with tmp_queue.create_tobus() as tobus, tmp_queue.create_frombus() as frombus: - # send a message... - original_msg = EventMessage(content="foobar") - tobus.send(original_msg) - - # ...receive the message. - received_msg = frombus.receive() + Together with the TemporaryExchange and factory methods create_frombus and/or create_tobus it gives us + the following simple but often used use case: + + >>> with TemporaryExchange("my.exchange") as tmp_exchange: + ... with TemporaryQueue("my.queue", exchange=tmp_exchange.address, routing_key="some.#") as tmp_queue: + ... with tmp_queue.create_frombus() as frombus: + ... msg = frombus.receive(0.1) + ... print(msg) + ... + ... with tmp_exchange.create_tobus() as tobus: + ... tobus.send(EventMessage(subject="some.event", content="foo")) + ... tobus.send(EventMessage(subject="another.event", content="xyz")) + ... tobus.send(EventMessage(subject="some.event", content="bar")) + ... + ... msg = frombus.receive(0.1) + ... print(msg.content) + ... msg = frombus.receive(0.1) + ... print(msg.content) + ... msg = frombus.receive(0.1) + ... print(msg) + None + foo + bar + None Alternative use cases with only a tobus or only a frombus on the tmp_queue are also possible. + Here's an example with two TemporaryQueues, each with their own address, and a ToBus sending messages to each queue directly. + + >>> with TemporaryQueue(addressed_to_me_only=True) as tmp_queue1, TemporaryQueue(addressed_to_me_only=True) as tmp_queue2: + ... + ... with ToBus("") as tobus: + ... tobus.send(EventMessage(content='foo'), tmp_queue1.address) + ... tobus.send(EventMessage(content='bar'), tmp_queue2.address) + ... + ... with tmp_queue1.create_frombus() as frombus: + ... msg = frombus.receive(0.1) + ... print(msg.content) + ... + ... with tmp_queue2.create_frombus() as frombus: + ... msg = frombus.receive(0.1) + ... print(msg.content) + foo + bar + """ - def __init__(self, name_prefix: str=None, exchange_name: str=None, + def __init__(self, name_prefix: str=None, exchange: str=None, routing_key: str="#", addressed_to_me_only: bool = False, broker=DEFAULT_BROKER): """ Create a TemporaryQueue instance with an optional name on the given broker. :param name_prefix: Optional prefix for the final address which also includes a uuid. - :param exchange_name: Optional exchange name to bind this queue to (with the given routing_key). + :param exchange: Optional exchange name to bind this queue to (with the given routing_key). If the exchange does not exist it is created and deleted automagically. - :param routing_key: Optional routing_key for binding this queue to the given exchange_name. + :param routing_key: Optional routing_key for binding this queue to the given exchange. If "#" (the default), then route all messages to this queue. This routing_key can be overruled by addressed_to_me_only. :param addressed_to_me_only: If True then apply the tmp-queue's address as binding routing key, @@ -585,7 +817,7 @@ class TemporaryQueue: """ self._name_prefix = name_prefix self.broker = broker - self._bound_exchange_name = exchange_name + self._bound_exchange = exchange self._routing_key = routing_key self._addressed_to_me_only = addressed_to_me_only self._created_exchange = False @@ -611,31 +843,32 @@ class TemporaryQueue: It is advised to use the TemporaryQueue instance in a 'with' context, which guarantees the close call. """ # create an identifiable address based on the given name which is also (almost) unique, and readable. - self.address = "%s-tmp-queue-%s" % (self._name_prefix, uuid.uuid4().hex[:8]) + self.address = "%stmp-queue-%s" % (self._name_prefix+"-" if self._name_prefix else "", + uuid.uuid4().hex[:8]) logger.debug("Creating TemporaryQueue at %s ...", self.address) - if not self._bound_exchange_name: + if not self._bound_exchange: # if there is no exhange to bind to, # then we create an exchange with the same name as the queue, # and route all messages from the exchange to the queue. # That's because messaging is designed to only publish messages to exchanges. - self._bound_exchange_name = self.address + self._bound_exchange = "exchange-for-" + self.address # create the tmp queue... create_queue(self.address, broker=self.broker, durable=False) # create the exchange (if needed), and remember if we need to destoy it (if it was created) - self._created_exchange = create_exchange(self._bound_exchange_name, + self._created_exchange = create_exchange(self._bound_exchange, broker=self.broker, durable=False) # and finally create the binding # if no routing_key given, then use this tmp-queue's specific address as routing key - create_binding(exchange_name=self._bound_exchange_name, queue_name=self.address, + create_binding(exchange=self._bound_exchange, queue=self.address, routing_key=self.address if self._addressed_to_me_only else self._routing_key , broker=self.broker, durable=False) logger.debug("Created TemporaryQueue %s bound to exchange %s with routing_key %s", - self.address, self._bound_exchange_name, self._routing_key) + self.address, self._bound_exchange, self._routing_key) def close(self): @@ -650,7 +883,7 @@ class TemporaryQueue: logger.error(e) try: if self._created_exchange: - delete_exchange(self._bound_exchange_name) + delete_exchange(self._bound_exchange) except Exception as e: logger.error(e) logger.debug("Closed TemporaryQueue at %s", self.address) @@ -658,68 +891,236 @@ class TemporaryQueue: def __str__(self): return "TemporaryQueue address=%s bound to exchange=%s with routing_key=%s" % ( - self.address, self._bound_exchange_name, self._routing_key) + self.address, self._bound_exchange, self._routing_key) - def create_frombus(self, subject=None): + def create_frombus(self) -> FromBus: """ - Factory method to create a FromBus instance which is connected to this TemporaryQueue - :param subject: Optional subject string to filter for. Only messages which match this subject are received. + Convenience factory method to create a FromBus instance which is connected to this TemporaryQueue :return: FromBus """ - return FromBus(broker=self.broker, - address="%s/%s" % (self.address, subject) if subject else self.address) + return FromBus(broker=self.broker, address=self.address) - def create_tobus(self): + def create_tobus(self) -> ToBus: """ - Factory method to create a ToBus instance which is connected to this TemporaryQueue + Convenience factory method to create a ToBus instance which is connected to the bound exchange of this TemporaryQueue if any. :return: ToBus """ - return ToBus(broker=self.broker, address=self._bound_exchange_name) - + return ToBus(broker=self.broker, exchange=self._bound_exchange) -class AbstractBusListener: +class AbstractMessageHandler: """ - AbstractBusListener class for handling messages which are received on a message bus. + The AbstractMessageHandler is the base class which defines the following message handling pattern: + - the method on_listen_loop_begin is called at startup + - in the loop, for each message these methods are called: + - on_before_receive_message + - handle_message + - on_after_receive_message + - finally the method on_listen_loop_end is called when the loop ends + + Only the handle_message method raises a NotImplementedError, and thus needs to be implemented in the subclass. + The other four methods have empty bodies, so their default behaviour is no-op. + Typical usage is to derive from this class and implement the handle_message method with concrete logic. """ - def __init__(self, exchange_name: str, routing_key: str, queue_name: str = None, num_threads: int = 1, broker: str = DEFAULT_BROKER): + def on_listen_loop_begin(self): + """Called before main processing loop is entered. + Typical usage for overriding this method is to create thread-bound connections to external resources like databases. """ - .....TODO.... - :param num_threads: the number of receiver/handler threads. Default=1, use higher number if it makes sense, - for example when you are waiting for a slow database while handling the message. + pass + + def on_before_receive_message(self): + """Called in main processing loop just before a blocking receive for a message.""" + pass + + def handle_message(self, msg: LofarMessage): + """Implement this method in your subclass to handle the received message + Raise an exception if you want to reject the incoming message on the broker. + :param msg: the received message to be handled """ - self.exchange_name = exchange_name - self.routing_key = routing_key + raise NotImplementedError("Please implement the handle_message method in your subclass to handle the received message") + + def on_after_receive_message(self): + """Called in the main loop after the messages was handled. + """ + pass + + def on_listen_loop_end(self): + """Called after main processing loop is finished.""" + pass + + +class BusListener: + """ + The BusListener is the core class to connect to a given bus, listen for messages, and handle each messages upon arrival. + The listening/handling is done on one or more background threads, so 'normal' program business logic can just go on in the foreground. + The actual handling of the message is deferred to a conrete implementation of an AbstractMessageHandler, so typical usage + is to derive from the AbstractMessageHandler class and implement the handle_message method with concrete logic, and + inject that into the buslistener. (Dependency Injection or Inversion of Control design pattern) + + Here's a simple but concrete example: + + >>> # implement an example MyHandler which just prints the received message subject and content + ... class MyHandler(AbstractMessageHandler): + ... def handle_message(self, lofar_msg): + ... print(lofar_msg.subject, lofar_msg.content) + ... # ... do some more fancy stuff with the msg... + + + And here's how it's used (TemporaryExchange and TemporaryQueue are used again to have an isolated test) + >>> with TemporaryExchange("my.exchange") as tmp_exchange: + ... with tmp_exchange.create_tobus() as tobus: + ... with TemporaryQueue("my.queue", exchange=tmp_exchange.address, routing_key="some.#") as tmp_queue: + ... + ... # construct a BusListener instance in a context, + ... # so it starts/stops listening and and handling messages automagically, + ... # and inject the MyHandler + ... with BusListener(handler_type=MyHandler, queue=tmp_queue.address): + ... tobus.send(EventMessage(subject="some.event", content="foo")) + ... tobus.send(EventMessage(subject="another.event", content="xyz")) + ... tobus.send(EventMessage(subject="some.event", content="bar")) + ... + ... # ... do some work ... simulate this by sleeping a little... + ... # ...in the mean time, BusListener receives and handles the messages (on its own thread) + ... from time import sleep + ... sleep(0.25) + ... + some.event foo + some.event bar + + There are 4 more methods to (possibly) override, which are all executed on the background thread(s) + - on_listen_loop_begin and on_listen_loop_end which are typically used to setup connections to a database (one per thread, to make each connection thread-specific) + - on_before_receive_message and on_after_receive_message which are called just before and after receiving a message. + + >>> # implement an example AbstractMessageHandler which just + ... # collects message contents in a 'database' and + ... # prints the all received message subject and content at the end. + ... class MyDBConnectedHandler(AbstractMessageHandler): + ... def __init__(self, db_name, user, password): + ... self._db_name = db_name + ... self._user = user + ... self._password = password + ... print("Connecting to database %s with user %s" % (self._db_name, self._user)) + ... self._db = {} # use dict as fake database for this example. You would normally connect to a real database here. + ... + ... def on_listen_loop_end(self): + ... print(sorted(self._db.values())) # You would normally disconnect from a real database here. + ... + ... def handle_message(self, lofar_msg): + ... # ... do some more fancy stuff with the msg, like storing it in the database + ... self._db[lofar_msg.id] = lofar_msg.content + + And here's how it's used (TemporaryExchange and TemporaryQueue are used again to have an isolated test) + >>> with TemporaryExchange("my.exchange") as tmp_exchange: + ... with tmp_exchange.create_tobus() as tobus: + ... with TemporaryQueue("my.queue", exchange=tmp_exchange.address, routing_key="some.#") as tmp_queue: + ... + ... # construct a BusListener instance in a context, + ... # so it starts/stops listening and and handling messages automagically + ... with BusListener(handler_type=MyDBConnectedHandler, + ... handler_kwargs={'db_name': 'my_db', 'user': 'my_user', 'password': 'my_password' }, + ... queue=tmp_queue.address): + ... tobus.send(EventMessage(subject="some.event", content="foo")) + ... tobus.send(EventMessage(subject="another.event", content="xyz")) + ... tobus.send(EventMessage(subject="some.event", content="bar")) + ... + ... # ... do some work ... simulate this by sleeping a little... + ... # ...in the mean time, BusListener receives and handles the messages (on its own thread) + ... from time import sleep + ... sleep(0.25) + Connecting to database my_db with user my_user + ['bar', 'foo'] + + """ + + def __init__(self, handler_type: AbstractMessageHandler.__class__, + handler_kwargs: dict = None, + exchange: str = None, routing_key: str = "#", + queue: str = None, + num_threads: int = 1, + broker: str = DEFAULT_BROKER): + """ + Create a buslistener instance. Either specify exchange and routing_key, or a queue. Not both. + + If you specify a queue to listen on, then it is assumed the queue exists and you've taken care in the queue configuration + on the broker that the correct messages are routed to this queue. + + The recommended way though is to specify an exchange and routing_key. Then this buslistener creates a designated queue + on the broker, specifically for this listener with the following constructed name: <exchange>.for.<program_name>.<routing_key> + The designated queue is bound to the given exchange with the given routing_key. + + The rational behind this is that: + - this saves a lot of 'infrastructure' (queue/binding) configuration on the broker. + - the designated queues are named in a consistent way. + - the designated queues only receive messages this listener is interested in. + - monitoring tools (like the RabbitMQ web interface) can see what programs (or services within programs) are consuming messages, and at what rate. + + We intentionally do not remove the queue and binding upon closing this listener, + so messages are stored/kept on the broker in the queue for this listener for later processing once the program and this listener restarts. + + If you realy would like to have automatic cleanup of the created queue (for example in tests), + then use this buslistener in a BusListenerJanitor's context. + + :param handler_type: TODO!!!!!!!! + :param handler_kwargs: TODO!!!!!!!! + :param exchange: Bind the listener to this given exchange with the given routing key via an auto-generated designated queue. Ignored if a the 'queue' parameter is specified. + :param routing_key: Bind the listener to this given exchange with the given routing key via an auto-generated designated queue. Ignored if a the 'queue' parameter is specified. + :param queue: Listen on this specific given queue. (Not recommended) + :param num_threads: the number of receiver/handler threads. + default=1, use higher number only if it makes sense, for example when you are + waiting for a slow database while handling the message. + :param broker: a message broker address + :raises: ValueError if both exchange and queue are given. These parameters are mutually exclusive. + :raises: MessagingError if the exchange could not be created + """ + + if not isinstance(handler_type, type): + raise TypeError("handler_type should be a ServiceMessageHandler subclass, not an instance!") + + if not issubclass(handler_type, AbstractMessageHandler): + raise TypeError("handler_type should be a AbstractMessageHandler subclass") + + self._handler_type = handler_type + self._handler_kwargs = dict(handler_kwargs) if handler_kwargs else {} self.broker = broker self._num_threads = num_threads + self._threads = [] self._running = threading.Event() self._listening = False - if queue_name: - self.queue_name = queue_name + if (exchange and queue) or (not exchange and not queue): + raise ValueError("Provide either 'exchange' (with a routing_key) or 'queue'.") + + if queue: + self.address = queue else: - # a buslistener should listen (consume) on a specific queue for this listener. - # this listener is/should be specific for the program it's used in, and for the routing_key it's interested in. - # so, we create a consumer address based on those parameters, and create the queue and binding on the fly if needed. - # This saves use a lot of infrastructure configuration. - # We intentionally do not remove the queue and binding upon closing this listener, so messages are stored on the broker - # in the queue for this listener for later processing once the program and this listener restarts. - # If you would like to have automatic cleanup of the created queue, then use the buslistener in a BusListenerJanitor's context. - sanitized_routing_key = self.routing_key.replace(".#", "").replace(".*", "").replace("#", "").replace("*", "") - if not sanitized_routing_key: - sanitized_routing_key = "all" - self.queue_name = "%s.for.%s.%s" % (exchange_name, - program_name(include_extension=False), - sanitized_routing_key) - - # make sure the queue is bound to the exchange - create_bound_queue(exchange_name=exchange_name, queue_name=self.queue_name, routing_key=routing_key, broker=self.broker) - - def isRunning(self): + self.address = self.designated_queue_name(exchange, routing_key) + + # make sure the queue is bound to the exchange + # any created queue or binding is not removed on exit. See rational above. + create_bound_queue(exchange=exchange, queue=self.address, routing_key=routing_key, broker=self.broker) + + @staticmethod + def designated_queue_name(exchange: str, routing_key: str) -> str: + """ + create a designated queue name based on the given exchange name, routing_key, and the current running program name. + Like so: <exchange>.for.<program_name>.<sanitzed_routing_key> + In case the routing_key filters for wildcards only, then the routing key is replaced by 'all' + :param exchange: the exchange name to which the designated queue will bind + :param routing_key: the routing_key which is used for the binding. Any wildcards like ".#"/".*" are removed. + :return: <exchange>.for.<program_name>.<sanitzed_routing_key> + """ + sanitized_routing_key = routing_key.replace(".#","").replace(".*","").replace("#","").replace("*","") + if not sanitized_routing_key: + sanitized_routing_key = "all" + return "%s.queue.for.%s.%s" % (exchange, program_name(include_extension=False), sanitized_routing_key) + + def is_running(self) -> bool: + """Is this listener running its background listen/handle loops?""" return self._running.isSet() - def isListening(self): + def is_listening(self) -> bool: + """Is this listener listening?""" return self._listening def start_listening(self): @@ -731,13 +1132,16 @@ class AbstractBusListener: self._listening = True self._running.set() - self._threads = [] for i in range(self._num_threads): - thread = threading.Thread(target=self._loop, - name="ListenerThread_%s_%d" % (self.queue_name, i)) + thread_name = "ListenerThread_%s_%d" % (self.address, i) + thread = threading.Thread(target=self._listen_loop, + name=thread_name) self._threads.append(thread) thread.start() + if not thread.is_alive(): + raise MessagingError("Could not start listener thread: %s" % thread_name) + def stop_listening(self): """ Stop the background threads that listen to incoming messages. @@ -748,131 +1152,120 @@ class AbstractBusListener: self._listening = False - if self.isRunning(): + if self.is_running(): self._running.clear() for thread in self._threads: logger.debug("%s: STOPPING Listening for messages on %s at broker %s" % - (thread.name, self.queue_name, self.broker if self.broker else DEFAULT_BROKER)) + (thread.name, self.address, self.broker if self.broker else DEFAULT_BROKER)) thread.join() - logger.info("%s: STOPPED Listening for messages on %s" % (thread.name, self.queue_name)) + logger.info("%s: STOPPED Listening for messages on %s" % (thread.name, self.address)) - def __enter__(self): - """ - Internal use only. Handles scope with keyword 'with' + def __enter__(self) -> 'BusListener': + """enter the context, and make the bus_listener start listening. + :return self """ self.start_listening() return self def __exit__(self, exc_type, exc_val, exc_tb): - """ - Internal use only. Handles scope with keyword 'with' + """leave the context, and make the bus_listener stop listening. + :return self """ self.stop_listening() - def _onListenLoopBegin(self): - "Called before main processing loop is entered." - pass - - def _onBeforeReceiveMessage(self): - "Called in main processing loop just before a blocking wait for messages is done." - pass - - def _handleMessage(self, msg): - "Implement this method in your subclass to handle a received message" - raise NotImplementedError("Please implement the _handleMessage method in your subclass to handle a received message") + def _create_handler(self): + return self._handler_type(**self._handler_kwargs) - def _onAfterReceiveMessage(self, successful): - "Called in the main loop after the result was send back to the requester." - "@successful@ reflects the state of the handling: true/false" - pass - - def _onListenLoopEnd(self): - "Called after main processing loop is finished." - pass - - def _loop(self): + def _listen_loop(self): """ Internal use only. Message listener loop that receives messages and starts the attached function with the message content as argument. """ logger.info( "%s START Listening for messages on %s at broker %s" % - (threading.currentThread().name, self.queue_name, self.broker if self.broker else DEFAULT_BROKER)) + (threading.currentThread().name, self.address, self.broker if self.broker else DEFAULT_BROKER)) + + # create an instance of the given handler for this background thread + # (to keep the internals of the handler thread agnostic) + thread_handler = self._create_handler() + try: - self._onListenLoopBegin() + thread_handler.on_listen_loop_begin() except Exception as e: - logger.error("onListenLoopBegin() failed with %s", e) - return + logger.warning("onListenLoopBegin() failed with %s", e) - with FromBus(self.queue_name, broker=self.broker) as receiver: - while self.isRunning(): + with FromBus(self.address, broker=self.broker) as receiver: + while self.is_running(): try: - self._onBeforeReceiveMessage() + thread_handler.on_before_receive_message() except Exception as e: - logger.error("onBeforeReceiveMessage() failed with %s", e) + logger.exception("on_before_receive_message() failed: %s", e) pass try: # get the next message - lofar_msg = receiver.receive(1) - # retry if timed-out + lofar_msg = receiver.receive(.1, acknowledge=False) + # retry loop if timed-out if lofar_msg is None: continue - # Execute the handler function and send reply back to client + # Execute the handler function try: - self._handleMessage(lofar_msg) - - try: - self._onAfterReceiveMessage(True) - except Exception as e: - logger.error("onAfterReceiveMessage() failed with %s", e) - continue + thread_handler.handle_message(lofar_msg) + receiver.ack(lofar_msg) + except Exception as e: + logger.error("Handling of %s failed. Rejecting message. Error: %s", lofar_msg, e) + receiver.reject(lofar_msg) + try: + thread_handler.on_after_receive_message() except Exception as e: - import traceback - logger.warning("Handling of message failed with %s: %s\nMessage: %s", e, - traceback.format_exc(), - lofar_msg.content) - - try: - self._onAfterReceiveMessage(False) - except Exception as e: - logger.error("onAfterReceiveMessage() failed with %s", e) - continue + logger.exception("on_after_receive_message() failed: %s", e) except Exception as e: # Unknown problem in the library. Report this and continue. logger.exception("[%s:] ERROR during processing of incoming message: %s", self.__class__.__name__, e) try: - self._onListenLoopEnd() + thread_handler.on_listen_loop_end() except Exception as e: - logger.error("finalize_loop() failed with %s", e) + logger.exception("on_listen_loop_end() failed: %s", e) + class BusListenerJanitor: - """The BusListenerJanitor can help you out cleaning up auto-generated consumer queues. + """The BusListenerJanitor cleans up auto-generated consumer queues. It is intended specifically for use in a 'with' context in a test, or short-lived use-case. Typical example: - - with BusListenerJanitor(MyBusListenerClass(my_exchange_name, my_routing_key)) as my_bus_listener: - # do something.... - - my_bus_listener.some_funky_method() - - # do more.... - - # at this moment, the BusListenerJanitor leaves scope, - # so, it makes the my_bus_listener stop_listening, - # and then cleans up the auto-generated queue's. - print("done") + >>> # implement an example AbstractMessageHandler which just prints the received message subject and content + ... class MyMessageHandler(AbstractMessageHandler): + ... def handle_message(self, lofar_msg): + ... print(lofar_msg.subject, lofar_msg.content) + ... # ... do some more fancy stuff with the msg... + + And here's how it's used together with the BusListenerJanitor. + >>> with TemporaryExchange("my.exchange") as tmp_exchange: + ... with tmp_exchange.create_tobus() as tobus: + ... # construct a BusListener instance in a BusListenerJanitor's context, + ... # so it starts/stops listening and and handling messages automagically + ... # and the auto-generated queue is also cleaned up after leaving context + ... with BusListenerJanitor(BusListener(MyMessageHandler, exchange=tmp_exchange.address, routing_key="some.#")): + ... tobus.send(EventMessage(subject="some.event", content="foo")) + ... tobus.send(EventMessage(subject="another.event", content="xyz")) + ... tobus.send(EventMessage(subject="some.event", content="bar")) + ... + ... # ... do some work ... simulate this by sleeping a little... + ... # ...in the mean time, BusListener receives and handles the messages (on its own thread) + ... from time import sleep + ... sleep(0.25) + ... + some.event foo + some.event bar """ - def __init__(self, bus_listener: AbstractBusListener): - """Create a janitor for the given bus_listener - """ + def __init__(self, bus_listener: BusListener): + """Create a janitor for the given bus_listener""" self._bus_listener = bus_listener - def __enter__(self): + def __enter__(self) -> BusListener: """enter the context, and make the bus_listener start listening. :return a reference to the buslistener, not to the janitor!""" self._bus_listener.start_listening() @@ -881,32 +1274,18 @@ class BusListenerJanitor: def __exit__(self, exc_type, exc_val, exc_tb): """leave the context, make the bus_listener stop listening, and clean up the auto-generated queue""" - self._bus_listener.stop_listening() - delete_queue(self._bus_listener.queue_name) + try: + self._bus_listener.stop_listening() + except Exception as e: + logger.error(e) + finally: + logger.info("BusListenerJanitor deleting auto-generated queue: %s", self._bus_listener.address) + delete_queue(self._bus_listener.address) + if __name__ == "__main__": - logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.DEBUG) - - from lofar.messaging.messages import EventMessage - - with ToBus("my.lofar.exchange") as tobus: - msg = EventMessage(content="whoopsie", subject="foobar_FOOOOOOBAR") - tobus.send(msg) - - with FromBus("my.lofar.all.queue") as frombus: - msg = frombus.receive() - print(msg) - - msg = EventMessage(content="message over tmp queue", subject="bladibla") - - with TemporaryQueue("abc") as tmp: - logger.info('------------------------') - with tmp.create_tobus() as tmp_tobus: - logger.info('------------------------') - tmp_tobus.send(msg) - logger.info('------------------------') - with tmp.create_frombus() as tmp_frombus: - logger.info('------------------------') - msg = tmp_frombus.receive() - logger.info("received: %s", msg) - logger.info('------------------------') + logging.basicConfig(format='%(levelname)s %(message)s', level=logging.INFO) + + # run the doctests in this module + import doctest + doctest.testmod(verbose=True, report=True) \ No newline at end of file diff --git a/LCS/Messaging/python/messaging/messages.py b/LCS/Messaging/python/messaging/messages.py index 222dd833f7f..6eb09a4ac68 100644 --- a/LCS/Messaging/python/messaging/messages.py +++ b/LCS/Messaging/python/messaging/messages.py @@ -24,10 +24,10 @@ Message classes used by the package lofar.messaging. """ -import kombu.message import uuid +from typing import Optional -from lofar.messaging.exceptions import MessageFactoryError +from lofar.messaging.exceptions import MessageFactoryError, InvalidMessage def convert_to_lofar_message(kombu_msg): """ @@ -39,35 +39,37 @@ def convert_to_lofar_message(kombu_msg): message `qmsg` contains a type name that is not registered with the factory. """ - message_type = kombu_msg.headers['MessageType'] - - if message_type == 'EventMessage': - return EventMessage(content=kombu_msg.payload, - subject=kombu_msg.headers.get('Subject'), - id=kombu_msg.headers.get('MessageId')) - if message_type == 'CommandMessage': - return CommandMessage(content=kombu_msg.payload, - subject=kombu_msg.headers.get('Subject'), - id=kombu_msg.headers.get('MessageId')) - if message_type == 'RequestMessage': - return RequestMessage(content=kombu_msg.payload, - reply_to=kombu_msg.properties.get('reply_to'), - subject=kombu_msg.headers.get('Subject'), - id=kombu_msg.headers.get('MessageId'), - has_args=kombu_msg.headers.get('has_args', False), - has_kwargs=kombu_msg.headers.get('has_kwargs', False)) - if message_type == 'ReplyMessage': - return ReplyMessage(content=kombu_msg.payload, - status=kombu_msg.headers.get('status'), - subject=kombu_msg.headers.get('Subject'), - id=kombu_msg.headers.get('MessageId'), - errmsg=kombu_msg.headers.get('errmsg'), - backtrace=kombu_msg.headers.get('backtrace')) + try: + message_type = kombu_msg.headers['MessageType'] + + if message_type == 'EventMessage': + return EventMessage(content=kombu_msg.payload, + subject=kombu_msg.headers.get('Subject'), + id=kombu_msg.headers.get('MessageId')) + if message_type == 'CommandMessage': + return CommandMessage(content=kombu_msg.payload, + subject=kombu_msg.headers.get('Subject'), + id=kombu_msg.headers.get('MessageId')) + if message_type == 'RequestMessage': + return RequestMessage(content=kombu_msg.payload, + reply_to=kombu_msg.properties.get('reply_to'), + subject=kombu_msg.headers.get('Subject'), + id=kombu_msg.headers.get('MessageId'), + has_args=kombu_msg.headers.get('has_args', False), + has_kwargs=kombu_msg.headers.get('has_kwargs', False)) + if message_type == 'ReplyMessage': + return ReplyMessage(content=kombu_msg.payload, + handled_successfully=kombu_msg.headers.get('handled_successfully'), + subject=kombu_msg.headers.get('Subject'), + id=kombu_msg.headers.get('MessageId'), + error_message=kombu_msg.headers.get('error_message')) + except Exception as e: + raise InvalidMessage(str(e)) raise MessageFactoryError("Unable to create LofarMessage of type %s for kombu-msg: %s" % (message_type, kombu_msg)) -class LofarMessage(object): +class LofarMessage: """ Describes the content of a message, which can be constructed from either a set of fields, or from an existing QPID message. @@ -159,8 +161,7 @@ class RequestMessage(LofarMessage): subsystem. A service message must contain a valid ``ReplyTo`` property. """ - #TODO: refactor args kwargs quirks - def __init__(self, content, reply_to, subject:str=None, priority:int=4, ttl:float=None, id=None, has_args=False, has_kwargs=False): + def __init__(self, subject:str, reply_to:str, content=None, priority:int=4, ttl:float=None, id=None, has_args=False, has_kwargs=False): super(RequestMessage, self).__init__(content=content, subject=subject, priority=priority, ttl=ttl, id=id) self.reply_to = reply_to self.has_args = has_args @@ -173,6 +174,10 @@ class RequestMessage(LofarMessage): publish_kwargs['headers']['has_kwargs'] = self.has_kwargs return publish_kwargs + def __str__(self): + return "%s reply_to=%s" % (super(RequestMessage, self).__str__(), self.reply_to) + + class ReplyMessage(LofarMessage): """ Message class used for reply messages. Reply messages are part of the @@ -180,17 +185,20 @@ class ReplyMessage(LofarMessage): message. These use topic exchanges and thus are routed by the 'subject' property """ - def __init__(self, content, status, subject:str=None, priority:int=4, ttl:float=None, id=None, errmsg:str="", backtrace:str=None): + def __init__(self, content, handled_successfully: bool, subject:str=None, priority:int=4, ttl:float=None, id=None, error_message: Optional[str]= ""): super(ReplyMessage, self).__init__(content=content, subject=subject, priority=priority, ttl=ttl, id=id) - self.status = status - self.errmsg = errmsg - self.backtrace = backtrace + self.handled_successfully = handled_successfully + self.error_message = error_message def as_kombu_publish_kwargs(self): publish_kwargs = super(ReplyMessage, self).as_kombu_publish_kwargs() - publish_kwargs['headers']['status'] = self.status - publish_kwargs['headers']['errmsg'] = self.errmsg - publish_kwargs['headers']['backtrace'] = self.backtrace + publish_kwargs['headers']['handled_successfully'] = self.handled_successfully + publish_kwargs['headers']['error_message'] = self.error_message return publish_kwargs -__all__ = ["EventMessage", "RequestMessage", "ReplyMessage", "CommandMessage"] + def __str__(self): + return "%s handled_successfully=%s%s" % ( + super(ReplyMessage, self).__str__(), + self.handled_successfully, + " error=%s" % self.error_message if self.error_message else "") + diff --git a/LCS/Messaging/python/messaging/test/t_RPC.py b/LCS/Messaging/python/messaging/test/t_RPC.py index 77924e488ec..5888cb9fcb3 100644 --- a/LCS/Messaging/python/messaging/test/t_RPC.py +++ b/LCS/Messaging/python/messaging/test/t_RPC.py @@ -12,8 +12,9 @@ import logging logger = logging.getLogger(__name__) -from lofar.messaging import TemporaryExchange, BusListenerJanitor -from lofar.messaging import Service, RPC, RPCWrapper, RPCException, RPCTimeoutException +from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor +from lofar.messaging.Service import Service +from lofar.messaging.RPC import RPC, RPCWrapper, RPCException, RPCTimeoutException class UserException(Exception): "Always thrown in one of the functions" @@ -122,12 +123,12 @@ def main(): with TemporaryExchange("TEST") as test_exchange: # Register functions as a service handler listening at busname and ServiceName - serv1 = Service("ErrorService", ErrorFunc, busname=test_exchange.address, num_threads=2) - serv2 = Service("ExceptionService", ExceptionFunc, busname=test_exchange.address, num_threads=2) - serv3 = Service("StringService", StringFunc, busname=test_exchange.address, num_threads=2) - serv4 = Service("ListService", ListFunc, busname=test_exchange.address, num_threads=2) - serv5 = Service("DictService", DictFunc, busname=test_exchange.address, num_threads=2) - serv6 = Service("TimeoutService", TimeoutFunc, busname=test_exchange.address, num_threads=2) + serv1 = Service("ErrorService", ErrorFunc, exchange=test_exchange.address, num_threads=2) + serv2 = Service("ExceptionService", ExceptionFunc, exchange=test_exchange.address, num_threads=2) + serv3 = Service("StringService", StringFunc, exchange=test_exchange.address, num_threads=2) + serv4 = Service("ListService", ListFunc, exchange=test_exchange.address, num_threads=2) + serv5 = Service("DictService", DictFunc, exchange=test_exchange.address, num_threads=2) + serv6 = Service("TimeoutService", TimeoutFunc, exchange=test_exchange.address, num_threads=2) # 'with' sets up the connection context and defines the scope of the service. # also use each service inside a BusListenerJanitor context to auto-cleanup auto-generated listener queues @@ -137,7 +138,7 @@ def main(): # # Redo all tests but via through RPC # # ErrorFunc - # with RPC("ErrorService", busname=test_exchange.address) as rpc: + # with RPC("ErrorService", exchange=test_exchange.address) as rpc: # try: # result = rpc("aap noot mies") # except RPCException as e: @@ -145,7 +146,7 @@ def main(): # raise # # # ExceptionFunc - # with RPC("ExceptionService", busname=test_exchange.address) as rpc: + # with RPC("ExceptionService", exchange=test_exchange.address) as rpc: # try: # result = rpc("aap noot mies") # except RPCException as e: @@ -153,7 +154,7 @@ def main(): # raise # # # StringFunc - # with RPC("StringService", busname=test_exchange.address) as rpc: + # with RPC("StringService", exchange=test_exchange.address) as rpc: # try: # result = rpc([25]) # except RPCException as e: @@ -165,7 +166,7 @@ def main(): # raise Exception("String function failed:{}".format(result)) # # # ListFunc - # with RPC("ListService", busname=test_exchange.address) as rpc: + # with RPC("ListService", exchange=test_exchange.address) as rpc: # try: # result = rpc("25") # except RPCException as e: @@ -176,7 +177,7 @@ def main(): # raise Exception("List function failed:{}".format(result)) # # # DictFunc - # with RPC("DictService", busname=test_exchange.address) as rpc: + # with RPC("DictService", exchange=test_exchange.address) as rpc: # try: # result = rpc([25]) # except RPCException as e: @@ -187,7 +188,7 @@ def main(): # raise Exception("Dict function failed:{}".format(result)) # # # TimeoutFunc - # with RPC("TimeoutService", busname=test_exchange.address, timeout=1) as rpc: + # with RPC("TimeoutService", exchange=test_exchange.address, timeout=1) as rpc: # try: # result = rpc("some random string") # raise Exception("TimeoutService did not timeout as expected...") @@ -220,7 +221,7 @@ def main(): # and use the MyRPC RPCWrapper class for testing - with MyRPC(busname=test_exchange.address, timeout=1) as my_rpc: + with MyRPC(exchange=test_exchange.address, timeout=1) as my_rpc: try: result = my_rpc.ErrorFunc("aap noot mies") except RPCException as e: diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.py b/LCS/Messaging/python/messaging/test/t_messagebus.py index 4536db012f2..c16913911e1 100644 --- a/LCS/Messaging/python/messaging/test/t_messagebus.py +++ b/LCS/Messaging/python/messaging/test/t_messagebus.py @@ -31,7 +31,7 @@ from datetime import datetime from lofar.messaging.messages import * from lofar.messaging.messagebus import * -from lofar.messaging.exceptions import MessagingError +from lofar.messaging.exceptions import MessageBusError from lofar.common.datetimeutils import round_to_millisecond_precision from time import sleep from threading import Lock @@ -73,24 +73,24 @@ class TestCreateDeleteFunctions(unittest.TestCase): self.assertFalse(delete_queue(name)) def test_create_binding(self): - exchange_name = "test-exchange-%s" % (uuid.uuid4()) - queue_name = "test-queue-%s" % (uuid.uuid4()) + exchange = "test-exchange-%s" % (uuid.uuid4()) + queue = "test-queue-%s" % (uuid.uuid4()) # try to create the binding on non-existing exchange/queue - with self.assertRaisesRegex(MessagingError, ".*does not exist.*"): - create_binding(exchange_name=exchange_name, queue_name=queue_name) + with self.assertRaisesRegex(MessageBusError, ".*does not exist.*"): + create_binding(exchange=exchange, queue=queue) try: # now, do make sure the exchange/queue exist - create_exchange(exchange_name) - create_queue(queue_name) + create_exchange(exchange) + create_queue(queue) # and do the actual binding test - self.assertTrue(create_binding(exchange_name=exchange_name, queue_name=queue_name)) + self.assertTrue(create_binding(exchange=exchange, queue=queue)) finally: # and cleanup the exchange/queue - delete_queue(queue_name) - delete_exchange(exchange_name) + delete_queue(queue) + delete_exchange(exchange) @@ -109,7 +109,7 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): # test if the temporary exchange has been deleted when leaving scope # We should not be able to connect to it anymore - with self.assertRaisesRegex(MessagingError, '.*NOT_FOUND.*'): + with self.assertRaisesRegex(MessageBusError, '.*NOT_FOUND.*'): with FromBus(tmp_exchange_address): pass @@ -124,7 +124,7 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): # test if the temporary queue has been deleted when leaving scope # We should not be able to connect to it anymore - with self.assertRaisesRegex(MessagingError, '.*NOT_FOUND.*'): + with self.assertRaisesRegex(MessageBusError, '.*NOT_FOUND.*'): with FromBus(tmp_queue_address): pass @@ -136,7 +136,7 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): # create a normal ToBus on this tmp_exchange with tmp_exchange.create_tobus() as tobus_on_exchange: # create a TemporaryQueue, bound to the tmp_exchange - with TemporaryQueue("MyTestQueue", exchange_name=tmp_exchange.address) as tmp_queue: + with TemporaryQueue("MyTestQueue", exchange=tmp_exchange.address) as tmp_queue: # create a normal FromBus on this tmp_queue with tmp_queue.create_frombus() as frombus: # and let's see if the tmp_queue can also create a tobus which then points to the bound_exchange @@ -162,14 +162,15 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): """ test the usage of the TemporaryQueue in conjunction with normal ToBus and Frombus usage with additional filtering on subject """ - with TemporaryQueue("MyTestQueue") as tmp_queue: + SUBJECT = "FooBarSubject" + SUBJECT2 = "FAKE_SUBJECT" + + with TemporaryQueue("MyTestQueue", routing_key=SUBJECT) as tmp_queue: # create a normal To/FromBus on this tmp_queue - SUBJECT = "FooBarSubject" - SUBJECT2 = "FAKE_SUBJECT" NUM_MESSAGES_TO_SEND = 3 with tmp_queue.create_tobus() as tobus: # create a FromBus, which listens for/receives only the messages with the given SUBJECT - with tmp_queue.create_frombus(SUBJECT) as frombus: + with tmp_queue.create_frombus() as frombus: for i in range(NUM_MESSAGES_TO_SEND): # send a message... original_msg = EventMessage(subject=SUBJECT, @@ -178,7 +179,7 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): tobus.send(original_msg) # ...receive the message... - received_msg = frombus.receive(timeout=1) + received_msg = frombus.receive(timeout=0.1) logger.info("received message: %s", received_msg) # and test if they are equal @@ -192,7 +193,7 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): tobus.send(original_msg) # ... and try to receive it (should yield None, because of the non-matching subject) - received_msg = frombus.receive(timeout=1) + received_msg = frombus.receive(timeout=0.1) logger.info("received message: %s", received_msg) self.assertEqual(None, received_msg) @@ -200,16 +201,14 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): """ test the usage of the TemporaryQueue in conjunction with normal ToBus and Frombus usage with additional filtering on subject """ + SUBJECT = "FooBarSubject" + SUBJECT2 = "FAKE_SUBJECT" + NUM_MESSAGES_TO_SEND = 3 with TemporaryExchange("MyTestExchange") as tmp_exchange: with tmp_exchange.create_tobus() as tobus: - with TemporaryQueue("MyTestQueue", exchange_name=tmp_exchange.address) as tmp_queue: - # create a normal To/FromBus on this tmp_queue - SUBJECT = "FooBarSubject" - SUBJECT2 = "FAKE_SUBJECT" - NUM_MESSAGES_TO_SEND = 3 - - # create a FromBus, which listens for/receives only the messages with the given SUBJECT - with tmp_queue.create_frombus(SUBJECT) as frombus: + # create a TemporaryQueue, which listens for/receives only the messages with the given SUBJECT + with TemporaryQueue("MyTestQueue", exchange=tmp_exchange.address, routing_key=SUBJECT) as tmp_queue: + with tmp_queue.create_frombus() as frombus: for i in range(NUM_MESSAGES_TO_SEND): # send a message... original_msg = EventMessage(subject=SUBJECT, @@ -219,7 +218,7 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): tobus.send(original_msg) # ...receive the message... - received_msg = frombus.receive(timeout=1) + received_msg = frombus.receive(timeout=0.1) logger.info("received message: %s", received_msg) # and test if they are equal @@ -233,7 +232,7 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): tobus.send(original_msg) # ... and try to receive it (should yield None, because of the non-matching subject) - received_msg = frombus.receive(timeout=1) + received_msg = frombus.receive(timeout=0.1) logger.info("received message: %s", received_msg) self.assertEqual(None, received_msg) @@ -246,8 +245,8 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): with tmp_exchange.create_tobus() as tobus: SUBJECT1 = "FooBarSubject" SUBJECT2 = "FAKE_SUBJECT" - with TemporaryQueue("MyTestQueue", exchange_name=tmp_exchange.address, routing_key=SUBJECT1) as tmp_queue1, \ - TemporaryQueue("MyTestQueue", exchange_name=tmp_exchange.address, routing_key=SUBJECT2) as tmp_queue2: + with TemporaryQueue("MyTestQueue", exchange=tmp_exchange.address, routing_key=SUBJECT1) as tmp_queue1, \ + TemporaryQueue("MyTestQueue", exchange=tmp_exchange.address, routing_key=SUBJECT2) as tmp_queue2: # create a normal To/FromBus on this tmp_queue NUM_MESSAGES_TO_SEND = 3 @@ -262,8 +261,8 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): tobus.send(original_msg) # ...receive the message... - received_msg1 = frombus1.receive(timeout=1) - received_msg2 = frombus2.receive(timeout=1) + received_msg1 = frombus1.receive(timeout=0.1) + received_msg2 = frombus2.receive(timeout=0.1) self.assertIsNotNone(received_msg1) self.assertIsNone(received_msg2) logger.info("received message: %s", received_msg1) @@ -279,8 +278,8 @@ class TestTemporaryExchangeAndQueue(unittest.TestCase): tobus.send(original_msg) # ... and try to receive it - received_msg1 = frombus1.receive(timeout=1) - received_msg2 = frombus2.receive(timeout=1) + received_msg1 = frombus1.receive(timeout=0.1) + received_msg2 = frombus2.receive(timeout=0.1) self.assertIsNone(received_msg1) self.assertIsNotNone(received_msg2) logger.info("received message: %s", received_msg2) @@ -305,17 +304,17 @@ class FromBusInitFailed(unittest.TestCase): def test_no_broker_address(self): """ - Connecting to non-existent broker address must raise MessagingError + Connecting to non-existent broker address must raise MessageBusError """ - with self.assertRaisesRegex(MessagingError, ".*failed to resolve broker hostname"): + with self.assertRaisesRegex(MessageBusError, ".*failed to resolve broker hostname"): with FromBus(self.test_queue.address, broker="foo.bar"): pass def test_connection_refused(self): """ - Connecting to broker on wrong port must raise MessagingError + Connecting to broker on wrong port must raise MessageBusError """ - with self.assertRaisesRegex(MessagingError, ".*failed to resolve broker hostname"): + with self.assertRaisesRegex(MessageBusError, ".*failed to resolve broker hostname"): with FromBus("fake" + self.test_queue.address, broker="localhost:4"): pass @@ -335,6 +334,13 @@ class FromBusInContext(unittest.TestCase): with FromBus(self.test_queue.address) as frombus: self.assertTrue(frombus._receiver is not None) + def test_connect_fails(self): + random_non_existing_address = str(uuid.uuid4()) + + with self.assertRaisesRegex(MessageBusError, ".*failed*"): + with FromBus(random_non_existing_address) as frombus: + self.assertTrue(frombus._receiver is not None) + def test_receive_timeout(self): """ Getting a message when there's none must yield None after timeout. @@ -357,17 +363,17 @@ class ToBusInitFailed(unittest.TestCase): def test_no_broker_address(self): """ - Connecting to non-existent broker address must raise MessagingError + Connecting to non-existent broker address must raise MessageBusError """ - with self.assertRaisesRegex(MessagingError, ".*failed to resolve broker hostname"): + with self.assertRaisesRegex(MessageBusError, ".*failed to resolve broker hostname"): with ToBus(self.test_queue.address, broker="foo.bar"): pass def test_connection_refused(self): """ - Connecting to broker on wrong port must raise MessagingError + Connecting to broker on wrong port must raise MessageBusError """ - with self.assertRaisesRegex(MessagingError, ".*failed to resolve broker hostname"): + with self.assertRaisesRegex(MessageBusError, ".*failed to resolve broker hostname"): with ToBus(self.test_queue.address, broker="localhost:4"): pass @@ -474,20 +480,51 @@ class SendReceiveMessage(unittest.TestCase): self.assertEqual(content, recv_msg.content) -class PingPongPlayer(AbstractBusListener): +class PingPongPlayer(BusListener): """ Helper class with a simple purpose: - listen on one queue, - - when receiving a message, send answer on second queue, flipping message contents between ping and pong. - This tests the AbstractBusListener's behaviour, and tests if the underlying messaging lib can cope with multithreading. + - when receiving a message, send answer on exchange, flipping message contents between ping and pong. + + This is NOT the intended way of using the BusListener and AbstractMessageHandler... This weird construct is + used to test the multi-threaded BusListener's behaviour, and tests if the underlying messaging lib can cope with multithreading. """ + + class Handler(AbstractMessageHandler): + def __init__(self, player, opponent_name): + self.player = player + self.opponent_name = opponent_name + + def handle_message(self, msg): + """Implementation of BusListener.handle_message + log received message, and send a response message to the pingpong_table_exchange where it will be routed to the opponent's queue, + flipping ping for pong and vice versa + """ + logger.info("%s: received %s on %s", self.player.name, msg.content, self.player.address) + + response_msg = EventMessage(content="ping" if msg.content == "pong" else "pong", + subject=self.opponent_name) + + logger.info("%s: sending %s to %s", self.player.name, response_msg.content, self.player.response_bus.address) + + # do not lock around the player's response_bus to test internal thread safety + self.player.response_bus.send(response_msg) + + with self.player.lock: # do lock 'normal' assignement of variables + self.player.num_turns += 1 + + return True + + def __init__(self, name, opponent_name, pingpong_table_exchange, num_threads): self.name = name self.opponent_name = opponent_name self.num_turns = 0 self.response_bus = ToBus(pingpong_table_exchange) self.lock = Lock() # a lock to keep track of self.num_turns in a multithreaded environment - super(PingPongPlayer, self).__init__(exchange_name=pingpong_table_exchange, + super(PingPongPlayer, self).__init__(handler_type=PingPongPlayer.Handler, + handler_kwargs={'player': self, 'opponent_name': opponent_name}, + exchange=pingpong_table_exchange, routing_key=self.name, num_threads=num_threads) @@ -503,30 +540,9 @@ class PingPongPlayer(AbstractBusListener): with self.lock: return self.num_turns - def _handleMessage(self, msg): - """Implementation of AbstractBusListener._handleMessage - log received message, and send response. - """ - logger.info("%s: received %s on %s", self.name, msg.content, self.queue_name) - self.send_response(msg.content) - - def send_response(self, value): - """ - Send a response message to the pingpong_table_exchange where it will be routed to the opponent's queue, - flipping ping for pong and vice versa - """ - response_msg = EventMessage(content="ping" if value == "pong" else "pong", - subject=self.opponent_name) - - logger.info("%s: sending %s to %s", self.name, response_msg.content, self.response_bus.address) - self.response_bus.send(response_msg) - - with self.lock: - self.num_turns += 1 - class PingPongTester(unittest.TestCase): """Test an event driven message ping/pong game, where two 'players' respond to each other. - This test should work regardless of the number of threads the each 'player'/AbstractBusListener uses""" + This test should work regardless of the number of threads the each 'player'/BusListener uses""" def test_single_thread_per_player(self): self._play(1) @@ -575,7 +591,7 @@ class PingPongTester(unittest.TestCase): if player1_num_turns >= NUM_TURNS and player2_num_turns >= NUM_TURNS : break - sleep(0.01) + sleep(0.1) # assert on players who did not finish the game self.assertGreaterEqual(player1.get_num_turns(), NUM_TURNS) @@ -585,7 +601,18 @@ class PingPongTester(unittest.TestCase): player1_num_turns, NUM_TURNS, player2_num_turns, NUM_TURNS, num_threads_per_player, 2*NUM_TURNS/(datetime.utcnow() - start_timestamp).total_seconds()) +def load_tests(loader, tests, ignore): + """add the doctests from lofar.messaging.messagebus to the unittest tests""" + import doctest + import lofar.messaging.messagebus + tests.addTests(doctest.DocTestSuite(lofar.messaging.messagebus)) + return tests + if __name__ == '__main__': - logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.DEBUG) - unittest.main() + logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.INFO) + + import tracemalloc + tracemalloc.start() + + unittest.main(defaultTest='PingPongTester.test_two_threads_per_player') diff --git a/LCS/Messaging/python/messaging/test/t_service_message_handler.py b/LCS/Messaging/python/messaging/test/t_service_message_handler.py index 3ef87438297..906f6b2fea0 100644 --- a/LCS/Messaging/python/messaging/test/t_service_message_handler.py +++ b/LCS/Messaging/python/messaging/test/t_service_message_handler.py @@ -5,10 +5,14 @@ It defines 5 functions and first calls those functions directly to check that the functions are OK. Next the same tests are done with the RPC and Service classes in between. This should give the same results. """ -import logging import sys import time -from lofar.messaging import * +from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor +from lofar.messaging.Service import ServiceMessageHandler, Service +from lofar.messaging.RPC import RPC + +import logging +logger = logging.getLogger(__name__) class UserException(Exception): "Always thrown in one of the functions" @@ -34,16 +38,16 @@ def StringFunc(input_value): raise InvalidArgType("Input value must be of the type 'string'") return input_value.upper() -class OnlyMessageHandling(MessageHandlerInterface): +class OnlyMessageHandling(ServiceMessageHandler): def __init__(self, **kwargs): - MessageHandlerInterface.__init__(self) + ServiceMessageHandler.__init__(self) logger.info("Creation of OnlyMessageHandling class: %s" % kwargs) self.handle_message = kwargs.pop("function") self.args = kwargs -class FullMessageHandling(MessageHandlerInterface): +class FullMessageHandling(ServiceMessageHandler): def __init__(self, **kwargs): - MessageHandlerInterface.__init__(self) + ServiceMessageHandler.__init__(self) logger.info("Creation of FullMessageHandling class: %s" % kwargs) self.handle_message = kwargs.pop("function") self.args = kwargs @@ -56,9 +60,9 @@ class FullMessageHandling(MessageHandlerInterface): def finalize_loop(self): logger.info("FullMessageHandling finalize_loop: %s" % self.args) -class FailingMessageHandling(MessageHandlerInterface): +class FailingMessageHandling(ServiceMessageHandler): def __init__(self, **kwargs): - MessageHandlerInterface.__init__(self) + ServiceMessageHandler.__init__(self) logger.info("Creation of FailingMessageHandling class: %s" % kwargs) self.handle_message = kwargs.pop("function") self.args = kwargs -- GitLab