diff --git a/.gitattributes b/.gitattributes index 3038ecdc2d7ee6e6132ab2cff23a8d4a109dd15a..639951db95fab7cc20ddc4e4d947d0b420e1c2af 100644 --- a/.gitattributes +++ b/.gitattributes @@ -2438,6 +2438,8 @@ LCS/MessageBus/src/__init__.py -text LCS/MessageBus/src/msgbus.py -text LCS/MessageBus/test/tMessage.cc -text LCS/MessageBus/test/tMsgBus.cc -text +LCS/MessageBus/test/tMsgBus.run eol=lf +LCS/MessageBus/test/tMsgBus.sh eol=lf LCS/MessageDaemons/src/MessageRouter -text LCS/MessageDaemons/src/MessageRouter.conf.ccu001 -text LCS/MessageDaemons/src/MessageRouter.conf.ccu099 -text @@ -4609,7 +4611,7 @@ SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.outp SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/SB3.cfloat.raw -text SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/SB4.cfloat.raw -text SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.processing -text -SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.status -text +SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.state -text SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.parset -text SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.run eol=lf SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.sh eol=lf diff --git a/LCS/MessageBus/include/MessageBus/MsgBus.h b/LCS/MessageBus/include/MessageBus/MsgBus.h index f170c65c13a06b4a23c26a7a0125d17a943aecd4..ac080aa4cb270b9417b084bae4ab726a21113812 100644 --- a/LCS/MessageBus/include/MessageBus/MsgBus.h +++ b/LCS/MessageBus/include/MessageBus/MsgBus.h @@ -48,9 +48,9 @@ namespace MessageBus { class FromBus { public: - FromBus(const std::string &address="testqueue" , const std::string &options="; {create: always}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ; + FromBus(const std::string &address="testqueue" , const std::string &options="; {create: never}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ; ~FromBus(void); - bool addQueue(const std::string &address="testqueue", const std::string &options="; {create: always}"); + bool addQueue(const std::string &address="testqueue", const std::string &options="; {create: never}"); bool getMessage(LOFAR::Message &msg, double timeout = 0.0); // timeout 0.0 means blocking @@ -69,7 +69,7 @@ private: class ToBus { public: - ToBus(const std::string &address="testqueue" , const std::string &options="; {create: always}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ; + ToBus(const std::string &address="testqueue" , const std::string &options="; {create: never}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ; ~ToBus(void); void send(const std::string &msg); diff --git a/LCS/MessageBus/src/msgbus.py b/LCS/MessageBus/src/msgbus.py index 480a32c347dc63f5815b71fe4089d598f9d27a71..06042adff2632e721ea85b083c134e6d6074630b 100644 --- a/LCS/MessageBus/src/msgbus.py +++ b/LCS/MessageBus/src/msgbus.py @@ -24,7 +24,7 @@ import lofar.messagebus.message as message # Candidate for a config file broker="127.0.0.1" -options="create:always" +options="create:never" class BusException(Exception): pass diff --git a/LCS/MessageBus/test/CMakeLists.txt b/LCS/MessageBus/test/CMakeLists.txt index df821987d9dd3e4ef05677092bad5b0670cd6c4b..0f8f56a78780c14db8f6efddfbe82d300753b60c 100644 --- a/LCS/MessageBus/test/CMakeLists.txt +++ b/LCS/MessageBus/test/CMakeLists.txt @@ -2,5 +2,9 @@ include(LofarCTest) +configure_file( + ${CMAKE_CURRENT_SOURCE_DIR}/MessageFuncs.sh.in + ${CMAKE_BINARY_DIR}/bin/MessageFuncs.sh @ONLY) + lofar_add_test(tMsgBus tMsgBus.cc) lofar_add_test(tMessage tMessage.cc) diff --git a/LCS/MessageBus/test/MessageFuncs.sh.in b/LCS/MessageBus/test/MessageFuncs.sh.in new file mode 100644 index 0000000000000000000000000000000000000000..1ddf2d292647cb76ab8edb723d21b9757a0e7222 --- /dev/null +++ b/LCS/MessageBus/test/MessageFuncs.sh.in @@ -0,0 +1,149 @@ +#!/bin/bash +# +# Usage: source MessageFuncs.sh +# + +function _on_exit() { + # Calls $1 on exit, while preserving existing EXIT traps + + # Get existing EXIT traps + TRAPLINE=`trap -p EXIT` + ONEXITS=`echo $TRAPLINE | perl -ne 'print "$1\n" if /trap -- .(.*). EXIT/;'` + + # Expand traps + trap -- "$1;$ONEXITS" EXIT +} + +function _generate_prefix() { + # Generate a unique prefix for this test + NOW=`date +"%FT%T"` + mktemp -u test-$NOW-XXXXXXXXXX. +} + +# Generate an unique prefix for all queues in this test +export QUEUE_PREFIX=`_generate_prefix` + +function have_qpid() { + [ "@HAVE_QPID@" == "TRUE" ] +} + +# A list of all queues we created +CREATED_QUEUES="" + +function create_queue() { + # Creates an empty queue + # + # Usage: + # create_queue queue + QUEUE_NAME="$1" + + if have_qpid; then + @QPID_RECEIVE_EXECUTABLE@ \ + -b 127.0.0.1 \ + -a "$QUEUE_PREFIX$QUEUE_NAME; { create: always }" \ + --print-content no --ignore-reply-to + fi + + # Update the list of queues we created + CREATED_QUEUES="$CREATED_QUEUES $QUEUE_NAME" +} + +function delete_queue() { + # Empties and deletes a queue + # + # Usage: + # delete_queue queue + # + # Will not remove non-empty or used queues + QUEUE_NAME="$1" + + if have_qpid; then + @QPID_RECEIVE_EXECUTABLE@ \ + -b 127.0.0.1 \ + -a "$QUEUE_PREFIX$QUEUE_NAME; { delete: always }" \ + --print-content no --ignore-reply-to + fi +} + +function delete_all_queues() { + # Empties and deletes all queues created by create_queue + # + # Usage: + # delete_all_queues + + for QUEUE_NAME in $CREATED_QUEUES; do + delete_queue "$QUEUE_NAME" + done +} + +# Automatically delete all empty, non-used queues on EXIT +_on_exit delete_all_queues + +function recv_msg() { + # Retrieves one message from a queue + # + # Usage: + # recv_msg queue > message + # + # Returns an empty message if none was available + QUEUE_NAME="$1" + + if have_qpid; then + @QPID_RECEIVE_EXECUTABLE@ \ + -b 127.0.0.1 \ + -a "$QUEUE_PREFIX$QUEUE_NAME" \ + --ignore-reply-to \ + -m 1 + fi +} + +function recv_all_msgs() { + # Retrieves all messages from a queue + # + # Usage: + # recv_all_msgs queue > messages + # + # Returns an empty message if none was available + QUEUE_NAME="$1" + + if have_qpid; then + @QPID_RECEIVE_EXECUTABLE@ \ + -b 127.0.0.1 \ + -a "$QUEUE_PREFIX$QUEUE_NAME" \ + --ignore-reply-to \ + -m 0 + fi +} + +function send_msg() { + # Send one message to a queue + # + # Usage: + # send_msg queue message + QUEUE_NAME="$1" + MESSAGE="$2" + + if have_qpid; then + @QPID_SEND_EXECUTABLE@ \ + -b 127.0.0.1 \ + -a "$QUEUE_PREFIX$QUEUE_NAME" \ + --content-string "$MESSAGE" + fi +} + +function compare_msg() { + # Compare two messages, ignoring uncontrollable fields (UUID, timestamp). + # Both messages are expected to be in files (or filedescriptors). + # + # Usage: + # compare_msg reference_message.txt generated_message.txt + REFERENCE="$1" + GENERATED="$2" + + function compare_msg_filter() { + fgrep -v '<uuid>' | fgrep -v '<timestamp>' + } + + diff -w <(<"$REFERENCE" compare_msg_filter) <(<"$GENERATED" compare_msg_filter) +} + diff --git a/LCS/MessageBus/test/tMsgBus.cc b/LCS/MessageBus/test/tMsgBus.cc index 50096068e3825092d999afaab240b1819ab8c937..14688926fa777573978e0b36f474feaab3270829 100644 --- a/LCS/MessageBus/test/tMsgBus.cc +++ b/LCS/MessageBus/test/tMsgBus.cc @@ -69,7 +69,7 @@ int main(int argc, char* argv[]) { MessageBus::init(); - std::string queue(argc == 2 ? argv[1] : "tMsgBus-test-queue"); + std::string queue(argc == 2 ? argv[1] : "tMsgBus"); cout << "Using queue " << queue << " (Syntax: " << argv[0] << " messagebus)" << endl; @@ -98,11 +98,11 @@ int main(int argc, char* argv[]) { compareMessages(msg2Send.qpidMsg(), receivedMsg.qpidMsg()); cout << "--- TEST 3: add an extra queue, send messages to both queues, receive them. --- " << endl; - ToBus tbExtra("extraTestQ"); + ToBus tbExtra("tMsgBus-extraTestQ"); tbExtra.send("Message send to extra queue"); tb.send("Message send to original queue"); - fb.addQueue("extraTestQ"); + fb.addQueue("tMsgBus-extraTestQ"); fb.getMessage(receivedMsg); fb.ack(receivedMsg); showMessage(receivedMsg.qpidMsg()); diff --git a/LCS/MessageBus/test/tMsgBus.run b/LCS/MessageBus/test/tMsgBus.run new file mode 100755 index 0000000000000000000000000000000000000000..52872de3543ef87e064670700f9bcd882843fb17 --- /dev/null +++ b/LCS/MessageBus/test/tMsgBus.run @@ -0,0 +1,8 @@ +#!/bin/bash + +source MessageFuncs.sh + +create_queue tMsgBus +create_queue tMsgBus-extraTestQ + +./tMsgBus diff --git a/LCS/MessageBus/test/tMsgBus.sh b/LCS/MessageBus/test/tMsgBus.sh new file mode 100755 index 0000000000000000000000000000000000000000..5dbd572389fbed059f2b93fe10ca70f52458f7f5 --- /dev/null +++ b/LCS/MessageBus/test/tMsgBus.sh @@ -0,0 +1,2 @@ +#!/bin/bash +./runctest.sh tMsgBus diff --git a/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.queue b/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.queue index 9d52007975f3ffa63fb1966f05a094ccd9a2460d..4ec0ed39e87645060dab7300cda59f0907848b20 100644 --- a/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.queue +++ b/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.queue @@ -3,7 +3,7 @@ <system>LOFAR</system> <version>1.0.0</version> <protocol> - <name>lofar.task.feedback.dataproducts</name> + <name>task.feedback.dataproducts</name> <version>1.0.0</version> </protocol> <source> diff --git a/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.run2.in b/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.run2.in index d6bd221dbcfcb1d8c686468a372c780ee9f3bfda..4491adeaa30cc3022f0e8a033cfc3fa41f621c29 100755 --- a/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.run2.in +++ b/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.run2.in @@ -5,20 +5,22 @@ DummyStorage 12345 0& # Run the command PID_ID=$! # get the PID +source MessageFuncs.sh + # Empty queues (if any) -if [ "@HAVE_QPID@" == "TRUE" ]; then - @QPID_RECEIVE_EXECUTABLE@ -b 127.0.0.1 -a lofar.task.feedback.dataproducts --print-content no --ignore-reply-to >/dev/null 2>&1 || true -fi +create_queue lofar.task.feedback.dataproducts # Start the test tStorageProcesses || exit 1 wait $PID_ID # Validate queues -if [ "@HAVE_QPID@" == "TRUE" ]; then - @QPID_RECEIVE_EXECUTABLE@ -b 127.0.0.1 -a lofar.task.feedback.dataproducts --ignore-reply-to > tStorageProcesses.queue || exit 1 +if have_qpid; then + recv_msg lofar.task.feedback.dataproducts > tStorageProcesses.queue - diff $srcdir/tStorageProcesses.queue tStorageProcesses.queue || exit 1 + compare_msg $srcdir/tStorageProcesses.queue tStorageProcesses.queue || exit 1 fi -exit 0 # do not exit with DymmyStorage exit. If we are waiting the test should succeed +# Signal success to parent +exit 0 + diff --git a/SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.processing b/SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.processing index 076b9f2b6eea664a0a3136e0d043e502d212c4ba..21dafb3099c1f3dcb20bc810853ade3d8f99cbec 100644 --- a/SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.processing +++ b/SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.processing @@ -3,7 +3,7 @@ <system>LOFAR</system> <version>1.0.0</version> <protocol> - <name>lofar.task.feedback.processing</name> + <name>task.feedback.processing</name> <version>1.0.0</version> </protocol> <source> diff --git a/SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.status b/SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.state similarity index 76% rename from SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.status rename to SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.state index 87c4cea6530b3fe1a8a720046e9621844fafce83..be60b26610df342f9d804c1ee123b3517b503d13 100644 --- a/SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.status +++ b/SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.state @@ -3,15 +3,15 @@ <system>LOFAR</system> <version>1.0.0</version> <protocol> - <name>lofar.task.feedback.status</name> + <name>task.feedback.state</name> <version>1.0.0</version> </protocol> <source> - <name>Cobalt/GPUProc/sendStatus</name> + <name>Cobalt/GPUProc/send_state</name> <user></user> <uuid></uuid> <timestamp></timestamp> - <summary>Status feedback</summary> + <summary>State feedback</summary> </source> <ids> <momid>0</momid> diff --git a/SubSystems/Online_Cobalt/test/runtest.sh.in b/SubSystems/Online_Cobalt/test/runtest.sh.in index 7287a02c518f6e31e7c3fef807e74286d2912742..04e5845e1ae4447d6dc566597ae5dedfda074c17 100755 --- a/SubSystems/Online_Cobalt/test/runtest.sh.in +++ b/SubSystems/Online_Cobalt/test/runtest.sh.in @@ -60,7 +60,7 @@ echo " in directory $(pwd)" done done - if [ $HAVE_QPID == "TRUE" ]; then + if have_qpid; then # validate the contents of the queues mkdir "queues" || error "Failed to create temporary directory ${OUTDIR}/queues" echo "[ -d ${PWD}/queues ] && cp -r ${PWD}/queues ${REFDIR}" >> accept_output @@ -68,8 +68,8 @@ echo " in directory $(pwd)" INCORRECT_QUEUES="" for Q in $QUEUES; do echo "Comparing output of queue $Q" - @QPID_RECEIVE_EXECUTABLE@ -b 127.0.0.1 -a $Q --ignore-reply-to > queues/$Q || error "Could not read output of queue $Q" - diff ${REFDIR}/queues/$Q queues/$Q || INCORRECT_QUEUES+="$Q " + recv_all_msgs "$Q" > queues/$Q + compare_msg ${REFDIR}/queues/$Q queues/$Q || INCORRECT_QUEUES+="$Q " done if [ "$INCORRECT_QUEUES" != "" ]; then diff --git a/SubSystems/Online_Cobalt/test/testFuncs.sh.in b/SubSystems/Online_Cobalt/test/testFuncs.sh.in index ed86afa29a6e252f6f844c7444a19051a0174b07..51e146d427b31edcbfa49d1042334e3c5b09872f 100755 --- a/SubSystems/Online_Cobalt/test/testFuncs.sh.in +++ b/SubSystems/Online_Cobalt/test/testFuncs.sh.in @@ -13,7 +13,8 @@ error() # Set LOFARROOT and other LOFAR env vars into install directory (var is always set). . "@CMAKE_INSTALL_PREFIX@/lofarinit.sh" || error "Could not load our lofarinit.sh -- did you run 'make install'?" -HAVE_QPID="@HAVE_QPID@" +# MessageFuncs.sh provides QPID functionality, and is present in the build directory +. "MessageFuncs.sh" || error "Could not load MessageFuncs.sh" # Create runtime output directories if not exists. # Not done at build, because it is a post-install setting. Different in production. @@ -26,10 +27,10 @@ export LC_ALL="C" # (if they exist, so ignore any errors regarding that) QUEUES=" - lofar.task.feedback.status + lofar.task.feedback.state lofar.task.feedback.dataproducts lofar.task.feedback.processing" for Q in $QUEUES; do - @QPID_RECEIVE_EXECUTABLE@ --print-content no --ignore-reply-to -b 127.0.0.1 -a $Q >/dev/null 2>&1 + create_queue "$Q" done