Skip to content
Snippets Groups Projects
Commit 74df2eab authored by Jan Rinze Peterzon's avatar Jan Rinze Peterzon
Browse files

Task #7342: adapted MessageBus.cc to match MessageBus.h

parent aa37cc93
No related branches found
No related tags found
No related merge requests found
......@@ -9,11 +9,9 @@
#include <qpid/messaging/Logger.h>
using namespace qpid::messaging;
#endif
namespace LOFAR {
#ifdef HAVE_QPID
static Duration TimeOutDuration(double secs)
{
if (secs > 0.0)
......@@ -22,55 +20,8 @@ namespace LOFAR {
return Duration::FOREVER;
}
class QpidLogSink: qpid::messaging::LoggerOutput {
public:
virtual void log(Level level, bool user, const char* file, int line, const char* function, const std::string& message) {
(void)user;
switch(level) {
case trace:
case debug:
LOG_DEBUG_STR("[QPID] " << function << " @ " << file << ":" << line << " " << message);
break;
case info:
case notice:
LOG_INFO_STR("[QPID] " << function << " @ " << file << ":" << line << " " << message);
break;
case warning:
LOG_WARN_STR("[QPID] " << function << " @ " << file << ":" << line << " " << message);
break;
case error:
LOG_ERROR_STR("[QPID] " << function << " @ " << file << ":" << line << " " << message);
break;
case critical:
LOG_FATAL_STR("[QPID] " << function << " @ " << file << ":" << line << " " << message);
break;
}
}
};
static QpidLogSink qpidLogSink;
#if 0
void init() {
Logger::setOutput(qpidLogSink);
}
void done() {
}
#endif
#endif
FromBus::FromBus(const std::string &address, const std::string &options, const std::string &broker)
#ifdef HAVE_QPID
try:
itsBrokerName(broker),
itsQueueName(address),
itsConnection(broker),
itsNrMissingACKs(0)
{
......@@ -79,18 +30,10 @@ namespace LOFAR {
itsSession = itsConnection.createSession();
Address addr(address+options);
receiver = itsSession.createReceiver(addr);
Receiver receiver = itsSession.createReceiver(addr);
} catch(const qpid::types::Exception &ex) {
THROW(MessageBusException, ex.what());
}
#else
:
itsBrokerName(broker),
itsQueueName(address),
itsNrMissingACKs(0)
{
}
#endif
FromBus::~FromBus(void)
......@@ -99,160 +42,69 @@ namespace LOFAR {
LOG_ERROR_STR("Queue " << itsQueueName << " on broker " << itsBrokerName << " has " << itsNrMissingACKs << " messages not ACK'ed ");
}
bool FromBus::getString(std::string &str, double timeout) // timeout 0.0 means blocking
bool FromBus::getMessage(Message &msg, double timeout) // timeout 0.0 means blocking
{
#ifdef HAVE_QPID
Message msg;
bool ret = getMessage(msg, timeout);
if (ret) {
itsNrMissingACKs ++;
str = msg.getContent();
Receiver next;
if (session.nextReceiver(next,TimeOutDuration(timeout)))
{
itsNrMissingACKs++;
return next.get(msg);
}
return ret;
#else
return false;
#endif
}
#ifdef HAVE_QPID
bool FromBus::getMessage(Message &msg, double timeout) // timeout 0.0 means blocking
void FromBus::nack(Message &msg)
{
bool ret= receiver.fetch(msg, TimeOutDuration(timeout));
if (ret) itsNrMissingACKs++;
return ret;
itsSession.release(msg);
itsNrMissingACKs--;
}
void FromBus::nack(Message &msg)
void FromBus::ack(Message &msg)
{
itsSession.release(msg);
itsNrMissingACKs--;
itsSession.acknowledge(msg);
itsNrMissingACKs --;
}
#endif
void FromBus::ack(void)
void FromBus::reject(Message &msg)
{
#ifdef HAVE_QPID
itsSession.acknowledge();
#endif
itsSession.reject(msg);
itsNrMissingACKs --;
}
// acknowlegde covers ALL messages received so far
itsNrMissingACKs = 0;
bool FromBus::addQueue(const std::string &address, const std::string &options)
{
try
{
Address addr(address+options);
Receiver receiver = itsSession.createReceiver(addr);
} catch(const qpid::types::Exception &ex) {
//THROW(MessageBusException, ex.what());
return false;
}
return true;
}
ToBus::ToBus(const std::string &address, const std::string &options, const std::string &broker)
#ifdef HAVE_QPID
try:
itsBrokerName(broker),
itsQueueName(address),
itsConnection(broker)
{
itsConnection.open();
itsSession = itsConnection.createSession();
Address addr(address+options);
sender = itsSession.createSender(addr);
} catch(const qpid::types::Exception &ex) {
THROW(MessageBusException, ex.what());
}
#else
:
itsBrokerName(broker),
itsQueueName(address)
{
}
#endif
ToBus::~ToBus(void)
{
}
void ToBus::send(const std::string &msg)
{
#ifdef HAVE_QPID
Message tosend(msg);
sender.send(tosend,true);
#endif
}
MultiBus::MultiBus(const std::string &broker)
#ifdef HAVE_QPID
try:
itsBrokerName(broker),
itsConnection(broker),
itsNrMissingACKs(0)
{
itsConnection.open();
itsSession = itsConnection.createSession();
} catch(const qpid::types::Exception &ex) {
THROW(MessageBusException, ex.what());
}
#else
:
itsBrokerName(broker),
itsNrMissingACKs(0)
{
}
#endif
MultiBus::~MultiBus()
{
// fixme: memory leak for workers. Also infinite loop needs a fix.
}
void MultiBus::addQueue(const std::string &address, const std::string &options)
{
#ifdef HAVE_QPID
Address addr(address + options);
Receiver receiver = itsSession.createReceiver(addr);
receiver.setCapacity(1);
itsReceivers[address] = receiver;
#endif
}
#ifdef HAVE_QPID
bool MultiBus::getMessage(Message &msg, double timeout)
{
try {
Receiver nrec = itsSession.nextReceiver(TimeOutDuration(timeout));
nrec.get(msg);
return true;
} catch(NoMessageAvailable&) {
return false;
}
}
void MultiBus::nack(Message &msg)
{
itsSession.release(msg);
}
#endif
void MultiBus::ack()
{
#ifdef HAVE_QPID
itsSession.acknowledge();
#endif
}
bool MultiBus::getString(std::string &str, double timeout)
{
#ifdef HAVE_QPID
Message msg;
bool ret = getMessage(msg, timeout);
if (ret)
str = msg.getContent();
return ret;
#else
return false;
#endif
}
} // namespace LOFAR
#endif
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment