Skip to content
Snippets Groups Projects
Commit bde05049 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #7432: Classes no longer derive from Connection, and put everything in LOFAR namespace

parent 54f52e4a
No related branches found
No related tags found
No related merge requests found
......@@ -32,11 +32,16 @@
#include <map>
class FromBus: private qpid::messaging::Connection
namespace LOFAR {
class FromBus
{
std::string queuename,brokername;
qpid::messaging::Connection connection;
qpid::messaging::Session session;
qpid::messaging::Receiver receiver;
int DiffNumAck;
int state;
void cleanup(void);
......@@ -49,11 +54,14 @@ public:
~FromBus(void);
};
class ToBus: private qpid::messaging::Connection
class ToBus
{
std::string queuename,brokername;
qpid::messaging::Connection connection;
qpid::messaging::Session session;
qpid::messaging::Sender sender;
int state,DiffNumAck;
void cleanup(void);
......@@ -72,11 +80,14 @@ typedef struct
std::string queuename;
} MsgWorker;
class MultiBus: private qpid::messaging::Connection
class MultiBus
{
std::map<qpid::messaging::Receiver, MsgWorker*> handlers;
std::string brokername;
qpid::messaging::Connection connection;
qpid::messaging::Session session;
int state;
int DiffNumAck;
void cleanup(void);
......@@ -92,5 +103,7 @@ public:
~MultiBus();
};
} // namespace LOFAR
#endif
#include "lofar_config.h"
#include <MessageBus/MsgBus.h>
#include <iostream>
......@@ -6,6 +7,8 @@ using namespace qpid::messaging;
using std::string;
namespace LOFAR {
#define S_OPEN 1
#define S_SESSION 2
#define S_SENDER 4
......@@ -28,17 +31,18 @@ static Duration TimeOutSecs(double secs)
FromBus::FromBus(const std::string &address, const std::string &options, const std::string &broker)
:Connection(broker),
DiffNumAck(0)
:
connection(broker),
DiffNumAck(0)
{
try {
open();
session = createSession();
connection.open();
session = connection.createSession();
Address addr(address+options);
receiver = session.createReceiver(addr);
} catch (const std::exception& error) {
std::cout << error.what() << std::endl;
close();
connection.close();
}
}
bool FromBus::GetStr( std::string & Str, double timeout) // timeout 0.0 means blocking
......@@ -68,7 +72,7 @@ static Duration TimeOutSecs(double secs)
FromBus::~FromBus(void)
{
if (DiffNumAck) { std::cout << "Queue " << queuename << " on broker " << brokername << " has " << DiffNumAck << " messages not ack'ed " << std::endl;};
close();
connection.close();
}
void ToBus::cleanup(void)
......@@ -79,16 +83,17 @@ static Duration TimeOutSecs(double secs)
}
ToBus::ToBus(const std::string &address, const std::string &options, const std::string &broker)
: Connection(broker)
:
connection(broker),
DiffNumAck(0)
{
queuename = string(address);
brokername = string( broker);
state = 0;
DiffNumAck= 0;
try {
open();
connection.open();
state |= S_OPEN;
session = createSession();
session = connection.createSession();
state |= S_SESSION;
Address addr(address+options);
sender = session.createSender(addr);
......@@ -116,16 +121,17 @@ static Duration TimeOutSecs(double secs)
}
MultiBus::MultiBus(MsgHandler handler, const std::string &address, const std::string &options, const std::string &broker)
: Connection(broker),
DiffNumAck(0)
:
connection(broker),
DiffNumAck(0)
{
string queuename = string(address);
brokername = string(broker);
state = 0;
try {
open();
connection.open();
state |= S_OPEN;
session = createSession();
session = connection.createSession();
state |= S_SESSION;
Address addr(address+options);
Receiver receiver = session.createReceiver(addr);
......@@ -199,3 +205,5 @@ static Duration TimeOutSecs(double secs)
// fixme: memory leak for workers. Also infinite loop needs a fix.
}
} // namespace LOFAR
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment