Skip to content
Snippets Groups Projects
Commit cd64dbc3 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #7573: Allow compilation and using the MessageBus classes even if QPID is...

Task #7573: Allow compilation and using the MessageBus classes even if QPID is not installed, through the use of a fallback that does not actually send or receive messages.
parents e0c86f35 3e9ffec2
Branches
Tags
No related merge requests found
...@@ -2,4 +2,8 @@ ...@@ -2,4 +2,8 @@
lofar_package(Pipeline-Recipes 0.1) lofar_package(Pipeline-Recipes 0.1)
# The pipeline.cfg needs to know whether QPID is installed
include(LofarFindPackage)
lofar_find_package(QPID)
add_subdirectory(sip) add_subdirectory(sip)
...@@ -96,6 +96,13 @@ install(FILES ...@@ -96,6 +96,13 @@ install(FILES
${CMAKE_CURRENT_BINARY_DIR}/tasks.cfg ${CMAKE_CURRENT_BINARY_DIR}/tasks.cfg
DESTINATION share/pipeline) DESTINATION share/pipeline)
# PIPELINE_FEEDBACK_METHOD is used in pipeline.cfg.in to enable/disable qpid feedback
if(HAVE_QPID)
set(PIPELINE_FEEDBACK_METHOD "messagebus")
else(HAVE_QPID)
set(PIPELINE_FEEDBACK_METHOD "none")
endif(HAVE_QPID)
configure_file( configure_file(
${CMAKE_CURRENT_SOURCE_DIR}/pipeline.cfg.in ${CMAKE_CURRENT_SOURCE_DIR}/pipeline.cfg.in
${CMAKE_CURRENT_BINARY_DIR}/pipeline.cfg) ${CMAKE_CURRENT_BINARY_DIR}/pipeline.cfg)
......
...@@ -29,4 +29,4 @@ xml_stat_file = %(runtime_directory)s/%(job_name)s/logs/%(start_time)s/statistic ...@@ -29,4 +29,4 @@ xml_stat_file = %(runtime_directory)s/%(job_name)s/logs/%(start_time)s/statistic
# Valid options: # Valid options:
# messagebus Send feedback and status using LCS/MessageBus # messagebus Send feedback and status using LCS/MessageBus
# none Do NOT send feedback and status # none Do NOT send feedback and status
method = messagebus method = @PIPELINE_FEEDBACK_METHOD@
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
#include <qpid/messaging/Message.h> #include <qpid/messaging/Message.h>
#include <qpid/types/Variant.h> #include <qpid/types/Variant.h>
#else #else
#include <MessageBus/QpidMock.h> #include <MessageBus/NoQpidFallback.h>
#endif #endif
#include <string> #include <string>
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
#include <qpid/messaging/Session.h> #include <qpid/messaging/Session.h>
#include <qpid/messaging/Address.h> #include <qpid/messaging/Address.h>
#else #else
#include <MessageBus/QpidMock.h> #include <MessageBus/NoQpidFallback.h>
#endif #endif
#include <Common/Exception.h> #include <Common/Exception.h>
......
//# QpidMock.h: A fake implementation of the QPID API //# NoQpidFallback.h: A fake implementation of the QPID API in case QPID is not installed
//# //#
//# Copyright (C) 2015 //# Copyright (C) 2015
//# ASTRON (Netherlands Institute for Radio Astronomy) //# ASTRON (Netherlands Institute for Radio Astronomy)
...@@ -20,8 +20,8 @@ ...@@ -20,8 +20,8 @@
//# //#
//# $Id$ //# $Id$
#ifndef LOFAR_MESSAGEBUS_QPIDMUCK_H #ifndef LOFAR_MESSAGEBUS_NO_QPID_FALLBACK_H
#define LOFAR_MESSAGEBUS_QPIDMUCK_H #define LOFAR_MESSAGEBUS_NO_QPID_FALLBACK_H
#ifndef HAVE_QPID #ifndef HAVE_QPID
#include <Common/LofarLogger.h> #include <Common/LofarLogger.h>
......
...@@ -8,7 +8,7 @@ set(messagebus_LIB_SRCS ...@@ -8,7 +8,7 @@ set(messagebus_LIB_SRCS
LogSink.cc LogSink.cc
MsgBus.cc MsgBus.cc
Message.cc Message.cc
QpidMock.cc) NoQpidFallback.cc)
set(messagebus_PROGRAMS set(messagebus_PROGRAMS
) )
...@@ -23,6 +23,7 @@ python_install( ...@@ -23,6 +23,7 @@ python_install(
__init__.py __init__.py
msgbus.py msgbus.py
message.py message.py
noqpidfallback.py
DESTINATION lofar/messagebus) DESTINATION lofar/messagebus)
add_subdirectory(protocols) add_subdirectory(protocols)
//# QpidMock.cc: A fake implementation of the QPID API //# NoQpidFallback.cc: A fake implementation of the QPID API in case QPID is not installed
//# //#
//# Copyright (C) 2015 //# Copyright (C) 2015
//# ASTRON (Netherlands Institute for Radio Astronomy) //# ASTRON (Netherlands Institute for Radio Astronomy)
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
#include <lofar_config.h> #include <lofar_config.h>
#ifndef HAVE_QPID #ifndef HAVE_QPID
#include <MessageBus/QpidMock.h> #include <MessageBus/NoQpidFallback.h>
std::ostream& operator<<(std::ostream& out, const qpid::types::Variant& value) { std::ostream& operator<<(std::ostream& out, const qpid::types::Variant& value) {
(void)value; (void)value;
......
...@@ -17,10 +17,11 @@ ...@@ -17,10 +17,11 @@
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
try: try:
import qpid.messaging import qpid.messaging as messaging
enabled = True MESSAGING_ENABLED = True
except ImportError: except ImportError:
enabled = False import noqpidfallback as messaging
MESSAGING_ENABLED = False
import xml.dom.minidom as xml import xml.dom.minidom as xml
import xml.parsers.expat as expat import xml.parsers.expat as expat
...@@ -61,7 +62,7 @@ def _uuid(): ...@@ -61,7 +62,7 @@ def _uuid():
""" """
Return an UUID Return an UUID
""" """
return str(qpid.messaging.uuid4()) return str(messaging.uuid4())
class MessageException(Exception): class MessageException(Exception):
pass pass
...@@ -145,7 +146,7 @@ class MessageContent(object): ...@@ -145,7 +146,7 @@ class MessageContent(object):
def qpidMsg(self): def qpidMsg(self):
""" Construct a NEW QPID message. """ """ Construct a NEW QPID message. """
msg = qpid.messaging.Message(content_type="text/plain", durable=True) msg = messaging.Message(content_type="text/plain", durable=True)
msg.content = self.content() msg.content = self.content()
return msg return msg
......
...@@ -19,10 +19,11 @@ ...@@ -19,10 +19,11 @@
# id.. TDB # id.. TDB
try: try:
import qpid.messaging import qpid.messaging as messaging
enabled = True MESSAGING_ENABLED = True
except ImportError: except ImportError:
enabled = False import noqpidfallback as messaging
MESSAGING_ENABLED = False
import os import os
import signal import signal
...@@ -47,14 +48,14 @@ class Session: ...@@ -47,14 +48,14 @@ class Session:
self.closed = False self.closed = False
logger.info("[Bus] Connecting to broker %s", broker) logger.info("[Bus] Connecting to broker %s", broker)
self.connection = qpid.messaging.Connection(broker) self.connection = messaging.Connection(broker)
self.connection.reconnect = True self.connection.reconnect = True
logger.info("[Bus] Connected to broker %s", broker) logger.info("[Bus] Connected to broker %s", broker)
try: try:
self.connection.open() self.connection.open()
self.session = self.connection.session() self.session = self.connection.session()
except qpid.messaging.MessagingError, m: except messaging.MessagingError, m:
raise BusException(m) raise BusException(m)
# NOTE: We cannuot use: # NOTE: We cannuot use:
...@@ -87,7 +88,7 @@ class Session: ...@@ -87,7 +88,7 @@ class Session:
# to data loss if the stall was legit. # to data loss if the stall was legit.
try: try:
self.connection.close(5.0) self.connection.close(5.0)
except qpid.messaging.exceptions.Timeout, t: except messaging.exceptions.Timeout, t:
logger.error("[Bus] Could not close connection: %s", t) logger.error("[Bus] Could not close connection: %s", t)
def __enter__(self): def __enter__(self):
...@@ -110,7 +111,7 @@ class ToBus(Session): ...@@ -110,7 +111,7 @@ class ToBus(Session):
try: try:
self.sender = self.session.sender(self.address(queue, options)) self.sender = self.session.sender(self.address(queue, options))
except qpid.messaging.MessagingError, m: except messaging.MessagingError, m:
raise BusException(m) raise BusException(m)
def send(self, msg): def send(self, msg):
...@@ -121,11 +122,11 @@ class ToBus(Session): ...@@ -121,11 +122,11 @@ class ToBus(Session):
# Send Message or MessageContent object # Send Message or MessageContent object
self.sender.send(msg.qpidMsg()) self.sender.send(msg.qpidMsg())
except AttributeError: except AttributeError:
# Send string or qpid.messaging.Message object # Send string or messaging.Message object
self.sender.send(msg) self.sender.send(msg)
logger.info("[ToBus] Message sent to queue %s", self.queue) logger.info("[ToBus] Message sent to queue %s", self.queue)
except qpid.messaging.SessionError, m: except messaging.SessionError, m:
raise BusException(m) raise BusException(m)
class FromBus(Session): class FromBus(Session):
...@@ -140,7 +141,7 @@ class FromBus(Session): ...@@ -140,7 +141,7 @@ class FromBus(Session):
# Need capacity >=1 for 'self.session.next_receiver' to function across multiple queues # Need capacity >=1 for 'self.session.next_receiver' to function across multiple queues
receiver.capacity = 1 #32 receiver.capacity = 1 #32
except qpid.messaging.MessagingError, m: except messaging.MessagingError, m:
raise BusException(m) raise BusException(m)
def get(self, timeout=None): def get(self, timeout=None):
...@@ -156,7 +157,7 @@ class FromBus(Session): ...@@ -156,7 +157,7 @@ class FromBus(Session):
logger.error("[FromBus] Could not retrieve available message on queue %s", receiver.source) logger.error("[FromBus] Could not retrieve available message on queue %s", receiver.source)
else: else:
logger.info("[FromBus] Message received on queue %s", receiver.source) logger.info("[FromBus] Message received on queue %s", receiver.source)
except qpid.messaging.exceptions.Empty, e: except messaging.exceptions.Empty, e:
return None return None
if msg is None: if msg is None:
......
#!/usr/bin/python
import sys
print >>sys.stderr, "QPID support NOT enabled! Will NOT connect to any broker, and messages will be lost!"
def uuid4():
return "<uuid>"
"""
Exceptions.
"""
class MessagingError(Exception):
pass
class SessionError(Exception):
pass
class exceptions:
class Timeout(Exception):
pass
class Empty(Exception):
pass
"""
Messages.
"""
class Message(object):
def __init__(self, content_type, durable):
self.content = ""
"""
Communication.
"""
class Sender(object):
def __init__(self, dest):
self.dest = dest
def send(self, msg):
pass
class Receiver(object):
def __init__(self, source):
self.capacity = 0
self.source = source
def fetch(self):
return None
class Session(object):
def sender(self, address):
return Sender(address)
def receiver(self, address):
return Receiver(address)
def next_receiver(self, timeout=0):
return Receiver("unknown")
def acknowledge(self, msg):
pass
class Connection(object):
def __init__(self, broker):
self.reconnect = False
def open(self):
pass
def close(self, timeout=0):
pass
def session(self):
return Session()
#!/usr/bin/python #!/usr/bin/python
from lofar.messagebus.msgbus import FromBus, ToBus #
# Test the basic functionality of FromBus and ToBus, both
# to send and to forward messages.
#
# Note that without QPID support, the classes are still usable,
# but messages won't arrive. We consider that case also in this test.
#
from lofar.messagebus.msgbus import FromBus, ToBus, MESSAGING_ENABLED
from lofar.messagebus.message import Message, MessageContent from lofar.messagebus.message import Message, MessageContent
# Send a message (send MessageContent) # Send a message (send MessageContent)
...@@ -12,7 +20,8 @@ tbus.send(tmsg) ...@@ -12,7 +20,8 @@ tbus.send(tmsg)
fbus = FromBus("test") fbus = FromBus("test")
fmsg = fbus.get(1) fmsg = fbus.get(1)
# Verify the content # Verify the content (only if QPID support is enabled)
if MESSAGING_ENABLED:
assert fmsg != None assert fmsg != None
assert fmsg.content().payload == "foo" assert fmsg.content().payload == "foo"
...@@ -20,6 +29,7 @@ assert fmsg.content().payload == "foo" ...@@ -20,6 +29,7 @@ assert fmsg.content().payload == "foo"
tbus.send(fmsg) tbus.send(fmsg)
rmsg = fbus.get(1) rmsg = fbus.get(1)
# Verify the content # Verify the content (only if QPID support is enabled)
if MESSAGING_ENABLED:
assert rmsg != None assert rmsg != None
assert rmsg.content().payload == "foo" assert rmsg.content().payload == "foo"
...@@ -62,10 +62,11 @@ class BusMulticast(threading.Thread): ...@@ -62,10 +62,11 @@ class BusMulticast(threading.Thread):
try: try:
content = msg.content() content = msg.content()
log("INFO","[%s] [%s] Message received" % (self.source, content))
except message.MessageException, e: except message.MessageException, e:
content = "<unknown>" content = "<unknown>"
log("WARN","[%s] Non-compliant message received" % (self.source,))
log("INFO","[%s] [%s] Message received" % (self.source, content))
outdump.send(msg) outdump.send(msg)
for outbus in outbusses: for outbus in outbusses:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment