diff --git a/LCS/MessageBus/include/MessageBus/MsgBus.h b/LCS/MessageBus/include/MessageBus/MsgBus.h index f1f1eac34c4c4cb5ff8f9b08e8781aa886c89ea9..cd99c88a6e1b0873f36b3f0c15a9d2772ce196ec 100644 --- a/LCS/MessageBus/include/MessageBus/MsgBus.h +++ b/LCS/MessageBus/include/MessageBus/MsgBus.h @@ -42,14 +42,8 @@ EXCEPTION_CLASS(MessageBusException, LOFAR::Exception); class FromBus { - const std::string itsBrokerName; - const std::string itsQueueName; - -#ifdef HAVE_QPID qpid::messaging::Connection itsConnection; qpid::messaging::Session itsSession; - qpid::messaging::Receiver receiver; -#endif int itsNrMissingACKs; @@ -57,72 +51,27 @@ public: FromBus(const std::string &address="testqueue" , const std::string &options="; {create: always}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ; ~FromBus(void); - bool getString(std::string &str,double timeout = 0.0); // timeout 0.0 means blocking - -#ifdef HAVE_QPID bool getMessage(qpid::messaging::Message &msg, double timeout = 0.0); // timeout 0.0 means blocking void nack(qpid::messaging::Message &msg); -#endif - void ack(void); + void ack(qpid::messaging::Message &msg); + void reject(qpid::messaging::Message &msg); + void addQueue(const std::string &address="testqueue", const std::string &options="; {create: always}"); }; class ToBus { - const std::string itsBrokerName; - const std::string itsQueueName; - -#ifdef HAVE_QPID qpid::messaging::Connection itsConnection; qpid::messaging::Session itsSession; - qpid::messaging::Sender sender; -#endif - + qpid::messaging::Sender itsSender; + public: ToBus(const std::string &address="testqueue" , const std::string &options="; {create: always}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ; ~ToBus(void); void send(const std::string &msg); }; - -class MultiBus -{ -public: - typedef bool (* MsgHandler)(const std::string &, const std::string &); - -private: - const std::string itsBrokerName; - -#ifdef HAVE_QPID - std::map<std::string, qpid::messaging::Receiver> itsReceivers; - - qpid::messaging::Connection itsConnection; - qpid::messaging::Session itsSession; -#endif - - int itsNrMissingACKs; - -public: - MultiBus(const std::string &broker = "amqp:tcp:127.0.0.1:5672"); - ~MultiBus(); - - void addQueue(const std::string &address="testqueue", const std::string &options="; {create: always}"); - void handleMessages(void); - -#ifdef HAVE_QPID - bool getMessage(qpid::messaging::Message &msg, double timeout = 0.0); // timeout 0.0 means blocking - - void nack(qpid::messaging::Message &msg); #endif - - void ack(void); - - bool getString(std::string &str, double timeout = 0.0); // timeout 0.0 means blocking - -}; - -} // namespace LOFAR - #endif