-
Jorrit Schaap authored
SW-516: remove 'options' from messagebus classes, as these were qpid options, and we're using proton now. At this moment there is no need for such options. These can be added later when needed.
Jorrit Schaap authoredSW-516: remove 'options' from messagebus classes, as these were qpid options, and we're using proton now. At this moment there is no need for such options. These can be added later when needed.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
messagebus.py 31.61 KiB
#!/usr/bin/env python3
# messagebus.py: Provide an easy way exchange messages on the message bus.
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: messagebus.py 1580 2015-09-30 14:18:57Z loose $
"""
Provide an easy way exchange messages on the message bus.
"""
from lofar.messaging.exceptions import MessageBusError, MessageFactoryError
from lofar.messaging.messages import to_qpid_message, MESSAGE_FACTORY
from lofar.common.util import raise_exception
from lofar.common.util import convertStringValuesToBuffer, convertBufferValuesToString
import proton
import proton.utils
import proton.reactor
import logging
import sys
import uuid
import threading
from copy import deepcopy
import re
logger = logging.getLogger(__name__)
# Default settings for often used parameters.
DEFAULT_BROKER = "localhost:5672"
DEFAULT_BROKER_OPTIONS = {'reconnect': True}
DEFAULT_RECEIVER_CAPACITY = 128
DEFAULT_TIMEOUT = 5
class FromBus(object):
"""
*** The following was true for the Py2 qpid library, not necessarily for Proton ***
This class provides an easy way to fetch messages from the message bus.
Note that most methods require that a FromBus object is used *inside* a
context. When entering the context, the connection with the broker is
opened, and a session and a receiver are created. When exiting the context,
the connection to the broker is closed; as a side-effect the receiver(s)
and session are destroyed.
note:: The rationale behind using a context is that this is (unfortunately)
the *only* way that we can guarantee proper resource management. If there
were a __deinit__() as counterpart to __init__(), we could have used that.
We cannot use __del__(), because it is not the counterpart of __init__(),
but that of __new__().
"""
def __init__(self, address, broker=None, broker_options=None):
"""
Initializer.
:param address: valid Qpid address
:param broker: valid Qpid broker URL, e.g. "localhost:5672"
:param broker_options: valid Qpid broker options, e.g. {'reconnect': True}
"""
self.address = address
self.broker = broker if broker else DEFAULT_BROKER
self.broker_options = broker_options if broker_options else DEFAULT_BROKER_OPTIONS
try:
logger.debug("[FromBus] Connecting to broker: %s", self.broker)
if 'reconnect' in self.broker_options:
self.broker_options.pop('reconnect')
logger.info('[FromBus] Ignoring duplicate reconnect option in connection init')
self.connection = proton.utils.BlockingConnection(self.broker, **self.broker_options)
logger.debug("[FromBus] Connected to broker: %s", self.broker)
except proton.ConnectionException as ex:
logger.exception('[FromBus] Initialization failed')
raise MessageBusError('[FromBus] Initialization failed (%s)' % ex)
self.opened=0
def isConnected(self):
return self.opened > 0
def open(self):
"""
The following actions will be performed when entering a context:
* connect to the broker
* add a receiver
The connection to the broker will be closed if any of these failed.
:raise MessageBusError: if any of the above actions failed.
:return: self
"""
if (self.opened==0):
# create sender
try:
self._add_queue(self.address)
except proton.ProtonException:
self.__exit__(*sys.exc_info())
raise_exception(MessageBusError, "[FromBus] Receiver initialization failed")
except MessageBusError:
self.__exit__(*sys.exc_info())
raise
self.opened+=1
def __enter__(self):
self.open()
return self
def close(self):
"""
The following actions will be performed:
* close the receiver
:param exc_type: type of exception thrown in context
:param exc_val: value of exception thrown in context
:param exc_tb: traceback of exception thrown in context
"""
if (self.opened==1):
try:
self.receiver.close()
logger.debug("[FromBus] Disconnected receiver from broker: %s", self.broker)
except proton.ProtonException:
raise_exception(MessageBusError,
"[FromBus] Failed to disconnect receiver from broker: %s" %
self.broker)
finally:
self.receiver = None
self.opened-=1
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def _check_session(self):
"""
Check if there's an active session.
:raise MessageBusError: if there's no active session
"""
if not self.isConnected() or not hasattr(self, 'receiver') or self.receiver is None:
raise MessageBusError(
"[FromBus] No active receiver (broker: %s)" % self.broker)
def _add_queue(self, address):
"""
Add a queue that you want to receive messages from.
:param address: valid Qpid address
"""
if address and '/' in address:
address, subject = address.split('/')
else:
subject=None
logger.debug("[FromBus] Receiving from bus: %s with subject: %s" % (address, subject))
what = "receiver for source: %s (broker: %s, session: %s)" % (address, self.broker, 'unknown')
try:
# helper class for filtering by subject
class ProtonSubjectFilter(proton.reactor.Filter):
def __init__(self, value):
filter_dict = { proton.symbol('subject-filter'):
proton.Described(proton.symbol('apache.org:legacy-amqp-topic-binding:string'), value)}
super(ProtonSubjectFilter, self).__init__(filter_dict)
self.receiver = self.connection.create_receiver(address=address,
credit=DEFAULT_RECEIVER_CAPACITY,
options=ProtonSubjectFilter(subject) if subject else None)
except proton.ProtonException as pe:
raise_exception(MessageBusError,
"[FromBus] Failed to create %s: %s" % (what, pe))
logger.debug("[FromBus] Created %s", what)
def receive(self, timeout=DEFAULT_TIMEOUT, logDebugMessages=True):
"""
Receive the next message from any of the queues we're listening on.
:param timeout: maximum time in seconds to wait for a message.
:return: received message, None if timeout occurred.
"""
self._check_session()
if logDebugMessages:
logger.debug("[FromBus] Waiting %s seconds for next message", timeout)
try:
while True: # break when message is acceptable
msg = self.receiver.receive(timeout=timeout)
if msg is not None:
break # handle this message
except proton.Timeout:
if logDebugMessages:
logger.debug("[FromBus] No message received within %s seconds", timeout)
return None
except proton.ProtonException:
raise_exception(MessageBusError,
"[FromBus] Failed to fetch message from: "
"%s" % self.address)
except Exception as e:
#FIXME: what if another exception is raised? should we reconnect?
logger.error(e)
raise_exception(MessageBusError,
"[FromBus] unknown exception while receiving message on %s: %s" % (self.address, e))
try:
if isinstance(msg.body, dict):
#qpid cannot handle strings longer than 64k within dicts
#so each string was converted to a buffer which qpid can fit in 2^32-1 bytes
#and now we convert it back on this end
msg.body = convertBufferValuesToString(msg.body)
except MessageFactoryError:
self.reject(msg)
raise_exception(MessageBusError, "[FromBus] Message rejected")
logger.debug("[FromBus] Message received on: %s subject: %s" % (self.address, msg.subject))
if logDebugMessages:
logger.debug("[FromBus] %s" % msg)
try:
amsg = MESSAGE_FACTORY.create(msg)
except MessageFactoryError:
self.reject(msg)
raise_exception(MessageBusError, "[FromBus] Message rejected")
self.ack(msg)
return amsg
def ack(self, msg):
"""
Acknowledge a message. This will inform Qpid that the message can
safely be removed from the queue.
:param msg: message to be acknowledged
"""
self._check_session()
qmsg = to_qpid_message(msg)
try:
self.receiver.accept() # with proton, we can only unspecifically for the receiver...
except:
# This seems to happen quite often...
# logger.exception('[FromBus] Could not acknowledge message, but will go on...')
pass
else:
logger.debug("[FromBus] acknowledged message: %s", qmsg)
def nack(self, msg):
"""
Do not acknowledge a message. This will inform Qpid that the message
has to be redelivered. You cannot nack a message that has already
been acknowledged.
:param msg: message to not be acknowledged
.. attention::
We should call qpid.messaging.Session.release() here,but that is
not available in Qpid-Python 0.32. We therefore use
qpid.messaging.Session.acknowledge() instead.
"""
logger.warning("[FromBus] nack() is not supported, using ack() instead")
self.ack(msg)
def reject(self, msg):
"""
Reject a message. This will inform Qpid that the message should not be
redelivered. You cannot reject a message that has already been
acknowledged.
:param msg: message to be rejected
.. attention::
We should call qpid.messaging.Session.reject() here, but that is
not available in Qpid-Python 0.32. We therefore use
qpid.messaging.Session.acknowledge() instead.
"""
logger.warning(
"[FromBus] reject() is not supported, using ack() instead")
self.ack(msg)
def drain(self, timeout=0.1):
"""Read and ack all messages until queue/exchange is empty"""
while True:
try:
if self.receiver.receive(timeout=timeout) is None:
break
self.receiver.accept()
except proton.Timeout:
break
def nr_of_messages_in_queue(self, timeout=0.1):
"""
Get the current number of messages in this FromBus's local queue, which is at most DEFAULT_RECEIVER_CAPACITY
Please note that this is not per se equal to the number of messages in the queue at the broker!
A proton receiver can and will prefetch messages from a broker-queue, and store them in an internal (client-side) queue.
If-and-only-if a message is handled and ack'ed at the client, then the message truly disappears from the broker-queue.
:param timeout: time out in (fractional) seconds or None
:return: the current number of messages in this FromBus's local receiver queue
"""
self._check_session()
if timeout is not None and timeout > 0:
try:
# allow the fetcher to receive some message(s)
current_nr_of_messages_in_queue = len(self.receiver.fetcher.incoming)
self.connection.container.do_work(timeout=0.5*timeout)
self.receiver.connection.wait(lambda: len(self.receiver.fetcher.incoming) != current_nr_of_messages_in_queue,
timeout=0.5*timeout)
except proton.Timeout:
pass
except Exception as e:
raise_exception(MessageBusError,
"[FromBus] Failed to get number of messages available in queue: %s" % self.address)
# return the current number of queued incoming messages
return len(self.receiver.fetcher.incoming)
class ToBus(object):
"""
This class provides an easy way to post messages onto the message bus.
*** The following was true for the Py2 qpid library, not necessarily for Proton ***
Note that most methods require that a ToBus object is used *inside* a
context. When entering the context, the connection with the broker is
opened, and a session and a sender are created. When exiting the context,
the connection to the broker is closed; as a side-effect the sender and
session are destroyed.
note:: The rationale behind using a context is that this is (unfortunately)
the *only* way that we can guarantee proper resource management. If there
were a __deinit__() as counterpart to __init__(), we could have used that.
We cannot use __del__(), because it is not the counterpart of __init__(),
but that of __new__().
"""
def __init__(self, address, broker=None, broker_options=None):
"""
Initializer.
:param address: valid Qpid address
:param broker: valid Qpid broker URL, e.g. "localhost:5672"
:param broker_options: valid Qpid broker options, e.g. {'reconnect': True}
"""
self.address = address
self.broker = broker if broker else DEFAULT_BROKER
self.broker_options = broker_options if broker_options else DEFAULT_BROKER_OPTIONS
try:
logger.debug("[ToBus] Connecting to broker: %s", self.broker)
if 'reconnect' in self.broker_options:
self.broker_options.pop('reconnect')
logger.info('[ToBus] Ignoring duplicate reconnect option in connection init')
self.connection = proton.utils.BlockingConnection(self.broker, **self.broker_options)
logger.debug("[ToBus] Connected to broker: %s", self.broker)
except proton.ConnectionException as ex:
logger.exception('[ToBus] Initialization failed')
raise MessageBusError('[ToBus] Initialization failed (%s)' % ex)
self.opened = 0
def open(self):
if (self.opened==0):
try:
self._add_queue(self.address)
except proton.ProtonException:
self.__exit__(*sys.exc_info())
raise_exception(MessageBusError, "[ToBus] Sender initialization failed")
except MessageBusError:
self.__exit__(*sys.exc_info())
raise
self.opened+=1
def __enter__(self):
"""
The following actions will be performed when entering a context:
* connect to the broker
* add a sender
The connection to the broker will be closed if any of these failed.
:raise MessageBusError: if any of the above actions failed.
:return: self
"""
self.open()
return self
def close(self):
"""
The following actions will be performed:
* close the sender and the connection to the broker
* set `sender` to None
:param exc_type: type of exception thrown in context
:param exc_val: value of exception thrown in context
:param exc_tb: traceback of exception thrown in context
:raise MessageBusError: if disconnect from broker fails
"""
if (self.opened==1):
try:
self.sender.close()
logger.debug("[ToBus] Disconnected sender from broker: %s", self.broker)
except proton.Timeout:
raise_exception(MessageBusError,
"[ToBus] Failed to disconnect sender from broker %s" %
self.broker)
finally:
self.sender = None
self.opened-=1
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def _check_session(self):
"""
Check if there's an active session.
:raise MessageBusError: if there's no active session
"""
if not self.opened or not hasattr(self, 'sender') or self.sender is None:
raise MessageBusError("[ToBus] No active sender (broker: %s)" % self.broker)
def _get_sender(self):
"""
Get the sender associated with the current session.
:raise MessageBusError: if there's no active session, or if there's not
exactly one sender
:return: sender object
"""
self._check_session()
return self.sender
#nr_senders = len(self.session.senders)
#if nr_senders == 1:
# return self.session.senders[0]
#else:
# msg = "No senders" if nr_senders == 0 else "More than one sender"
# raise MessageBusError("[ToBus] %s (broker: %s, session %s)" %
# (msg, self.broker, self.session))
def _add_queue(self, address):
"""
Add a queue that you want to sends messages to.
:param address: valid Qpid address
:raise MessageBusError: if sender could not be created
"""
if address and '/' in address:
address, self.subject = address.split('/')
else:
self.subject = None
what = "sender for source: %s (broker: %s, session: %s)" % (address, self.broker, 'unknown')
try:
if hasattr(self, 'sender') and self.sender is not None:
raise_exception(MessageBusError, "[ToBus] More than one sender")
self.sender = self.connection.create_sender(address=address)
except proton.ProtonException:
raise_exception(MessageBusError,
"[ToBus] Failed to create %s" % (what,))
logger.debug("[ToBus] Created %s", what)
def send(self, message, timeout=DEFAULT_TIMEOUT):
"""
Send a message to the exchange (target) we're connected to.
:param message: message to be sent
:param timeout: maximum time in seconds to wait for send action
:return:
"""
sender = self._get_sender()
qmsg = to_qpid_message(message)
if isinstance(qmsg.body, dict):
#qpid cannot handle strings longer than 64k within dicts
#so convert each string to a buffer which qpid can fit in 2^32-1 bytes
#convert it back on the other end
# --- JK, Python3 change:
# We used to have a deep copy of the message before altering the strings, but we can't do that any more.
# I commented it out. Why was it even required? I don't see any side effects from that?
# In Py3, deepcopy raises: (TypeError: object.__new__(SwigPyObject) is not safe, use SwigPyObject.__new__())
# ---
# make copy of qmsg first, because we are modifying the contents, and we don't want any side effects
# qmsg = deepcopy(qmsg)
qmsg.body = convertStringValuesToBuffer(qmsg.body, 65535)
logger.debug("[ToBus] Sending message to: %s (%s)", self.address, qmsg)
try:
if hasattr(self, 'subject') and self.subject:
qmsg.subject = self.subject
sender.send(qmsg, timeout=timeout)
except proton.ProtonException:
raise_exception(MessageBusError,
"[ToBus] Failed to send message to: %s" %
sender.target)
logger.debug("[ToBus] Message sent to: %s subject: %s" % (self.address, qmsg.subject))
class TemporaryQueue(object):
"""
A TemporaryQueue instance can be used to setup a dynamic temporary queue which is closed and deleted automagically when leaving context.
Together with the factory methods create_frombus and/or create_tobus it gives us to following simple but often used use case:
with TemporaryQueue("MyTestQueue") as tmp_queue:
with tmp_queue.create_tobus() as tobus, tmp_queue.create_frombus() as frombus:
# send a message...
original_msg = EventMessage(content="foobar")
tobus.send(original_msg)
# ...receive the message.
received_msg = frombus.receive()
Alternative use cases with only a tobus or only a frombus on the tmp_queue are also possible.
"""
def __init__(self, name=None, broker="localhost"):
"""
Create a TemporaryQueue instance with an optional name on the given broker.
:param name: Optional name, which is part of the final address which also includes a uuid.
:param broker: the qpid broker to connect to.
"""
self.name = name
self.broker = broker
self._dynamic_receiver = None
self.address = None
def __enter__(self):
"""
Opens/creates the temporary queue. It is automatically closed when leaving context in __exit__.
:return: self.
"""
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Close/remove the temporary queue.
"""
self.close()
def open(self):
"""
Open/create the temporary queue.
It is advised to use the TemporaryQueue instance in a 'with' context, which guarantees the close call.
"""
logger.info("Creating TemporaryQueue...")
connection = proton.utils.BlockingConnection(self.broker)
self._dynamic_receiver = connection.create_receiver(address=None, dynamic=True, name=self.name)
self.address = self._dynamic_receiver.link.remote_source.address
logger.info("Created TemporaryQueue at %s", self.address)
def close(self):
"""
Close/remove the temporary queue.
It is advised to use the TemporaryQueue instance in a 'with' context, which guarantees the close call.
"""
logger.debug("Closing TemporaryQueue at %s", self.address)
self._dynamic_receiver.close()
self._dynamic_receiver.connection.close()
self._dynamic_receiver = None
logger.info("Closed TemporaryQueue at %s", self.address)
self.address = None
def __str__(self):
return "TemporaryQueue address=%s".format(self.address)
def create_frombus(self, subject=None):
"""
Factory method to create a FromBus instance which is connected to this TemporaryQueue
:param subject: Optional subject string to filter for. Only messages which match this subject are received.
:return: FromBus
"""
return FromBus(broker=self.broker,
address="%s/%s" % (self.address, subject) if subject else self.address)
def create_tobus(self):
"""
Factory method to create a ToBus instance which is connected to this TemporaryQueue
:return: ToBus
"""
return ToBus(broker=self.broker, address=self.address)
class AbstractBusListener(object):
"""
AbstractBusListener class for handling messages which are received on a message bus.
Typical usage is to derive from this class and implement the handle_message method with concrete logic.
"""
def __init__(self, address, broker=None, **kwargs):
"""
Initialize AbstractBusListener object with address (str).
:param address: valid Qpid address
additional parameters in kwargs:
exclusive= <bool> Create an exclusive binding so no other listeners can consume duplicate messages (default: False)
numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False)
"""
self.address = address
self.broker = broker
self._running = threading.Event()
self._listening = False
self.exclusive = kwargs.pop("exclusive", False)
self._numthreads = kwargs.pop("numthreads", 1)
self.verbose = kwargs.pop("verbose", False)
self.frombus_options = {}
if len(kwargs):
raise AttributeError("Unexpected argument passed to AbstractBusListener constructor: %s", kwargs)
# Set appropriate flags for exclusive binding
if self.exclusive == True:
logger.warning("exclusive binding is not supported yet since our switch to proton")
binding_key = address.split('/')[-1]
self.frombus_options["link"] = { "name": str(uuid.uuid4()),
"x-bindings": [ { "key": binding_key,
"arguments": { "\"qpid.exclusive-binding\"": True }
}
]
}
def _debug(self, txt):
"""
Internal use only.
"""
if self.verbose == True:
logger.debug("[%s: %s]", self.__class__.__name__, txt)
def isRunning(self):
return self._running.isSet()
def isListening(self):
return self._listening
def start_listening(self, numthreads=None):
"""
Start the background threads and process incoming messages.
"""
if self._listening == True:
return
self._bus_listener = FromBus(self.address, broker=self.broker, broker_options=self.frombus_options)
self._bus_listener.open()
if numthreads != None:
self._numthreads = numthreads
self._running.set()
self._threads = {}
for i in range(self._numthreads):
thread = threading.Thread(target=self._loop)
self._threads[thread] = self._create_thread_args(i)
thread.start()
self._listening = True
def _create_thread_args(self, index):
return {'index':index,
'num_received_messages':0,
'num_processed_messages':0}
def stop_listening(self):
"""
Stop the background threads that listen to incoming messages.
"""
# stop all running threads
if self.isRunning():
self._running.clear()
for thread, args in list(self._threads.items()):
logger.debug("Thread %2d: STOPPING Listening for messages on %s at broker %s" %
(args['index'], self.address, self.broker if self.broker else 'localhost'))
thread.join()
logger.info("Thread %2d: STOPPED Listening for messages on %s" % (args['index'], self.address))
logger.info(" %d messages received and %d processed OK." % (args['num_received_messages'], args['num_processed_messages']))
self._listening = False
# close the listeners
if self._bus_listener.isConnected():
self._bus_listener.close()
def __enter__(self):
"""
Internal use only. Handles scope with keyword 'with'
"""
self.start_listening()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Internal use only. Handles scope with keyword 'with'
"""
self.stop_listening()
def _onListenLoopBegin(self):
"Called before main processing loop is entered."
pass
def _onBeforeReceiveMessage(self):
"Called in main processing loop just before a blocking wait for messages is done."
pass
def _handleMessage(self, msg):
"Implement this method in your subclass to handle a received message"
raise NotImplementedError("Please implement the _handleMessage method in your subclass to handle a received message")
def _onAfterReceiveMessage(self, successful):
"Called in the main loop after the result was send back to the requester."
"@successful@ reflects the state of the handling: true/false"
pass
def _onListenLoopEnd(self):
"Called after main processing loop is finished."
pass
def _loop(self):
"""
Internal use only. Message listener loop that receives messages and starts the attached function with the message content as argument.
"""
currentThread = threading.currentThread()
args = self._threads[currentThread]
thread_idx = args['index']
logger.info( "Thread %d START Listening for messages on %s at broker %s" %
(thread_idx, self.address, self.broker if self.broker else 'localhost'))
try:
self._onListenLoopBegin()
except Exception as e:
logger.error("onListenLoopBegin() failed with %s", e)
return
while self.isRunning():
try:
self._onBeforeReceiveMessage()
except Exception as e:
logger.error("onBeforeReceiveMessage() failed with %s", e)
continue
try:
# get the next message
lofar_msg = self._bus_listener.receive(1, self.verbose)
# retry if timed-out
if lofar_msg is None:
continue
# Keep track of number of received messages
args['num_received_messages'] += 1
# Execute the handler function and send reply back to client
try:
self._debug("Running handler")
self._handleMessage(lofar_msg)
self._debug("Finished handler")
self._bus_listener.ack(lofar_msg)
args['num_processed_messages'] += 1
try:
self._onAfterReceiveMessage(True)
except Exception as e:
logger.error("onAfterReceiveMessage() failed with %s", e)
continue
except Exception as e:
import traceback
logger.warning("Handling of message failed with %s: %s\nMessage: %s", e, traceback.format_exc(),lofar_msg.body)
# Any thrown exceptions either Service exception or unhandled exception
# during the execution of the service handler is caught here.
self._debug(str(e))
try:
self._onAfterReceiveMessage(False)
except Exception as e:
logger.error("onAfterReceiveMessage() failed with %s", e)
continue
except Exception as e:
# Unknown problem in the library. Report this and continue.
logger.error("[%s:] ERROR during processing of incoming message.\n%s" %(self.__class__.__name__, str(e)))
logger.info("Thread %d: Resuming listening on %s " % (thread_idx, self.address))
try:
self._onListenLoopEnd()
except Exception as e:
logger.error("finalize_loop() failed with %s", e)
__all__ = ["FromBus", "ToBus", "TemporaryQueue", "AbstractBusListener"]