diff --git a/LCS/MessageBus/include/MessageBus/Message.h b/LCS/MessageBus/include/MessageBus/Message.h index 32d001f6c7e6492dad633403cfac8d58082bf367..09bbff90813e1030a617a4f10e573c04c4183ec3 100644 --- a/LCS/MessageBus/include/MessageBus/Message.h +++ b/LCS/MessageBus/include/MessageBus/Message.h @@ -42,6 +42,9 @@ class Message { public: // Construct a message + Message() {}; + + // With header info Message( // Name of the service or process producing this message const std::string &from, @@ -60,7 +63,7 @@ public: ); // Parse a message - Message(const qpid::messaging::Message &qpidMsg); + Message(const qpid::messaging::Message qpidMsg) : itsQpidMsg(qpidMsg) {}; // Read a message from disk (header + payload) Message(const std::string &rawContent); @@ -86,20 +89,26 @@ public: std::string toVersion() const { return (getXMLvalue("message.header.service.version")); } // Construct the given fields as a QPID message - qpid::messaging::Message getQpidMsg() const { return (itsQpidMsg); } + qpid::messaging::Message& qpidMsg() { return (itsQpidMsg); } // Return the raw message (header + payload) - std::string getRawContent() const { return (itsQpidMsg.getContent()); } + std::string rawContent() const { return (itsQpidMsg.getContent()); } + + // function for printing + std::ostream& print (std::ostream& os) const; private: // Internal very simple XML parser to get a key from the XML content. - string getXMLvalue(const string& key) const; + std::string getXMLvalue(const std::string& key) const; - // datamembers + // -- datamembers -- qpid::messaging::Message itsQpidMsg; }; -std::ostream &operator<<(std::ostream &s, const Message &msg); +inline std::ostream &operator<<(std::ostream &os, const Message &msg) +{ + return (msg.print(os)); +} } // namespace LOFAR diff --git a/LCS/MessageBus/include/MessageBus/MsgBus.h b/LCS/MessageBus/include/MessageBus/MsgBus.h index 31ca96b38dca9b11d774f999b3b0fe27552f3bea..512e59287f6646fd838c373b49455324751c3a4c 100644 --- a/LCS/MessageBus/include/MessageBus/MsgBus.h +++ b/LCS/MessageBus/include/MessageBus/MsgBus.h @@ -32,6 +32,7 @@ #include <qpid/messaging/Address.h> #include <Common/Exception.h> +#include <MessageBus/Message.h> #include <map> @@ -41,36 +42,40 @@ EXCEPTION_CLASS(MessageBusException, LOFAR::Exception); class FromBus { - qpid::messaging::Connection itsConnection; - qpid::messaging::Session itsSession; - - int itsNrMissingACKs; - public: FromBus(const std::string &address="testqueue" , const std::string &options="; {create: always}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ; ~FromBus(void); + bool addQueue(const std::string &address="testqueue", const std::string &options="; {create: always}"); - bool getMessage(qpid::messaging::Message &msg, double timeout = 0.0); // timeout 0.0 means blocking + bool getMessage(LOFAR::Message &msg, double timeout = 0.0); // timeout 0.0 means blocking - void nack(qpid::messaging::Message &msg); + void ack (LOFAR::Message &msg); + void nack (LOFAR::Message &msg); + void reject(LOFAR::Message &msg); + +private: + qpid::messaging::Connection itsConnection; + qpid::messaging::Session itsSession; + + int itsNrMissingACKs; - void ack(qpid::messaging::Message &msg); - void reject(qpid::messaging::Message &msg); - bool addQueue(const std::string &address="testqueue", const std::string &options="; {create: always}"); }; class ToBus { - qpid::messaging::Connection itsConnection; - qpid::messaging::Session itsSession; - qpid::messaging::Sender itsSender; - public: ToBus(const std::string &address="testqueue" , const std::string &options="; {create: always}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ; ~ToBus(void); void send(const std::string &msg); - void send(const qpid::messaging::Message &msg); + void send(LOFAR::Message &msg); + +private: + // -- datamambers -- + qpid::messaging::Connection itsConnection; + qpid::messaging::Session itsSession; + qpid::messaging::Sender itsSender; + }; } // namespace LOFAR diff --git a/LCS/MessageBus/src/Message.cc b/LCS/MessageBus/src/Message.cc index 8962f9653c3ccbdd46b6d681457395edeb34666a..85633153d5fe559dcabc75247998cda97da8f718 100644 --- a/LCS/MessageBus/src/Message.cc +++ b/LCS/MessageBus/src/Message.cc @@ -66,18 +66,10 @@ Message::Message(const std::string &from, cout << itsQpidMsg.getContent() << endl; } -Message::Message(const qpid::messaging::Message &qpidMsg) : - itsQpidMsg(qpidMsg) -{ - cout << itsQpidMsg.getContent() << endl; -} - - // Read a message from disk (header + payload) Message::Message(const std::string &rawContent) { itsQpidMsg.setContent(rawContent); - cout << itsQpidMsg.getContent() << endl; } Message::~Message() @@ -85,13 +77,12 @@ Message::~Message() void Message::setXMLPayload (const std::string &payload) { - + itsQpidMsg.setContent(formatString(itsQpidMsg.getContent().c_str(), payload.c_str())); } void Message::setTXTPayload (const std::string &payload) { itsQpidMsg.setContent(formatString(itsQpidMsg.getContent().c_str(), payload.c_str())); - cout << itsQpidMsg.getContent() << endl; } void Message::setMapPayload (const qpid::types::Variant::Map &payload) @@ -104,6 +95,27 @@ void Message::setListPayload(const qpid::types::Variant::List &payload) } +// +// print +// +std::ostream& Message::print (std::ostream& os) const +{ + os << "system : " << system() << endl; + os << "systemversion : " << headerVersion() << endl; + os << "serviceName : " << toService() << endl; + os << "serviceVersion : " << toVersion() << endl; + os << "summary : " << summary() << endl; + os << "timestamp : " << timestamp() << endl; + os << "source : " << from() << endl; + os << "user : " << forUser() << endl; + os << "uuid : " << uuid() << endl; + os << "payload : " << payload() << endl; + return (os); +} + +// +// getXMLvalue(tag) +// string Message::getXMLvalue(const string& key) const { // get copy of content diff --git a/LCS/MessageBus/src/MsgBus.cc b/LCS/MessageBus/src/MsgBus.cc index a0f2516f516de4fc241d69a244d1770a9a141882..933a6df2d50f2ea8dbed7f72d4acc78c3d4a0a44 100644 --- a/LCS/MessageBus/src/MsgBus.cc +++ b/LCS/MessageBus/src/MsgBus.cc @@ -46,34 +46,37 @@ namespace LOFAR { } } - bool FromBus::getMessage(Message &msg, double timeout) // timeout 0.0 means blocking + bool FromBus::getMessage(LOFAR::Message &msg, double timeout) // timeout 0.0 means blocking { Receiver next; + qpid::messaging::Message qmsg; cout << "waiting for message..." << endl; - if (itsSession.nextReceiver(next,TimeOutDuration(timeout))) - { + if (itsSession.nextReceiver(next,TimeOutDuration(timeout))) { cout << "message available on queue: " << next.getName() << endl; itsNrMissingACKs++; - return next.get(msg); + if (next.get(qmsg)) { + msg = LOFAR::Message(qmsg); + return true; + } } return false; } - void FromBus::nack(Message &msg) + void FromBus::ack(LOFAR::Message &msg) { - itsSession.release(msg); - itsNrMissingACKs--; + itsSession.acknowledge(msg.qpidMsg()); + itsNrMissingACKs --; } - void FromBus::ack(Message &msg) + void FromBus::nack(LOFAR::Message &msg) { - itsSession.acknowledge(msg); - itsNrMissingACKs --; + itsSession.release(msg.qpidMsg()); + itsNrMissingACKs--; } - void FromBus::reject(Message &msg) + void FromBus::reject(LOFAR::Message &msg) { - itsSession.reject(msg); + itsSession.reject(msg.qpidMsg()); itsNrMissingACKs --; } @@ -112,13 +115,13 @@ namespace LOFAR { void ToBus::send(const std::string &msg) { - Message tosend(msg); - itsSender.send(tosend,true); + LOFAR::Message tosend(msg); + itsSender.send(tosend.qpidMsg(), true); } - void ToBus::send(const Message& msg) + void ToBus::send(LOFAR::Message& msg) { - itsSender.send(msg,true); + itsSender.send(msg.qpidMsg(), true); } } // namespace LOFAR diff --git a/LCS/MessageBus/test/tMessage.cc b/LCS/MessageBus/test/tMessage.cc index e1d6f288e418dd631afa989f9d4f2cefc96b53cd..38077c04773098a1625b1c0aa6e55baef23cf49b 100644 --- a/LCS/MessageBus/test/tMessage.cc +++ b/LCS/MessageBus/test/tMessage.cc @@ -65,8 +65,8 @@ void compareMessages(Message& lhm, Message& rhm) #endif int main(int argc, char* argv[]) { - if (argc != 2) { - cout << "Syntax: " << argv[0] << " messagebus" << endl; + if (argc != 1) { + cout << "Syntax: " << argv[0] << endl; return (1); } @@ -78,18 +78,7 @@ int main(int argc, char* argv[]) { string KVmapje("abc=[aap,noot,mies]\nmyInteger=5\nmyDouble=3.14"); msg1.setTXTPayload(KVmapje); - cout << "system : " << msg1.system() << endl; - cout << "systemversion : " << msg1.headerVersion() << endl; - cout << "serviceName : " << msg1.toService() << endl; - cout << "serviceVersion : " << msg1.toVersion() << endl; - cout << "summary : " << msg1.summary() << endl; - cout << "timestamp : " << msg1.timestamp() << endl; - cout << "source : " << msg1.from() << endl; - cout << "user : " << msg1.forUser() << endl; - cout << "uuid : " << msg1.uuid() << endl; - cout << "payload : " << msg1.payload() << endl; - - + cout << msg1; return (0); } diff --git a/LCS/MessageBus/test/tMsgBus.cc b/LCS/MessageBus/test/tMsgBus.cc index a82e611ad09fb02aebdff19679d47d70300e4b8c..675639ee62c972ce3da864cafed65982098f06e5 100644 --- a/LCS/MessageBus/test/tMsgBus.cc +++ b/LCS/MessageBus/test/tMsgBus.cc @@ -29,7 +29,7 @@ using namespace qpid::messaging; using namespace LOFAR; -void showMessage(Message& msg) +void showMessage(qpid::messaging::Message& msg) { cout << "Message ID : " << msg.getMessageId() << endl; cout << "User ID : " << msg.getUserId() << endl; @@ -46,7 +46,7 @@ void showMessage(Message& msg) cout << "Content : " << msg.getContent() << endl; } -void compareMessages(Message& lhm, Message& rhm) +void compareMessages(qpid::messaging::Message& lhm, qpid::messaging::Message& rhm) { ASSERTSTR(lhm.getMessageId() == rhm.getMessageId(), "messageIDs differ"); ASSERTSTR(lhm.getUserId() == rhm.getUserId(), "UserIDs differ"); @@ -72,7 +72,7 @@ int main(int argc, char* argv[]) { cout << "--- Drain the queue ---" << endl; FromBus fb(argv[1]); - Message receivedMsg; + LOFAR::Message receivedMsg; while (fb.getMessage(receivedMsg, 0.01)) { fb.ack(receivedMsg); } @@ -83,16 +83,16 @@ int main(int argc, char* argv[]) { tb.send(someText); fb.getMessage(receivedMsg); fb.ack(receivedMsg); - showMessage(receivedMsg); + showMessage(receivedMsg.qpidMsg()); cout << "--- TEST 2: create a message by hand, send it, receive it, print it, compare them. --- " << endl; - Message msg2Send; - msg2Send.setContent("Manually constructed message"); + LOFAR::Message msg2Send; + msg2Send.setTXTPayload("Manually constructed message"); tb.send(msg2Send); fb.getMessage(receivedMsg); fb.ack(receivedMsg); - showMessage(receivedMsg); - compareMessages(msg2Send, receivedMsg); + showMessage(receivedMsg.qpidMsg()); + compareMessages(msg2Send.qpidMsg(), receivedMsg.qpidMsg()); cout << "--- TEST 3: add an extra queue, send messages to both queues, receive them. --- " << endl; ToBus tbExtra("extraTestQ"); @@ -102,10 +102,10 @@ int main(int argc, char* argv[]) { fb.addQueue("extraTestQ"); fb.getMessage(receivedMsg); fb.ack(receivedMsg); - showMessage(receivedMsg); + showMessage(receivedMsg.qpidMsg()); fb.getMessage(receivedMsg); fb.ack(receivedMsg); - showMessage(receivedMsg); + showMessage(receivedMsg.qpidMsg()); cout << "--- All test successful! ---" << endl; return (0);