-
Auke Klazema authoredAuke Klazema authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
messagebus.py 29.53 KiB
#!/usr/bin/env python
# messagebus.py: Provide an easy way exchange messages on the message bus.
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: messagebus.py 1580 2015-09-30 14:18:57Z loose $
"""
Provide an easy way exchange messages on the message bus.
"""
from lofar.messaging.exceptions import MessageBusError, MessageFactoryError
from lofar.messaging.messages import to_qpid_message, MESSAGE_FACTORY
from lofar.common.util import raise_exception
from lofar.common.util import convertStringValuesToBuffer, convertBufferValuesToString
import proton
import proton.utils
import proton.reactor
import logging
import sys
import uuid
import threading
from copy import deepcopy
logger = logging.getLogger(__name__)
# Default settings for often used parameters.
DEFAULT_ADDRESS_OPTIONS = {'create': 'always'}
DEFAULT_BROKER = "localhost:5672"
DEFAULT_BROKER_OPTIONS = {'reconnect': True}
DEFAULT_RECEIVER_CAPACITY = 1
DEFAULT_TIMEOUT = 5
# Construct address options string (address options object not supported well in Python)
def address_options_to_str(opt):
if isinstance(opt, dict):
return "{%s}" % (", ".join('%s: %s' % (k,address_options_to_str(v)) for (k,v) in opt.items()))
elif isinstance(opt, list):
return "[%s]" % (", ".join(address_options_to_str(v) for v in opt))
elif isinstance(opt, int):
return '%s' % (opt,)
elif isinstance(opt, bool):
return '%s' % (opt,)
else:
return '"%s"' % (opt,)
class FromBus(object):
"""
*** The following was true for the Py2 qpid library, not necessarily for Proton ***
This class provides an easy way to fetch messages from the message bus.
Note that most methods require that a FromBus object is used *inside* a
context. When entering the context, the connection with the broker is
opened, and a session and a receiver are created. When exiting the context,
the connection to the broker is closed; as a side-effect the receiver(s)
and session are destroyed.
note:: The rationale behind using a context is that this is (unfortunately)
the *only* way that we can guarantee proper resource management. If there
were a __deinit__() as counterpart to __init__(), we could have used that.
We cannot use __del__(), because it is not the counterpart of __init__(),
but that of __new__().
"""
def __init__(self, address, options=None, broker=None, broker_options=None, dynamic=False):
"""
Initializer.
:param address: valid Qpid address
:param options: valid Qpid address options, e.g. {'create': 'never'}
:param broker: valid Qpid broker URL, e.g. "localhost:5672"
:param broker_options: valid Qpid broker options, e.g. {'reconnect': True}
"""
self.address = address
self.options = options if options else DEFAULT_ADDRESS_OPTIONS
self.broker = broker if broker else DEFAULT_BROKER
self.broker_options = broker_options if broker_options else DEFAULT_BROKER_OPTIONS
self.dynamic = dynamic
try:
logger.debug("[FromBus] Connecting to broker: %s", self.broker)
if 'reconnect' in self.broker_options:
self.broker_options.pop('reconnect')
logger.info('[FromBus] Ignoring duplicate reconnect option in connection init')
self.connection = proton.utils.BlockingConnection(self.broker, **self.broker_options)
logger.debug("[FromBus] Connected to broker: %s", self.broker)
except proton.ConnectionException as ex:
logger.exception('[FromBus] Initialization failed')
raise MessageBusError('[FromBus] Initialization failed (%s)' % ex)
self.opened=0
def isConnected(self):
return self.opened > 0
def open(self):
"""
The following actions will be performed when entering a context:
* connect to the broker
* add a receiver
The connection to the broker will be closed if any of these failed.
:raise MessageBusError: if any of the above actions failed.
:return: self
"""
if (self.opened==0):
# create sender
try:
self._add_queue(self.address, self.options)
except proton.ProtonException:
self.__exit__(*sys.exc_info())
raise_exception(MessageBusError, "[FromBus] Receiver initialization failed")
except MessageBusError:
self.__exit__(*sys.exc_info())
raise
self.opened+=1
def __enter__(self):
self.open()
return self
def close(self):
"""
The following actions will be performed:
* close the receiver
:param exc_type: type of exception thrown in context
:param exc_val: value of exception thrown in context
:param exc_tb: traceback of exception thrown in context
"""
if (self.opened==1):
try:
self.receiver.close()
logger.debug("[FromBus] Disconnected receiver from broker: %s", self.broker)
except proton.ProtonException:
raise_exception(MessageBusError,
"[FromBus] Failed to disconnect receiver from broker: %s" %
self.broker)
finally:
self.receiver = None
self.opened-=1
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def _check_session(self):
"""
Check if there's an active session.
:raise MessageBusError: if there's no active session
"""
if not self.isConnected() or not hasattr(self, 'receiver') or self.receiver is None:
raise MessageBusError(
"[FromBus] No active receiver (broker: %s)" % self.broker)
def _add_queue(self, address, options=None):
"""
Add a queue that you want to receive messages from.
:param address: valid Qpid address
:param options: dict containing valid Qpid address options
"""
if address and '/' in address:
address, subject = address.split('/')
else:
subject=None
logger.debug("[FromBus] Receiving from bus: %s with subject: %s dynamic queue: %s" % (address, subject, self.dynamic))
options = options if options else self.options
# Extract capacity (not supported in address string in Python, see COMMON_OPTS in qpid/messaging/driver.py)
# capacity = options.pop("capacity", DEFAULT_RECEIVER_CAPACITY)
optstr = address_options_to_str(options)
what = "receiver for source: %s (broker: %s, session: %s, options: %s)" % \
(address, self.broker, 'unknown', optstr)
try:
if options:
# todo: options=optstr) # "%s; %s" % (address, optstr), capacity=capacity)
logger.warning('[FromBus] Options are currently ignored since the switch to Proton!')
# todo: get this selector to work!
self.receiver = self.connection.create_receiver(address=address, dynamic=self.dynamic) #, options=proton.reactor.Selector("subject = %s" % subject))
self.subject = subject # todo: when the selector works, get rid of the message rejection on wrong subject in receive()
except proton.ProtonException:
raise_exception(MessageBusError,
"[FromBus] Failed to create %s" % (what,))
logger.debug("[FromBus] Created %s", what)
def receive(self, timeout=DEFAULT_TIMEOUT, logDebugMessages=True):
"""
Receive the next message from any of the queues we're listening on.
:param timeout: maximum time in seconds to wait for a message.
:return: received message, None if timeout occurred.
"""
self._check_session()
if logDebugMessages:
logger.debug("[FromBus] Waiting %s seconds for next message", timeout)
try:
while True: # break when message is acceptable
msg = self.receiver.receive(timeout=timeout)
if hasattr(self, 'subject') and self.subject is not None: # only accept what has matching subject
logger.debug("got subject: %s | filter for subject: %s" % (msg.subject, self.subject))
if msg.subject != self.subject:
pass # ignore, and receive next one
else:
break # handle this message
else:
break
except proton.Timeout:
if logDebugMessages:
logger.debug("[FromBus] No message received within %s seconds", timeout)
return None
except proton.ProtonException:
raise_exception(MessageBusError,
"[FromBus] Failed to fetch message from: "
"%s" % self.address)
except Exception as e:
#FIXME: what if another exception is raised? should we reconnect?
logger.error(e)
raise_exception(MessageBusError,
"[FromBus] unknown exception while receiving message on %s: %s" % (self.address, e))
try:
if isinstance(msg.body, dict):
#qpid cannot handle strings longer than 64k within dicts
#so each string was converted to a buffer which qpid can fit in 2^32-1 bytes
#and now we convert it back on this end
msg.body = convertBufferValuesToString(msg.body)
except MessageFactoryError:
self.reject(msg)
raise_exception(MessageBusError, "[FromBus] Message rejected")
logger.debug("[FromBus] Message received on: %s subject: %s" % (self.address, msg.subject))
if logDebugMessages:
logger.debug("[FromBus] %s" % msg)
try:
amsg = MESSAGE_FACTORY.create(msg)
except MessageFactoryError:
self.reject(msg)
raise_exception(MessageBusError, "[FromBus] Message rejected")
self.ack(msg)
return amsg
def ack(self, msg):
"""
Acknowledge a message. This will inform Qpid that the message can
safely be removed from the queue.
:param msg: message to be acknowledged
"""
self._check_session()
qmsg = to_qpid_message(msg)
try:
self.receiver.accept() # with proton, we can only unspecifically for the receiver...
except:
# This seems to happen quite often...
# logger.exception('[FromBus] Could not acknowledge message, but will go on...')
pass
else:
logger.debug("[FromBus] acknowledged message: %s", qmsg)
def nack(self, msg):
"""
Do not acknowledge a message. This will inform Qpid that the message
has to be redelivered. You cannot nack a message that has already
been acknowledged.
:param msg: message to not be acknowledged
.. attention::
We should call qpid.messaging.Session.release() here,but that is
not available in Qpid-Python 0.32. We therefore use
qpid.messaging.Session.acknowledge() instead.
"""
logger.warning("[FromBus] nack() is not supported, using ack() instead")
self.ack(msg)
def reject(self, msg):
"""
Reject a message. This will inform Qpid that the message should not be
redelivered. You cannot reject a message that has already been
acknowledged.
:param msg: message to be rejected
.. attention::
We should call qpid.messaging.Session.reject() here, but that is
not available in Qpid-Python 0.32. We therefore use
qpid.messaging.Session.acknowledge() instead.
"""
logger.warning(
"[FromBus] reject() is not supported, using ack() instead")
self.ack(msg)
# todo: required?
#def nr_of_messages_in_queue(self, timeout=1.0):
# self._check_session()
# try:
# recv = self.receiver_iter.next()
# return recv.available()
#except qpid.messaging.exceptions.Empty: # todo: find Proton alternative if necessary
# return 0
# except Exception as e:
# raise_exception(MessageBusError,
# "[FromBus] Failed to get number of messages available in queue: %s" % self.address)
class ToBus(object):
"""
This class provides an easy way to post messages onto the message bus.
*** The following was true for the Py2 qpid library, not necessarily for Proton ***
Note that most methods require that a ToBus object is used *inside* a
context. When entering the context, the connection with the broker is
opened, and a session and a sender are created. When exiting the context,
the connection to the broker is closed; as a side-effect the sender and
session are destroyed.
note:: The rationale behind using a context is that this is (unfortunately)
the *only* way that we can guarantee proper resource management. If there
were a __deinit__() as counterpart to __init__(), we could have used that.
We cannot use __del__(), because it is not the counterpart of __init__(),
but that of __new__().
"""
def __init__(self, address, options=None, broker=None, broker_options=None):
"""
Initializer.
:param address: valid Qpid address
:param options: valid Qpid address options, e.g. {'create': 'never'}
:param broker: valid Qpid broker URL, e.g. "localhost:5672"
:param broker_options: valid Qpid broker options, e.g. {'reconnect': True}
"""
self.address = address
self.options = options if options else DEFAULT_ADDRESS_OPTIONS
self.broker = broker if broker else DEFAULT_BROKER
self.broker_options = broker_options if broker_options else DEFAULT_BROKER_OPTIONS
try:
logger.debug("[ToBus] Connecting to broker: %s", self.broker)
if 'reconnect' in self.broker_options:
self.broker_options.pop('reconnect')
logger.info('[ToBus] Ignoring duplicate reconnect option in connection init')
self.connection = proton.utils.BlockingConnection(self.broker, **self.broker_options)
logger.debug("[ToBus] Connected to broker: %s", self.broker)
except proton.ConnectionException as ex:
logger.exception('[ToBus] Initialization failed')
raise MessageBusError('[ToBus] Initialization failed (%s)' % ex)
self.opened = 0
def open(self):
if (self.opened==0):
try:
self._add_queue(self.address, self.options)
except proton.ProtonException:
self.__exit__(*sys.exc_info())
raise_exception(MessageBusError, "[ToBus] Sender initialization failed")
except MessageBusError:
self.__exit__(*sys.exc_info())
raise
self.opened+=1
def __enter__(self):
"""
The following actions will be performed when entering a context:
* connect to the broker
* add a sender
The connection to the broker will be closed if any of these failed.
:raise MessageBusError: if any of the above actions failed.
:return: self
"""
"""
try:
self.connection.open()
logger.debug("[ToBus] Connected to broker: %s", self.broker)
self.session = self.connection.session()
logger.debug("[ToBus] Created session: %s", self.session.name)
self._add_queue(self.address, self.options)
except qpid.messaging.MessagingError:
self.__exit__(*sys.exc_info())
raise_exception(MessageBusError, "[ToBus] Initialization failed")
except MessageBusError:
self.__exit__(*sys.exc_info())
raise
"""
self.open()
logging.debug("[ToBus] enter complete")
return self
def close(self):
"""
The following actions will be performed:
* close the sender and the connection to the broker
* set `sender` to None
:param exc_type: type of exception thrown in context
:param exc_val: value of exception thrown in context
:param exc_tb: traceback of exception thrown in context
:raise MessageBusError: if disconnect from broker fails
"""
if (self.opened==1):
try:
self.sender.close()
logger.debug("[ToBus] Disconnected sender from broker: %s", self.broker)
except proton.Timeout:
raise_exception(MessageBusError,
"[ToBus] Failed to disconnect sender from broker %s" %
self.broker)
finally:
self.sender = None
self.opened-=1
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def _check_session(self):
"""
Check if there's an active session.
:raise MessageBusError: if there's no active session
"""
if not self.opened or not hasattr(self, 'sender') or self.sender is None:
raise MessageBusError("[ToBus] No active sender (broker: %s)" % self.broker)
def _get_sender(self):
"""
Get the sender associated with the current session.
:raise MessageBusError: if there's no active session, or if there's not
exactly one sender
:return: sender object
"""
self._check_session()
return self.sender
#nr_senders = len(self.session.senders)
#if nr_senders == 1:
# return self.session.senders[0]
#else:
# msg = "No senders" if nr_senders == 0 else "More than one sender"
# raise MessageBusError("[ToBus] %s (broker: %s, session %s)" %
# (msg, self.broker, self.session))
def _add_queue(self, address, options):
"""
Add a queue that you want to sends messages to.
:param address: valid Qpid address
:param options: dict containing valid Qpid address options
:raise MessageBusError: if sender could not be created
"""
if address and '/' in address:
address, subject = address.split('/')
self.subject = subject
else:
subject=None
optstr = address_options_to_str(options)
what = "sender for source: %s (broker: %s, session: %s, options: %s)" % \
(address, self.broker, 'unknown', optstr)
try:
if hasattr(self, 'sender') and self.sender is not None:
raise_exception(MessageBusError, "[ToBus] More than one sender")
if options:
# todo: create sender with options -> "%s; %s" % (address, optstr))
logger.warning('[FromBus] Options are currently ignored since the switch to Proton!')
self.sender = self.connection.create_sender(address=address)
except proton.ProtonException:
raise_exception(MessageBusError,
"[ToBus] Failed to create %s" % (what,))
logger.debug("[ToBus] Created %s", what)
def send(self, message, timeout=DEFAULT_TIMEOUT):
"""
Send a message to the exchange (target) we're connected to.
:param message: message to be sent
:param timeout: maximum time in seconds to wait for send action
:return:
"""
sender = self._get_sender()
qmsg = to_qpid_message(message)
if isinstance(qmsg.body, dict):
#qpid cannot handle strings longer than 64k within dicts
#so convert each string to a buffer which qpid can fit in 2^32-1 bytes
#convert it back on the other end
#make copy of qmsg first, because we are modifying the contents, and we don't want any side effects
# todo: can't do that any more. Why is that required?
# todo: now raises -> (TypeError: object.__new__(SwigPyObject) is not safe, use SwigPyObject.__new__())
# qmsg = deepcopy(qmsg)
qmsg.body = convertStringValuesToBuffer(qmsg.body, 65535)
logger.debug("[ToBus] Sending message to: %s (%s)", self.address, qmsg)
try:
if hasattr(self, 'subject') and self.subject:
qmsg.subject = self.subject
sender.send(qmsg, timeout=timeout)
except proton.ProtonException:
raise_exception(MessageBusError,
"[ToBus] Failed to send message to: %s" %
sender.target)
logger.debug("[ToBus] Message sent to: %s subject: %s" % (self.address, qmsg.subject))
class AbstractBusListener(object):
"""
AbstractBusListener class for handling messages which are received on a message bus.
Typical usage is to derive from this class and implement the handle_message method with concrete logic.
"""
def __init__(self, address, broker=None, **kwargs):
"""
Initialize AbstractBusListener object with address (str).
:param address: valid Qpid address
additional parameters in kwargs:
options= <dict> Dictionary of options passed to QPID
exclusive= <bool> Create an exclusive binding so no other listeners can consume duplicate messages (default: False)
numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False)
"""
self.address = address
self.broker = broker
self._running = threading.Event()
self._listening = False
self.exclusive = kwargs.pop("exclusive", False)
self._numthreads = kwargs.pop("numthreads", 1)
self.verbose = kwargs.pop("verbose", False)
self.frombus_options = {"capacity": self._numthreads*20}
options = kwargs.pop("options", None)
if len(kwargs):
raise AttributeError("Unexpected argument passed to AbstractBusListener constructor: %s", kwargs)
# Set appropriate flags for exclusive binding
if self.exclusive == True:
binding_key = address.split('/')[-1]
self.frombus_options["link"] = { "name": str(uuid.uuid4()),
"x-bindings": [ { "key": binding_key,
"arguments": { "\"qpid.exclusive-binding\"": True }
}
]
}
# only add options if it is given as a dictionary
if isinstance(options,dict):
for key,val in options.items():
self.frombus_options[key] = val
def _debug(self, txt):
"""
Internal use only.
"""
if self.verbose == True:
logger.debug("[%s: %s]", self.__class__.__name__, txt)
def isRunning(self):
return self._running.isSet()
def isListening(self):
return self._listening
def start_listening(self, numthreads=None):
"""
Start the background threads and process incoming messages.
"""
if self._listening == True:
return
self._bus_listener = FromBus(self.address, broker=self.broker, broker_options=self.frombus_options)
self._bus_listener.open()
if numthreads != None:
self._numthreads = numthreads
self._running.set()
self._threads = {}
for i in range(self._numthreads):
thread = threading.Thread(target=self._loop)
self._threads[thread] = self._create_thread_args(i)
thread.start()
self._listening = True
def _create_thread_args(self, index):
return {'index':index,
'num_received_messages':0,
'num_processed_messages':0}
def stop_listening(self):
"""
Stop the background threads that listen to incoming messages.
"""
# stop all running threads
if self.isRunning():
self._running.clear()
for thread, args in list(self._threads.items()):
logger.debug("Thread %2d: STOPPING Listening for messages on %s at broker %s" %
(args['index'], self.address, self.broker if self.broker else 'localhost'))
thread.join()
logger.info("Thread %2d: STOPPED Listening for messages on %s" % (args['index'], self.address))
logger.info(" %d messages received and %d processed OK." % (args['num_received_messages'], args['num_processed_messages']))
self._listening = False
# close the listeners
if self._bus_listener.isConnected():
self._bus_listener.close()
def __enter__(self):
"""
Internal use only. Handles scope with keyword 'with'
"""
self.start_listening()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Internal use only. Handles scope with keyword 'with'
"""
self.stop_listening()
def _onListenLoopBegin(self):
"Called before main processing loop is entered."
pass
def _onBeforeReceiveMessage(self):
"Called in main processing loop just before a blocking wait for messages is done."
pass
def _handleMessage(self, msg):
"Implement this method in your subclass to handle a received message"
raise NotImplementedError("Please implement the _handleMessage method in your subclass to handle a received message")
def _onAfterReceiveMessage(self, successful):
"Called in the main loop after the result was send back to the requester."
"@successful@ reflects the state of the handling: true/false"
pass
def _onListenLoopEnd(self):
"Called after main processing loop is finished."
pass
def _loop(self):
"""
Internal use only. Message listener loop that receives messages and starts the attached function with the message content as argument.
"""
currentThread = threading.currentThread()
args = self._threads[currentThread]
thread_idx = args['index']
logger.info( "Thread %d START Listening for messages on %s at broker %s" %
(thread_idx, self.address, self.broker if self.broker else 'localhost'))
try:
self._onListenLoopBegin()
except Exception as e:
logger.error("onListenLoopBegin() failed with %s", e)
return
while self.isRunning():
try:
self._onBeforeReceiveMessage()
except Exception as e:
logger.error("onBeforeReceiveMessage() failed with %s", e)
continue
try:
# get the next message
lofar_msg = self._bus_listener.receive(1, self.verbose)
# retry if timed-out
if lofar_msg is None:
continue
# Keep track of number of received messages
args['num_received_messages'] += 1
# Execute the handler function and send reply back to client
try:
self._debug("Running handler")
self._handleMessage(lofar_msg)
self._debug("Finished handler")
self._bus_listener.ack(lofar_msg)
args['num_processed_messages'] += 1
try:
self._onAfterReceiveMessage(True)
except Exception as e:
logger.error("onAfterReceiveMessage() failed with %s", e)
continue
except Exception as e:
import traceback
logger.warning("Handling of message failed with %s: %s\nMessage: %s", e, traceback.format_exc(),lofar_msg.body)
# Any thrown exceptions either Service exception or unhandled exception
# during the execution of the service handler is caught here.
self._debug(str(e))
try:
self._onAfterReceiveMessage(False)
except Exception as e:
logger.error("onAfterReceiveMessage() failed with %s", e)
continue
except Exception as e:
# Unknown problem in the library. Report this and continue.
logger.error("[%s:] ERROR during processing of incoming message.\n%s" %(self.__class__.__name__, str(e)))
logger.info("Thread %d: Resuming listening on %s " % (thread_idx, self.address))
try:
self._onListenLoopEnd()
except Exception as e:
logger.error("finalize_loop() failed with %s", e)
__all__ = ["FromBus", "ToBus", "AbstractBusListener"]