diff --git a/LCS/MessageBus/src/MsgBus.cc b/LCS/MessageBus/src/MsgBus.cc index 3ac56901f788b6f068afacde586a270021d82a67..7cda238d52ec57a75b706ac700454314b46aedd2 100644 --- a/LCS/MessageBus/src/MsgBus.cc +++ b/LCS/MessageBus/src/MsgBus.cc @@ -9,11 +9,9 @@ #include <qpid/messaging/Logger.h> using namespace qpid::messaging; -#endif namespace LOFAR { -#ifdef HAVE_QPID static Duration TimeOutDuration(double secs) { if (secs > 0.0) @@ -22,55 +20,8 @@ namespace LOFAR { return Duration::FOREVER; } - class QpidLogSink: qpid::messaging::LoggerOutput { - public: - virtual void log(Level level, bool user, const char* file, int line, const char* function, const std::string& message) { - (void)user; - - switch(level) { - case trace: - case debug: - LOG_DEBUG_STR("[QPID] " << function << " @ " << file << ":" << line << " " << message); - break; - - case info: - case notice: - LOG_INFO_STR("[QPID] " << function << " @ " << file << ":" << line << " " << message); - break; - - case warning: - LOG_WARN_STR("[QPID] " << function << " @ " << file << ":" << line << " " << message); - break; - - case error: - LOG_ERROR_STR("[QPID] " << function << " @ " << file << ":" << line << " " << message); - break; - - case critical: - LOG_FATAL_STR("[QPID] " << function << " @ " << file << ":" << line << " " << message); - break; - } - } - }; - - static QpidLogSink qpidLogSink; -#if 0 - void init() { - Logger::setOutput(qpidLogSink); - } - - void done() { - } -#endif - -#endif - - FromBus::FromBus(const std::string &address, const std::string &options, const std::string &broker) -#ifdef HAVE_QPID try: - itsBrokerName(broker), - itsQueueName(address), itsConnection(broker), itsNrMissingACKs(0) { @@ -79,18 +30,10 @@ namespace LOFAR { itsSession = itsConnection.createSession(); Address addr(address+options); - receiver = itsSession.createReceiver(addr); + Receiver receiver = itsSession.createReceiver(addr); } catch(const qpid::types::Exception &ex) { THROW(MessageBusException, ex.what()); } -#else - : - itsBrokerName(broker), - itsQueueName(address), - itsNrMissingACKs(0) - { - } -#endif FromBus::~FromBus(void) @@ -99,160 +42,69 @@ namespace LOFAR { LOG_ERROR_STR("Queue " << itsQueueName << " on broker " << itsBrokerName << " has " << itsNrMissingACKs << " messages not ACK'ed "); } - bool FromBus::getString(std::string &str, double timeout) // timeout 0.0 means blocking + bool FromBus::getMessage(Message &msg, double timeout) // timeout 0.0 means blocking { -#ifdef HAVE_QPID - Message msg; - - bool ret = getMessage(msg, timeout); - if (ret) { - itsNrMissingACKs ++; - str = msg.getContent(); + Receiver next; + if (session.nextReceiver(next,TimeOutDuration(timeout))) + { + itsNrMissingACKs++; + return next.get(msg); } - - return ret; -#else return false; -#endif } -#ifdef HAVE_QPID - bool FromBus::getMessage(Message &msg, double timeout) // timeout 0.0 means blocking + void FromBus::nack(Message &msg) { - bool ret= receiver.fetch(msg, TimeOutDuration(timeout)); - if (ret) itsNrMissingACKs++; - return ret; + itsSession.release(msg); + itsNrMissingACKs--; } - void FromBus::nack(Message &msg) + void FromBus::ack(Message &msg) { - itsSession.release(msg); - - itsNrMissingACKs--; + itsSession.acknowledge(msg); + itsNrMissingACKs --; } -#endif - void FromBus::ack(void) + void FromBus::reject(Message &msg) { -#ifdef HAVE_QPID - itsSession.acknowledge(); -#endif + itsSession.reject(msg); + itsNrMissingACKs --; + } - // acknowlegde covers ALL messages received so far - itsNrMissingACKs = 0; + bool FromBus::addQueue(const std::string &address, const std::string &options) + { + try + { + Address addr(address+options); + Receiver receiver = itsSession.createReceiver(addr); + } catch(const qpid::types::Exception &ex) { + //THROW(MessageBusException, ex.what()); + return false; + } + return true; } - + ToBus::ToBus(const std::string &address, const std::string &options, const std::string &broker) -#ifdef HAVE_QPID try: - itsBrokerName(broker), - itsQueueName(address), itsConnection(broker) { itsConnection.open(); - itsSession = itsConnection.createSession(); - Address addr(address+options); sender = itsSession.createSender(addr); } catch(const qpid::types::Exception &ex) { THROW(MessageBusException, ex.what()); } -#else - : - itsBrokerName(broker), - itsQueueName(address) - { - } -#endif - ToBus::~ToBus(void) { } void ToBus::send(const std::string &msg) { -#ifdef HAVE_QPID Message tosend(msg); sender.send(tosend,true); -#endif - } - - MultiBus::MultiBus(const std::string &broker) -#ifdef HAVE_QPID - try: - itsBrokerName(broker), - itsConnection(broker), - itsNrMissingACKs(0) - { - itsConnection.open(); - itsSession = itsConnection.createSession(); - } catch(const qpid::types::Exception &ex) { - THROW(MessageBusException, ex.what()); - } -#else - : - itsBrokerName(broker), - itsNrMissingACKs(0) - { - } -#endif - - MultiBus::~MultiBus() - { - // fixme: memory leak for workers. Also infinite loop needs a fix. - } - - void MultiBus::addQueue(const std::string &address, const std::string &options) - { -#ifdef HAVE_QPID - Address addr(address + options); - Receiver receiver = itsSession.createReceiver(addr); - receiver.setCapacity(1); - - itsReceivers[address] = receiver; -#endif - } - -#ifdef HAVE_QPID - bool MultiBus::getMessage(Message &msg, double timeout) - { - try { - Receiver nrec = itsSession.nextReceiver(TimeOutDuration(timeout)); - nrec.get(msg); - - return true; - } catch(NoMessageAvailable&) { - return false; - } - } - - void MultiBus::nack(Message &msg) - { - itsSession.release(msg); - } -#endif - - void MultiBus::ack() - { -#ifdef HAVE_QPID - itsSession.acknowledge(); -#endif - } - - bool MultiBus::getString(std::string &str, double timeout) - { -#ifdef HAVE_QPID - Message msg; - bool ret = getMessage(msg, timeout); - - if (ret) - str = msg.getContent(); - return ret; -#else - return false; -#endif } } // namespace LOFAR +#endif \ No newline at end of file