From f5c391a57a883246b71b70b742b1f64dc57fa133 Mon Sep 17 00:00:00 2001 From: Jan Rinze Peterzon <peterzon@astron.nl> Date: Fri, 30 Jan 2015 13:22:10 +0000 Subject: [PATCH] Task #7342: New MessageBus.h --- LCS/MessageBus/include/MessageBus/MsgBus.h | 61 ++-------------------- 1 file changed, 5 insertions(+), 56 deletions(-) diff --git a/LCS/MessageBus/include/MessageBus/MsgBus.h b/LCS/MessageBus/include/MessageBus/MsgBus.h index f1f1eac34c4..cd99c88a6e1 100644 --- a/LCS/MessageBus/include/MessageBus/MsgBus.h +++ b/LCS/MessageBus/include/MessageBus/MsgBus.h @@ -42,14 +42,8 @@ EXCEPTION_CLASS(MessageBusException, LOFAR::Exception); class FromBus { - const std::string itsBrokerName; - const std::string itsQueueName; - -#ifdef HAVE_QPID qpid::messaging::Connection itsConnection; qpid::messaging::Session itsSession; - qpid::messaging::Receiver receiver; -#endif int itsNrMissingACKs; @@ -57,72 +51,27 @@ public: FromBus(const std::string &address="testqueue" , const std::string &options="; {create: always}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ; ~FromBus(void); - bool getString(std::string &str,double timeout = 0.0); // timeout 0.0 means blocking - -#ifdef HAVE_QPID bool getMessage(qpid::messaging::Message &msg, double timeout = 0.0); // timeout 0.0 means blocking void nack(qpid::messaging::Message &msg); -#endif - void ack(void); + void ack(qpid::messaging::Message &msg); + void reject(qpid::messaging::Message &msg); + void addQueue(const std::string &address="testqueue", const std::string &options="; {create: always}"); }; class ToBus { - const std::string itsBrokerName; - const std::string itsQueueName; - -#ifdef HAVE_QPID qpid::messaging::Connection itsConnection; qpid::messaging::Session itsSession; - qpid::messaging::Sender sender; -#endif - + qpid::messaging::Sender itsSender; + public: ToBus(const std::string &address="testqueue" , const std::string &options="; {create: always}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ; ~ToBus(void); void send(const std::string &msg); }; - -class MultiBus -{ -public: - typedef bool (* MsgHandler)(const std::string &, const std::string &); - -private: - const std::string itsBrokerName; - -#ifdef HAVE_QPID - std::map<std::string, qpid::messaging::Receiver> itsReceivers; - - qpid::messaging::Connection itsConnection; - qpid::messaging::Session itsSession; -#endif - - int itsNrMissingACKs; - -public: - MultiBus(const std::string &broker = "amqp:tcp:127.0.0.1:5672"); - ~MultiBus(); - - void addQueue(const std::string &address="testqueue", const std::string &options="; {create: always}"); - void handleMessages(void); - -#ifdef HAVE_QPID - bool getMessage(qpid::messaging::Message &msg, double timeout = 0.0); // timeout 0.0 means blocking - - void nack(qpid::messaging::Message &msg); #endif - - void ack(void); - - bool getString(std::string &str, double timeout = 0.0); // timeout 0.0 means blocking - -}; - -} // namespace LOFAR - #endif -- GitLab