diff --git a/LCS/MessageBus/src/MsgBus.cc b/LCS/MessageBus/src/MsgBus.cc index 3ebafe7e3c948a9d9f536f3342b20e29659edd6c..a0f2516f516de4fc241d69a244d1770a9a141882 100644 --- a/LCS/MessageBus/src/MsgBus.cc +++ b/LCS/MessageBus/src/MsgBus.cc @@ -26,12 +26,14 @@ namespace LOFAR { itsNrMissingACKs(0) { itsConnection.open(); + cout << "Connected to: " << itsConnection.getUrl() << endl; itsSession = itsConnection.createSession(); Address addr(address+options); Receiver receiver = itsSession.createReceiver(addr); receiver.setCapacity(1); + cout << "Receiver started at queue: " << receiver.getName() << endl; } catch(const qpid::types::Exception &ex) { THROW(MessageBusException, ex.what()); } @@ -50,7 +52,7 @@ namespace LOFAR { cout << "waiting for message..." << endl; if (itsSession.nextReceiver(next,TimeOutDuration(timeout))) { - cout << "message available..." << endl; + cout << "message available on queue: " << next.getName() << endl; itsNrMissingACKs++; return next.get(msg); } @@ -82,6 +84,7 @@ namespace LOFAR { Address addr(address+options); Receiver receiver = itsSession.createReceiver(addr); receiver.setCapacity(1); + cout << "Receiver started at queue: " << receiver.getName() << endl; } catch(const qpid::types::Exception &ex) { //THROW(MessageBusException, ex.what()); return false; @@ -94,9 +97,12 @@ namespace LOFAR { itsConnection(broker) { itsConnection.open(); + cout << "Connected to: " << itsConnection.getUrl() << endl; + itsSession = itsConnection.createSession(); Address addr(address+options); itsSender = itsSession.createSender(addr); + cout << "Sender created: " << itsSender.getName() << endl; } catch(const qpid::types::Exception &ex) { THROW(MessageBusException, ex.what()); } diff --git a/LCS/MessageBus/test/tMsgBus.cc b/LCS/MessageBus/test/tMsgBus.cc index 7c96ded8a7c5787061c655f7d26cc152cc596776..a82e611ad09fb02aebdff19679d47d70300e4b8c 100644 --- a/LCS/MessageBus/test/tMsgBus.cc +++ b/LCS/MessageBus/test/tMsgBus.cc @@ -46,17 +46,68 @@ void showMessage(Message& msg) cout << "Content : " << msg.getContent() << endl; } +void compareMessages(Message& lhm, Message& rhm) +{ + ASSERTSTR(lhm.getMessageId() == rhm.getMessageId(), "messageIDs differ"); + ASSERTSTR(lhm.getUserId() == rhm.getUserId(), "UserIDs differ"); + ASSERTSTR(lhm.getCorrelationId() == rhm.getCorrelationId(), "CorrelationIDs differ"); + ASSERTSTR(lhm.getSubject() == rhm.getSubject(), "Subjects differ"); + ASSERTSTR(lhm.getReplyTo() == rhm.getReplyTo(), "ReplyTos differ"); + ASSERTSTR(lhm.getContentType() == rhm.getContentType(), "ContentTypes differ"); + ASSERTSTR(lhm.getPriority() == rhm.getPriority(), "Priorities differ"); + ASSERTSTR(lhm.getTtl() == rhm.getTtl(), "TTLs differ"); + ASSERTSTR(lhm.getDurable() == rhm.getDurable(), "Durability differs"); + ASSERTSTR(lhm.getRedelivered() == rhm.getRedelivered(), "Redelivered differs"); +// ASSERTSTR(lhm.getProperties() == rhm.getProperties(), "Properties differ"); + ASSERTSTR(lhm.getContentSize() == rhm.getContentSize(), "ContentSize differs"); + ASSERTSTR(lhm.getContent() == rhm.getContent(), "Content differs"); +} + + int main(int argc, char* argv[]) { if (argc != 2) { - cout << "Syntax: abc messagebus" << endl; + cout << "Syntax: " << argv[0] << " messagebus" << endl; return (1); } + cout << "--- Drain the queue ---" << endl; FromBus fb(argv[1]); - Message msg; - fb.getMessage(msg); - showMessage(msg); + Message receivedMsg; + while (fb.getMessage(receivedMsg, 0.01)) { + fb.ack(receivedMsg); + } + + cout << "--- TEST 1: create a message from a string, send it, receive it, print it. --- " << endl; + ToBus tb(argv[1]); + string someText("An example message constructed from text"); + tb.send(someText); + fb.getMessage(receivedMsg); + fb.ack(receivedMsg); + showMessage(receivedMsg); + + cout << "--- TEST 2: create a message by hand, send it, receive it, print it, compare them. --- " << endl; + Message msg2Send; + msg2Send.setContent("Manually constructed message"); + tb.send(msg2Send); + fb.getMessage(receivedMsg); + fb.ack(receivedMsg); + showMessage(receivedMsg); + compareMessages(msg2Send, receivedMsg); + + cout << "--- TEST 3: add an extra queue, send messages to both queues, receive them. --- " << endl; + ToBus tbExtra("extraTestQ"); + tbExtra.send("Message send to extra queue"); + tb.send("Message send to original queue"); + + fb.addQueue("extraTestQ"); + fb.getMessage(receivedMsg); + fb.ack(receivedMsg); + showMessage(receivedMsg); + fb.getMessage(receivedMsg); + fb.ack(receivedMsg); + showMessage(receivedMsg); + cout << "--- All test successful! ---" << endl; return (0); }