Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
messagebus.py 31.61 KiB
#!/usr/bin/env python3

# messagebus.py: Provide an easy way exchange messages on the message bus.
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: messagebus.py 1580 2015-09-30 14:18:57Z loose $

"""
Provide an easy way exchange messages on the message bus.
"""

from lofar.messaging.exceptions import MessageBusError, MessageFactoryError
from lofar.messaging.messages import to_qpid_message, MESSAGE_FACTORY
from lofar.common.util import raise_exception
from lofar.common.util import convertStringValuesToBuffer, convertBufferValuesToString

import proton
import proton.utils
import proton.reactor
import logging
import sys
import uuid
import threading
from copy import deepcopy
import re

logger = logging.getLogger(__name__)

# Default settings for often used parameters.
DEFAULT_BROKER = "localhost:5672"
DEFAULT_BROKER_OPTIONS = {'reconnect': True}
DEFAULT_RECEIVER_CAPACITY = 128
DEFAULT_TIMEOUT = 5

class FromBus(object):
    """
    *** The following was true for the Py2 qpid library, not necessarily for Proton ***

    This class provides an easy way to fetch messages from the message bus.
    Note that most methods require that a FromBus object is used *inside* a
    context. When entering the context, the connection with the broker is
    opened, and a session and a receiver are created. When exiting the context,
    the connection to the broker is closed; as a side-effect the receiver(s)
    and session are destroyed.

    note:: The rationale behind using a context is that this is (unfortunately)
    the *only* way that we can guarantee proper resource management. If there
    were a __deinit__() as counterpart to __init__(), we could have used that.
    We cannot use __del__(), because it is not the counterpart of __init__(),
    but that of __new__().
    """

    def __init__(self, address, broker=None, broker_options=None):
        """
        Initializer.
        :param address: valid Qpid address
        :param broker: valid Qpid broker URL, e.g. "localhost:5672"
        :param broker_options: valid Qpid broker options, e.g. {'reconnect': True}
        """
        self.address = address
        self.broker = broker if broker else DEFAULT_BROKER
        self.broker_options = broker_options if broker_options else DEFAULT_BROKER_OPTIONS

        try:
            logger.debug("[FromBus] Connecting to broker: %s", self.broker)
            if 'reconnect' in self.broker_options:
                self.broker_options.pop('reconnect')
                logger.info('[FromBus] Ignoring duplicate reconnect option in connection init')
            self.connection = proton.utils.BlockingConnection(self.broker, **self.broker_options)
            logger.debug("[FromBus] Connected to broker: %s", self.broker)
        except proton.ConnectionException as ex:
            logger.exception('[FromBus] Initialization failed')
            raise MessageBusError('[FromBus] Initialization failed (%s)' % ex)

        self.opened=0

    def isConnected(self):
        return self.opened > 0

    def open(self):
        """
        The following actions will be performed when entering a context:
        * connect to the broker
        * add a receiver
        The connection to the broker will be closed if any of these failed.
        :raise MessageBusError: if any of the above actions failed.
        :return: self
        """
        if (self.opened==0):
          # create sender
          try:
            self._add_queue(self.address)
          except proton.ProtonException:
            self.__exit__(*sys.exc_info())
            raise_exception(MessageBusError, "[FromBus] Receiver initialization failed")
          except MessageBusError:
            self.__exit__(*sys.exc_info())
            raise
        self.opened+=1

    def __enter__(self):
        self.open()
        return self

    def close(self):
        """
        The following actions will be performed:
        * close the receiver
        :param exc_type: type of exception thrown in context
        :param exc_val: value of exception thrown in context
        :param exc_tb: traceback of exception thrown in context
        """
        if (self.opened==1):
          try:
            self.receiver.close()
            logger.debug("[FromBus] Disconnected receiver from broker: %s", self.broker)

          except proton.ProtonException:
            raise_exception(MessageBusError,
                            "[FromBus] Failed to disconnect receiver from broker: %s" %
                            self.broker)
          finally:
            self.receiver = None

        self.opened-=1


    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def _check_session(self):
        """
        Check if there's an active session.
        :raise MessageBusError: if there's no active session
        """
        if not self.isConnected() or not hasattr(self, 'receiver') or self.receiver is None:
            raise MessageBusError(
                "[FromBus] No active receiver (broker: %s)" % self.broker)

    def _add_queue(self, address):
        """
        Add a queue that you want to receive messages from.
        :param address: valid Qpid address
        """

        if address and '/' in address:
            address, subject = address.split('/')
        else:
            subject=None
        logger.debug("[FromBus] Receiving from bus: %s  with subject: %s" % (address, subject))

        what = "receiver for source: %s (broker: %s, session: %s)" %  (address, self.broker, 'unknown')

        try:
            # helper class for filtering by subject
            class ProtonSubjectFilter(proton.reactor.Filter):
                def __init__(self, value):
                    filter_dict = { proton.symbol('subject-filter'):
                                    proton.Described(proton.symbol('apache.org:legacy-amqp-topic-binding:string'), value)}
                    super(ProtonSubjectFilter, self).__init__(filter_dict)

            self.receiver = self.connection.create_receiver(address=address,
                                                            credit=DEFAULT_RECEIVER_CAPACITY,
                                                            options=ProtonSubjectFilter(subject) if subject else None)
        except proton.ProtonException as pe:
            raise_exception(MessageBusError,
                            "[FromBus] Failed to create %s: %s" % (what, pe))
        logger.debug("[FromBus] Created %s", what)

    def receive(self, timeout=DEFAULT_TIMEOUT, logDebugMessages=True):
        """
        Receive the next message from any of the queues we're listening on.
        :param timeout: maximum time in seconds to wait for a message.
        :return: received message, None if timeout occurred.
        """
        self._check_session()
        if logDebugMessages:
            logger.debug("[FromBus] Waiting %s seconds for next message", timeout)
        try:
            while True: # break when message is acceptable
                msg = self.receiver.receive(timeout=timeout)
                if msg is not None:
                    break  # handle this message

        except proton.Timeout:
            if logDebugMessages:
                logger.debug("[FromBus] No message received within %s seconds", timeout)
            return None
        except proton.ProtonException:
            raise_exception(MessageBusError,
                            "[FromBus] Failed to fetch message from: "
                            "%s" % self.address)
        except Exception as e:
            #FIXME: what if another exception is raised? should we reconnect?
            logger.error(e)
            raise_exception(MessageBusError,
                            "[FromBus] unknown exception while receiving message on %s: %s" % (self.address, e))

        try:
            if isinstance(msg.body, dict):
                #qpid cannot handle strings longer than 64k within dicts
                #so each string was converted to a buffer which qpid can fit in 2^32-1 bytes
                #and now we convert it back on this end
                msg.body = convertBufferValuesToString(msg.body)
        except MessageFactoryError:
            self.reject(msg)
            raise_exception(MessageBusError, "[FromBus] Message rejected")


        logger.debug("[FromBus] Message received on: %s subject: %s" % (self.address, msg.subject))
        if logDebugMessages:
            logger.debug("[FromBus] %s" % msg)
        try:
            amsg = MESSAGE_FACTORY.create(msg)
        except MessageFactoryError:
            self.reject(msg)
            raise_exception(MessageBusError, "[FromBus] Message rejected")
        self.ack(msg)
        return amsg

    def ack(self, msg):
        """
         Acknowledge a message. This will inform Qpid that the message can
        safely be removed from the queue.
        :param msg: message to be acknowledged
        """
        self._check_session()
        qmsg = to_qpid_message(msg)
        try:
            self.receiver.accept()    # with proton, we can only unspecifically for the receiver...
        except:
            # This seems to happen quite often...
            # logger.exception('[FromBus] Could not acknowledge message, but will go on...')
            pass
        else:
            logger.debug("[FromBus] acknowledged message: %s", qmsg)

    def nack(self, msg):
        """
        Do not acknowledge a message. This will inform Qpid that the message
        has to be redelivered. You cannot nack a message that has already
        been acknowledged.
        :param msg: message to not be acknowledged

        .. attention::
            We should call qpid.messaging.Session.release() here,but that is
            not available in Qpid-Python 0.32. We therefore use
            qpid.messaging.Session.acknowledge() instead.
        """
        logger.warning("[FromBus] nack() is not supported, using ack() instead")
        self.ack(msg)

    def reject(self, msg):
        """
        Reject a message. This will inform Qpid that the message should not be
        redelivered. You cannot reject a message that has already been
        acknowledged.
        :param msg: message to be rejected

        .. attention::
            We should call qpid.messaging.Session.reject() here, but that is
            not available in Qpid-Python 0.32. We therefore use
            qpid.messaging.Session.acknowledge() instead.
        """
        logger.warning(
            "[FromBus] reject() is not supported, using ack() instead")
        self.ack(msg)

    def drain(self, timeout=0.1):
        """Read and ack all messages until queue/exchange is empty"""
        while True:
            try:
                if self.receiver.receive(timeout=timeout) is None:
                    break
                self.receiver.accept()
            except proton.Timeout:
                break

    def nr_of_messages_in_queue(self, timeout=0.1):
        """
        Get the current number of messages in this FromBus's local queue, which is at most DEFAULT_RECEIVER_CAPACITY
        Please note that this is not per se equal to the number of messages in the queue at the broker!
        A proton receiver can and will prefetch messages from a broker-queue, and store them in an internal (client-side) queue.
        If-and-only-if a message is handled and ack'ed at the client, then the message truly disappears from the broker-queue.
        :param timeout: time out in (fractional) seconds or None
        :return: the current number of messages in this FromBus's local receiver queue
        """
        self._check_session()

        if timeout is not None and timeout > 0:
            try:
                # allow the fetcher to receive some message(s)
                current_nr_of_messages_in_queue = len(self.receiver.fetcher.incoming)
                self.connection.container.do_work(timeout=0.5*timeout)
                self.receiver.connection.wait(lambda: len(self.receiver.fetcher.incoming) != current_nr_of_messages_in_queue,
                                             timeout=0.5*timeout)
            except proton.Timeout:
               pass
            except Exception as e:
              raise_exception(MessageBusError,
                              "[FromBus] Failed to get number of messages available in queue: %s" % self.address)

        # return the current number of queued incoming messages
        return len(self.receiver.fetcher.incoming)


class ToBus(object):
    """
    This class provides an easy way to post messages onto the message bus.

    *** The following was true for the Py2 qpid library, not necessarily for Proton ***
    Note that most methods require that a ToBus object is used *inside* a
    context. When entering the context, the connection with the broker is
    opened, and a session and a sender are created. When exiting the context,
    the connection to the broker is closed; as a side-effect the sender and
    session are destroyed.

    note:: The rationale behind using a context is that this is (unfortunately)
    the *only* way that we can guarantee proper resource management. If there
    were a __deinit__() as counterpart to __init__(), we could have used that.
    We cannot use __del__(), because it is not the counterpart of __init__(),
    but that of __new__().
    """

    def __init__(self, address, broker=None, broker_options=None):
        """
        Initializer.
        :param address: valid Qpid address
        :param broker: valid Qpid broker URL, e.g. "localhost:5672"
        :param broker_options: valid Qpid broker options, e.g. {'reconnect': True}
        """
        self.address = address
        self.broker = broker if broker else DEFAULT_BROKER
        self.broker_options = broker_options if broker_options else DEFAULT_BROKER_OPTIONS

        try:
            logger.debug("[ToBus] Connecting to broker: %s", self.broker)
            if 'reconnect' in self.broker_options:
                self.broker_options.pop('reconnect')
                logger.info('[ToBus] Ignoring duplicate reconnect option in connection init')
            self.connection = proton.utils.BlockingConnection(self.broker, **self.broker_options)
            logger.debug("[ToBus] Connected to broker: %s", self.broker)
        except proton.ConnectionException as ex:
            logger.exception('[ToBus] Initialization failed')
            raise MessageBusError('[ToBus] Initialization failed (%s)' % ex)

        self.opened = 0

    def open(self):
        if (self.opened==0):
            try:
                self._add_queue(self.address)
            except proton.ProtonException:
                self.__exit__(*sys.exc_info())
                raise_exception(MessageBusError, "[ToBus] Sender initialization failed")
            except MessageBusError:
                self.__exit__(*sys.exc_info())
                raise
        self.opened+=1


    def __enter__(self):
        """
        The following actions will be performed when entering a context:
        * connect to the broker
        * add a sender
        The connection to the broker will be closed if any of these failed.
        :raise MessageBusError: if any of the above actions failed.
        :return: self
        """
        self.open()
        return self

    def close(self):
        """
        The following actions will be performed:
        * close the sender and the connection to the broker
        * set `sender` to None
        :param exc_type: type of exception thrown in context
        :param exc_val: value of exception thrown in context
        :param exc_tb: traceback of exception thrown in context
        :raise MessageBusError: if disconnect from broker fails
        """
        if (self.opened==1):
           try:
              self.sender.close()
              logger.debug("[ToBus] Disconnected sender from broker: %s", self.broker)

           except proton.Timeout:
              raise_exception(MessageBusError,
                            "[ToBus] Failed to disconnect sender from broker %s" %
                            self.broker)
           finally:
              self.sender = None
        self.opened-=1

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def _check_session(self):
        """
        Check if there's an active session.
        :raise MessageBusError: if there's no active session
        """
        if not self.opened or not hasattr(self, 'sender') or self.sender is None:
            raise MessageBusError("[ToBus] No active sender (broker: %s)" % self.broker)

    def _get_sender(self):
        """
        Get the sender associated with the current session.
        :raise MessageBusError: if there's no active session, or if there's not
        exactly one sender
        :return: sender object
        """
        self._check_session()
        return self.sender
        #nr_senders = len(self.session.senders)
        #if nr_senders == 1:
        #    return self.session.senders[0]
        #else:
        #    msg = "No senders" if nr_senders == 0 else "More than one sender"
        #    raise MessageBusError("[ToBus] %s (broker: %s, session %s)" %
        #                          (msg, self.broker, self.session))

    def _add_queue(self, address):
        """
        Add a queue that you want to sends messages to.
        :param address: valid Qpid address
        :raise MessageBusError: if sender could not be created
        """

        if address and '/' in address:
            address, self.subject = address.split('/')
        else:
            self.subject = None

        what = "sender for source: %s (broker: %s, session: %s)" % (address, self.broker, 'unknown')

        try:
            if hasattr(self, 'sender') and self.sender is not None:
                raise_exception(MessageBusError, "[ToBus] More than one sender")
            self.sender = self.connection.create_sender(address=address)
        except proton.ProtonException:
            raise_exception(MessageBusError,
                            "[ToBus] Failed to create %s" % (what,))
        logger.debug("[ToBus] Created %s", what)

    def send(self, message, timeout=DEFAULT_TIMEOUT):
        """
        Send a message to the exchange (target) we're connected to.
        :param message: message to be sent
        :param timeout: maximum time in seconds to wait for send action
        :return:
        """
        sender = self._get_sender()
        qmsg = to_qpid_message(message)

        if isinstance(qmsg.body, dict):
            #qpid cannot handle strings longer than 64k within dicts
            #so convert each string to a buffer which qpid can fit in 2^32-1 bytes
            #convert it back on the other end
            # --- JK, Python3 change:
            # We used to have a deep copy of the message before altering the strings, but we can't do that any more.
            # I commented it out. Why was it even required? I don't see any side effects from that?
            # In Py3, deepcopy raises: (TypeError: object.__new__(SwigPyObject) is not safe, use SwigPyObject.__new__())
            # ---
            # make copy of qmsg first, because we are modifying the contents, and we don't want any side effects
            # qmsg = deepcopy(qmsg)

            qmsg.body = convertStringValuesToBuffer(qmsg.body, 65535)

        logger.debug("[ToBus] Sending message to: %s (%s)", self.address, qmsg)
        try:
            if hasattr(self, 'subject') and self.subject:
                qmsg.subject = self.subject
            sender.send(qmsg, timeout=timeout)
        except proton.ProtonException:
            raise_exception(MessageBusError,
                            "[ToBus] Failed to send message to: %s" %
                            sender.target)
        logger.debug("[ToBus] Message sent to: %s subject: %s" % (self.address, qmsg.subject))


class TemporaryQueue(object):
    """
    A TemporaryQueue instance can be used to setup a dynamic temporary queue which is closed and deleted automagically when leaving context.
    Together with the factory methods create_frombus and/or create_tobus it gives us to following simple but often used use case:

    with TemporaryQueue("MyTestQueue") as tmp_queue:
        with tmp_queue.create_tobus() as tobus, tmp_queue.create_frombus() as frombus:
            # send a message...
            original_msg = EventMessage(content="foobar")
            tobus.send(original_msg)

            # ...receive the message.
            received_msg = frombus.receive()

    Alternative use cases with only a tobus or only a frombus on the tmp_queue are also possible.
    """
    def __init__(self, name=None, broker="localhost"):
        """
        Create a TemporaryQueue instance with an optional name on the given broker.
        :param name: Optional name, which is part of the final address which also includes a uuid.
        :param broker: the qpid broker to connect to.
        """
        self.name = name
        self.broker = broker
        self._dynamic_receiver = None
        self.address = None

    def __enter__(self):
        """
        Opens/creates the temporary queue. It is automatically closed when leaving context in __exit__.
        :return: self.
        """
        self.open()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Close/remove the temporary queue.
        """
        self.close()

    def open(self):
        """
        Open/create the temporary queue.
        It is advised to use the TemporaryQueue instance in a 'with' context, which guarantees the close call.
        """
        logger.info("Creating TemporaryQueue...")
        connection = proton.utils.BlockingConnection(self.broker)
        self._dynamic_receiver = connection.create_receiver(address=None, dynamic=True, name=self.name)
        self.address = self._dynamic_receiver.link.remote_source.address
        logger.info("Created TemporaryQueue at %s", self.address)

    def close(self):
        """
        Close/remove the temporary queue.
        It is advised to use the TemporaryQueue instance in a 'with' context, which guarantees the close call.
        """
        logger.debug("Closing TemporaryQueue at %s", self.address)
        self._dynamic_receiver.close()
        self._dynamic_receiver.connection.close()
        self._dynamic_receiver = None
        logger.info("Closed TemporaryQueue at %s", self.address)
        self.address = None

    def __str__(self):
        return "TemporaryQueue address=%s".format(self.address)

    def create_frombus(self, subject=None):
        """
        Factory method to create a FromBus instance which is connected to this TemporaryQueue
        :param subject: Optional subject string to filter for. Only messages which match this subject are received.
        :return: FromBus
        """
        return FromBus(broker=self.broker,
                       address="%s/%s" % (self.address, subject) if subject else self.address)

    def create_tobus(self):
        """
        Factory method to create a ToBus instance which is connected to this TemporaryQueue
        :return: ToBus
        """
        return ToBus(broker=self.broker, address=self.address)


class AbstractBusListener(object):
    """
    AbstractBusListener class for handling messages which are received on a message bus.
    Typical usage is to derive from this class and implement the handle_message method with concrete logic.
    """

    def __init__(self, address, broker=None, **kwargs):
        """
        Initialize AbstractBusListener object with address (str).
        :param address: valid Qpid address
        additional parameters in kwargs:
            exclusive=    <bool>  Create an exclusive binding so no other listeners can consume duplicate messages (default: False)
            numthreads=   <int>  Number of parallel threads processing messages (default: 1)
            verbose=      <bool>  Output extra logging over stdout (default: False)
        """
        self.address          = address
        self.broker           = broker
        self._running         = threading.Event()
        self._listening       = False
        self.exclusive        = kwargs.pop("exclusive", False)
        self._numthreads      = kwargs.pop("numthreads", 1)
        self.verbose          = kwargs.pop("verbose", False)
        self.frombus_options  = {}

        if len(kwargs):
            raise AttributeError("Unexpected argument passed to AbstractBusListener constructor: %s", kwargs)

        # Set appropriate flags for exclusive binding
        if self.exclusive == True:
            logger.warning("exclusive binding is not supported yet since our switch to proton")
            binding_key = address.split('/')[-1]
            self.frombus_options["link"] = { "name": str(uuid.uuid4()),
                                              "x-bindings": [ { "key": binding_key,
                                                                "arguments": { "\"qpid.exclusive-binding\"": True }
                                                              }
                                                            ]
                                            }

    def _debug(self, txt):
        """
        Internal use only.
        """
        if self.verbose == True:
            logger.debug("[%s: %s]", self.__class__.__name__, txt)

    def isRunning(self):
        return self._running.isSet()

    def isListening(self):
        return self._listening

    def start_listening(self, numthreads=None):
        """
        Start the background threads and process incoming messages.
        """
        if self._listening == True:
            return

        self._bus_listener  = FromBus(self.address, broker=self.broker, broker_options=self.frombus_options)
        self._bus_listener.open()

        if numthreads != None:
            self._numthreads = numthreads

        self._running.set()
        self._threads = {}
        for i in range(self._numthreads):
            thread = threading.Thread(target=self._loop)
            self._threads[thread] = self._create_thread_args(i)
            thread.start()
        self._listening = True

    def _create_thread_args(self, index):
        return {'index':index,
                'num_received_messages':0,
                'num_processed_messages':0}

    def stop_listening(self):
        """
        Stop the background threads that listen to incoming messages.
        """
        # stop all running threads
        if self.isRunning():
            self._running.clear()

            for thread, args in list(self._threads.items()):
                logger.debug("Thread %2d: STOPPING Listening for messages on %s at broker %s" %
                             (args['index'], self.address, self.broker if self.broker else 'localhost'))
                thread.join()
                logger.info("Thread %2d: STOPPED Listening for messages on %s" % (args['index'], self.address))
                logger.info("           %d messages received and %d processed OK." % (args['num_received_messages'], args['num_processed_messages']))
        self._listening = False

        # close the listeners
        if self._bus_listener.isConnected():
            self._bus_listener.close()


    def __enter__(self):
        """
        Internal use only. Handles scope with keyword 'with'
        """
        self.start_listening()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Internal use only. Handles scope with keyword 'with'
        """
        self.stop_listening()

    def _onListenLoopBegin(self):
        "Called before main processing loop is entered."
        pass

    def _onBeforeReceiveMessage(self):
        "Called in main processing loop just before a blocking wait for messages is done."
        pass

    def _handleMessage(self, msg):
        "Implement this method in your subclass to handle a received message"
        raise NotImplementedError("Please implement the _handleMessage method in your subclass to handle a received message")

    def _onAfterReceiveMessage(self, successful):
        "Called in the main loop after the result was send back to the requester."
        "@successful@ reflects the state of the handling: true/false"
        pass

    def _onListenLoopEnd(self):
        "Called after main processing loop is finished."
        pass

    def _loop(self):
        """
        Internal use only. Message listener loop that receives messages and starts the attached function with the message content as argument.
        """
        currentThread = threading.currentThread()
        args = self._threads[currentThread]
        thread_idx = args['index']
        logger.info( "Thread %d START Listening for messages on %s at broker %s" %
                    (thread_idx, self.address, self.broker if self.broker else 'localhost'))
        try:
            self._onListenLoopBegin()
        except Exception as e:
            logger.error("onListenLoopBegin() failed with %s", e)
            return

        while self.isRunning():
            try:
                self._onBeforeReceiveMessage()
            except Exception as e:
                logger.error("onBeforeReceiveMessage() failed with %s", e)
                continue

            try:
                # get the next message
                lofar_msg = self._bus_listener.receive(1, self.verbose)
                # retry if timed-out
                if lofar_msg is None:
                    continue

                # Keep track of number of received messages
                args['num_received_messages'] += 1

                # Execute the handler function and send reply back to client
                try:
                    self._debug("Running handler")

                    self._handleMessage(lofar_msg)

                    self._debug("Finished handler")

                    self._bus_listener.ack(lofar_msg)

                    args['num_processed_messages'] += 1

                    try:
                        self._onAfterReceiveMessage(True)
                    except Exception as e:
                        logger.error("onAfterReceiveMessage() failed with %s", e)
                        continue

                except Exception as e:
                    import traceback
                    logger.warning("Handling of message failed with %s: %s\nMessage: %s", e, traceback.format_exc(),lofar_msg.body)

                    # Any thrown exceptions either Service exception or unhandled exception
                    # during the execution of the service handler is caught here.
                    self._debug(str(e))
                    try:
                        self._onAfterReceiveMessage(False)
                    except Exception as e:
                        logger.error("onAfterReceiveMessage() failed with %s", e)
                    continue

            except Exception as e:
                # Unknown problem in the library. Report this and continue.
                logger.error("[%s:] ERROR during processing of incoming message.\n%s" %(self.__class__.__name__, str(e)))
                logger.info("Thread %d: Resuming listening on %s " % (thread_idx, self.address))

        try:
            self._onListenLoopEnd()
        except Exception as e:
            logger.error("finalize_loop() failed with %s", e)


__all__ = ["FromBus", "ToBus", "TemporaryQueue", "AbstractBusListener"]