Skip to content
Snippets Groups Projects
Commit f5c391a5 authored by Jan Rinze Peterzon's avatar Jan Rinze Peterzon
Browse files

Task #7342: New MessageBus.h

parent 34c0c246
No related branches found
No related tags found
No related merge requests found
...@@ -42,14 +42,8 @@ EXCEPTION_CLASS(MessageBusException, LOFAR::Exception); ...@@ -42,14 +42,8 @@ EXCEPTION_CLASS(MessageBusException, LOFAR::Exception);
class FromBus class FromBus
{ {
const std::string itsBrokerName;
const std::string itsQueueName;
#ifdef HAVE_QPID
qpid::messaging::Connection itsConnection; qpid::messaging::Connection itsConnection;
qpid::messaging::Session itsSession; qpid::messaging::Session itsSession;
qpid::messaging::Receiver receiver;
#endif
int itsNrMissingACKs; int itsNrMissingACKs;
...@@ -57,72 +51,27 @@ public: ...@@ -57,72 +51,27 @@ public:
FromBus(const std::string &address="testqueue" , const std::string &options="; {create: always}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ; FromBus(const std::string &address="testqueue" , const std::string &options="; {create: always}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ;
~FromBus(void); ~FromBus(void);
bool getString(std::string &str,double timeout = 0.0); // timeout 0.0 means blocking
#ifdef HAVE_QPID
bool getMessage(qpid::messaging::Message &msg, double timeout = 0.0); // timeout 0.0 means blocking bool getMessage(qpid::messaging::Message &msg, double timeout = 0.0); // timeout 0.0 means blocking
void nack(qpid::messaging::Message &msg); void nack(qpid::messaging::Message &msg);
#endif
void ack(void); void ack(qpid::messaging::Message &msg);
void reject(qpid::messaging::Message &msg);
void addQueue(const std::string &address="testqueue", const std::string &options="; {create: always}");
}; };
class ToBus class ToBus
{ {
const std::string itsBrokerName;
const std::string itsQueueName;
#ifdef HAVE_QPID
qpid::messaging::Connection itsConnection; qpid::messaging::Connection itsConnection;
qpid::messaging::Session itsSession; qpid::messaging::Session itsSession;
qpid::messaging::Sender sender; qpid::messaging::Sender itsSender;
#endif
public: public:
ToBus(const std::string &address="testqueue" , const std::string &options="; {create: always}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ; ToBus(const std::string &address="testqueue" , const std::string &options="; {create: always}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ;
~ToBus(void); ~ToBus(void);
void send(const std::string &msg); void send(const std::string &msg);
}; };
class MultiBus
{
public:
typedef bool (* MsgHandler)(const std::string &, const std::string &);
private:
const std::string itsBrokerName;
#ifdef HAVE_QPID
std::map<std::string, qpid::messaging::Receiver> itsReceivers;
qpid::messaging::Connection itsConnection;
qpid::messaging::Session itsSession;
#endif
int itsNrMissingACKs;
public:
MultiBus(const std::string &broker = "amqp:tcp:127.0.0.1:5672");
~MultiBus();
void addQueue(const std::string &address="testqueue", const std::string &options="; {create: always}");
void handleMessages(void);
#ifdef HAVE_QPID
bool getMessage(qpid::messaging::Message &msg, double timeout = 0.0); // timeout 0.0 means blocking
void nack(qpid::messaging::Message &msg);
#endif #endif
void ack(void);
bool getString(std::string &str, double timeout = 0.0); // timeout 0.0 means blocking
};
} // namespace LOFAR
#endif #endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment