diff --git a/CEP/Pipeline/recipes/CMakeLists.txt b/CEP/Pipeline/recipes/CMakeLists.txt index baa1a584c238a28d459c3950a6fbc086cc549c56..608dee21588636f4e94c01abcd39c75adca3f453 100644 --- a/CEP/Pipeline/recipes/CMakeLists.txt +++ b/CEP/Pipeline/recipes/CMakeLists.txt @@ -2,4 +2,8 @@ lofar_package(Pipeline-Recipes 0.1) +# The pipeline.cfg needs to know whether QPID is installed +include(LofarFindPackage) +lofar_find_package(QPID) + add_subdirectory(sip) diff --git a/CEP/Pipeline/recipes/sip/CMakeLists.txt b/CEP/Pipeline/recipes/sip/CMakeLists.txt index b93dd298113dd18c5d97cccbdcf9c91b9c24062f..f12038ba6a4d6ebc60eb7967c2fd3aba30fc4373 100644 --- a/CEP/Pipeline/recipes/sip/CMakeLists.txt +++ b/CEP/Pipeline/recipes/sip/CMakeLists.txt @@ -96,6 +96,13 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/tasks.cfg DESTINATION share/pipeline) +# PIPELINE_FEEDBACK_METHOD is used in pipeline.cfg.in to enable/disable qpid feedback +if(HAVE_QPID) + set(PIPELINE_FEEDBACK_METHOD "messagebus") +else(HAVE_QPID) + set(PIPELINE_FEEDBACK_METHOD "none") +endif(HAVE_QPID) + configure_file( ${CMAKE_CURRENT_SOURCE_DIR}/pipeline.cfg.in ${CMAKE_CURRENT_BINARY_DIR}/pipeline.cfg) diff --git a/CEP/Pipeline/recipes/sip/pipeline.cfg.in b/CEP/Pipeline/recipes/sip/pipeline.cfg.in index 2b311047dd96f45bd74332b997240099414d4d28..a45a23a6b83e360ca8782e1acdabf610892cb34e 100644 --- a/CEP/Pipeline/recipes/sip/pipeline.cfg.in +++ b/CEP/Pipeline/recipes/sip/pipeline.cfg.in @@ -29,4 +29,4 @@ xml_stat_file = %(runtime_directory)s/%(job_name)s/logs/%(start_time)s/statistic # Valid options: # messagebus Send feedback and status using LCS/MessageBus # none Do NOT send feedback and status -method = messagebus +method = @PIPELINE_FEEDBACK_METHOD@ diff --git a/LCS/MessageBus/include/MessageBus/Message.h b/LCS/MessageBus/include/MessageBus/Message.h index 02989063493115ee59fa4d8d0d0f6503d3b08351..f526f9cefd4938d83580382ebc9a1f53f8beb0cf 100644 --- a/LCS/MessageBus/include/MessageBus/Message.h +++ b/LCS/MessageBus/include/MessageBus/Message.h @@ -27,7 +27,7 @@ #include <qpid/messaging/Message.h> #include <qpid/types/Variant.h> #else -#include <MessageBus/QpidMock.h> +#include <MessageBus/NoQpidFallback.h> #endif #include <string> diff --git a/LCS/MessageBus/include/MessageBus/MsgBus.h b/LCS/MessageBus/include/MessageBus/MsgBus.h index 29ff84ba6e7b7cefae6b21af8fb9165bb1d04081..c2cdecc841bbd37e59f488f0245354504a4c3d04 100644 --- a/LCS/MessageBus/include/MessageBus/MsgBus.h +++ b/LCS/MessageBus/include/MessageBus/MsgBus.h @@ -31,7 +31,7 @@ #include <qpid/messaging/Session.h> #include <qpid/messaging/Address.h> #else -#include <MessageBus/QpidMock.h> +#include <MessageBus/NoQpidFallback.h> #endif #include <Common/Exception.h> diff --git a/LCS/MessageBus/include/MessageBus/QpidMock.h b/LCS/MessageBus/include/MessageBus/NoQpidFallback.h similarity index 96% rename from LCS/MessageBus/include/MessageBus/QpidMock.h rename to LCS/MessageBus/include/MessageBus/NoQpidFallback.h index 5b7c428d49895b37892e988c6a043b0bbb839ae3..1e5fc9936556cb859b2f6102ac1576c2c3036879 100644 --- a/LCS/MessageBus/include/MessageBus/QpidMock.h +++ b/LCS/MessageBus/include/MessageBus/NoQpidFallback.h @@ -1,4 +1,4 @@ -//# QpidMock.h: A fake implementation of the QPID API +//# NoQpidFallback.h: A fake implementation of the QPID API in case QPID is not installed //# //# Copyright (C) 2015 //# ASTRON (Netherlands Institute for Radio Astronomy) @@ -20,8 +20,8 @@ //# //# $Id$ -#ifndef LOFAR_MESSAGEBUS_QPIDMUCK_H -#define LOFAR_MESSAGEBUS_QPIDMUCK_H +#ifndef LOFAR_MESSAGEBUS_NO_QPID_FALLBACK_H +#define LOFAR_MESSAGEBUS_NO_QPID_FALLBACK_H #ifndef HAVE_QPID #include <Common/LofarLogger.h> diff --git a/LCS/MessageBus/src/CMakeLists.txt b/LCS/MessageBus/src/CMakeLists.txt index c7177cf992cf3e7abf31c84cbff17d0274bf9f57..eeb33d453afb23533c9bd379e437e34a300b14d9 100644 --- a/LCS/MessageBus/src/CMakeLists.txt +++ b/LCS/MessageBus/src/CMakeLists.txt @@ -8,7 +8,7 @@ set(messagebus_LIB_SRCS LogSink.cc MsgBus.cc Message.cc - QpidMock.cc) + NoQpidFallback.cc) set(messagebus_PROGRAMS ) @@ -23,6 +23,7 @@ python_install( __init__.py msgbus.py message.py + noqpidfallback.py DESTINATION lofar/messagebus) add_subdirectory(protocols) diff --git a/LCS/MessageBus/src/QpidMock.cc b/LCS/MessageBus/src/NoQpidFallback.cc similarity index 90% rename from LCS/MessageBus/src/QpidMock.cc rename to LCS/MessageBus/src/NoQpidFallback.cc index 45e3d5e48e66d1b72dea9239f60e104dcacf9d68..c0bebd09fd59225929fb8fe1c04084f7cd455b82 100644 --- a/LCS/MessageBus/src/QpidMock.cc +++ b/LCS/MessageBus/src/NoQpidFallback.cc @@ -1,4 +1,4 @@ -//# QpidMock.cc: A fake implementation of the QPID API +//# NoQpidFallback.cc: A fake implementation of the QPID API in case QPID is not installed //# //# Copyright (C) 2015 //# ASTRON (Netherlands Institute for Radio Astronomy) @@ -23,7 +23,7 @@ #include <lofar_config.h> #ifndef HAVE_QPID -#include <MessageBus/QpidMock.h> +#include <MessageBus/NoQpidFallback.h> std::ostream& operator<<(std::ostream& out, const qpid::types::Variant& value) { (void)value; diff --git a/LCS/MessageBus/src/message.py b/LCS/MessageBus/src/message.py index 8ef65ec399637cfc8b55707072b3000d5c8796bf..12986ff18ff30c945920c907dca040203dca7004 100644 --- a/LCS/MessageBus/src/message.py +++ b/LCS/MessageBus/src/message.py @@ -17,10 +17,11 @@ # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. try: - import qpid.messaging - enabled = True + import qpid.messaging as messaging + MESSAGING_ENABLED = True except ImportError: - enabled = False + import noqpidfallback as messaging + MESSAGING_ENABLED = False import xml.dom.minidom as xml import xml.parsers.expat as expat @@ -61,7 +62,7 @@ def _uuid(): """ Return an UUID """ - return str(qpid.messaging.uuid4()) + return str(messaging.uuid4()) class MessageException(Exception): pass @@ -145,7 +146,7 @@ class MessageContent(object): def qpidMsg(self): """ Construct a NEW QPID message. """ - msg = qpid.messaging.Message(content_type="text/plain", durable=True) + msg = messaging.Message(content_type="text/plain", durable=True) msg.content = self.content() return msg diff --git a/LCS/MessageBus/src/msgbus.py b/LCS/MessageBus/src/msgbus.py index 932680d7658be2e017e1382eeada234480f94a57..77b07749c1dce2ff458bc04cc83e1d6dbf9eac65 100644 --- a/LCS/MessageBus/src/msgbus.py +++ b/LCS/MessageBus/src/msgbus.py @@ -19,10 +19,11 @@ # id.. TDB try: - import qpid.messaging - enabled = True + import qpid.messaging as messaging + MESSAGING_ENABLED = True except ImportError: - enabled = False + import noqpidfallback as messaging + MESSAGING_ENABLED = False import os import signal @@ -47,14 +48,14 @@ class Session: self.closed = False logger.info("[Bus] Connecting to broker %s", broker) - self.connection = qpid.messaging.Connection(broker) + self.connection = messaging.Connection(broker) self.connection.reconnect = True logger.info("[Bus] Connected to broker %s", broker) try: self.connection.open() self.session = self.connection.session() - except qpid.messaging.MessagingError, m: + except messaging.MessagingError, m: raise BusException(m) # NOTE: We cannuot use: @@ -87,7 +88,7 @@ class Session: # to data loss if the stall was legit. try: self.connection.close(5.0) - except qpid.messaging.exceptions.Timeout, t: + except messaging.exceptions.Timeout, t: logger.error("[Bus] Could not close connection: %s", t) def __enter__(self): @@ -110,7 +111,7 @@ class ToBus(Session): try: self.sender = self.session.sender(self.address(queue, options)) - except qpid.messaging.MessagingError, m: + except messaging.MessagingError, m: raise BusException(m) def send(self, msg): @@ -121,11 +122,11 @@ class ToBus(Session): # Send Message or MessageContent object self.sender.send(msg.qpidMsg()) except AttributeError: - # Send string or qpid.messaging.Message object + # Send string or messaging.Message object self.sender.send(msg) logger.info("[ToBus] Message sent to queue %s", self.queue) - except qpid.messaging.SessionError, m: + except messaging.SessionError, m: raise BusException(m) class FromBus(Session): @@ -140,7 +141,7 @@ class FromBus(Session): # Need capacity >=1 for 'self.session.next_receiver' to function across multiple queues receiver.capacity = 1 #32 - except qpid.messaging.MessagingError, m: + except messaging.MessagingError, m: raise BusException(m) def get(self, timeout=None): @@ -156,7 +157,7 @@ class FromBus(Session): logger.error("[FromBus] Could not retrieve available message on queue %s", receiver.source) else: logger.info("[FromBus] Message received on queue %s", receiver.source) - except qpid.messaging.exceptions.Empty, e: + except messaging.exceptions.Empty, e: return None if msg is None: diff --git a/LCS/MessageBus/src/noqpidfallback.py b/LCS/MessageBus/src/noqpidfallback.py new file mode 100644 index 0000000000000000000000000000000000000000..26c6257a4c8643225051af32ebc457cee4004112 --- /dev/null +++ b/LCS/MessageBus/src/noqpidfallback.py @@ -0,0 +1,79 @@ +#!/usr/bin/python + +import sys +print >>sys.stderr, "QPID support NOT enabled! Will NOT connect to any broker, and messages will be lost!" + +def uuid4(): + return "<uuid>" + + +""" + Exceptions. +""" + +class MessagingError(Exception): + pass + +class SessionError(Exception): + pass + +class exceptions: + class Timeout(Exception): + pass + + class Empty(Exception): + pass + +""" + Messages. +""" + +class Message(object): + def __init__(self, content_type, durable): + self.content = "" + +""" + Communication. +""" + +class Sender(object): + def __init__(self, dest): + self.dest = dest + + def send(self, msg): + pass + +class Receiver(object): + def __init__(self, source): + self.capacity = 0 + self.source = source + + def fetch(self): + return None + +class Session(object): + def sender(self, address): + return Sender(address) + + def receiver(self, address): + return Receiver(address) + + def next_receiver(self, timeout=0): + return Receiver("unknown") + + def acknowledge(self, msg): + pass + +class Connection(object): + def __init__(self, broker): + self.reconnect = False + + def open(self): + pass + + def close(self, timeout=0): + pass + + def session(self): + return Session() + diff --git a/LCS/MessageBus/test/tPyMsgBus.py b/LCS/MessageBus/test/tPyMsgBus.py index 8b88c1a98190fd82f9ca71794881f93092b73b4f..6b66a10b4ca5296a711dbd704973da6e79ede828 100644 --- a/LCS/MessageBus/test/tPyMsgBus.py +++ b/LCS/MessageBus/test/tPyMsgBus.py @@ -1,5 +1,13 @@ #!/usr/bin/python -from lofar.messagebus.msgbus import FromBus, ToBus +# +# Test the basic functionality of FromBus and ToBus, both +# to send and to forward messages. +# +# Note that without QPID support, the classes are still usable, +# but messages won't arrive. We consider that case also in this test. +# + +from lofar.messagebus.msgbus import FromBus, ToBus, MESSAGING_ENABLED from lofar.messagebus.message import Message, MessageContent # Send a message (send MessageContent) @@ -12,14 +20,16 @@ tbus.send(tmsg) fbus = FromBus("test") fmsg = fbus.get(1) -# Verify the content -assert fmsg != None -assert fmsg.content().payload == "foo" +# Verify the content (only if QPID support is enabled) +if MESSAGING_ENABLED: + assert fmsg != None + assert fmsg.content().payload == "foo" # Resend it! (send Message) tbus.send(fmsg) rmsg = fbus.get(1) -# Verify the content -assert rmsg != None -assert rmsg.content().payload == "foo" +# Verify the content (only if QPID support is enabled) +if MESSAGING_ENABLED: + assert rmsg != None + assert rmsg.content().payload == "foo" diff --git a/LCS/MessageDaemons/src/MessageRouter b/LCS/MessageDaemons/src/MessageRouter index 335c65ada1450ead3456466d3ae0b54432433d5f..6b49bcfae6c815d03665819e428aa2404e56048f 100644 --- a/LCS/MessageDaemons/src/MessageRouter +++ b/LCS/MessageDaemons/src/MessageRouter @@ -62,10 +62,11 @@ class BusMulticast(threading.Thread): try: content = msg.content() + log("INFO","[%s] [%s] Message received" % (self.source, content)) except message.MessageException, e: content = "<unknown>" + log("WARN","[%s] Non-compliant message received" % (self.source,)) - log("INFO","[%s] [%s] Message received" % (self.source, content)) outdump.send(msg) for outbus in outbusses: