From 0fa632221ec3788f9c26ebe4b38bfd16a6f46a58 Mon Sep 17 00:00:00 2001
From: Jorrit Schaap <schaap@astron.nl>
Date: Thu, 16 May 2019 14:16:20 +0000
Subject: [PATCH] SW-699: major rewrite of FromBus/ToBus, AbstractBusListener
 and RPC/Service based on kombu/rabbitmq. This allows us to use a single lofar
 exchange (multiple exchanges are still allowed), and using auto-generated
 designated queues with subject based routin_keys.

---
 LCS/Messaging/python/messaging/RPC.py         | 107 +--
 LCS/Messaging/python/messaging/Service.py     | 183 ++--
 LCS/Messaging/python/messaging/__init__.py    |  11 +-
 LCS/Messaging/python/messaging/messagebus.py  | 885 ++++++++++--------
 LCS/Messaging/python/messaging/messages.py    | 328 ++-----
 LCS/Messaging/python/messaging/test/t_RPC.py  | 212 +++--
 .../python/messaging/test/t_messagebus.py     | 287 +++---
 .../python/messaging/test/t_messages.py       | 236 +----
 .../test/t_service_message_handler.py         | 156 +--
 9 files changed, 1048 insertions(+), 1357 deletions(-)

diff --git a/LCS/Messaging/python/messaging/RPC.py b/LCS/Messaging/python/messaging/RPC.py
index 25eff1ee0d1..bf662fc4294 100644
--- a/LCS/Messaging/python/messaging/RPC.py
+++ b/LCS/Messaging/python/messaging/RPC.py
@@ -20,9 +20,8 @@
 #
 
 #  RPC invocation with possible timeout
-from .messagebus import ToBus, TemporaryQueue, DEFAULT_BROKER, DEFAULT_BUSNAME
+from .messagebus import ToBus, TemporaryQueue, DEFAULT_BROKER, DEFAULT_TIMEOUT
 from .messages import RequestMessage, ReplyMessage
-import uuid
 import logging
 
 logger = logging.getLogger(__name__)
@@ -68,7 +67,7 @@ class RPCTimeoutException(RPCException):
 
 class RPC():
     """
-    This class provides an easy way to invoke a Remote Rrocedure Call to a
+    This class provides an easy way to invoke a Remote Procedure Call to a
     Services on the message bus.
 
     Note that most methods require that the RPC object is used *inside* a
@@ -79,43 +78,25 @@ class RPC():
     As a side-effect the sender and session are destroyed.
 
     """
-    def __init__(self, service, broker=None, **kwargs ):
-        """
-        Initialize an Remote procedure call using:
-            service= <str>    Service Name
-            busname= <str>    Bus Name
-            broker= <str>     qpid broker, default None which is localhost
-            timeout= <float>  Time to wait in seconds before the call is considered a failure.
-            Verbose= <bool>   If True output extra logging to stdout.
-
-        Use with extra care: ForwardExceptions= <bool>
-            This enables forwarding exceptions from the server side tobe raised at the client side durting RPC invocation.
-        """
-        self.timeout           = kwargs.pop("timeout", None)
-        self.ForwardExceptions = kwargs.pop("ForwardExceptions", False)
-        self.Verbose           = kwargs.pop("Verbose", False)
-        self.BusName           = kwargs.pop("busname", DEFAULT_BUSNAME)
-        self.ServiceName       = service
-        self.broker            = broker if broker else DEFAULT_BROKER
-
-        self.request_sender    = ToBus(address=self.BusName, subject=self.ServiceName, broker=self.broker)
-
-        if len(kwargs):
-            raise AttributeError("Unexpected argument passed to RPC class: %s" %( kwargs ))
+    def __init__(self, service_name: str, busname: str, timeout: int=DEFAULT_TIMEOUT, broker: str=DEFAULT_BROKER):
+        self._service_name     = service_name
+        self._timeout          = timeout
+        self._broker           = broker
+        self._request_sender   = ToBus(busname, subject_based_routing=True, broker=self._broker)
 
     def open(self):
         """
         Start accepting requests.
         """
 
-        self.request_sender.open()
+        self._request_sender.open()
 
     def close(self):
         """
         Stop accepting requests.
         """
 
-        self.request_sender.close()
+        self._request_sender.close()
 
     def __enter__(self):
         """
@@ -144,29 +125,37 @@ class RPC():
 
     def execute(self, *args, **kwargs):
         '''execute the remote procedure call'''
-        timeout = kwargs.pop("timeout", self.timeout)
+        timeout = kwargs.pop("timeout", self._timeout)
         Content = _args_as_content(*args, **kwargs)
         HasArgs, HasKwArgs = _analyze_args(args, kwargs)
 
-        logger.debug("executing rpc call to address=%s subject=%s with timeout=%s",
-                     self.request_sender.address,
-                     self.request_sender.subject,
+        logger.debug("executing rpc call to address=%s subject/service=%s with timeout=%s",
+                     self._request_sender.address,
+                     self._service_name,
                      timeout)
 
-        tmp_queue_postfix = self.request_sender.address
-        with TemporaryQueue(name=tmp_queue_postfix, broker=self.broker) as tmp_queue:
-            with tmp_queue.create_frombus(connection_log_level=logging.DEBUG) as reply_receiver:
-                request_msg = RequestMessage(content=Content, reply_to=reply_receiver.address,
+        with TemporaryQueue(name_prefix=self._service_name + "-reply",
+                            broker=self._broker) as tmp_reply_queue:
+            with tmp_reply_queue.create_frombus() as reply_receiver:
+                request_msg = RequestMessage(content=Content,
+                                             reply_to=reply_receiver.address,
+                                             subject=self._service_name,
                                              has_args=HasArgs, has_kwargs=HasKwArgs)
                 if timeout:
                     request_msg.ttl = timeout
 
-                self.request_sender.send(request_msg)
+                self._request_sender.send(request_msg)
+
+                logger.debug("executed rpc call to address=%s subject/service=%s waiting for answer on %s",
+                             self._request_sender.address,
+                             self._service_name,
+                             reply_receiver.address)
+
                 answer = reply_receiver.receive(timeout)
 
-                logger.debug("executed rpc call to address=%s subject=%s received answer on %s",
-                             self.request_sender.address,
-                             self.request_sender.subject,
+                logger.debug("executed rpc call to address=%s subject/service=%s received answer on %s",
+                             self._request_sender.address,
+                             self._service_name,
                              reply_receiver.address)
 
         status = {}
@@ -174,8 +163,8 @@ class RPC():
         if answer is None:
             status["state"] = "TIMEOUT"
             status["errmsg"] = "RPC Timed out with call to service_bus: %s subject: %s, receiving on tmp_queue: %s request_msg: %s" % (
-                self.request_sender.address,
-                self.request_sender.subject,
+                self._request_sender.address,
+                self._request_sender.subject,
                 reply_receiver.address,
                 request_msg)
             status["backtrace"] = ""
@@ -191,29 +180,9 @@ class RPC():
 
         # return content and status if status is 'OK'
         if (answer.status == "OK"):
-            return (answer.body, answer.status)
+            return (answer.content, answer.status)
 
-        # Compile error handling from status
-        try:
-            status["state"] = answer.status
-            status["errmsg"] = answer.errmsg
-            status["backtrace"] = answer.backtrace
-        except Exception as e:
-            status["state"] = "ERROR"
-            status["errmsg"] = "Return state in message not found"
-            status["backtrace"] = ""
-            raise RPCException(status)
-
-        # Does the client expect us to throw the exception?
-        if self.ForwardExceptions is True:
-            excep_mod = __import__("builtins")
-            excep_class_ = getattr(excep_mod, answer.errmsg.split(':')[0], None)
-            if (excep_class_ != None):
-                instance = excep_class_("%s%s" % (answer.errmsg.split(':',1)[1].strip(), answer.backtrace))
-                raise (instance)
-            else:
-                raise RPCException(answer.errmsg)
-        return (None, status)
+        raise RPCException(answer.errmsg)
 
 class RPCWrapper(object):
     """
@@ -315,7 +284,7 @@ class RPCWrapper(object):
     def close(self):
         '''Close all opened rpc connections'''
         for rpc in list(self._serviceRPCs.values()):
-            logger.info('closing rpc connection %s at %s', rpc.request_sender.address, rpc.broker)
+            logger.info('closing rpc connection %s at %s', rpc._request_sender.address, rpc._broker)
             rpc.close()
 
     def __enter__(self):
@@ -331,10 +300,6 @@ class RPCWrapper(object):
     def rpc(self, method=None, *args, **kwargs):
         '''execute the rpc call on the <bus>/<service>.<method> and return the result'''
         try:
-            if self.timeout:
-                rpckwargs = {'timeout': self.timeout,
-                             'Verbose': self.verbose}
-
             service_method = (self.servicename + '.' + method) if self.servicename and method \
                                 else self.servicename if self.servicename else method
 
@@ -342,8 +307,8 @@ class RPCWrapper(object):
             if service_method not in self._serviceRPCs:
                 # not in cache
                 # so, create RPC for this service method, open it, and cache it
-                rpc = RPC(service_method, busname=self.busname, broker=self.broker, ForwardExceptions=True, **rpckwargs)
-                logger.info('opening rpc connection method=%s address=%s broker=%s', service_method, rpc.request_sender.address, rpc.broker)
+                rpc = RPC(service_method, busname=self.busname, broker=self.broker, timeout=self.timeout)
+                logger.info('opening rpc connection method=%s address=%s broker=%s', service_method, rpc._request_sender.address, rpc._broker)
                 rpc.open()
                 self._serviceRPCs[service_method] = rpc
 
diff --git a/LCS/Messaging/python/messaging/Service.py b/LCS/Messaging/python/messaging/Service.py
index 288bf51a000..a53fe389b8f 100644
--- a/LCS/Messaging/python/messaging/Service.py
+++ b/LCS/Messaging/python/messaging/Service.py
@@ -20,12 +20,9 @@
 # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
 #
 
-from .messagebus import ToBus, AbstractBusListener, DEFAULT_BUSNAME
+from .messagebus import ToBus, AbstractBusListener, DEFAULT_BROKER
 from .messages import ReplyMessage, RequestMessage
-from .exceptions import MessageBusError, MessageFactoryError
-import threading
-import time
-import uuid
+from .exceptions import MessagingError
 import sys
 import traceback
 import logging
@@ -91,147 +88,118 @@ class Service(AbstractBusListener):
        options <dict>          For the QPID connection
        exclusive <bool>        To create eclusive access to this messagebus. Default:True
        numthreads <int>        Amount of threads processing messages. Default:1
-       parsefullmessage <bool> Pass full message of only message content to the service handler. Default:False.
        verbose <bool>          Show debug text. Default:False
        handler_args <dict>     Arguments that are passed to the constructor of the servicehandler is case the servicehandler
                                is a class in stead of a function.
     """
 
-    def __init__(self, servicename, servicehandler, broker=None, **kwargs):
+    def __init__(self,
+                 service_name: str,
+                 service_handler: MessageHandlerInterface,
+                 exchange_name: str,
+                 num_threads: int = 1,
+                 broker: str = DEFAULT_BROKER,
+                 **kwargs):
         """
         Initialize Service object with servicename (str) and servicehandler function.
         additional parameters:
-            sevicename          = <string> Name of the service to call
+            busname             = <string> Name of the bus in case exchanges are used in stead of queues
             exclusive           = <bool>   Create an exclusive binding so no other services can consume duplicate messages (default: True)
             numthreads          = <int>    Number of parallel threads processing messages (default: 1)
             verbose             = <bool>   Output extra logging over stdout (default: False)
             use_service_methods = <bool>   Listen to <servicename>.* and map 2nd subject part to method. (default: False)
                                            Example: MyService.foo calls the method foo in the handler.
         """
-        self.service_name        = servicename
-        self.service_handler     = servicehandler
-        self.busname             = kwargs.pop("busname", DEFAULT_BUSNAME)
+        self.service_name        = service_name
         self.use_service_methods = kwargs.pop("use_service_methods", False)
-        self.parsefullmessage    = kwargs.pop("parsefullmessage", False)
         self.handler_args        = kwargs.pop("handler_args", {})
 
-        # A dedicated queue for us
-        self.address             = "%s.command.%s" % (self.busname, self.service_name)
-
-        # if the service_handler wants to map the 2nd part of the subject to a method
-        # then we need to listen to <servicename>.#
-        if self.use_service_methods:
-           subject = self.service_name + ".#"
+        # set up service_handler, either for a wrapped function, or a MessageHandlerInterface
+        if str(type(service_handler)) == "<class 'instancemethod'>" or str(type(service_handler)) == "<class 'function'>":
+            self.service_handler = MessageHandlerInterface()
+            self.service_handler.handle_message = service_handler
         else:
-           subject = self.service_name
-
-        # TODO: Create queue "address"
-        # TODO: Bind queue "address" to exchange "self.busname" with filter "subject"
+            self.service_handler = service_handler(**self.handler_args)
 
-        super(Service, self).__init__(self.address, subject=subject, broker=broker, **kwargs)
-
-    def _create_thread_args(self, index):
-        # set up service_handler
-        if str(type(self.service_handler)) == "<class 'instancemethod'>" or \
-            str(type(self.service_handler)) == "<class 'function'>":
-            thread_service_handler = MessageHandlerInterface()
-            thread_service_handler.handle_message = self.service_handler
-        else:
-            thread_service_handler = self.service_handler(**self.handler_args)
-
-        if not isinstance(thread_service_handler, MessageHandlerInterface):
+        if not isinstance(self.service_handler, MessageHandlerInterface):
             raise TypeError("Servicehandler argument must by a function or a derived class from MessageHandlerInterface.")
 
-        # add service_handler to default args for thread
-        args = super(Service, self)._create_thread_args(index)
-        args['service_handler'] = thread_service_handler
-        return args
+        super(Service, self).__init__(exchange_name=exchange_name,
+                                      routing_key="%s.#" % (self.service_name,),
+                                      broker=broker, num_threads=num_threads)
 
-    def _send_reply(self, replymessage, status, reply_to, errtxt="",backtrace=""):
+    def _send_reply(self, content, status, reply_to_address, errtxt="",backtrace=""):
         """
         Internal use only. Send a reply message to the RPC client including exception info.
         """
-        # Compose Reply message from reply and status.
-        if isinstance(replymessage, ReplyMessage):
-            reply_msg = replymessage
-        else:
-            reply_msg = ReplyMessage(replymessage, reply_to)
+
+        reply_msg = ReplyMessage(content=content, status=status, subject=reply_to_address)
         reply_msg.status = status
         reply_msg.errmsg = errtxt
         reply_msg.backtrace = backtrace
 
-        # show the message content if required by the verbose flag.
-        if self.verbose:
-            reply_msg.show()
-
         # send the result to the RPC client
         try:
-            with ToBus(reply_to, broker=self.broker, connection_log_level=logging.DEBUG) as dest:
-                dest.send(reply_msg)
-        except MessageBusError as e:
-            logger.error("Failed to send reply messgage to reply address %s. Error: %s", reply_to, e)
-
-    def _getServiceHandlerForCurrentThread(self):
-        currentThread = threading.currentThread()
-        args = self._threads[currentThread]
-        return args['service_handler']
+            # send the msg to the reply_to_address directly (no subject_based_routing)
+            with ToBus(reply_to_address, broker=self.broker, subject_based_routing=False) as reply_to_sender:
+                reply_to_sender.send(reply_msg)
+        except MessagingError as e:
+            logger.error("Failed to send reply messgage to %s. Error: %s", reply_to_address, e)
 
     def _onListenLoopBegin(self):
         "Called before main processing loop is entered."
-        self._getServiceHandlerForCurrentThread().prepare_loop()
+        self.service_handler.prepare_loop()
 
     def _onBeforeReceiveMessage(self):
         "Called in main processing loop just before a blocking wait for messages is done."
-        self._getServiceHandlerForCurrentThread().prepare_receive()
+        self.service_handler.prepare_receive()
 
     def _handleMessage(self, lofar_msg):
-        service_handler = self._getServiceHandlerForCurrentThread()
-
         try:
+            if not isinstance(lofar_msg, RequestMessage):
+                logger.warning("%s ignoring non-RequestMessage %s", self.service_name, lofar_msg)
+
             # determine which handler method has to be called
-            if hasattr(service_handler, 'service2MethodMap') and '.' in lofar_msg.subject:
+            if hasattr(self.service_handler, 'service2MethodMap') and '.' in lofar_msg.subject:
                 subject_parts = lofar_msg.subject.split('.')
                 method_name = subject_parts[-1]
-                if method_name in service_handler.service2MethodMap:
+                if method_name in self.service_handler.service2MethodMap:
                     # pass the handling of this message on to the specific method for this service
-                    serviceHandlerMethod = service_handler.service2MethodMap[method_name]
+                    serviceHandlerMethod = self.service_handler.service2MethodMap[method_name]
                 else:
                     raise ValueError('Unknown method %s on service %s' % (method_name, lofar_msg.subject))
             else:
-                serviceHandlerMethod = service_handler.handle_message
-
-            if self.parsefullmessage is True:
-                replymessage = serviceHandlerMethod(lofar_msg)
+                serviceHandlerMethod = self.service_handler.handle_message
+
+            # check for positional arguments and named arguments
+            # depending on presence of args and kwargs,
+            # the signature of the handler method should vary as well
+            if lofar_msg.has_args and lofar_msg.has_kwargs:
+                # both positional and named arguments
+                # rpcargs and rpckwargs are packed in the content
+                rpcargs = lofar_msg.content
+
+                # rpckwargs is the last argument in the content
+                # rpcargs is the rest in front
+                rpckwargs = rpcargs[-1]
+                del rpcargs[-1]
+                rpcargs = tuple(rpcargs)
+                replymessage = serviceHandlerMethod(*rpcargs, **rpckwargs)
+            elif lofar_msg.has_args:
+                # only positional arguments
+                # msg.content should be a list
+                rpcargs = tuple(lofar_msg.content)
+                replymessage = serviceHandlerMethod(*rpcargs)
+            elif lofar_msg.has_kwargs:
+                # only named arguments
+                # msg.content should be a dict
+                rpckwargs = lofar_msg.content
+                replymessage = serviceHandlerMethod(**rpckwargs)
+            elif lofar_msg.content:
+                rpccontent = lofar_msg.content
+                replymessage = serviceHandlerMethod(rpccontent)
             else:
-                # check for positional arguments and named arguments
-                # depending on presence of args and kwargs,
-                # the signature of the handler method should vary as well
-                if lofar_msg.has_args and lofar_msg.has_kwargs:
-                    # both positional and named arguments
-                    # rpcargs and rpckwargs are packed in the content
-                    rpcargs = lofar_msg.body
-
-                    # rpckwargs is the last argument in the content
-                    # rpcargs is the rest in front
-                    rpckwargs = rpcargs[-1]
-                    del rpcargs[-1]
-                    rpcargs = tuple(rpcargs)
-                    replymessage = serviceHandlerMethod(*rpcargs, **rpckwargs)
-                elif lofar_msg.has_args:
-                    # only positional arguments
-                    # msg.body should be a list
-                    rpcargs = tuple(lofar_msg.body)
-                    replymessage = serviceHandlerMethod(*rpcargs)
-                elif lofar_msg.has_kwargs:
-                    # only named arguments
-                    # msg.body should be a dict
-                    rpckwargs = lofar_msg.body
-                    replymessage = serviceHandlerMethod(**rpckwargs)
-                elif lofar_msg.body:
-                    rpccontent = lofar_msg.body
-                    replymessage = serviceHandlerMethod(rpccontent)
-                else:
-                    replymessage = serviceHandlerMethod()
+                replymessage = serviceHandlerMethod()
 
             #TODO: check for timeout and/or presence of response queue!
             self._send_reply(replymessage,"OK",lofar_msg.reply_to)
@@ -239,31 +207,30 @@ class Service(AbstractBusListener):
         except Exception as e:
             # Any thrown exceptions either Service exception or unhandled exception
             # during the execution of the service handler is caught here.
-            self._debug("handling exception")
+            logger.debug("handling exception")
             exc_info = sys.exc_info()
             status="ERROR"
             rawbacktrace = traceback.format_exception(*exc_info)
             errtxt = rawbacktrace[-1]
-            self._debug(rawbacktrace)
+            logger.debug(rawbacktrace)
             # cleanup the backtrace print by removing the first two lines and the last
             del rawbacktrace[1]
             del rawbacktrace[0]
             del rawbacktrace[-1]
-            backtrace = ''.join(rawbacktrace).encode('latin-1').decode('unicode_escape')
-            logger.error("exception while handling message: %s\n%s" % (errtxt, backtrace))
-            if self.verbose:
-                logger.error("[Service:] Status: %s", str(status))
-                logger.error("[Service:] ERRTXT: %s", str(errtxt))
-                logger.error("[Service:] BackTrace: %s", str( backtrace ))
+            backtrace = ''.join(rawbacktrace).encode('latin-1').decode('unicode_escape').rstrip()
+            logger.error("exception while handling message: %s %s" % (errtxt, backtrace))
+            logger.debug("[Service:] Status: %s", str(status))
+            logger.debug("[Service:] ERRTXT: %s", str(errtxt))
+            logger.debug("[Service:] BackTrace: %s", str( backtrace ))
             self._send_reply(None, status, lofar_msg.reply_to, errtxt=errtxt, backtrace=backtrace)
 
     def _onAfterReceiveMessage(self, successful):
         "Called in the main loop after the result was send back to the requester."
         "@successful@ reflects the state of the handling: true/false"
-        self._getServiceHandlerForCurrentThread().finalize_handling(successful)
+        self.service_handler.finalize_handling(successful)
 
     def _onListenLoopEnd(self):
         "Called after main processing loop is finished."
-        self._getServiceHandlerForCurrentThread().finalize_loop()
+        self.service_handler.finalize_loop()
 
 __all__ = ["Service", "MessageHandlerInterface"]
diff --git a/LCS/Messaging/python/messaging/__init__.py b/LCS/Messaging/python/messaging/__init__.py
index 5a0ec953fee..e0535559e7b 100644
--- a/LCS/Messaging/python/messaging/__init__.py
+++ b/LCS/Messaging/python/messaging/__init__.py
@@ -24,16 +24,25 @@
 Module initialization file.
 """
 
-from .config import *
 from .exceptions import *
 from .messages import *
 from .messagebus import *
 from .RPC import *
 from .Service import *
 import logging
+from lofar.common import isProductionEnvironment, isTestEnvironment
 
 def setQpidLogLevel(qpidLogLevel):
     for name, logger in list(logging.Logger.manager.loggerDict.items()):
         if name.startswith('qpid.') and isinstance(logger, logging.Logger):
             logger.setLevel(qpidLogLevel)
 
+def adaptNameToEnvironment(name):
+    if isProductionEnvironment():
+        return name #return original name only for PRODUCTION LOFARENV
+
+    if isTestEnvironment():
+        return 'test.%s' % name #return 'test.' prefixed name only for TEST LOFARENV
+
+    # in all other cases prefix queue/bus name with 'devel.'
+    return 'devel.%s' % name
diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py
index fd05be61aea..caec8dad3cc 100644
--- a/LCS/Messaging/python/messaging/messagebus.py
+++ b/LCS/Messaging/python/messaging/messagebus.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env python3
 
 # messagebus.py: Provide an easy way exchange messages on the message bus.
 #
@@ -25,40 +24,171 @@
 """
 Provide an easy way exchange messages on the message bus.
 """
-
-from lofar.messaging.exceptions import MessageBusError, MessageFactoryError
-from lofar.messaging.messages import to_qpid_message, MESSAGE_FACTORY
-from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME
-from lofar.common.util import raise_exception, is_iterable
-from lofar.common.datetimeutils import to_milliseconds_since_unix_epoch, from_milliseconds_since_unix_epoch
+from amqp.exceptions import NotFound
+from lofar.messaging.exceptions import MessagingError
+from lofar.messaging.messages import convert_to_lofar_message
+from lofar.common.util import raise_exception
 from lofar.common import isProductionEnvironment, isTestEnvironment
+from lofar.common.threading_utils import TimeoutLock
+from lofar.common.util import program_name
 
-import proton
-import proton.utils
-import proton.reactor
+import kombu
+import kombu.exceptions
+import amqp.exceptions
 import logging
 import uuid
 import threading
-from datetime import datetime, timedelta
-from copy import deepcopy
-from time import sleep
+from datetime import datetime
+from queue import Empty as EmptyQueueError
 
 logger = logging.getLogger(__name__)
 
-DEFAULT_RECEIVER_CAPACITY = 1
+from .config import DEFAULT_BROKER, DEFAULT_BUSNAME
 DEFAULT_TIMEOUT = 5
 
-class _ProtonSubjectFilter(proton.reactor.Filter):
+# some serializers are considered 'insecure', but we know better ;)
+# so enable the python pickle serializer
+kombu.enable_insecure_serializers(['pickle'])
+
+# make default kombu/amqp logger less spammy
+logging.getLogger("amqp").setLevel(logging.INFO)
+
+
+def create_exchange(name: str, durable: bool=True, broker: str=DEFAULT_BROKER) -> bool:
     """
-    helper class for filtering by subject
+    create a message exchange of type topic with the given name on the given broker
+    :param name: the name for the exchange
+    :param durable: if True, then the exchange 'survives' broker restarts
+    :param broker: a message broker address
+    :raises: MessagingError if the exchange could not be created
+    :return True if created, False if not-created because it already exists
     """
-    def __init__(self, value):
-        filter_dict = { proton.symbol('subject-filter'):
-                            proton.Described(proton.symbol('apache.org:legacy-amqp-topic-binding:string'), value) }
-        super(_ProtonSubjectFilter, self).__init__(filter_dict)
+    try:
+        with kombu.Connection(hostname=broker) as connection:
+            exchange = kombu.Exchange(name, durable=durable, type='topic')
+            try:
+                exchange.declare(channel=connection.default_channel, passive=True)
+            except amqp.exceptions.NotFound:
+                exchange.declare(channel=connection.default_channel)
+                logger.debug("Created exchange %s on broker %s", name, broker)
+                return True
+        return False
+    except Exception as e:
+        raise MessagingError("Could not create exchange %s on broker %s error=%s" % (name, broker, e))
+
+def delete_exchange(name: str, broker: str=DEFAULT_BROKER) -> bool:
+    """
+    delete the exchange with the given name on the given broker
+    :param name: the name for the exchange
+    :param broker: a message broker address
+    :raises: MessagingError if the exchange could not be deleted
+    :return True if deleted, False if not-deleted because it does not exist
+    """
+    try:
+        with kombu.Connection(hostname=broker) as connection:
+            exchange = kombu.Exchange(name, channel=connection)
+            try:
+                exchange.declare(channel=connection.default_channel, passive=True)
+                exchange.delete()
+                logger.debug("Deleted exchange %s on broker %s", name, broker)
+                return True
+            except amqp.exceptions.NotFound:
+                return False
+    except Exception as e:
+        raise MessagingError("Could not delete exchange %s on broker %s error=%s" % (name, broker, e))
 
 
-class _AbstractBus():
+def create_queue(name: str, durable: bool=True, broker: str=DEFAULT_BROKER) -> bool:
+    """
+    create a message queue with the given name on the given broker
+    :param name: the name for the queue
+    :param durable: if True, then the queue 'survives' broker restarts
+    :param broker: a message broker address
+    :raises: MessagingError if the queue could not be created
+    :return True if created, False if not-created because it already exists
+    """
+    try:
+        with kombu.Connection(hostname=broker) as connection:
+            queue = kombu.Queue(name, durable=durable)
+            try:
+                queue.queue_declare(channel=connection.default_channel, passive=True)
+            except amqp.exceptions.NotFound:
+                queue.queue_declare(channel=connection.default_channel)
+                logger.debug("Created queue %s on broker %s", name, broker)
+                return True
+        return False
+    except Exception as e:
+        raise MessagingError("Could not create queue %s on broker %s error=%s" % (name, broker, e))
+
+def delete_queue(name: str, broker: str=DEFAULT_BROKER) -> bool:
+    """
+    delete the message queue with the given name on the given broker (any messages in the queue are discarded)
+    :param name: the name for the queue
+    :param broker: a message broker address
+    :raises: MessagingError if the queue could not be deleted
+    :return True if deleted, False if not-deleted because it does not exist
+    """
+    try:
+        with kombu.Connection(hostname=broker) as connection:
+            queue = kombu.Queue(name, no_declare=True, channel=connection)
+            try:
+                queue.queue_declare(channel=connection.default_channel, passive=True)
+                queue.delete(if_unused=False, if_empty=False)
+                logger.debug("Deleted queue %s on broker %s", name, broker)
+                return True
+            except amqp.exceptions.NotFound:
+                return False
+    except Exception as e:
+        raise MessagingError("Could not delete queue %s on broker %s error=%s" % (name, broker, e))
+
+
+def create_binding(exchange_name: str, queue_name: str, routing_key: str='#', durable: bool=True, broker: str=DEFAULT_BROKER):
+    """
+    create a binding between the exchange and queue, possibly filtered by the routing_key, on the given broker.
+    please note that this only creates the binding, not the exchange, nor the queue. See also create_bound_queue(...)
+    :param exchange_name: the name for the exchange
+    :param queue_name: the name for the queue
+    :param routing_key: filter only messages with the given routing_key to the queue
+    :param durable: if True, then the queue 'survives' broker restarts
+    :param broker: a message broker address
+    """
+    try:
+        with kombu.Connection(hostname=broker) as connection:
+            exchange = kombu.Exchange(exchange_name, durable=durable, type='topic', no_declare=True)
+            queue = kombu.Queue(queue_name, exchange=exchange, routing_key=routing_key, durable=durable, no_declare=True)
+            queue.queue_bind(channel=connection.default_channel)
+            logger.debug("Added binding from exchange %s to queue %s with routing_key %s on broker %s", exchange_name,
+                                                                                                        queue_name,
+                                                                                                        routing_key,
+                                                                                                        broker)
+            return True
+    except amqp.exceptions.NotFound as e:
+        raise MessagingError("Could not create binding from exchange %s to queue %s with routing_key %s " \
+                " on broker %s because either the exchange or queue does not exist. error=%s" % (exchange_name,
+                                                                                                 queue_name,
+                                                                                                 routing_key,
+                                                                                                 broker,
+                                                                                                 e))
+    except Exception as e:
+        raise MessagingError("Could not create binding from exchange %s to queue %s with routing_key %s " \
+                             " on broker %s error=%s" % (exchange_name, queue_name, routing_key, broker, e))
+
+def create_bound_queue(exchange_name, queue_name, routing_key='#', durable=True, broker=DEFAULT_BROKER):
+    """
+    create a exchange (if needed), queue (if needed), and the in-between binding, possibly filtered by the routing_key, on the given broker.
+    :param exchange_name: the name for the exchange
+    :param queue_name: the name for the queue
+    :param routing_key: filter only messages with the given routing_key to the queue
+    :param durable: if True, then the queue 'survives' broker restarts
+    :param broker: a message broker address
+    """
+    create_exchange(exchange_name, durable=durable, broker=broker)
+    create_queue(queue_name, durable=durable, broker=broker)
+    create_binding(exchange_name, queue_name, routing_key, durable=durable, broker=broker)
+
+
+
+class _AbstractBus:
     """
     Common class for ToBus and Frombus, providing an easy way to connectto the qpid message bus.
     Note that most methods require that a FromBus/ToBus object is used *inside* a
@@ -74,18 +204,27 @@ class _AbstractBus():
     but that of __new__().
     """
 
-    def __init__(self, address, subject=None, broker=None, connection_log_level=logging.INFO, auto_reconnect=True):
+    def __init__(self, address, broker=None):
         """
         Initializer.
         :param address: valid Qpid address
         :param broker: valid Qpid broker URL, e.g. "localhost:5672"
         """
-        self.address = address
-        self.subject = subject
+        self.address, self.subject = _AbstractBus._split_address_and_subject(address)
         self.broker = broker if broker else DEFAULT_BROKER
         self._connected = False
-        self._connection_log_level = connection_log_level
-        self._auto_reconnect = auto_reconnect
+        self._lock = TimeoutLock()
+
+    @staticmethod
+    def _split_address_and_subject(address):
+        """
+        :return: Tuple of real address and (optional) subject
+        """
+        # parse/split address into queue/exchange name and (optional) subject
+        if address and '/' in address:
+            return address.split('/')
+
+        return address, None
 
     def is_connected(self):
         return self._connected
@@ -101,98 +240,67 @@ class _AbstractBus():
         * add a receiver
         The connection to the broker will be closed if any of these failed.
         :param retry_on_connection_error: If True then keep on trying to open the connection.
-        :raise MessageBusError: if any of the above actions failed.
+        :raise MessagingError: if any of the above actions failed.
         :return: self
         """
-        if self._connected:
-            return
-
         try:
-            logger.debug("[%s] Connecting to broker: %s", self.__class__.__name__, self.broker)
-            self.connection = proton.utils.BlockingConnection(self.broker)
-            logger.debug("[%s] Connected to broker: %s", self.__class__.__name__, self.broker)
+            with self._lock:
+                if self._connected:
+                    return
+
+                logger.debug("[%s] Connecting to broker: %s", self.__class__.__name__, self.broker)
+                self.connection = kombu.Connection(hostname=self.broker)
+                self.connection.connect()
+                logger.debug("[%s] Connected to broker: %s", self.__class__.__name__, self.broker)
 
-            # let the subclass (FromBus or ToBus) create a receiver of sender
-            self._connect_to_endpoint()
+                # let the subclass (FromBus or ToBus) create a receiver of sender
+                self._connect_to_endpoint()
 
-            self._connected = True
+                self._connected = True
 
-        except proton.ProtonException as ex:
+        except Exception as ex:
             error_msg = "[%s] Connecting to %s at broker %s failed: %s" % (self.__class__.__name__,
                                                                            self.address,
                                                                            self.broker,
                                                                            ex)
-            logger.exception(error_msg)
-            if self._auto_reconnect and retry_on_connection_error and isinstance(ex, proton.ConnectionException):
-                self.reconnect(num_retries=None, retry_wait_time=10, timeout=None)
+            if isinstance(ex, NotFound):
+                logging.error(error_msg)
             else:
-                raise MessageBusError(error_msg)
+                logging.exception(error_msg)
+
+            raise MessagingError(error_msg)
 
     def close(self):
         """
         Disconnect from the subscribed address and close the connection.
         """
-        if not self._connected:
-            return
-
         try:
-            self._disconnect_from_endpoint()
-
-            logger.debug("[%s] Disconnecting from broker: %s", self.__class__.__name__, self.broker)
-            self.connection.close()
-            logger.debug("[%s] Disconnected from broker: %s", self.__class__.__name__, self.broker)
-        except proton.ProtonException as ex:
-            error_msg = "[%s] Connecting to %s at broker %s failed: %s" % (self.__class__.__name__,
-                                                                           self.address,
-                                                                           self.broker,
-                                                                           ex)
+            with self._lock:
+                if not self._connected:
+                    return
+
+                self._disconnect_from_endpoint()
+
+                logger.debug("[%s] Disconnecting from broker: %s", self.__class__.__name__, self.broker)
+                self.connection.close()
+                logger.debug("[%s] Disconnected from broker: %s", self.__class__.__name__, self.broker)
+        except Exception as ex:
+            if isinstance(ex, AttributeError) and 'drain_events' in str(ex):
+                # looks like a kombu bug, just continue
+                pass
+            else:
+                error_msg = "[%s] Disconnecting from %s at broker %s failed: %s" % (self.__class__.__name__,
+                                                                                    self.address,
+                                                                                    self.broker,
+                                                                                    ex)
 
-            logger.exception(error_msg)
-            raise MessageBusError(error_msg)
+                logger.exception(error_msg)
+                raise MessagingError(error_msg)
         finally:
             del self.connection
             self.connection = None
             self._connected = False
 
-    def reconnect(self, num_retries=1, retry_wait_time=10, timeout=DEFAULT_TIMEOUT):
-        """
-        (Try to) reconnect to the messagebus.
-        :param num_retries: number of times to retry the reconnect. If None, retry indefinitely or until <timeout> seconds passed.
-        :param retry_wait_time: waiting time in seconds between retry attempts
-        :param timeout: timeout in seconds. Stop attempting to reconnect after this timeout, even when num_retries is not reach yet.
-                        if timeout<0 or None, then never do timeout.
-        :return: bool indicating if (re)connect was successful
-        """
-
-        logger.info("trying to reconnect to %s", self.address)
-        retry_cnt = 0
-        reconnect_start_time = datetime.utcnow()
-        while num_retries is None or retry_cnt < num_retries:
-            try:
-                self.close()
-            except Exception as ex:
-                logger.exception(ex)
-
-            try:
-                # try to open the connection, but let this current reconnect loop handle the reconnects -> retry_on_connection_error=False
-                self.open(retry_on_connection_error=False)
-            except Exception as ex:
-                logger.exception(ex)
-
-            if self.is_connected():
-                return True
-
-            if timeout is not None and timeout >= 0:
-                if datetime.utcnow() - reconnect_start_time > timedelta(seconds=timeout):
-                    logger.error("could not reconnect to %s within %s seconds", self.address, timeout)
-                    return False
-
-            logger.info("waiting %s seconds before retrying to connect to %s", retry_wait_time, self.address)
-            sleep(retry_wait_time)
-            retry_cnt += 1
-
-        return self.is_connected()
-
     def __enter__(self):
         self.open()
         return self
@@ -225,34 +333,28 @@ class FromBus(_AbstractBus):
     but that of __new__().
     """
 
-    def __init__(self, address, subject=None, broker=None, broker_options=None, connection_log_level=logging.INFO, auto_reconnect=True):
+    def __init__(self, address, broker=None):
         """
         Initializer.
         :param address: valid Qpid address
         :param broker: valid Qpid broker URL, e.g. "localhost:5672"
-        :param broker_options: OBSOLETE
         """
-        super(FromBus, self).__init__(address=address, subject=subject, broker=broker,
-                                      connection_log_level=connection_log_level,
-                                      auto_reconnect=auto_reconnect)
+        super(FromBus, self).__init__(address=address, broker=broker)
 
-        if broker_options:
-            logger.warning("broker_options are obsolete. address=%s subject=%s broker=%s broker_options=%s",
-                           self.address, self.subject, self.broker, broker_options)
 
     def _connect_to_endpoint(self):
-        logger.debug("[FromBus] Connecting receiver to bus: %s with subject: %s on broker: %s" % (self.address,
+        logger.debug("[FromBus] Connecting receiver to: %s with subject: %s on broker: %s" % (self.address,
                                                                                                   self.subject,
                                                                                                   self.broker))
 
-        self._receiver = self.connection.create_receiver(address=self.address,
-                                                         credit=DEFAULT_RECEIVER_CAPACITY,
-                                                         options=_ProtonSubjectFilter(self.subject) if self.subject else None)
+        queue = kombu.Queue(self.address, no_declare=True)
+
+        # try to passivly declare the queue on the broker, raises if non existent
+        queue.queue_declare(passive=True, channel=self.connection.default_channel)
+
+        self._receiver = self.connection.SimpleQueue(queue)
 
-        logger.log(self._connection_log_level,
-                   "[FromBus] Connected receiver to bus: %s with subject: %s on broker: %s" % (self.address,
-                                                                                               self.subject,
-                                                                                               self.broker))
+        logger.info("[FromBus] Connected receiver to: %s on broker: %s" % (self.address, self.broker))
 
     def _disconnect_from_endpoint(self):
         if self._receiver is not None:
@@ -263,123 +365,53 @@ class FromBus(_AbstractBus):
             self._receiver.close()
             self._receiver = None
 
-            logger.log(self._connection_log_level,
-                       "[FromBus] Disconnected receiver from bus: %s with subject: %s on broker: %s" % (self.address,
-                                                                                                        self.subject,
-                                                                                                        self.broker))
+            logger.info("[FromBus] Disconnected receiver from bus: %s with subject: %s on broker: %s" % (self.address,
+                                                                                                         self.subject,
+                                                                                                         self.broker))
 
-    def receive(self, timeout=DEFAULT_TIMEOUT, logDebugMessages=False):
+    def receive(self, timeout=DEFAULT_TIMEOUT):
         """
         Receive the next message from any of the queues we're listening on.
         :param timeout: maximum time in seconds to wait for a message.
-        :param logDebugMessages: do/don't log superfluous debug messages (to reduce spam in logs)
         :return: received message, None if timeout occurred.
         """
 
-        if logDebugMessages:
-            logger.debug("[FromBus] Waiting %s seconds for next message", timeout)
-        try:
-            while True: # break when message is acceptable
-                msg = self._receiver.receive(timeout=timeout)
-                if msg is not None:
-                    if logDebugMessages:
-                        logger.debug("[FromBus] Message received on: %s subject: %s" % (self.address, msg.subject))
-                    break  # handle this message
-
-        except proton.Timeout:
-            logger.debug("[FromBus] No message received within %s seconds", timeout)
-            return None
-        except proton.ProtonException:
-            raise_exception(MessageBusError,
-                            "[FromBus] Failed to fetch message from: "
-                            "%s" % self.address)
-        except Exception as e:
-            #FIXME: what if another exception is raised? should we reconnect?
-            logger.error(e)
-            self.reject()
-            raise_exception(MessageBusError,
-                            "[FromBus] unknown exception while receiving message on %s: %s" % (self.address, e))
-
-        try:
-            # convert proton.timestamps back to datetimes
-            msg.body = _convert_timestamps_to_datetimes(msg.body)
-
-            # convert proton/qpid msg to lofarmessage
-            lofar_msg = MESSAGE_FACTORY.create(msg)
-            logger.debug("[FromBus] received %s on bus %s" % (lofar_msg, self.address))
-
-            # acknowledge the message on the broker
-            self.ack()
-
-            return lofar_msg
-        except Exception as e:
-            self.reject()
-            raise_exception(MessageBusError, "[FromBus] Message rejected. Error=%s".format(e))
-
+        msg = None
+        start = datetime.utcnow()
 
-    def ack(self):
-        """
-        Acknowledge the last received message. This will inform Qpid that the message can
-        safely be removed from the queue.
-        """
-        try:
-            self._receiver.accept()    # with proton, we can only unspecifically accecpt the last received message
-        except Exception as e:
-            # This seems to happen quite often...
-            #logger.debug("[FromBus] Could not acknowledge message: %s error=%s", msg, e)
-            pass
-        else:
-            logger.debug("[FromBus] acknowledged last message")
+        with self._lock.timeout_context(timeout=timeout) as have_lock:
+            if not have_lock:
+                return None
 
-    def reject(self):
-        """
-        Reject a message. This will inform Qpid that the message should not be
-        redelivered.
-        """
-        try:
-            self._receiver.reject()    # with proton, we can only unspecifically reject the last received message
-        except Exception as e:
-            # This seems to happen quite often...
-            logger.exception("[FromBus] Could not reject last message error=%s", e)
-            pass
-        else:
-            logger.debug("[FromBus] rejected last message")
-
-    def drain(self, timeout=0.1):
-        """Read and ack all messages until queue/exchange is empty"""
-        while True:
             try:
-                if self._receiver.receive(timeout=timeout) is None:
-                    break
-                self._receiver.accept()
-            except proton.Timeout:
-                break
-
-    def nr_of_messages_in_queue(self, timeout=0.1):
-        """
-        Get the current number of messages in this FromBus's local queue, which is at most DEFAULT_RECEIVER_CAPACITY
-        Please note that this is not per se equal to the number of messages in the queue at the broker!
-        A proton receiver can and will prefetch messages from a broker-queue, and store them in an internal (client-side) queue.
-        If-and-only-if a message is handled and ack'ed at the client, then the message truly disappears from the broker-queue.
-        :param timeout: time out in (fractional) seconds or None
-        :return: the current number of messages in this FromBus's local receiver queue
-        """
-        if timeout is not None and timeout > 0:
-            try:
-                # allow the fetcher to receive some message(s)
-                current_nr_of_messages_in_queue = len(self._receiver.fetcher.incoming)
-                self.connection.container.do_work(timeout=0.5*timeout)
-                self._receiver.connection.wait(lambda: len(self._receiver.fetcher.incoming) != current_nr_of_messages_in_queue,
-                                               timeout=0.5*timeout)
-            except proton.Timeout:
-               pass
+                elapsed_sec = (datetime.utcnow() - start).total_seconds()
+                msg = self._receiver.get(timeout=max(timeout-elapsed_sec, 0.001))
+                logger.debug("[FromBus] Message received on: %s mgs: %s" % (self.address, msg))
+
+                # convert kombu msg to lofarmessage
+                lofar_msg = convert_to_lofar_message(msg)
+
+                # filter by subject here in the client.
+                # See: https://derickbailey.com/2015/07/22/airport-baggage-claims-selective-consumers-and-rabbitmq-anti-patterns/
+                # RabbitMQ/Kombu think having consumers which filter by subject is an anti-pattern, so it's not supported out of the box.
+                # So, let's filter here, and reject any message that does not match.
+                if self.subject and self.subject != lofar_msg.subject:
+                    msg.reject(requeue=False)
+                    return None
+                else:
+                    msg.ack()
+                    return lofar_msg
+
+            except kombu.exceptions.TimeoutError:
+                return None
+            except EmptyQueueError:
+                return None
             except Exception as e:
-              raise_exception(MessageBusError,
-                              "[FromBus] Failed to get number of messages available in queue: %s error=%s" % (self.address, e))
-
-        # return the current number of queued incoming messages
-        return len(self._receiver.fetcher.incoming)
-
+                logger.exception(e)
+                if msg:
+                    msg.reject()
+                raise_exception(MessagingError,
+                                "[FromBus] unknown exception while receiving message on %s: %s" % (self.address, e))
 
 class ToBus(_AbstractBus):
     """
@@ -398,84 +430,125 @@ class ToBus(_AbstractBus):
     but that of __new__().
     """
 
-    def __init__(self, address, subject=None, broker=None, broker_options=None, connection_log_level=logging.INFO, auto_reconnect=True):
+    def __init__(self, address:str=DEFAULT_BUSNAME, broker=None, subject_based_routing=True):
         """
         Initializer.
         :param address: valid Qpid address
         :param broker: valid Qpid broker URL, e.g. "localhost:5672"
-        :param broker_options: OBSOLETE
+        :param subject_based_routing: if True, then publish the message on this bus'es address which is assumed to be an exchange,
+                                               and set the given subject as routing_key of the message.
+                                               This assumes the broker is configured correcly to filter on the given
+                                               subject/routing_key, so the message is delivered to the correct consumer(s).
+                                      If False, then publish the message directly to this bus'es address which is assumed to be a queue.
         """
-        super(ToBus, self).__init__(address=address, subject=subject, broker=broker,
-                                    connection_log_level=connection_log_level,
-                                    auto_reconnect=auto_reconnect)
+        super(ToBus, self).__init__(address=address, broker=broker)
 
-        if broker_options:
-            logger.warning("broker_options are obsolete. address=%s subject=%s broker=%s broker_options=%s",
-                           self.address, self.subject, self.broker, broker_options)
+        self._subject_based_routing = subject_based_routing
 
     def _connect_to_endpoint(self):
-        logger.debug("[ToBus] Connecting sender to bus: %s on broker: %s" % (self.address, self.broker))
+        logger.debug("[ToBus] Connecting sender to: %s on broker: %s" % (self.address, self.broker))
 
-        self._sender = self.connection.create_sender(address=self.address)
+        # self._sender = self.connection.create_sender(address=self.address)
+        self._sender = kombu.Producer(self.connection)
 
-        logger.log(self._connection_log_level,
-                   "[ToBus] Connected sender to bus: %s on broker: %s" % (self.address, self.broker))
+        logger.info("[ToBus] Connected sender to: %s on broker: %s" % (self.address, self.broker))
 
     def _disconnect_from_endpoint(self):
         if self._sender is not None:
-            logger.debug("[ToBus] Disconnecting sender from bus: %s on broker: %s" % (self.address, self.broker))
+            logger.debug("[ToBus] Disconnecting sender from: %s on broker: %s" % (self.address, self.broker))
             self._sender.close()
             self._sender = None
-            logger.log(self._connection_log_level,
-                       "[ToBus] Disconnected sender from bus: %s on broker: %s" % (self.address, self.broker))
+            logger.info("[ToBus] Disconnected sender from: %s on broker: %s" % (self.address, self.broker))
 
-    def send(self, message, timeout=DEFAULT_TIMEOUT):
+    def send(self, message):
         """
         Send a message to the exchange (target) we're connected to.
         :param message: message to be sent
-        :param timeout: maximum time in seconds to wait for send action
         :return:
         """
         try:
-            qmsg_body_original = None
+            logger.debug("[ToBus] Sending message to: %s (%s)", self.address, message)
 
-            qmsg = to_qpid_message(message)
+            kwargs_dict = message.as_kombu_publish_kwargs()
 
-            # convert datetimes in (nested) dicts/lists to proton.timestamps,
-            # convert them back on the other end
-            # make copy of qmsg.body first, because we are modifying the contents, and we don't want any side effects
-            qmsg_body_original = deepcopy(qmsg.body)
-            qmsg.body = _convert_datetimes_to_timestamps(qmsg.body)
+            if self._subject_based_routing:
+                kwargs_dict['routing_key'] = message.subject
+                kwargs_dict['exchange'] = self.address
+            else:
+                kwargs_dict['routing_key'] = self.address
+                kwargs_dict['exchange'] = ''
 
-            if self.subject:
-                qmsg.subject = self.subject
+            with self._lock:
+                self._sender.publish(**kwargs_dict, serializer='pickle')
 
-            logger.debug("[ToBus] Sending message to: %s (%s)", self.address, qmsg)
+            logger.debug("[ToBus] Sent message to: %s", self.address)
 
-            sending_start_time = datetime.utcnow()
-            while True:
-                try:
-                    # if datetime.utcnow() - sending_start_time > timedelta(seconds=timeout):
-                    #     raise TimeoutError("Could not send msg to %s within %s seconds", self.address, timeout)
+        except Exception as e:
+            raise_exception(MessagingError, "[ToBus] Failed to send message to: %s error=%s" % (self.address, e))
+
+
+class TemporaryExchange(object):
+    """
+    A TemporaryExchange instance can be used to setup a dynamic temporary exchange which is closed and deleted automagically when leaving context.
+    """
+    def __init__(self, name, broker=DEFAULT_BROKER):
+        """
+        Create a TemporaryExchange instance with an optional name on the given broker.
+        :param name: name, which is part of the final address which also includes a uuid.
+        :param broker: the qpid broker to connect to.
+        """
+        self.name = name
+        self.broker = broker
+        self._tmp_exchange = None
+        self.address = None
+
+    def __enter__(self):
+        """
+        Opens/creates the temporary exchange. It is automatically closed when leaving context in __exit__.
+        :return: self.
+        """
+        self.open()
+        return self
 
-                    self._sender.send(qmsg, timeout=timeout)
-                    break # no exception, so sending succeeded, break out of loop.
-                except proton.ProtonException as ex:
-                    logger.error("error while sending message to %s, trying to reconnect... error=%s", self.address, ex)
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """
+        Close/remove the temporary exchange.
+        """
+        self.close()
 
-                    if not self.reconnect(num_retries=None, retry_wait_time=5, timeout=timeout):
-                        raise # reconnecting did not help, re-raise original exception....
-                    # yes, reconnect worked, try sending again in the while loop
+    def open(self):
+        """
+        Open/create the temporary exchange.
+        It is advised to use the TemporaryExchange instance in a 'with' context, which guarantees the close call.
+        """
+        # create an identifiable address based on the given name which is also (almost) unique, and readable.
+        self.address = "%s-tmp-exchange-%s" % (self.name, uuid.uuid4().hex[:8])
+        logger.debug("Creating TemporaryExchange at %s ...", self.address)
+        create_exchange(name=self.address, broker=self.broker)
+        logger.debug("Created TemporaryExchange at %s", self.address)
 
+    def close(self):
+        """
+        Close/remove the temporary exchange.
+        It is advised to use the TemporaryExchange instance in a 'with' context, which guarantees the close call.
+        """
+        logger.debug("Closing TemporaryExchange at %s", self.address)
+        try:
+            delete_exchange(self.address)
         except Exception as e:
-            raise_exception(MessageBusError, "[ToBus] Failed to send message to: %s error=%s" % (self.address, e))
-        finally:
-            # restore the original body (in case it was modified)
-            if qmsg_body_original is not None:
-                qmsg.body = qmsg_body_original
+            logger.error(e)
+        logger.debug("Closed TemporaryExchange at %s", self.address)
+        self.address = None
 
-        logger.debug("[ToBus] Sent message to: %s subject: %s" % (self.address, qmsg.subject))
+    def __str__(self):
+        return "TemporaryExchange address=%s".format(self.address)
 
+    def create_tobus(self):
+        """
+        Factory method to create a ToBus instance which is connected to this TemporaryExchange
+        :return: ToBus
+        """
+        return ToBus(broker=self.broker, address=self.address)
 
 class TemporaryQueue(object):
     """
@@ -493,15 +566,15 @@ class TemporaryQueue(object):
 
     Alternative use cases with only a tobus or only a frombus on the tmp_queue are also possible.
     """
-    def __init__(self, name=None, broker=DEFAULT_BROKER):
+    def __init__(self, name_prefix=None, broker=DEFAULT_BROKER):
         """
         Create a TemporaryQueue instance with an optional name on the given broker.
-        :param name: Optional name, which is part of the final address which also includes a uuid.
+        :param name_prefix: Optional name, which is part of the final address which also includes a uuid.
         :param broker: the qpid broker to connect to.
         """
-        self.name = name
+        self.name_prefix = name_prefix
         self.broker = broker
-        self._dynamic_receiver = None
+        self._tmp_queue = None
         self.address = None
 
     def __enter__(self):
@@ -523,10 +596,10 @@ class TemporaryQueue(object):
         Open/create the temporary queue.
         It is advised to use the TemporaryQueue instance in a 'with' context, which guarantees the close call.
         """
-        logger.debug("Creating TemporaryQueue...")
-        connection = proton.utils.BlockingConnection(self.broker)
-        self._dynamic_receiver = connection.create_receiver(address=None, dynamic=True, name=self.name)
-        self.address = self._dynamic_receiver.link.remote_source.address
+        # create an identifiable address based on the given name which is also (almost) unique, and readable.
+        self.address = "%s-tmp-queue-%s" % (self.name_prefix, uuid.uuid4().hex[:8])
+        logger.debug("Creating TemporaryQueue at %s ...", self.address)
+        create_bound_queue(exchange_name=self.address, queue_name=self.address, routing_key="#", broker=self.broker)
         logger.debug("Created TemporaryQueue at %s", self.address)
 
     def close(self):
@@ -535,31 +608,35 @@ class TemporaryQueue(object):
         It is advised to use the TemporaryQueue instance in a 'with' context, which guarantees the close call.
         """
         logger.debug("Closing TemporaryQueue at %s", self.address)
-        self._dynamic_receiver.close()
-        self._dynamic_receiver.connection.close()
-        self._dynamic_receiver = None
+        try:
+            delete_queue(self.address)
+        except Exception as e:
+            logger.error(e)
+        try:
+            delete_exchange(self.address)
+        except Exception as e:
+            logger.error(e)
         logger.debug("Closed TemporaryQueue at %s", self.address)
         self.address = None
 
     def __str__(self):
         return "TemporaryQueue address=%s".format(self.address)
 
-    def create_frombus(self, subject=None, connection_log_level=logging.INFO):
+    def create_frombus(self, subject=None):
         """
         Factory method to create a FromBus instance which is connected to this TemporaryQueue
         :param subject: Optional subject string to filter for. Only messages which match this subject are received.
         :return: FromBus
         """
         return FromBus(broker=self.broker,
-                       address="%s/%s" % (self.address, subject) if subject else self.address,
-                       connection_log_level=connection_log_level)
+                       address="%s/%s" % (self.address, subject) if subject else self.address)
 
-    def create_tobus(self, connection_log_level=logging.INFO):
+    def create_tobus(self):
         """
         Factory method to create a ToBus instance which is connected to this TemporaryQueue
         :return: ToBus
         """
-        return ToBus(broker=self.broker, address=self.address, connection_log_level=connection_log_level)
+        return ToBus(broker=self.broker, address=self.address)
 
 
 class AbstractBusListener(object):
@@ -568,34 +645,30 @@ class AbstractBusListener(object):
     Typical usage is to derive from this class and implement the handle_message method with concrete logic.
     """
 
-    def __init__(self, address, subject=None, broker=None, **kwargs):
+    def __init__(self, exchange_name, routing_key, num_threads=1, broker=DEFAULT_BROKER):
         """
-        Initialize AbstractBusListener object with address (str).
-        :param address: valid Qpid address
-        additional parameters in kwargs:
-            numthreads=   <int>  Number of parallel threads processing messages (default: 1)
-            verbose=      <bool>  Output extra logging over stdout (default: False)
+        .....TODO....
+        :param num_threads: the number of receiver/handler threads. Default=1, use higher number if it makes sense,
+                            for example when you are waiting for a slow database while handling the message.
         """
-        self.address          = address
-        self.subject          = subject
+        self.exchange_name    = exchange_name
+        self.routing_key      = routing_key
         self.broker           = broker
+        self._num_threads     = num_threads
         self._running         = threading.Event()
         self._listening       = False
-        self._numthreads      = kwargs.pop("numthreads", 1)
-        self.verbose          = kwargs.pop("verbose", False)
-
-        if len(kwargs):
-            raise AttributeError("Unexpected argument passed to AbstractBusListener constructor: %s", kwargs)
 
-        # The part of the subject before the first dot. Used in many derived classes for logging and method-name construction.
-        self.subject_prefix   = (self.subject.split('.')[0]+'.') if '.' in self.subject else ''
-
-    def _debug(self, txt):
-        """
-        Internal use only.
-        """
-        if self.verbose == True:
-            logger.debug("[%s: %s]", self.__class__.__name__, txt)
+        # a buslistener should listen (consume) on a specific queue for this listener.
+        # this listener is/should be specific for the program it's used in, and for the routing_key it's interested in.
+        # so, we create a consumer address based on those parameters, and create the queue and binding on the fly if needed.
+        # This saves use a lot of infrastructure configuration.
+        # We intentionally do not remove the queue and binding upon closing this listener, so messages are stored on the broker
+        # in the queue for this listener for later processing once the program and this listener restarts.
+        # If you would like to have automatic cleanup of the created queue, then use the buslistener in a BusListenerJanitor's context.
+        self.queue_address = "%s.for.%s.%s" % (exchange_name,
+                                               program_name(include_extension=False),
+                                               routing_key.replace(".#", "").replace(".*", ""))
+        create_bound_queue(exchange_name=exchange_name, queue_name=self.queue_address, routing_key=routing_key, broker=self.broker)
 
     def isRunning(self):
         return self._running.isSet()
@@ -603,33 +676,22 @@ class AbstractBusListener(object):
     def isListening(self):
         return self._listening
 
-    def start_listening(self, numthreads=None):
+    def start_listening(self):
         """
         Start the background threads and process incoming messages.
         """
         if self._listening == True:
             return
 
-        if numthreads != None:
-            self._numthreads = numthreads
-
         self._running.set()
-        self._threads = {}
-        self._bus_listeners = {}
-        for i in range(self._numthreads):
-            thread = threading.Thread(target=self._loop)
-            self._threads[thread] = self._create_thread_args(i)
-            _bus_listener = FromBus(self.address, subject=self.subject, broker=self.broker, connection_log_level=logging.INFO)
-            _bus_listener.open()
-            self._bus_listeners[thread] = _bus_listener
+        self._threads = []
+        for i in range(self._num_threads):
+            thread = threading.Thread(target=self._loop,
+                                      name="ListenerThread_%s_%d" % (self.queue_address, i))
+            self._threads.append(thread)
             thread.start()
         self._listening = True
 
-    def _create_thread_args(self, index):
-        return {'index':index,
-                'num_received_messages':0,
-                'num_processed_messages':0}
-
     def stop_listening(self):
         """
         Stop the background threads that listen to incoming messages.
@@ -638,20 +700,13 @@ class AbstractBusListener(object):
         if self.isRunning():
             self._running.clear()
 
-            for thread, args in list(self._threads.items()):
-                logger.debug("Thread %2d: STOPPING Listening for messages on %s at broker %s" %
-                             (args['index'], self.address, self.broker if self.broker else DEFAULT_BROKER))
+            for thread in self._threads:
+                logger.debug("%s: STOPPING Listening for messages on %s at broker %s" %
+                             (thread.name, self.queue_address, self.broker if self.broker else DEFAULT_BROKER))
                 thread.join()
-                logger.info("Thread %2d: STOPPED Listening for messages on %s" % (args['index'], self.address))
-                logger.info("           %d messages received and %d processed OK." % (args['num_received_messages'], args['num_processed_messages']))
-
-                # close the listeners
-                if self._bus_listeners[thread].isConnected():
-                    self._bus_listeners[thread].close()
-
+                logger.info("%s: STOPPED Listening for messages on %s" % (thread.name, self.queue_address))
         self._listening = False
 
-
     def __enter__(self):
         """
         Internal use only. Handles scope with keyword 'with'
@@ -690,110 +745,122 @@ class AbstractBusListener(object):
         """
         Internal use only. Message listener loop that receives messages and starts the attached function with the message content as argument.
         """
-        currentThread = threading.currentThread()
-        args = self._threads[currentThread]
-        thread_idx = args['index']
-        logger.info( "Thread %d START Listening for messages on %s at broker %s" %
-                    (thread_idx, self.address, self.broker if self.broker else DEFAULT_BROKER))
+        logger.info( "%s START Listening for messages on %s at broker %s" %
+                    (threading.currentThread().name, self.queue_address, self.broker if self.broker else DEFAULT_BROKER))
         try:
             self._onListenLoopBegin()
         except Exception as e:
             logger.error("onListenLoopBegin() failed with %s", e)
             return
 
-        while self.isRunning():
-            try:
-                self._onBeforeReceiveMessage()
-            except Exception as e:
-                logger.error("onBeforeReceiveMessage() failed with %s", e)
-                pass
-
-            try:
-                # get the next message
-                lofar_msg = self._bus_listeners[currentThread].receive(1)
-                # retry if timed-out
-                if lofar_msg is None:
-                    continue
-
-                # Keep track of number of received messages
-                args['num_received_messages'] += 1
+        with FromBus(self.queue_address, broker=self.broker) as receiver:
+            while self.isRunning():
+                try:
+                    self._onBeforeReceiveMessage()
+                except Exception as e:
+                    logger.error("onBeforeReceiveMessage() failed with %s", e)
+                    pass
 
-                # Execute the handler function and send reply back to client
                 try:
-                    self._debug("Running handler")
+                    # get the next message
+                    lofar_msg = receiver.receive(1)
+                    # retry if timed-out
+                    if lofar_msg is None:
+                        continue
 
-                    self._handleMessage(lofar_msg)
+                    # Execute the handler function and send reply back to client
+                    try:
+                        logger.debug("Running handler")
 
-                    self._debug("Finished handler")
+                        self._handleMessage(lofar_msg)
 
-                    args['num_processed_messages'] += 1
+                        logger.debug("Finished handler")
+
+                        try:
+                            self._onAfterReceiveMessage(True)
+                        except Exception as e:
+                            logger.error("onAfterReceiveMessage() failed with %s", e)
+                            continue
 
-                    try:
-                        self._onAfterReceiveMessage(True)
                     except Exception as e:
-                        logger.error("onAfterReceiveMessage() failed with %s", e)
+                        import traceback
+                        logger.warning("Handling of message failed with %s: %s\nMessage: %s", e,
+                                                                                              traceback.format_exc(),
+                                                                                              lofar_msg.content)
+
+                        try:
+                            self._onAfterReceiveMessage(False)
+                        except Exception as e:
+                            logger.error("onAfterReceiveMessage() failed with %s", e)
                         continue
 
                 except Exception as e:
-                    import traceback
-                    logger.warning("Handling of message failed with %s: %s\nMessage: %s", e, traceback.format_exc(),lofar_msg.body)
-
-                    # Any thrown exceptions either Service exception or unhandled exception
-                    # during the execution of the service handler is caught here.
-                    self._debug(str(e))
-                    try:
-                        self._onAfterReceiveMessage(False)
-                    except Exception as e:
-                        logger.error("onAfterReceiveMessage() failed with %s", e)
-                    continue
+                    # Unknown problem in the library. Report this and continue.
+                    logger.exception("[%s:] ERROR during processing of incoming message: %s", self.__class__.__name__, e)
 
+            try:
+                self._onListenLoopEnd()
             except Exception as e:
-                # Unknown problem in the library. Report this and continue.
-                logger.exception("[%s:] ERROR during processing of incoming message: %s", self.__class__.__name__, e)
-                logger.info("Thread %d: Resuming listening on %s " % (thread_idx, self.address))
-
-        try:
-            self._onListenLoopEnd()
-        except Exception as e:
-            logger.error("finalize_loop() failed with %s", e)
+                logger.error("finalize_loop() failed with %s", e)
 
-def _convert_datetimes_to_timestamps(thing):
-    """recursively convert python datetimes to proton timestamps"""
-    if isinstance(thing, dict):
-        return { k: _convert_datetimes_to_timestamps(v) if is_iterable(v) else
-                    proton.timestamp(to_milliseconds_since_unix_epoch(v)) if isinstance(v, datetime) else
-                    v
-                 for k, v in thing.items()}
+class BusListenerJanitor:
+    """The BusListenerJanitor can help you out cleaning up auto-generated consumer queues.
+       It is intended specifically for use in a 'with' context in a test, or short-lived use-case.
 
-    if isinstance(thing, list):
-        return [ _convert_datetimes_to_timestamps(v) if is_iterable(v) else
-                 proton.timestamp(to_milliseconds_since_unix_epoch(v)) if isinstance(v, datetime) else
-                 v
-                 for v in thing]
+       Typical example:
 
-    if isinstance(thing, datetime):
-        return proton.timestamp(to_milliseconds_since_unix_epoch(thing))
+       with BusListenerJanitor(MyBusListenerClass(my_exchange_name, my_routing_key)) as my_bus_listener:
+           # do something....
 
-    return thing
+           my_bus_listener.some_funky_method()
 
-def _convert_timestamps_to_datetimes(thing):
-    """recursively convert proton timestamps to python datetimes"""
-    if isinstance(thing, dict):
-        return { k: _convert_timestamps_to_datetimes(v) if is_iterable(v) else
-                    from_milliseconds_since_unix_epoch(v) if isinstance(v, proton.timestamp) else
-                    v
-                 for k, v in thing.items()}
+           # do more....
 
-    if isinstance(thing, list):
-        return [ _convert_timestamps_to_datetimes(v) if is_iterable(v) else
-                 from_milliseconds_since_unix_epoch(v) if isinstance(v, proton.timestamp) else
-                 v
-                 for v in thing ]
-
-    if isinstance(thing, proton.timestamp):
-        return from_milliseconds_since_unix_epoch(thing)
-
-    return thing
+       # at this moment, the BusListenerJanitor leaves scope,
+       # so, it makes the my_bus_listener stop_listening,
+       # and then cleans up the auto-generated queue's.
+       print("done")
+       """
+    def __init__(self, bus_listener: AbstractBusListener):
+        """Create a janitor for the given bus_listener
+        """
+        self._bus_listener = bus_listener
 
+    def __enter__(self):
+        """enter the context, and make the bus_listener start listening.
+        :return a reference to the buslistener, not to the janitor!"""
+        self._bus_listener.start_listening()
+        return self._bus_listener
 
-__all__ = ["FromBus", "ToBus", "TemporaryQueue", "AbstractBusListener"]
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """leave the context, make the bus_listener stop listening,
+        and clean up the auto-generated queue"""
+        self._bus_listener.stop_listening()
+        delete_queue(self._bus_listener.queue_address)
+
+if __name__ == "__main__":
+    logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.DEBUG)
+
+    from lofar.messaging.messages import EventMessage
+
+    with ToBus("my.lofar.exchange") as tobus:
+        msg = EventMessage(content="whoopsie", subject="foobar_FOOOOOOBAR")
+        tobus.send(msg)
+
+    with FromBus("my.lofar.all.queue") as frombus:
+        msg = frombus.receive()
+        print(msg)
+
+    msg = EventMessage(content="message over tmp queue", subject="bladibla")
+
+    with TemporaryQueue("abc") as tmp:
+        logger.info('------------------------')
+        with tmp.create_tobus() as tmp_tobus:
+            logger.info('------------------------')
+            tmp_tobus.send(msg)
+            logger.info('------------------------')
+            with tmp.create_frombus() as tmp_frombus:
+                logger.info('------------------------')
+                msg = tmp_frombus.receive()
+                logger.info("received: %s", msg)
+                logger.info('------------------------')
diff --git a/LCS/Messaging/python/messaging/messages.py b/LCS/Messaging/python/messaging/messages.py
index 7d996081d70..94e20a3770b 100644
--- a/LCS/Messaging/python/messaging/messages.py
+++ b/LCS/Messaging/python/messaging/messages.py
@@ -24,107 +24,45 @@
 Message classes used by the package lofar.messaging.
 """
 
-import proton
+import kombu.message
 import uuid
 
-from lofar.common.factory import Factory
-from lofar.messaging.exceptions import InvalidMessage, MessageFactoryError
-
-
-# Valid QPID message fields (from proton.Message):
-_QPID_MESSAGE_FIELDS = set(['body', 'properties', 'instructions', 'annotations',
-                            'content_type', 'correlation_id', 'durable', 'id', 'priority',
-                            'reply_to', 'subject', 'ttl', 'user_id'])
-# previously used valid QPID message fields (from qpid.messaging.Message):
-    #'content', 'content_type', 'correlation_id', 'durable', 'id',
-    #'priority', 'properties', 'reply_to', 'subject', 'ttl', 'user_id'])
-
-
-def _validate_qpid_message(qmsg):
-    """
-    Validate Qpid message `qmsg`. A Qpid message is required to contain the
-    following properties in order to be considered valid:
-    - "SystemName"  : "LOFAR"
-    - "MessageType" : message type string
-    - "MessageId"   : a valid UUID string
-    :raises InvalidMessage: if any of the required properties are missing in
-    the Qpid message
-    """
-    required_props = ["SystemName", "MessageType", "MessageId"]
-    if not isinstance(qmsg, proton.Message):
-        raise InvalidMessage(
-            "Not a Qpid Message: %r" % type(qmsg)
-        )
-    msg_props = qmsg.properties
-    if not isinstance(msg_props, dict):
-        raise InvalidMessage(
-            "Invalid message properties type: %r (expected %r)" %
-            (type(msg_props), type(dict()))
-        )
-    illegal_props = _QPID_MESSAGE_FIELDS.intersection(msg_props)
-    if illegal_props:
-        raise InvalidMessage(
-            "Illegal message propert%s (Qpid reserved): %r" %
-            ("ies" if len(illegal_props) > 1 else "y", ', '.join(illegal_props))
-        )
-    missing_props = set(required_props).difference(msg_props)
-    if missing_props:
-        raise InvalidMessage(
-            "Missing message propert%s: %s" %
-            ("ies" if len(missing_props) > 1 else "y", ', '.join(missing_props))
-        )
-    sysname, _, msgid = (msg_props[prop] for prop in required_props)
-    if sysname != "LOFAR":
-        raise InvalidMessage(
-            "Invalid message property 'SystemName': %s" % sysname
-        )
-    try:
-        uuid.UUID(msgid)
-    except Exception:
-        raise InvalidMessage(
-            "Invalid message property 'MessageId': %s" % msgid
-        )
-
-def to_qpid_message(msg):
-    """
-    Convert `msg` into a Qpid message.
-    :param msg: Message to be converted into a Qpid message.
-    :return: Qpid message
-    :raise InvalidMessage if `msg` cannot be converted into a Qpid message.
-    """
-    if isinstance(msg, proton.Message):
-        return msg
-    if isinstance(msg, LofarMessage):
-        return msg.qpid_msg
-    raise InvalidMessage("Invalid message type: %r" % type(msg))
-
-class MessageFactory(Factory):
-    """
-    Factory to produce LofarMessage objects.
-    """
-
-    def create(self, qmsg):
-        """
-        Override the create method to restrict the number of arguments to one
-        and to do some extra testing.
-        :param qmsg: Qpid message that must be converted into an LofarMessage.
-        :return: Instance of a child class of LofarMessage.
-        :raise MessageFactoryError: if the MessageType property of the Qpid
-        message `qmsg` contains a type name that is not registered with the
-        factory.
-        """
-        _validate_qpid_message(qmsg)
-        clsid = qmsg.properties['MessageType']
-        msg = super(MessageFactory, self).create(clsid, qmsg)
-        if msg is None:
-            raise MessageFactoryError(
-                "Don't know how to create a message of type %s. "
-                "Did you register the class with the factory?" % clsid)
-        return msg
-
-
-# Global MessageFactory object, to be used by the lofar.messaging package,
-MESSAGE_FACTORY = MessageFactory()
+from lofar.messaging.exceptions import MessageFactoryError
+
+def convert_to_lofar_message(kombu_msg):
+    """
+    Override the create method to restrict the number of arguments to one
+    and to do some extra testing.
+    :param kombu_msg: Kombu message that must be converted into an LofarMessage.
+    :return: Instance of a child class of LofarMessage.
+    :raise MessageFactoryError: if the MessageType property of the Qpid
+    message `qmsg` contains a type name that is not registered with the
+    factory.
+    """
+    message_type = kombu_msg.headers['MessageType']
+
+    if message_type == 'EventMessage':
+        return EventMessage(content=kombu_msg.payload,
+                            subject=kombu_msg.headers.get('Subject'),
+                            id=kombu_msg.headers.get('MessageId'))
+    if message_type == 'CommandMessage':
+        return CommandMessage(content=kombu_msg.payload,
+                              subject=kombu_msg.headers.get('Subject'),
+                              id=kombu_msg.headers.get('MessageId'))
+    if message_type == 'RequestMessage':
+        return RequestMessage(content=kombu_msg.payload,
+                              reply_to=kombu_msg.properties.get('reply_to'),
+                              subject=kombu_msg.headers.get('Subject'),
+                              id=kombu_msg.headers.get('MessageId'))
+    if message_type == 'ReplyMessage':
+        return ReplyMessage(content=kombu_msg.payload,
+                            status=kombu_msg.headers.get('status'),
+                            subject=kombu_msg.headers.get('Subject'),
+                            id=kombu_msg.headers.get('MessageId'),
+                            errmsg=kombu_msg.headers.get('errmsg'),
+                            backtrace=kombu_msg.headers.get('backtrace'))
+
+    raise MessageFactoryError("Unable to create LofarMessage of type %s for kombu-msg: %s" % (message_type, kombu_msg))
 
 
 class LofarMessage(object):
@@ -139,7 +77,7 @@ class LofarMessage(object):
     message properties and provide direct access to them.
     """
 
-    def __init__(self, content=None):
+    def __init__(self, content=None, subject=None, priority=4, id=None):
         """Constructor.
 
         :param content: Content can either be a qpid.messaging.Message object,
@@ -154,77 +92,20 @@ class LofarMessage(object):
         initialize our attributes; otherwise a `KeyError` exception will be
         raised.
         """
-        if isinstance(content, proton.Message):
-            _validate_qpid_message(content)
-            self.__dict__['_qpid_msg'] = content
-        else:
-            try:
-                # Note: these were accepted earlier. Proton does not seem to care...
-                if type(content) not in (list, str, bytes, dict, type(None)):
-                    raise KeyError(type(content))
-                self.__dict__['_qpid_msg'] = proton.Message(content)
-            except KeyError:
-                raise InvalidMessage(
-                    "Unsupported content type: %r" % type(content))
-            else:
-                if not self._qpid_msg.properties:
-                    self._qpid_msg.properties = {}
-                self._qpid_msg.properties.update({
-                    'SystemName': 'LOFAR',
-                    'MessageId': str(uuid.uuid4()),
-                    'MessageType': self.__class__.__name__})
-
-    def __getattr__(self, name):
-        """
-        Catch read-access to attributes to fetch our message properties from
-        the Qpid message properties field. Direct access to the Qpid message
-        properties is not allowed.
-
-        :raises: AttributeError
-        """
-        if name != 'properties':
-            if name in _QPID_MESSAGE_FIELDS:
-                return getattr(self.__dict__['_qpid_msg'], name)
-            if name in self.__dict__['_qpid_msg'].__dict__['properties']:
-                return self.__dict__['_qpid_msg'].__dict__['properties'][name]
-            if name == 'content':
-                return self.__dict__['_qpid_msg'].body
-        raise AttributeError("%r object has no attribute %r" %
-                             (self.__class__.__name__, name))
-
-    def __setattr__(self, name, value):
-        """
-        Catch write-access to data members to put our message properties into
-        the Qpid message properties field. Direct access to the Qpid message
-        properties is not allowed.
-
-        :raises: AttributeError
-        """
-        if name != 'properties':
-            if name in _QPID_MESSAGE_FIELDS:
-                setattr(self.__dict__['_qpid_msg'], name, value)
-            else:
-                self.__dict__['_qpid_msg'].__dict__['properties'][name] = value
-        else:
-            raise AttributeError("%r object has no attribute %r" %
-                                 (self.__class__.__name__, name))
-
-    def prop_names(self):
-        """
-        Return a list of all the message property names that are currently
-        defined.
-        """
-        return list(
-            _QPID_MESSAGE_FIELDS.union(self._qpid_msg.properties) -
-            set(['properties'])
-        )
-
-    @property
-    def qpid_msg(self):
-        """
-        Return the Qpid message object itself.
-        """
-        return self._qpid_msg
+        self.content = content
+        self.subject = subject
+        self.priority = priority
+        self.id = uuid.uuid4() if id is None else uuid.UUID(id)
+
+    def as_kombu_publish_kwargs(self):
+        return {'body':self.content,
+                'priority': self.priority,
+                'routing_key': self.subject,
+                'headers': {'SystemName': 'LOFAR',
+                            'MessageId': str(self.id),
+                            'MessageType': self.__class__.__name__,
+                            'Subject': self.subject}
+                }
 
     def show(self):
         """
@@ -232,12 +113,14 @@ class LofarMessage(object):
         between user-defined properties and standard Qpid properties.
         """
         print(str(self))
-
+        
     def __str__(self):
-        return "%s subject: %s id: %s%s" % (self.__class__.__name__,
-                                            self.subject,
-                                            self.MessageId,
-                                            (" reply_to: %s" % (self.reply_to,)) if self.reply_to else "")
+        content_str = str(self.content)
+        delimited_content_str = content_str if len(content_str) <= 32 else (content_str[:32] + "...")
+        return "%s subject=%s id=%s content=%s" % (self.__class__.__name__,
+                                                   self.subject,
+                                                   self.id,
+                                                   delimited_content_str)
 
 
 class EventMessage(LofarMessage):
@@ -247,34 +130,19 @@ class EventMessage(LofarMessage):
     will be stored in a persistent queue for later delivery.
     """
 
-    def __init__(self, content=None, context=None):
-        super(EventMessage, self).__init__(content)
-        if (context!=None):
-            self.durable = True
-            self.subject = context
-
-
-class MonitoringMessage(LofarMessage):
-    """
-    Message class used for monitoring messages. Monitoring messages are
-    publish-subscribe type of messages. They will be not be queued, so they
-    will be lost if there are no subscribers.
-    """
-
-    def __init__(self, content=None):
-        super(MonitoringMessage, self).__init__(content)
-
+    def __init__(self, content=None, subject=None, id=None):
+        super(EventMessage, self).__init__(content, subject=subject, id=id)
 
-class ProgressMessage(LofarMessage):
+class CommandMessage(LofarMessage):
     """
-    Message class used for progress messages. Progress messages are
-    publish-subscribe type of messages. They will be not be queued, so they
-    will be lost if there are no subscribers.
+    Message class used for command messages. Commands will typically be sent
+    to a controller. Command messages are messages that *must* be delivered.
+    If the message cannot be delivered to the recipient, it will be stored in
+    a persistent queue for later delivery.
     """
 
-    def __init__(self, content=None):
-        super(ProgressMessage, self).__init__(content)
-
+    def __init__(self, content=None, subject=None, id=None):
+        super(CommandMessage, self).__init__(content=content, subject=subject, id=id)
 
 class RequestMessage(LofarMessage):
     """
@@ -284,15 +152,16 @@ class RequestMessage(LofarMessage):
     """
 
     #TODO: refactor args kwargs quirks
-    def __init__(self, content=None, **kwargs): #reply_to=None, has_args=None, has_kwargs=None):
-        super(RequestMessage, self).__init__(content)
-        if("has_args" in kwargs):
-            self.has_args = kwargs["has_args"]
-        if("has_kwargs" in kwargs):
-            self.has_kwargs = kwargs["has_kwargs"]
-        if("reply_to" in kwargs):
-            self.reply_to = kwargs["reply_to"]
+    def __init__(self, content, reply_to, subject=None, id=None, has_args=False, has_kwargs=False):
+        super(RequestMessage, self).__init__(content=content, subject=subject, id=id)
+        self.reply_to = reply_to
+        self.has_args = has_args
+        self.has_kwargs = has_kwargs
 
+    def as_kombu_publish_kwargs(self):
+        publish_kwargs = super(RequestMessage, self).as_kombu_publish_kwargs()
+        publish_kwargs['reply_to'] = self.reply_to
+        return publish_kwargs
 
 class ReplyMessage(LofarMessage):
     """
@@ -301,36 +170,17 @@ class ReplyMessage(LofarMessage):
     message. These use topic exchanges and thus are routed by the 'subject' property
     """
 
-    def __init__(self, content=None, reply_to=None):
-        super(ReplyMessage, self).__init__(content)
-        if (reply_to!=None):
-            self.subject = reply_to
-            self.has_args   = False
-            self.has_kwargs = False
-
-class CommandMessage(LofarMessage):
-    """
-    Message class used for command messages. Commands will typically be sent
-    to a controller. Command messages are messages that *must* be delivered.
-    If the message cannot be delivered to the recipient, it will be stored in
-    a persistent queue for later delivery.
-    """
-
-    def __init__(self, content=None, context=None, recipients=None, **kwargs):
-        super(CommandMessage, self).__init__(
-                content,
-                **kwargs)
-        self.durable = True
-        self.context=context
-        self.recipients=recipients
-
+    def __init__(self, content, status, subject=None, id=None, errmsg="", backtrace=None):
+        super(ReplyMessage, self).__init__(content=content, subject=subject, id=id)
+        self.status    = status
+        self.errmsg    = errmsg
+        self.backtrace = backtrace
 
-MESSAGE_FACTORY.register("EventMessage", EventMessage)
-MESSAGE_FACTORY.register("MonitoringMessage", MonitoringMessage)
-MESSAGE_FACTORY.register("ProgressMessage", ProgressMessage)
-MESSAGE_FACTORY.register("RequestMessage", RequestMessage)
-MESSAGE_FACTORY.register("ReplyMessage", ReplyMessage)
-MESSAGE_FACTORY.register("CommandMessage", CommandMessage)
+    def as_kombu_publish_kwargs(self):
+        publish_kwargs = super(ReplyMessage, self).as_kombu_publish_kwargs()
+        publish_kwargs['headers']['status'] = self.status
+        publish_kwargs['headers']['errmsg'] = self.errmsg
+        publish_kwargs['headers']['backtrace'] = self.backtrace
+        return publish_kwargs
 
-__all__ = ["EventMessage", "MonitoringMessage", "ProgressMessage",
-	   "RequestMessage", "ReplyMessage", "CommandMessage"]
+__all__ = ["EventMessage", "RequestMessage", "ReplyMessage", "CommandMessage"]
diff --git a/LCS/Messaging/python/messaging/test/t_RPC.py b/LCS/Messaging/python/messaging/test/t_RPC.py
index 6daaf3a9b0d..f6157086f6d 100644
--- a/LCS/Messaging/python/messaging/test/t_RPC.py
+++ b/LCS/Messaging/python/messaging/test/t_RPC.py
@@ -5,11 +5,15 @@ It defines 5 functions and first calls those functions directly to check
 that the functions are OK. Next the same tests are done with the RPC and
 Service classes in between. This should give the same results.
 """
-from pprint import pformat
 from time import sleep
 from contextlib import ExitStack
 
-from lofar.messaging import Service, RPC, TemporaryQueue, RPCTimeoutException
+import logging
+
+logger = logging.getLogger(__name__)
+
+from lofar.messaging import TemporaryExchange, BusListenerJanitor
+from lofar.messaging import Service, RPC, RPCWrapper, RPCException, RPCTimeoutException
 
 class UserException(Exception):
     "Always thrown in one of the functions"
@@ -18,10 +22,9 @@ class InvalidArgType(Exception):
     "Thrown when the input is wrong for one of the functions"
     pass
 
-# create several function:
 def ErrorFunc(input_value):
     " Always thrown a predefined exception"
-    raise UserException("Exception thrown by the user")
+    raise UserException("Intentional exception for testing")
 
 def ExceptionFunc(input_value):
     "Generate a exception not caught by the function"
@@ -72,13 +75,10 @@ def DictFunc(input_value):
             result[key] = value
     return result
 
-if __name__ == '__main__':
+
+def main():
     # First do basic test for the functions
     # ErrorFunc
-    import logging
-    logging.basicConfig(format='%(asctime)s %(process)d %(levelname)s %(message)s', level=logging.INFO)
-    logger = logging.getLogger(__name__)
-
     try:
         result = ErrorFunc("aap noot mies")
     except UserException as e:
@@ -119,89 +119,155 @@ if __name__ == '__main__':
 
     logger.info("Functions tested outside RPC: All OK")
 
-    with TemporaryQueue("t_RPC") as test_queue:
-
-        # Register functs as a service handler listening at busname and ServiceName
-        serv1 = Service("ErrorService",     ErrorFunc,     busname=test_queue.address, numthreads=1)
-        serv2 = Service("ExceptionService", ExceptionFunc, busname=test_queue.address, numthreads=1)
-        serv3 = Service("StringService",    StringFunc,    busname=test_queue.address, numthreads=1)
-        serv4 = Service("ListService",      ListFunc,      busname=test_queue.address, numthreads=1)
-        serv5 = Service("DictService",      DictFunc,      busname=test_queue.address, numthreads=1)
-        serv6 = Service("TimeoutService",   TimeoutFunc,   busname=test_queue.address, numthreads=1)
+    with TemporaryExchange("TEST") as test_exchange:
 
+        # Register functions as a service handler listening at busname and ServiceName
+        serv1 = Service("ErrorService",     ErrorFunc,     exchange_name=test_exchange.address, num_threads=2)
+        serv2 = Service("ExceptionService", ExceptionFunc, exchange_name=test_exchange.address, num_threads=2)
+        serv3 = Service("StringService",    StringFunc,    exchange_name=test_exchange.address, num_threads=2)
+        serv4 = Service("ListService",      ListFunc,      exchange_name=test_exchange.address, num_threads=2)
+        serv5 = Service("DictService",      DictFunc,      exchange_name=test_exchange.address, num_threads=2)
+        serv6 = Service("TimeoutService",   TimeoutFunc,   exchange_name=test_exchange.address, num_threads=2)
 
         # 'with' sets up the connection context and defines the scope of the service.
-        with ExitStack() as stack:
-            for arg in (serv1, serv2, serv3, serv4, serv5, serv6):
-                stack.enter_context(arg)
-
-            # Start listening in the background. This will start as many threads as defined by the instance
-            serv1.start_listening()
-            serv2.start_listening()
-            serv3.start_listening()
-            serv4.start_listening()
-            serv5.start_listening()
-            serv6.start_listening()
-
-            # Redo all tests but via through RPC
-            # ErrorFunc
-            with RPC("ErrorService", busname=test_queue.address) as rpc:
+        # also use each service inside a BusListenerJanitor context to auto-cleanup auto-generated listener queues
+        with BusListenerJanitor(serv1), BusListenerJanitor(serv2), \
+             BusListenerJanitor(serv3), BusListenerJanitor(serv4), \
+             BusListenerJanitor(serv5), BusListenerJanitor(serv6):
+
+            # # Redo all tests but via through RPC
+            # # ErrorFunc
+            # with RPC("ErrorService", busname=test_exchange.address) as rpc:
+            #     try:
+            #         result = rpc("aap noot mies")
+            #     except RPCException as e:
+            #         if not 'UserException' in str(e):
+            #             raise
+            #
+            # # ExceptionFunc
+            # with RPC("ExceptionService", busname=test_exchange.address) as rpc:
+            #     try:
+            #         result = rpc("aap noot mies")
+            #     except RPCException as e:
+            #         if not 'IndexError' in str(e):
+            #             raise
+            #
+            # # StringFunc
+            # with RPC("StringService", busname=test_exchange.address) as rpc:
+            #     try:
+            #         result = rpc([25])
+            #     except RPCException as e:
+            #         if not 'InvalidArgType' in str(e):
+            #             raise
+            #
+            #     result = rpc("aap noot mies")
+            #     if result[0] != "AAP NOOT MIES":
+            #         raise Exception("String function failed:{}".format(result))
+            #
+            # # ListFunc
+            # with RPC("ListService", busname=test_exchange.address) as rpc:
+            #     try:
+            #         result = rpc("25")
+            #     except RPCException as e:
+            #         if not 'InvalidArgType' in str(e):
+            #             raise
+            #     result = rpc(["aap", 25, [1, 2], {'mies' : "meisje"}])
+            #     if result[0] != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]:
+            #         raise Exception("List function failed:{}".format(result))
+            #
+            # # DictFunc
+            # with RPC("DictService", busname=test_exchange.address) as rpc:
+            #     try:
+            #         result = rpc([25])
+            #     except RPCException as e:
+            #         if not 'InvalidArgType' in str(e):
+            #             raise
+            #     result = rpc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]})
+            #     if result[0] != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}:
+            #         raise Exception("Dict function failed:{}".format(result))
+            #
+            # # TimeoutFunc
+            # with RPC("TimeoutService", busname=test_exchange.address, timeout=1) as rpc:
+            #     try:
+            #         result = rpc("some random string")
+            #         raise Exception("TimeoutService did not timeout as expected...")
+            #     except RPCTimeoutException as e:
+            #         logger.info("TimoutService timed out as expected. RPCTimeoutException: %s", e.args)
+
+            logger.info("Functions tested with RPC: All OK")
+            logger.info("************************************************")
+
+            # let's do the same tests, but now with the rpc calls wrapped in a RPCWrapper
+            # define a RPCWrapper subclass wrapping the above service calls...
+            class MyRPC(RPCWrapper):
+                def ErrorFunc(self, input_value):
+                    return self.rpc('ErrorService', input_value)
+
+                def ExceptionFunc(self, input_value):
+                    return self.rpc('ExceptionService', input_value)
+
+                def StringFunc(self, input_value):
+                    return self.rpc('StringService', input_value)
+
+                def TimeoutFunc(self, input_value):
+                    return self.rpc('TimeoutService', input_value)
+
+                def ListFunc(self, input_value):
+                    return self.rpc('ListService', input_value)
+
+                def DictFunc(self, input_value):
+                    return self.rpc('DictService', input_value)
+
+
+            # and use the MyRPC RPCWrapper class for testing
+            with MyRPC(busname=test_exchange.address, timeout=1) as my_rpc:
                 try:
-                    result = rpc("aap noot mies")
-                except UserException as e:
-                    pass
+                    result = my_rpc.ErrorFunc("aap noot mies")
+                except RPCException as e:
+                    if not 'UserException' in str(e):
+                        raise
 
-            # ExceptionFunc
-            with RPC("ExceptionService", busname=test_queue.address) as rpc:
                 try:
-                    result = rpc("aap noot mies")
-                except IndexError as e:
-                    pass
+                    result = my_rpc.ExceptionFunc("aap noot mies")
+                except RPCException as e:
+                    if not 'IndexError' in str(e):
+                        raise
 
-            # StringFunc
-            with RPC("StringService", busname=test_queue.address) as rpc:
                 try:
-                    result = rpc([25])
-                except InvalidArgType as e:
-                    pass
-                result = rpc("aap noot mies")
-                if result[0] != "AAP NOOT MIES":
+                    result = my_rpc.StringFunc([25])
+                except RPCException as e:
+                    if not 'InvalidArgType' in str(e):
+                        raise
+
+                result = my_rpc.StringFunc("aap noot mies")
+                if result != "AAP NOOT MIES":
                     raise Exception("String function failed:{}".format(result))
 
-            # ListFunc
-            with RPC("ListService", busname=test_queue.address) as rpc:
                 try:
-                    result = rpc("25")
-                except InvalidArgType as e:
-                    pass
-                result = rpc(["aap", 25, [1, 2], {'mies' : "meisje"}])
-                if result[0] != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]:
+                    result = my_rpc.ListFunc("25")
+                except RPCException as e:
+                    if not 'InvalidArgType' in str(e):
+                        raise
+                result = my_rpc.ListFunc(["aap", 25, [1, 2], {'mies' : "meisje"}])
+                if result != ["AAP", 25, [1, 2], {'mies' : "MEISJE"}]:
                     raise Exception("List function failed:{}".format(result))
 
-            # DictFunc
-            with RPC("DictService", busname=test_queue.address) as rpc:
                 try:
-                    result = rpc([25])
-                except InvalidArgType as e:
-                    pass
-                result = rpc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]})
-                if result[0] != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}:
+                    result = my_rpc.DictFunc([25])
+                except RPCException as e:
+                    if not 'InvalidArgType' in str(e):
+                        raise
+                result = my_rpc.DictFunc({'mies' : "meisje", "aap" : 125, "noot" : [2, 3]})
+                if result != {'mies' : "MEISJE", "aap" : 125, "noot" : [2, 3]}:
                     raise Exception("Dict function failed:{}".format(result))
 
-            # TimeoutFunc
-            with RPC("TimeoutService", busname=test_queue.address, timeout=1) as rpc:
                 try:
-                    result = rpc("some random string")
+                    result = my_rpc.TimeoutFunc("some random string")
                     raise Exception("TimeoutService did not timeout as expected...")
                 except RPCTimeoutException as e:
                     logger.info("TimoutService timed out as expected. RPCTimeoutException: %s", e.args)
 
-            logger.info("Functions tested with RPC: All OK")
 
-            # Tell all background listener threads to stop and wait for them to finish.
-            serv1.stop_listening()
-            serv2.stop_listening()
-            serv3.stop_listening()
-            serv4.stop_listening()
-            serv5.stop_listening()
-            serv6.stop_listening()
+if __name__ == '__main__':
+    logging.basicConfig(format='%(asctime)s %(process)d %(levelname)s %(message)s', level=logging.DEBUG)
+    main()
diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.py b/LCS/Messaging/python/messaging/test/t_messagebus.py
index 87d871826ec..9cf58522ede 100644
--- a/LCS/Messaging/python/messaging/test/t_messagebus.py
+++ b/LCS/Messaging/python/messaging/test/t_messagebus.py
@@ -31,10 +31,8 @@ from datetime import datetime
 
 from lofar.messaging.messages import *
 from lofar.messaging.messagebus import *
-from lofar.messaging.messagebus import DEFAULT_RECEIVER_CAPACITY
-from lofar.messaging.exceptions import MessageBusError, InvalidMessage
+from lofar.messaging.exceptions import MessagingError
 from lofar.common.datetimeutils import round_to_millisecond_precision
-from lofar.common.util import convertIntKeysToString, convertStringDigitKeysToInt
 from time import sleep
 from threading import Lock
 
@@ -42,6 +40,61 @@ logger = logging.getLogger(__name__)
 
 TIMEOUT = 1.0
 
+
+class TestCreateDeleteFunctions(unittest.TestCase):
+    """Test the various create/delete exchange/queue/binding funcions"""
+
+    def test_create_delete_exchange(self):
+        name = "test-exchange-%s" % (uuid.uuid4())
+        # creating this new unique test exchange should succeed, and return True cause it's a new exchange
+        self.assertTrue(create_exchange(name, durable=False))
+
+        # creating it again should return False
+        self.assertFalse(create_exchange(name, durable=False))
+
+        # deleting it should succeed
+        self.assertTrue(delete_exchange(name))
+
+        # deleting it again should return False as there is nothing to deleting
+        self.assertFalse(delete_exchange(name))
+
+    def test_create_delete_queue(self):
+        name = "test-queue-%s" % (uuid.uuid4())
+        # creating this new unique test queue should succeed, and return True cause it's a new queue
+        self.assertTrue(create_queue(name, durable=False))
+
+        # creating it again should return False
+        self.assertFalse(create_queue(name, durable=False))
+
+        # deleting it should succeed
+        self.assertTrue(delete_queue(name))
+
+        # deleting it again should return False as there is nothing to deleting
+        self.assertFalse(delete_queue(name))
+
+    def test_create_binding(self):
+        exchange_name = "test-exchange-%s" % (uuid.uuid4())
+        queue_name = "test-queue-%s" % (uuid.uuid4())
+
+        # try to create the binding on non-existing exchange/queue
+        with self.assertRaisesRegex(MessagingError, ".*does not exist.*"):
+            create_binding(exchange_name=exchange_name, queue_name=queue_name)
+
+        try:
+            # now, do make sure the exchange/queue exist
+            create_exchange(exchange_name)
+            create_queue(queue_name)
+
+            # and do the actual binding test
+            self.assertTrue(create_binding(exchange_name=exchange_name, queue_name=queue_name))
+        finally:
+            # and cleanup the exchange/queue
+            delete_queue(queue_name)
+            delete_exchange(exchange_name)
+
+
+
+
 class TestTemporaryQueue(unittest.TestCase):
     """Test the TemporaryQueue class"""
 
@@ -56,8 +109,7 @@ class TestTemporaryQueue(unittest.TestCase):
 
         # test if the temporary queue has been deleted when leaving scope
         # We should not be able to connect to it anymore
-        regexp = '.*Node not found: %s' % tmp_queue_address
-        with self.assertRaisesRegex(MessageBusError, regexp):
+        with self.assertRaisesRegex(MessagingError, '.*NOT_FOUND.*'):
             with FromBus(tmp_queue_address):
                 pass
 
@@ -77,7 +129,7 @@ class TestTemporaryQueue(unittest.TestCase):
 
                 # and test if they are equal
                 self.assertEqual(original_msg.id, received_msg.id)
-                self.assertEqual(original_msg.body, received_msg.body)
+                self.assertEqual(original_msg.content, received_msg.content)
 
     def test_send_receive_over_temporary_queue_with_subject_filtering(self):
         """
@@ -93,39 +145,30 @@ class TestTemporaryQueue(unittest.TestCase):
                 with tmp_queue.create_frombus(SUBJECT) as frombus:
                     for i in range(NUM_MESSAGES_TO_SEND):
                         # send a message...
-                        original_msg = EventMessage(context=SUBJECT,
-                                                    content="test message %d with subject='%s'".format(i, SUBJECT))
+                        original_msg = EventMessage(subject=SUBJECT,
+                                                    content="test message %d with subject='%s'" % (i, SUBJECT))
                         logger.info("Sending message: %s", original_msg)
                         tobus.send(original_msg)
 
                         # ...receive the message...
-                        received_msg = frombus.receive(timeout=0.1)
+                        received_msg = frombus.receive(timeout=1)
                         logger.info("received message: %s", received_msg)
 
                         # and test if they are equal
                         self.assertEqual(original_msg.id, received_msg.id)
-                        self.assertEqual(original_msg.body, received_msg.body)
+                        self.assertEqual(original_msg.content, received_msg.content)
                         self.assertEqual(original_msg.subject, received_msg.subject)
 
                         # now send a message with a different subject...
-                        original_msg = EventMessage(context=SUBJECT2, content="foobar")
+                        original_msg = EventMessage(subject=SUBJECT2, content="foobar")
                         logger.info("Sending message: %s", original_msg)
                         tobus.send(original_msg)
 
                         # ... and try to receive it (should yield None, because of the non-matching subject)
-                        received_msg = frombus.receive(timeout=0.1)
+                        received_msg = frombus.receive(timeout=1)
                         logger.info("received message: %s", received_msg)
                         self.assertEqual(None, received_msg)
 
-            # let's see if we can receive the left-over messages from the tmp-queue
-            with tmp_queue.create_frombus() as frombus:
-                # there should still be messages in the queue with non-matching subjects from above
-                for i in range(NUM_MESSAGES_TO_SEND):
-                    # ...receive the message...
-                    received_msg = frombus.receive(timeout=0.1)
-                    logger.info("received message: %s", received_msg)
-                    self.assertEqual(SUBJECT2, received_msg.subject)
-
 
 # ========  FromBus unit tests  ======== #
 
@@ -141,18 +184,18 @@ class FromBusInitFailed(unittest.TestCase):
 
     def test_no_broker_address(self):
         """
-        Connecting to non-existent broker address must raise MessageBusError
+        Connecting to non-existent broker address must raise MessagingError
         """
-        with self.assertRaisesRegex(MessageBusError, "No address associated with hostname"):
-            with FromBus(self.test_queue.address, broker="foo.bar", auto_reconnect=False):
+        with self.assertRaisesRegex(MessagingError, ".*failed to resolve broker hostname"):
+            with FromBus(self.test_queue.address, broker="foo.bar"):
                 pass
 
     def test_connection_refused(self):
         """
-        Connecting to broker on wrong port must raise MessageBusError
+        Connecting to broker on wrong port must raise MessagingError
         """
-        with self.assertRaisesRegex(MessageBusError, "recv: errno: 111"):
-            with FromBus("fake" + self.test_queue.address, broker="localhost:4", auto_reconnect=False):
+        with self.assertRaisesRegex(MessagingError, ".*failed to resolve broker hostname"):
+            with FromBus("fake" + self.test_queue.address, broker="localhost:4"):
                 pass
 
 
@@ -193,72 +236,20 @@ class ToBusInitFailed(unittest.TestCase):
 
     def test_no_broker_address(self):
         """
-        Connecting to non-existent broker address must raise MessageBusError
+        Connecting to non-existent broker address must raise MessagingError
         """
-        with self.assertRaisesRegex(MessageBusError, "No address associated with hostname"):
-            with ToBus(self.test_queue.address, broker="foo.bar", auto_reconnect=False):
+        with self.assertRaisesRegex(MessagingError, ".*failed to resolve broker hostname"):
+            with ToBus(self.test_queue.address, broker="foo.bar"):
                 pass
 
     def test_connection_refused(self):
         """
-        Connecting to broker on wrong port must raise MessageBusError
+        Connecting to broker on wrong port must raise MessagingError
         """
-        with self.assertRaisesRegex(MessageBusError, "recv: errno: 111"):
-            with ToBus(self.test_queue.address, broker="localhost:4", auto_reconnect=False):
+        with self.assertRaisesRegex(MessagingError, ".*failed to resolve broker hostname"):
+            with ToBus(self.test_queue.address, broker="localhost:4"):
                 pass
 
-# ========  Combined FromBus/ToBus unit tests  ======== #
-
-class QueueIntrospection(unittest.TestCase):
-    """
-    Test sending and receiving messages, and introspecting the in-between queue
-    """
-
-    def setUp(self):
-        self.test_queue = TemporaryQueue(__class__.__name__)
-        self.test_queue.open()
-        self.addCleanup(self.test_queue.close)
-
-        self.frombus = FromBus(self.test_queue.address)
-        self.tobus = ToBus(self.test_queue.address)
-
-        # if there are any dangling messages in the self.test_queue.address, they hold state between the individual tests
-        # make sure the queue is empty by receiving any dangling messages
-        with self.frombus:
-            self.frombus.drain()
-
-    def test_drain_non_empty_queue(self):
-        with self.tobus, self.frombus:
-            self.tobus.send(EventMessage(content="foo"))
-            self.tobus.send(EventMessage(content="foo"))
-            self.assertGreater(self.frombus.nr_of_messages_in_queue(), 0)
-
-            self.frombus.drain()
-            self.assertEqual(0, self.frombus.nr_of_messages_in_queue())
-
-
-    def test_counting_one_message_in_queue(self):
-        with self.tobus, self.frombus:
-            self.tobus.send(EventMessage(content="foo"))
-            self.assertEqual(1, self.frombus.nr_of_messages_in_queue())
-
-            self.frombus.receive()
-            self.assertEqual(0, self.frombus.nr_of_messages_in_queue())
-
-    def test_counting_multiple_messages_in_queue(self):
-        # DEFAULT_RECEIVER_CAPACITY should be >= 1 otherwise we cannot even store multiple messages in the local queue
-        self.assertGreaterEqual(DEFAULT_RECEIVER_CAPACITY, 1)
-
-        with self.tobus, self.frombus:
-            MAX_NR_OF_MESSAGES = min(10, DEFAULT_RECEIVER_CAPACITY)
-            for i in range(MAX_NR_OF_MESSAGES):
-                self.tobus.send(EventMessage(content="foo"))
-                self.assertEqual(i+1, self.frombus.nr_of_messages_in_queue())
-
-            for i in range(MAX_NR_OF_MESSAGES):
-                self.assertEqual(MAX_NR_OF_MESSAGES-i, self.frombus.nr_of_messages_in_queue())
-                self.frombus.receive()
-                self.assertEqual(MAX_NR_OF_MESSAGES-i-1, self.frombus.nr_of_messages_in_queue())
 
 class SendReceiveMessage(unittest.TestCase):
     """
@@ -281,12 +272,12 @@ class SendReceiveMessage(unittest.TestCase):
         """
         with self.tobus, self.frombus:
             self.tobus.send(send_msg)
-            recv_msg = self.frombus.receive(timeout=TIMEOUT, logDebugMessages=True)
+            recv_msg = self.frombus.receive(timeout=TIMEOUT)
 
-        self.assertEqual(
-            (send_msg.SystemName, send_msg.MessageId, send_msg.MessageType),
-            (recv_msg.SystemName, recv_msg.MessageId, recv_msg.MessageType))
-        self.assertEqual(send_msg.body, recv_msg.body)
+        self.assertEqual(type(send_msg), type(recv_msg))
+        self.assertEqual(send_msg.id, recv_msg.id)
+        self.assertEqual(send_msg.subject, recv_msg.subject)
+        self.assertEqual(send_msg.content, recv_msg.content)
         return recv_msg
 
     def test_sendrecv_event_message(self):
@@ -296,20 +287,6 @@ class SendReceiveMessage(unittest.TestCase):
         content = "An event message"
         self._test_sendrecv(EventMessage(content))
 
-    def test_sendrecv_monitoring_message(self):
-        """
-        Test send/receive of an MonitoringMessage, containing a python list.
-        """
-        content = ["A", "monitoring", "message"]
-        self._test_sendrecv(MonitoringMessage(content))
-
-    def test_sendrecv_progress_message(self):
-        """
-        Test send/receive of an ProgressMessage, containing a python dict.
-        """
-        content = {"Progress": "Message"}
-        self._test_sendrecv(ProgressMessage(content))
-
     def test_sendrecv_request_message(self):
         """
         Test send/receive of an RequestMessage, containing a byte array.
@@ -372,31 +349,8 @@ class SendReceiveMessage(unittest.TestCase):
         """
         content = { 0: 'foo',
                     1: 'bar' }
-        recv_msg = self._test_sendrecv(RequestMessage(convertIntKeysToString(content), reply_to=self.test_queue.address))
-        self.assertEqual(content, convertStringDigitKeysToInt(recv_msg.body))
-
-    def test_sendrecv_reconnect_in_send(self):
-        """
-        Test send/receive of an RequestMessage even when sending the message raises a ProtonException.
-        """
-        from proton.utils import BlockingSender
-        from proton import ProtonException
-
-        # use fancy code injection into proton to enforce a ProtonException at the correct moment
-        original_send = BlockingSender.send
-        def mock_send(self, msg, timeout=False, error_states=None):
-            # restore original behaviour into proton...
-            BlockingSender.send = original_send
-            raise ProtonException("mocked ProtonException")
-
-        # inject into proton...
-        BlockingSender.send = mock_send
-
-        # normal sendrecv... even though the exception is raised... reconnecting and retrying should still deliver message!
-        self._test_sendrecv(RequestMessage("foobar", reply_to=self.test_queue.address))
-
-        # check if the injection worked, by checking if BlockingSender.send has been restored to the original
-        self.assertTrue(BlockingSender.send == original_send)
+        recv_msg = self._test_sendrecv(RequestMessage(content, reply_to=self.test_queue.address))
+        self.assertEqual(content, recv_msg.content)
 
 
 class PingPongPlayer(AbstractBusListener):
@@ -404,15 +358,25 @@ class PingPongPlayer(AbstractBusListener):
     Helper class with a simple purpose:
         - listen on one queue,
         - when receiving a message, send answer on second queue, flipping message contents between ping and pong.
-
-
+    This tests the AbstractBusListener's behaviour, and tests if the underlying messaging lib can cope with multithreading.
     """
-    def __init__(self, name, listen_queue_name, response_queue_name, num_threads):
+    def __init__(self, name, opponent_name, pingpong_table_exchange, num_threads):
         self.name = name
+        self.opponent_name = opponent_name
         self.num_turns = 0
-        self.response_queue_name = response_queue_name
+        self.response_bus = ToBus(pingpong_table_exchange)
         self.lock = Lock() # a lock to keep track of self.num_turns in a multithreaded environment
-        super(PingPongPlayer, self).__init__(listen_queue_name, numthreads=num_threads)
+        super(PingPongPlayer, self).__init__(exchange_name=pingpong_table_exchange,
+                                             routing_key=self.name,
+                                             num_threads=num_threads)
+
+    def start_listening(self):
+        self.response_bus.open()
+        super(PingPongPlayer, self).start_listening()
+
+    def stop_listening(self):
+        super(PingPongPlayer, self).stop_listening()
+        self.response_bus.close()
 
     def get_num_turns(self):
         with self.lock:
@@ -422,21 +386,22 @@ class PingPongPlayer(AbstractBusListener):
         """Implementation of AbstractBusListener._handleMessage
         log received message, and send response.
         """
-        logger.info("%s: received %s", self.name, msg.content)
+        logger.info("%s: received %s on %s", self.name, msg.content, self.queue_address)
         self.send_response(msg.content)
 
     def send_response(self, value):
         """
-        Send a response message to the response_queue_name, flipping ping for pong and vice versa
+        Send a response message to the pingpong_table_exchange where it will be routed to the opponent's queue,
+        flipping ping for pong and vice versa
         """
-        with ToBus(self.response_queue_name) as response_bus:
-            response_msg = EventMessage(content="ping" if value == "pong" else "pong")
+        response_msg = EventMessage(content="ping" if value == "pong" else "pong",
+                                    subject=self.opponent_name)
 
-            logger.info("%s: sending %s", self.name, response_msg.content)
-            response_bus.send(response_msg)
+        logger.info("%s: sending %s to %s", self.name, response_msg.content, self.response_bus.address)
+        self.response_bus.send(response_msg)
 
-            with self.lock:
-                self.num_turns += 1
+        with self.lock:
+            self.num_turns += 1
 
 class PingPongTester(unittest.TestCase):
     """Test an event driven message ping/pong game, where two 'players' respond to each other.
@@ -454,23 +419,25 @@ class PingPongTester(unittest.TestCase):
     def _play(self, num_threads_per_player):
         """simulate a ping/pong event driven loop until each player played a given amount of turns, or timeout"""
 
-        # setup tmp forward and return queue
-        with TemporaryQueue("forward") as forward_tmp_queue, TemporaryQueue("return") as return_tmp_queue:
+        # game parameters
+        NUM_TURNS = 10
+        GAME_TIMEOUT = 10
 
-            # create two players, on "both sides of the table" / forward and return queue swapped.
-            with PingPongPlayer("Player1", forward_tmp_queue.address, return_tmp_queue.address, num_threads_per_player) as player1:
-                with PingPongPlayer("Player2", return_tmp_queue.address, forward_tmp_queue.address, num_threads_per_player) as player2:
-
-                    # game parameters
-                    NUM_TURNS = 10
-                    GAME_TIMEOUT = 5
+        # setup temporary exchange, on which the player can publish their messages (ping/pong balls)
+        with TemporaryExchange("PingPongTable") as tmp_exchange:
+            # create two players, on "both sides of the table"
+            # i.e.: they each play on the tmp_exchange, but have the auto-generated designated listen queues for incoming balls
+            with BusListenerJanitor(PingPongPlayer("Player1", "Player2", tmp_exchange.address, num_threads_per_player)) as player1:
+                 with BusListenerJanitor(PingPongPlayer("Player2", "Player1", tmp_exchange.address, num_threads_per_player)) as player2:
                     start_timestamp = datetime.utcnow()
+                    return
+
 
-                    # first serve, referee throws a ping ball on the table in the direction of player2
-                    with forward_tmp_queue.create_tobus() as first_pinger:
-                        first_msg = EventMessage(content="ping")
-                        logger.info("first message: sending %s", first_msg.content)
-                        first_pinger.send(first_msg)
+                    # first serve, referee throws a ping ball on the table in the direction of player1
+                    with tmp_exchange.create_tobus() as referee:
+                        first_msg = EventMessage(content="ping", subject="Player1")
+                        logger.info("first message: sending %s to %s", first_msg.content, referee.address)
+                        referee.send(first_msg)
 
                     # play the game!
                     # run the "event loop". Actually there are multiple loops: num_threads per player
@@ -480,7 +447,7 @@ class PingPongTester(unittest.TestCase):
                         player2_num_turns = player2.get_num_turns()
                         time_remaining = GAME_TIMEOUT - (datetime.utcnow() - start_timestamp).total_seconds()
 
-                        logger.info("player1_num_turns=%d/%d player2_num_turns=%d/%d time_remaining=%ssec",
+                        logger.info("PingPongTester STATUS: player1_num_turns=%d/%d player2_num_turns=%d/%d time_remaining=%.1fsec",
                                     player1_num_turns, NUM_TURNS, player2_num_turns, NUM_TURNS, time_remaining)
 
                         # assert on deadlocked game (should never happen!)
@@ -489,13 +456,17 @@ class PingPongTester(unittest.TestCase):
                         if player1_num_turns >= NUM_TURNS and player2_num_turns >= NUM_TURNS :
                             break
 
-                        sleep(0.1)
+                        sleep(0.01)
 
                     # assert on players who did not finish the game
                     self.assertGreaterEqual(player1.get_num_turns(), NUM_TURNS)
                     self.assertGreaterEqual(player2.get_num_turns(), NUM_TURNS)
 
+                    logger.info("SUCCESS! player1_num_turns=%d/%d player2_num_turns=%d/%d num_threads_per_player=%d #msg_per_sec=%.1f",
+                                player1_num_turns, NUM_TURNS, player2_num_turns, NUM_TURNS,
+                                num_threads_per_player, 2*NUM_TURNS/(datetime.utcnow() - start_timestamp).total_seconds())
 
 if __name__ == '__main__':
     logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.INFO)
     unittest.main()
+
diff --git a/LCS/Messaging/python/messaging/test/t_messages.py b/LCS/Messaging/python/messaging/test/t_messages.py
index ddb8a091b47..08a9ac0a8fe 100644
--- a/LCS/Messaging/python/messaging/test/t_messages.py
+++ b/LCS/Messaging/python/messaging/test/t_messages.py
@@ -28,7 +28,7 @@ import unittest
 import uuid
 import struct
 import proton
-from lofar.messaging.messages import LofarMessage, InvalidMessage
+from lofar.messaging.messages import LofarMessage
 
 
 class DefaultLofarMessage(unittest.TestCase):
@@ -42,242 +42,14 @@ class DefaultLofarMessage(unittest.TestCase):
         """
         self.message = LofarMessage()
 
-    def test_system_name(self):
-        """
-        Object attribute SystemName must be set to 'LOFAR'
-        """
-        self.assertEqual(self.message.SystemName, "LOFAR")
-
     def test_message_id(self):
         """
         Object attribute MessageId must be a valid UUID string
         """
-        self.assertIsNotNone(uuid.UUID(self.message.MessageId))
-
-
-class QpidLofarMessage(unittest.TestCase):
-    """
-    Class to test LofarMessage constructed from a Qpid message
-    """
-
-    def setUp(self):
-        """
-        Create Qpid message with all required properties set
-        """
-        self.qmsg = proton.Message()
-        self.qmsg.properties = {
-            "SystemName": "LOFAR",
-            "MessageType": None,
-            "MessageId": str(uuid.uuid4())
-        }
-
-    def test_invalid_properties_type(self):
-        """
-        Test that exception is raised if Qpid message properties attribute is
-        of incorrect type (i.e. not 'dict').
-        """
-        self.qmsg.properties = 42
-        self.assertRaisesRegex(InvalidMessage,
-                                "^Invalid message properties type:",
-                                LofarMessage, self.qmsg)
-
-    def test_illegal_properties(self):
-        """
-        Test that exception is raised if a Qpid-reserved attribute (like
-        'body', 'content_type', etc.) is used as property.
-        """
-        self.qmsg.properties['body'] = 'blah blah blah'
-        self.assertRaisesRegex(InvalidMessage,
-                                "^Illegal message propert(y|ies).*:",
-                                LofarMessage, self.qmsg)
-
-    def test_missing_properties(self):
-        """
-        Test that exception is raised if required properties for constructing
-        an LofarMessage are missing.
-        """
-        self.qmsg.properties = {}
-        self.assertRaisesRegex(InvalidMessage,
-                                "^Missing message propert(y|ies):",
-                                LofarMessage, self.qmsg)
-
-    def test_missing_property_systemname(self):
-        """
-        Test that exception is raised if required property 'SystemName' is
-        missing.
-        """
-        self.qmsg.properties.pop("SystemName")
-        self.assertRaisesRegex(InvalidMessage,
-                                "^Missing message property: SystemName",
-                                LofarMessage, self.qmsg)
-
-    def test_missing_property_messageid(self):
-        """
-        Test that exception is raised if required property 'MessageId' is
-        missing.
-        """
-        self.qmsg.properties.pop("MessageId")
-        self.assertRaisesRegex(InvalidMessage,
-                                "^Missing message property: MessageId",
-                                LofarMessage, self.qmsg)
-
-    def test_missing_property_messagetype(self):
-        """
-        Test that exception is raised if required property 'MessageType' is
-        missing.
-        """
-        self.qmsg.properties.pop("MessageType")
-        self.assertRaisesRegex(InvalidMessage,
-                                "^Missing message property: MessageType",
-                                LofarMessage, self.qmsg)
-
-    def test_invalid_property_systemname(self):
-        """
-        Test that exception is raised if 'SystemName' has wrong value (i.e.
-        not equal to 'LOFAR')
-        """
-        self.qmsg.properties["SystemName"] = "NOTLOFAR"
-        self.assertRaisesRegex(InvalidMessage,
-                                "^Invalid message property 'SystemName':",
-                                LofarMessage, self.qmsg)
-
-    def test_invalid_property_messageid(self):
-        """
-        Test that exception is raised if 'MessageId' contains an invalid
-        UUID-string.
-        """
-        self.qmsg.properties["MessageId"] = "Invalid-UUID-string"
-        self.assertRaisesRegex(InvalidMessage,
-                                "^Invalid message property 'MessageId':",
-                                LofarMessage, self.qmsg)
-
-    def test_getattr_raises(self):
-        """
-        Test that exception is raised if a non-existent attribute is read.
-        """
-        msg = LofarMessage(self.qmsg)
-        with self.assertRaisesRegex(AttributeError, "object has no attribute"):
-            _ = msg.non_existent
-
-    def test_getattr_raises_on_properties(self):
-        """
-        Test that exception is raised if attribute 'properties' is read.
-        This attribute should not be visible.
-        """
-        msg = LofarMessage(self.qmsg)
-        with self.assertRaisesRegex(AttributeError, "object has no attribute"):
-            _ = msg.properties
-
-    def test_setattr_raises_on_properties(self):
-        """
-        Test that exception is raised if attribute 'properties' is written.
-        This attribute should not be visible.
-        """
-        msg = LofarMessage(self.qmsg)
-        with self.assertRaisesRegex(AttributeError, "object has no attribute"):
-            msg.properties = {}
-
-    def test_getattr_qpid_field(self):
-        """
-        Test that a Qpid message field becomes an LofarMessage attribute.
-        """
-        msg = LofarMessage(self.qmsg)
-        msg.qpid_msg.ttl = 100
-        self.assertEqual(self.qmsg.ttl, msg.ttl)
-        self.assertEqual(msg.ttl, 100)
-
-    def test_setattr_qpid_field(self):
-        """
-        Test that an LofarMessage attribute becomes a Qpid message field.
-        """
-        msg = LofarMessage(self.qmsg)
-        msg.ttl = 100
-        self.assertEqual(self.qmsg.ttl, msg.ttl)
-        self.assertEqual(self.qmsg.ttl, 100)
-
-    def test_getattr_qpid_property(self):
-        """
-        Test that a Qpid message property becomes an LofarMessage attribute.
-        """
-        self.qmsg.properties["NewProperty"] = "New Property"
-        msg = LofarMessage(self.qmsg)
-        self.assertEqual(msg.qpid_msg.properties["NewProperty"],
-                         msg.NewProperty)
-
-    def test_setattr_qpid_property(self):
-        """
-        Test that an LofarMessage attribute becomes a Qpid message property.
-        """
-        msg = LofarMessage(self.qmsg)
-        msg.NewProperty = "New Property"
-        self.assertEqual(msg.qpid_msg.properties["NewProperty"],
-                         msg.NewProperty)
-
-    def test_propname_not_contains_properties(self):
-        """
-        Test that prop_names() does not return the property 'properties'.
-        This attribute should not be visible.
-        """
-        msg = LofarMessage(self.qmsg)
-        self.assertNotIn('properties', msg.prop_names())
-
+        self.assertIsNotNone(self.message.id)
+        self.assertTrue(isinstance(self.message.id, uuid.UUID))
 
-class BodyLofarMessage(unittest.TestCase):
-    """
-    Class to test that an LofarMessage can be constructed from different types
-    of body. The body is used to initialize a Qpid Message object.
-    """
-
-    def test_construct_from_string(self):
-        """
-        Test that an LofarMessage can be constructed from an ASCII string.
-        """
-        body = b"Byte string"
-        msg = LofarMessage(body)
-        self.assertEqual(msg.body, body)
-
-    def test_construct_from_unicode(self):
-        """
-        Test that an LofarMessage can be constructed from a Unicode string.
-        :return:
-        """
-        body = "Unicode string"
-        msg = LofarMessage(body)
-        self.assertEqual(msg.body, body)
-
-    def test_construct_from_list(self):
-        """
-        Test that an LofarMessage can be constructed from a python list.
-        """
-        body = list(range(10))
-        msg = LofarMessage(body)
-        self.assertEqual(msg.body, body)
-
-    def test_construct_from_dict(self):
-        """
-        Test that an LofarMessage can be constructed from a python dict.
-        """
-        body = {1: 'one', 2: 'two', 3: 'three'}
-        msg = LofarMessage(body)
-        self.assertEqual(msg.body, body)
-
-    def test_construct_from_binary(self):
-        """
-        Test that an LofarMessage can be constructed from binary data.
-        Use struct.pack() to create a byte array
-        """
-        body = struct.pack("<256B", *range(256))
-        msg = LofarMessage(body)
-        self.assertEqual(msg.body, body)
-
-    def test_construct_from_unsupported(self):
-        """
-        Test that an LofarMessage cannot be constructed from unsupported
-        data type like 'int'.
-        """
-        body = 42
-        self.assertRaisesRegex(InvalidMessage, "^Unsupported content type:",
-                                LofarMessage, body)
+#TODO: add more tests
 
 
 if __name__ == '__main__':
diff --git a/LCS/Messaging/python/messaging/test/t_service_message_handler.py b/LCS/Messaging/python/messaging/test/t_service_message_handler.py
index 671118471e1..12e29899ce6 100644
--- a/LCS/Messaging/python/messaging/test/t_service_message_handler.py
+++ b/LCS/Messaging/python/messaging/test/t_service_message_handler.py
@@ -13,6 +13,7 @@ from lofar.messaging import *
 class UserException(Exception):
     "Always thrown in one of the functions"
     pass
+
 class InvalidArgType(Exception):
     "Thrown when the input is wrong for one of the functions"
     pass
@@ -20,7 +21,7 @@ class InvalidArgType(Exception):
 # create several function:
 def ErrorFunc(input_value):
     " Always thrown a predefined exception"
-    raise UserException("Exception thrown by the user")
+    raise UserException("Intentional exception for testing")
 
 def ExceptionFunc(input_value):
     "Generate a exception not caught by the function"
@@ -36,162 +37,185 @@ def StringFunc(input_value):
 class OnlyMessageHandling(MessageHandlerInterface):
     def __init__(self, **kwargs):
         MessageHandlerInterface.__init__(self)
-        print("Creation of OnlyMessageHandling class: %s" % kwargs)
+        logger.info("Creation of OnlyMessageHandling class: %s" % kwargs)
         self.handle_message = kwargs.pop("function")
         self.args = kwargs
 
 class FullMessageHandling(MessageHandlerInterface):
     def __init__(self, **kwargs):
         MessageHandlerInterface.__init__(self)
-        print("Creation of FullMessageHandling class: %s" % kwargs)
+        logger.info("Creation of FullMessageHandling class: %s" % kwargs)
         self.handle_message = kwargs.pop("function")
         self.args = kwargs
     def prepare_loop(self):
-        print("FullMessageHandling prepare_loop: %s" % self.args)
+        logger.info("FullMessageHandling prepare_loop: %s" % self.args)
     def prepare_receive(self):
-        print("FullMessageHandling prepare_receive: %s" % self.args)
+        logger.info("FullMessageHandling prepare_receive: %s" % self.args)
     def finalize_handling(self, successful):
-        print("FullMessageHandling finalize_handling: %s" % self.args)
+        logger.info("FullMessageHandling finalize_handling: %s" % self.args)
     def finalize_loop(self):
-        print("FullMessageHandling finalize_loop: %s" % self.args)
+        logger.info("FullMessageHandling finalize_loop: %s" % self.args)
 
 class FailingMessageHandling(MessageHandlerInterface):
     def __init__(self, **kwargs):
         MessageHandlerInterface.__init__(self)
-        print("Creation of FailingMessageHandling class: %s" % kwargs)
+        logger.info("Creation of FailingMessageHandling class: %s" % kwargs)
         self.handle_message = kwargs.pop("function")
         self.args = kwargs
         self.counter = 0
     def prepare_loop(self):
-        print("FailingMessageHandling prepare_loop: %s" % self.args)
+        logger.info("FailingMessageHandling prepare_loop: %s" % self.args)
         #raise UserException("oops in prepare_loop()")   # todo: this is freezing the test. Why is this necessary?
     def prepare_receive(self):
         # allow one succesfull call otherwise the main loop never accepts the message :-)
-        print("FailingMessageHandling prepare_receive: %s" % self.args)
+        logger.info("FailingMessageHandling prepare_receive: %s" % self.args)
         if self.counter:
             time.sleep(1)  # Prevent running around too fast
             raise UserException("FailingMessageHandling: intentional oops in prepare_receive counter=%d" % self.counter)
         else:
             self.counter = self.counter + 1
     def finalize_handling(self, successful):
-        print("FailingMessageHandling finalize_handling: %s, %s" % (self.args, successful))
+        logger.info("FailingMessageHandling finalize_handling: %s, %s" % (self.args, successful))
         raise UserException("oops in finalize_handling()")
     def finalize_loop(self):
-        print("FailingMessageHandling finalize_loop: %s" % self.args)
+        logger.info("FailingMessageHandling finalize_loop: %s" % self.args)
         raise UserException("oops in finalize_loop()")
 
-if __name__ == '__main__':
-    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
-
-    with TemporaryQueue(__name__) as tmp_queue:
-        busname = tmp_queue.address
+def main():
+    with TemporaryExchange("TEST") as tmp_exchange:
+        busname = tmp_exchange.address
 
-        # Register functs as a service handler listening at busname and ServiceName
-        serv1_plain         = Service("String1Service", StringFunc,             busname=busname, numthreads=1)
-        serv1_minimal_class = Service("String2Service", OnlyMessageHandling,    busname=busname, numthreads=1,
+        # Register StringFunc functions as a service handler listening at busname and ServiceName
+        serv1_plain         = Service("String1Service", StringFunc,             exchange_name=busname)
+        serv1_minimal_class = Service("String2Service", OnlyMessageHandling,    exchange_name=busname,
                                       handler_args={"function" : StringFunc})
-        serv1_full_class    = Service("String3Service", FullMessageHandling,    busname=busname, numthreads=1,
+        serv1_full_class    = Service("String3Service", FullMessageHandling,    exchange_name=busname,
                                       handler_args={"function" : StringFunc})
-        serv1_failing_class = Service("String4Service", FailingMessageHandling, busname=busname, numthreads=1,
+        serv1_failing_class = Service("String4Service", FailingMessageHandling, exchange_name=busname,
                                       handler_args={"function" : StringFunc})
 
         # 'with' sets up the connection context and defines the scope of the service.
-        with serv1_plain, serv1_minimal_class, serv1_full_class, serv1_failing_class:
+        # also use each service inside a BusListenerJanitor context to auto-cleanup auto-generated listener queues
+        with BusListenerJanitor(serv1_plain), BusListenerJanitor(serv1_minimal_class), \
+             BusListenerJanitor(serv1_full_class), BusListenerJanitor(serv1_failing_class):
+
             # Redo string tests via RPC
-            with RPC("String1Service", ForwardExceptions=True, busname=busname) as rpc:
+            with RPC("String1Service", busname=busname) as rpc:
                 result = rpc("aap noot mies")
                 if result[0] != "AAP NOOT MIES":
                     raise Exception("String function failed of String1Service:{}".format(result))
-                print("string1Service is OK")
+                logger.info("string1Service is OK")
 
-            with RPC("String2Service", ForwardExceptions=True, busname=busname) as rpc:
+            with RPC("String2Service", busname=busname) as rpc:
                 result = rpc("aap noot mies")
                 if result[0] != "AAP NOOT MIES":
                     raise Exception("String function failed of String2Service:{}".format(result))
-                print("string2Service is OK")
+                logger.info("string2Service is OK")
 
-            with RPC("String3Service", ForwardExceptions=True, busname=busname) as rpc:
+            with RPC("String3Service", busname=busname) as rpc:
                 result = rpc("aap noot mies")
                 if result[0] != "AAP NOOT MIES":
                     raise Exception("String function failed of String3Service:{}".format(result))
-                print("string3Service is OK")
+                logger.info("string3Service is OK")
 
-            with RPC("String4Service", ForwardExceptions=True, busname=busname) as rpc:
+            with RPC("String4Service", busname=busname) as rpc:
                 result = rpc("aap noot mies")
                 if result[0] != "AAP NOOT MIES":
                     raise Exception("String function failed of String4Service:{}".format(result))
-                print("string4Service is OK")
+                logger.info("string4Service is OK")
+
+        logger.info("******************************************************************************")
 
-        # Register functs as a service handler listening at busname and ServiceName
-        serv2_plain         = Service("Error1Service", ErrorFunc,              busname=busname, numthreads=1)
-        serv2_minimal_class = Service("Error2Service", OnlyMessageHandling,    busname=busname, numthreads=1,
+        # Register ErrorFunc function as a service handler listening at busname and ServiceName
+        serv2_plain         = Service("Error1Service", ErrorFunc,              exchange_name=busname)
+        serv2_minimal_class = Service("Error2Service", OnlyMessageHandling,    exchange_name=busname,
                                       handler_args={"function" : ErrorFunc})
-        serv2_full_class    = Service("Error3Service", FullMessageHandling,    busname=busname, numthreads=1,
+        serv2_full_class    = Service("Error3Service", FullMessageHandling,    exchange_name=busname,
                                       handler_args={"function" : ErrorFunc})
-        serv2_failing_class = Service("Error4Service", FailingMessageHandling, busname=busname, numthreads=1,
+        serv2_failing_class = Service("Error4Service", FailingMessageHandling, exchange_name=busname,
                                       handler_args={"function" : ErrorFunc})
 
         # 'with' sets up the connection context and defines the scope of the service.
-        with serv2_plain, serv2_minimal_class, serv2_full_class, serv2_failing_class:
-            # Redo Error tests via RPC
-            with RPC("Error1Service", ForwardExceptions=True, busname=busname) as rpc:
+        # also use each service inside a BusListenerJanitor context to auto-cleanup auto-generated listener queues
+        with BusListenerJanitor(serv2_plain), BusListenerJanitor(serv2_minimal_class), \
+             BusListenerJanitor(serv2_full_class), BusListenerJanitor(serv2_failing_class):
+
+            # do Error tests via RPC
+            with RPC("Error1Service", busname=busname) as rpc:
                 try:
                     result = rpc("aap noot mies")
                 except RPCException as e:
-                    print("Error1Service is OK")
+                    logger.info("Error1Service is OK")
 
-            with RPC("Error2Service", ForwardExceptions=True, busname=busname) as rpc:
+            with RPC("Error2Service", busname=busname) as rpc:
                 try:
                     result = rpc("aap noot mies")
                 except RPCException as e:
-                    print("Error2Service is OK")
+                    logger.info("Error2Service is OK")
 
-            with RPC("Error3Service", ForwardExceptions=True, busname=busname) as rpc:
+            with RPC("Error3Service", busname=busname) as rpc:
                 try:
                     result = rpc("aap noot mies")
                 except RPCException as e:
-                    print("Error3Service is OK")
+                    logger.info("Error3Service is OK")
 
-            with RPC("Error4Service", ForwardExceptions=True, busname=busname) as rpc:
+            with RPC("Error4Service", busname=busname) as rpc:
                 try:
                     result = rpc("aap noot mies")
                 except Exception as e:
-                    print("Error4Service is OK")
+                    logger.info("Error4Service is OK")
+
+        logger.info("******************************************************************************")
 
-        # Register functs as a service handler listening at busname and ServiceName
-        serv3_plain         = Service("Except1Service", ExceptionFunc,          busname=busname, numthreads=1)
-        serv3_minimal_class = Service("Except2Service", OnlyMessageHandling,    busname=busname, numthreads=1,
+        # Register ExceptionFunc functions as a service handler listening at busname and ServiceName
+        serv3_plain         = Service("Except1Service", ExceptionFunc,          exchange_name=busname)
+        serv3_minimal_class = Service("Except2Service", OnlyMessageHandling,    exchange_name=busname,
                                       handler_args={"function" : ExceptionFunc})
-        serv3_full_class    = Service("Except3Service", FullMessageHandling,    busname=busname, numthreads=1,
+        serv3_full_class    = Service("Except3Service", FullMessageHandling,    exchange_name=busname,
                                       handler_args={"function" : ExceptionFunc})
-        serv3_failing_class = Service("Except4Service", FailingMessageHandling, busname=busname, numthreads=1,
+        serv3_failing_class = Service("Except4Service", FailingMessageHandling, exchange_name=busname,
                                       handler_args={"function" : ExceptionFunc})
 
         # 'with' sets up the connection context and defines the scope of the service.
-        with serv3_plain, serv3_minimal_class, serv3_full_class, serv3_failing_class:
-            # Redo exception tests via RPC
-            with RPC("Except1Service", ForwardExceptions=True, busname=busname) as rpc:
+        # also use each service inside a BusListenerJanitor context to auto-cleanup auto-generated listener queues
+        with BusListenerJanitor(serv3_plain), BusListenerJanitor(serv3_minimal_class), \
+             BusListenerJanitor(serv3_full_class), BusListenerJanitor(serv3_failing_class):
+            
+            # do exception tests via RPC
+            with RPC("Except1Service", busname=busname) as rpc:
                 try:
                     result = rpc("aap noot mies")
-                except IndexError as e:
-                    print("Except1Service is OK")
+                except RPCException as e:
+                    if 'IndexError' not in str(e):
+                        raise
+                    logger.info("Except1Service is OK")
 
-            with RPC("Except2Service", ForwardExceptions=True, busname=busname) as rpc:
+            with RPC("Except2Service", busname=busname) as rpc:
                 try:
                     result = rpc("aap noot mies")
-                except IndexError as e:
-                    print("Except2Service is OK")
+                except RPCException as e:
+                    if 'IndexError' not in str(e):
+                        raise
+                    logger.info("Except2Service is OK")
 
-            with RPC("Except3Service", ForwardExceptions=True, busname=busname) as rpc:
+            with RPC("Except3Service", busname=busname) as rpc:
                 try:
                     result = rpc("aap noot mies")
-                except IndexError as e:
-                    print("Except3Service is OK")
+                except RPCException as e:
+                    if 'IndexError' not in str(e):
+                        raise
+                    logger.info("Except3Service is OK")
 
-            with RPC("Except4Service", ForwardExceptions=True, busname=busname) as rpc:
+            with RPC("Except4Service", busname=busname) as rpc:
                 try:
                     result = rpc("aap noot mies")
-                except IndexError as e:
-                    print("Except4Service is OK")
+                except RPCException as e:
+                    if 'IndexError' not in str(e):
+                        raise
+                    logger.info("Except4Service is OK")
 
-        print("Functions tested with RPC: All OK")
+        logger.info("Functions tested with RPC: All OK")
+
+if __name__ == '__main__':
+    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
+    main()
\ No newline at end of file
-- 
GitLab