diff --git a/.gitattributes b/.gitattributes index f621ceb6653e55b240da0e3daa1e99b4e30ca8e0..1b330920c8375cc4c1a59b04890b726c51c58cd7 100644 --- a/.gitattributes +++ b/.gitattributes @@ -4123,7 +4123,9 @@ RTCP/Cobalt/GPUProc/test/Kernels/tKernelPerformance.run eol=lf RTCP/Cobalt/GPUProc/test/Kernels/tKernelPerformance.sh eol=lf RTCP/Cobalt/GPUProc/test/Kernels/visualizeBeamformer.py eol=lf RTCP/Cobalt/GPUProc/test/Pipelines/tCorrelatorPipelineProcessObs.sh eol=lf +RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.queue -text RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.run eol=lf +RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.run2.in eol=lf RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.sh eol=lf RTCP/Cobalt/GPUProc/test/SubbandProcs/plot_arrays.py eol=lf RTCP/Cobalt/GPUProc/test/SubbandProcs/tBeamFormerSubbandProcProcessSb.parset -text @@ -4594,6 +4596,8 @@ SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.outp SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/SB2.cfloat.raw -text SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/SB3.cfloat.raw -text SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/SB4.cfloat.raw -text +SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.processing -text +SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.status -text SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.parset -text SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.run eol=lf SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.sh eol=lf @@ -4613,7 +4617,6 @@ SubSystems/Online_Cobalt/test/Correlator/tCorrelate_3sec_2st_5sb.output/SB4.cflo SubSystems/Online_Cobalt/test/Correlator/tCorrelate_3sec_2st_5sb.parset -text SubSystems/Online_Cobalt/test/Correlator/tCorrelate_3sec_2st_5sb.run eol=lf SubSystems/Online_Cobalt/test/Correlator/tCorrelate_3sec_2st_5sb.sh eol=lf -SubSystems/Online_Cobalt/test/MockOnlineControl.sh eol=lf SubSystems/Online_Cobalt/test/tMACfeedback.in_parset_failure_1 -text SubSystems/Online_Cobalt/test/tMACfeedback.in_parset_success_1 -text SubSystems/Online_Cobalt/test/tMACfeedback.run eol=lf diff --git a/RTCP/Cobalt/CoInterface/src/LTAFeedback.cc b/RTCP/Cobalt/CoInterface/src/LTAFeedback.cc index 8928df3d4e4e513b45f52b5e072bf82107ff95da..7396924d513ac9e7122b7269aaa00f3f6103e251 100644 --- a/RTCP/Cobalt/CoInterface/src/LTAFeedback.cc +++ b/RTCP/Cobalt/CoInterface/src/LTAFeedback.cc @@ -192,7 +192,7 @@ namespace LOFAR } - ParameterSet LTAFeedback::allFeedback() const + ParameterSet LTAFeedback::processingFeedback() const { ParameterSet ps; @@ -209,11 +209,6 @@ namespace LOFAR str(format("%u") % settings.correlator.nrChannels)); ps.add("Observation.Correlator.channelWidth", str(format("%.16g") % (settings.subbandWidth() / settings.correlator.nrChannels))); - - // add the feedback for the individual files - for (size_t i = 0; i < settings.correlator.files.size(); ++i) { - ps.adoptCollection(correlatedFeedback(i)); - } } ps.add("Observation.DataProducts.nrOfOutput_Beamformed_", @@ -314,9 +309,25 @@ namespace LOFAR ps.add("Observation.IncoherentStokes.stationList", settings.rawStationList); } + + return ps; + } + + + ParameterSet LTAFeedback::allFeedback() const + { + ParameterSet ps; + + ps.adoptCollection(processingFeedback()); + + // add the feedback for the individual files + if (settings.correlator.enabled) { + for (size_t i = 0; i < settings.correlator.files.size(); ++i) { + ps.adoptCollection(correlatedFeedback(i)); + } + } if (settings.beamFormer.enabled) { - // add the feedback for the individual files for (size_t i = 0; i < settings.beamFormer.files.size(); ++i) { ps.adoptCollection(beamFormedFeedback(i)); } diff --git a/RTCP/Cobalt/CoInterface/src/LTAFeedback.h b/RTCP/Cobalt/CoInterface/src/LTAFeedback.h index 3fbe662591407bc996a41a0bea3f388f58bc1a80..e8be4dd8d389581c12d45bc4eb991ddd247f6ab6 100644 --- a/RTCP/Cobalt/CoInterface/src/LTAFeedback.h +++ b/RTCP/Cobalt/CoInterface/src/LTAFeedback.h @@ -45,6 +45,14 @@ namespace LOFAR ParameterSet correlatedFeedback(size_t fileno) const; ParameterSet beamFormedFeedback(size_t fileno) const; + // Return the generic processing LTA feedback parameters. + // (Non data-product specific). + // \note Details about the meaning of the different meta-data parameters + // can be found in the XSD that describes the Submission Information + // Package (sip) for the LTA. + // \see http://proposal.astron.nl/schemas/LTA-SIP.xsd + ParameterSet processingFeedback() const; + // Return the LTA feedback parameters. // \note Details about the meaning of the different meta-data parameters // can be found in the XSD that describes the Submission Information diff --git a/RTCP/Cobalt/GPUProc/CMakeLists.txt b/RTCP/Cobalt/GPUProc/CMakeLists.txt index 6b420ab50d288c5be17f15fd2a571887f6e4e4a1..3a8a7c8ad69c4f4ce5cf3b79a5c3502cdaccdadb 100644 --- a/RTCP/Cobalt/GPUProc/CMakeLists.txt +++ b/RTCP/Cobalt/GPUProc/CMakeLists.txt @@ -2,7 +2,7 @@ # Handle options USE_CUDA and USE_OPENCL. if(USE_CUDA AND NOT USE_OPENCL) - set(_gpuproc_deps Common Stream ApplCommon CoInterface InputProc MACIO BrokenAntennaInfo) + set(_gpuproc_deps Common Stream ApplCommon CoInterface InputProc MACIO BrokenAntennaInfo MessageBus) lofar_find_package(CUDA 4.1 REQUIRED) lofar_find_package(CUDADriver REQUIRED) @@ -34,7 +34,7 @@ if(USE_CUDA AND NOT USE_OPENCL) endif() elseif(USE_OPENCL AND NOT USE_CUDA) - set(_gpuproc_deps Common Stream ApplCommon CoInterface InputProc MACIO BrokenAntennaInfo OpenCL_FFT) + set(_gpuproc_deps Common Stream ApplCommon CoInterface InputProc MACIO BrokenAntennaInfo OpenCL_FFT MessageBus) lofar_find_package(OpenCL REQUIRED) add_definitions(-DUSE_OPENCL) else() diff --git a/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/MAC-feedback.parset b/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/MAC-feedback.parset deleted file mode 100644 index 14e6a2cedc84950869b3e4829e7c993c85a2e2c9..0000000000000000000000000000000000000000 --- a/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/MAC-feedback.parset +++ /dev/null @@ -1,15 +0,0 @@ -# $Id$ - -# The host that is running OnlineControl -# The feedback port is derived through -# 21000 + (Observation.ObsID % 1000) -Cobalt.Feedback.host = ccu001 - -# The remote directory in which the LTA feedback needs to be stored -Cobalt.Feedback.remotePath = /opt/lofar/var/run - -# The host that is running PVSSGateway. -# If empty, data points are never sent. -# One can also start a PVSSGateway_Stub -# on localhost, which writes to a file. -Cobalt.PVSSGateway.host = ccu001 diff --git a/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/PVSS-feedback.parset b/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/PVSS-feedback.parset new file mode 100644 index 0000000000000000000000000000000000000000..4e062233e380cd0bc57b388f0ecec42bf1ccfe33 --- /dev/null +++ b/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/PVSS-feedback.parset @@ -0,0 +1,7 @@ +# $Id: MAC-feedback.parset 30525 2014-11-28 12:21:15Z mol $ + +# The host that is running PVSSGateway. +# If empty, data points are never sent. +# One can also start a PVSSGateway_Stub +# on localhost, which writes to a file. +Cobalt.PVSSGateway.host = ccu001 diff --git a/RTCP/Cobalt/GPUProc/src/CMakeLists.txt b/RTCP/Cobalt/GPUProc/src/CMakeLists.txt index b511d42b9b92e42c54929bff6b07a00bb52308a6..62d944e74d3f8553863ed907fa51425084417b53 100644 --- a/RTCP/Cobalt/GPUProc/src/CMakeLists.txt +++ b/RTCP/Cobalt/GPUProc/src/CMakeLists.txt @@ -119,6 +119,7 @@ endif() lofar_add_bin_program(mpi_node_list Station/mpi_node_list.cc) lofar_add_bin_program(station_stream Station/station_stream.cc) +lofar_add_bin_program(send_status send_status.cc) # install scripts used to run an observation under bin install(PROGRAMS diff --git a/RTCP/Cobalt/GPUProc/src/Storage/StorageProcess.cc b/RTCP/Cobalt/GPUProc/src/Storage/StorageProcess.cc index 5b602f491460b70ca486630f8d6e87c5529bb75d..7f756ff25ba1901a035d7ec06398ddf62602ea7d 100644 --- a/RTCP/Cobalt/GPUProc/src/Storage/StorageProcess.cc +++ b/RTCP/Cobalt/GPUProc/src/Storage/StorageProcess.cc @@ -28,8 +28,11 @@ #include <Common/LofarLogger.h> #include <Common/Thread/Thread.h> +#include <MessageBus/MsgBus.h> +#include <MessageBus/Protocols/TaskFeedbackDataproducts.h> #include <Stream/PortBroker.h> #include <CoInterface/Stream.h> +#include <CoInterface/LTAFeedback.h> namespace LOFAR { @@ -45,7 +48,9 @@ namespace LOFAR itsParset(parset), itsLogPrefix(str(boost::format("%s [StorageWriter rank %2d host %s] ") % logPrefix % rank % hostname)), itsRank(rank), - itsHostname(hostname) + itsHostname(hostname), + itsSentFeedback(false), + itsSuccessful(false) { } @@ -57,6 +62,42 @@ namespace LOFAR // stop immediately struct timespec immediately = { 0, 0 }; stop(immediately); + + if (!itsSentFeedback) { + // send default LTA feedback for this host + ToBus bus("lofar.task.feedback.dataproducts"); + + const std::string myName = "Cobalt/GPUProc/Storage/StorageProcess"; + + LTAFeedback feedback(itsParset.settings); + + if (itsParset.settings.correlator.enabled) + for (size_t i = 0; i < itsParset.settings.correlator.files.size(); ++i) { + LOG_INFO_STR(itsParset.settings.correlator.files[i].location.host << " == " << itsHostname); + + if (itsParset.settings.correlator.files[i].location.host == itsHostname) { + Protocols::TaskFeedbackDataproducts msg( + myName, + "", + str(boost::format("Feedback for Correlated Data, subband %s") % i), + feedback.correlatedFeedback(i)); + + bus.send(msg); + } + } + + if (itsParset.settings.beamFormer.enabled) + for (size_t i = 0; i < itsParset.settings.beamFormer.files.size(); ++i) + if (itsParset.settings.beamFormer.files[i].location.host == itsHostname) { + Protocols::TaskFeedbackDataproducts msg( + myName, + "", + str(boost::format("Feedback for Beamformed Data, file nr %s") % i), + feedback.beamFormedFeedback(i)); + + bus.send(msg); + } + } } @@ -82,6 +123,12 @@ namespace LOFAR } + bool StorageProcess::isSuccesful() const + { + return itsSuccessful; + } + + bool StorageProcess::isDone() const { return itsThread->isDone(); @@ -95,15 +142,6 @@ namespace LOFAR } - ParameterSet StorageProcess::feedbackLTA() const - { - // Prevent read/write conflicts - ASSERT(isDone()); - - return itsFeedbackLTA; - } - - void StorageProcess::controlThread() { // Connect control stream @@ -123,12 +161,11 @@ namespace LOFAR itsFinalMetaData.write(stream); LOG_DEBUG_STR(itsLogPrefix << "[ControlThread] sent final meta data"); - // Wait for LTA feedback - LOG_DEBUG_STR(itsLogPrefix << "[ControlThread] reading LTA feedback"); - ParameterSet feedbackLTA; - readParameterSet(stream, feedbackLTA); - itsFeedbackLTA.adoptCollection(feedbackLTA); - LOG_DEBUG_STR(itsLogPrefix << "[ControlThread] read LTA feedback"); + // Wait for OutputProc to finish + LOG_DEBUG_STR(itsLogPrefix << "[ControlThread] waiting to finish"); + stream.read(&itsSentFeedback, sizeof itsSentFeedback); + stream.read(&itsSuccessful, sizeof itsSuccessful); + LOG_DEBUG_STR(itsLogPrefix << "[ControlThread] finished"); } } diff --git a/RTCP/Cobalt/GPUProc/src/Storage/StorageProcess.h b/RTCP/Cobalt/GPUProc/src/Storage/StorageProcess.h index f982196a70cd7b318984aac26cc08f2ebe6c3b89..a6e49c8429a9e9d48739861f26eab0b678568c0f 100644 --- a/RTCP/Cobalt/GPUProc/src/Storage/StorageProcess.h +++ b/RTCP/Cobalt/GPUProc/src/Storage/StorageProcess.h @@ -67,11 +67,9 @@ namespace LOFAR void start(); void stop( struct timespec deadline ); - bool isDone() const; - // Returns feedback for the LTA -- only access this once the - // StorageProcess has finished! - ParameterSet feedbackLTA() const; + bool isSuccesful() const; // return whether communication with OutputProc went perfect + bool isDone() const; void setFinalMetaData( const FinalMetaData &finalMetaData ); @@ -84,11 +82,12 @@ namespace LOFAR const int itsRank; const std::string itsHostname; + bool itsSentFeedback; + bool itsSuccessful; + FinalMetaData itsFinalMetaData; Semaphore itsFinalMetaDataAvailable; - ParameterSet itsFeedbackLTA; - SmartPtr<Thread> itsThread; }; diff --git a/RTCP/Cobalt/GPUProc/src/Storage/StorageProcesses.cc b/RTCP/Cobalt/GPUProc/src/Storage/StorageProcesses.cc index 402e43a5b95babbe6cb4323121fbbff80d8357b0..148c18dc8cf6bac77f56f03d69811122da6fed1b 100644 --- a/RTCP/Cobalt/GPUProc/src/Storage/StorageProcesses.cc +++ b/RTCP/Cobalt/GPUProc/src/Storage/StorageProcesses.cc @@ -58,12 +58,6 @@ namespace LOFAR } - ParameterSet StorageProcesses::feedbackLTA() const - { - return itsFeedbackLTA; - } - - void StorageProcesses::start() { const vector<string> &hostnames = itsParset.settings.outputProcHosts; @@ -91,9 +85,6 @@ namespace LOFAR // stop storage process itsStorageProcesses[rank]->stop(deadline_ts); - // obtain feedback for LTA - itsFeedbackLTA.adoptCollection(itsStorageProcesses[rank]->feedbackLTA()); - // free the StorageProcess object itsStorageProcesses[rank] = 0; } diff --git a/RTCP/Cobalt/GPUProc/src/Storage/StorageProcesses.h b/RTCP/Cobalt/GPUProc/src/Storage/StorageProcesses.h index 04151848e90d6697d6297fb44e06c1136cf15f63..2e59471ed5cbec36b60917a4f8ef265dc33a7fde 100644 --- a/RTCP/Cobalt/GPUProc/src/Storage/StorageProcesses.h +++ b/RTCP/Cobalt/GPUProc/src/Storage/StorageProcesses.h @@ -80,17 +80,12 @@ namespace LOFAR // stop the processes and control threads, given an absolute time out. void stop( time_t deadline ); - ParameterSet feedbackLTA() const; - private: const Parset &itsParset; const std::string itsLogPrefix; std::vector<SmartPtr<StorageProcess> > itsStorageProcesses; - // All feedback for the LTA obtained by the storage processes - ParameterSet itsFeedbackLTA; - // start the processes and control threads void start(); diff --git a/RTCP/Cobalt/GPUProc/src/rtcp.cc b/RTCP/Cobalt/GPUProc/src/rtcp.cc index ee93600810e45156acec705f623fa00114cc9f95..cd296975df3dcf600dcbf6ae6db9f761b9502c71 100644 --- a/RTCP/Cobalt/GPUProc/src/rtcp.cc +++ b/RTCP/Cobalt/GPUProc/src/rtcp.cc @@ -52,6 +52,8 @@ #include <Common/SystemUtil.h> #include <Common/StringUtil.h> #include <Common/Thread/Trigger.h> +#include <MessageBus/MsgBus.h> +#include <MessageBus/Protocols/TaskFeedbackProcessing.h> #include <ApplCommon/PVSSDatapointDefs.h> #include <ApplCommon/StationInfo.h> #include <MACIO/RTmetadata.h> @@ -593,31 +595,18 @@ int main(int argc, char **argv) // graceful exit storageProcesses->stop(time(0) + outputProcTimeout); - LOG_INFO("Writing LTA feedback to disk"); + // send processing feedback + ToBus bus("lofar.task.feedback.processing"); - // obtain LTA feedback LTAFeedback fb(ps.settings); - Parset feedbackLTA; - // augment LTA feedback with global information - feedbackLTA.adoptCollection(fb.allFeedback()); + Protocols::TaskFeedbackProcessing msg( + "Cobalt/GPUProc/rtcp", + "", + "Processing feedback", + fb.processingFeedback()); - // process updates from outputProc - feedbackLTA.adoptCollection(storageProcesses->feedbackLTA()); - - // write LTA feedback to disk - const char *LOFARROOT = getenv("LOFARROOT"); - if (LOFARROOT != NULL) { - string feedbackFilename = str(format("%s/var/run/Observation%s_feedback") % LOFARROOT % ps.settings.observationID); - - try { - feedbackLTA.writeFile(feedbackFilename, false); - } catch (APSException &ex) { - LOG_ERROR_STR("Could not write feedback file " << feedbackFilename << ": " << ex); - } - } else { - LOG_WARN("Could not write feedback file: $LOFARROOT not set."); - } + bus.send(msg); // final cleanup storageProcesses = 0; diff --git a/RTCP/Cobalt/GPUProc/src/scripts/runObservation.sh b/RTCP/Cobalt/GPUProc/src/scripts/runObservation.sh index d903cebebd7e4095958fa25e1d7f6baff2536ab1..9686496c82cbc91c2f64d179c449d845aadbdf37 100755 --- a/RTCP/Cobalt/GPUProc/src/scripts/runObservation.sh +++ b/RTCP/Cobalt/GPUProc/src/scripts/runObservation.sh @@ -51,7 +51,7 @@ function usage { "\n -B: do NOT add broken antenna information"\ "\n -C: run with check tool specified in environment variable"\ "LOFAR_CHECKTOOL"\ - "\n -F: do NOT send feedback to OnlineControl and do NOT send data points to a PVSS gateway"\ + "\n -F: do NOT send data points to a PVSS gateway"\ "\n -P: create PID file"\ "\n -l: run solely on localhost using 'nprocs' MPI processes (isolated test)"\ "\n -p: enable profiling" \ @@ -85,7 +85,7 @@ function command_retry { done } -# Send the result status back to OnlineControl. +# Send the result status back to LOFAR (MAC, MoM) # # to report success: # sendback_status 0 @@ -94,55 +94,17 @@ function command_retry { function sendback_status { OBSRESULT="$1" - if [ -z "$PARSET" ] + if [ $OBSRESULT -eq 0 ] then - echo "Not communicating back to OnlineControl (no parset)" - return 0 - fi - - if [ "$ONLINECONTROL_FEEDBACK" -eq "0" ] - then - echo "Not communicating back to OnlineControl (disabled on command line)" - return 0 + echo "Signalling success" + STATUS=0 + else + # ***** Observation or sending feedback failed for some reason + echo "Signalling failure" + STATUS=1 fi - if [ "$ONLINECONTROL_FEEDBACK" -eq "1" ] - then - ONLINECONTROL_USER=`getkey Cobalt.Feedback.userName $USER` - ONLINECONTROL_HOST=`getkey Cobalt.Feedback.host` - - if [ $OBSRESULT -eq 0 ] - then - # ***** Observation ran successfully - - # Copy LTA feedback file to ccu001 - FEEDBACK_DEST="$ONLINECONTROL_USER@$ONLINECONTROL_HOST:`getkey Cobalt.Feedback.remotePath`" - - echo "Copying feedback to $FEEDBACK_DEST" - timeout $KILLOPT 30s scp "$FEEDBACK_FILE" "$FEEDBACK_DEST" - FEEDBACK_RESULT=$? - if [ $FEEDBACK_RESULT -ne 0 ] - then - echo "Failed to copy file $FEEDBACK_FILE to $FEEDBACK_DEST (status: $FEEDBACK_RESULT)" - OBSRESULT=$FEEDBACK_RESULT - fi - fi - - # Communicate result back to OnlineControl - ONLINECONTROL_RESULT_PORT=$((21000 + $OBSID % 1000)) - - if [ $OBSRESULT -eq 0 ] - then - # Signal success to OnlineControl - echo "Signalling success to $ONLINECONTROL_HOST" - echo -n "FINISHED" > /dev/tcp/$ONLINECONTROL_HOST/$ONLINECONTROL_RESULT_PORT - else - # ***** Observation or sending feedback failed for some reason - # Signal failure to OnlineControl - echo "Signalling failure to $ONLINECONTROL_HOST" - echo -n "ABORT" > /dev/tcp/$ONLINECONTROL_HOST/$ONLINECONTROL_RESULT_PORT - fi - fi + send_status $STATUS return 1 } @@ -155,8 +117,8 @@ echo "Called as: $0 $@" # Set default options # ****************************** -# Provide feedback to OnlineControl and data points to PVSS? -ONLINECONTROL_FEEDBACK=1 +# Provide data points to PVSS? +STATUS_FEEDBACK=1 # Augment the parset with etc/parset-additions.d/* ? AUGMENT_PARSET=1 @@ -192,7 +154,7 @@ while getopts ":ABCFP:l:o:p" opt; do ;; C) CHECK_TOOL="$LOFAR_CHECKTOOL" ;; - F) ONLINECONTROL_FEEDBACK=0 + F) STATUS_FEEDBACK=0 ;; P) PIDFILE="$OPTARG" ;; @@ -295,8 +257,6 @@ then setkey Cobalt.OutputProc.executable "$LOFARROOT/bin/outputProc" setkey Cobalt.OutputProc.StaticMetaDataDirectory "$LOFARROOT/etc" setkey Cobalt.FinalMetaDataGatherer.database.host localhost - setkey Cobalt.Feedback.host localhost - setkey Cobalt.Feedback.remotePath "$LOFARROOT/var/run" setkey Cobalt.PVSSGateway.host "" # Redirect UDP/TCP input streams to any interface on the local machine @@ -309,7 +269,7 @@ then setkey Cobalt.FinalMetaDataGatherer.enabled false fi - if [ "$ONLINECONTROL_FEEDBACK" -eq "0" ] + if [ "$STATUS_FEEDBACK" -eq "0" ] then setkey Cobalt.PVSSGateway.host "" fi diff --git a/RTCP/Cobalt/GPUProc/src/send_status.cc b/RTCP/Cobalt/GPUProc/src/send_status.cc new file mode 100644 index 0000000000000000000000000000000000000000..f49e86880e2d9753dbaabf32c263ebab4711b295 --- /dev/null +++ b/RTCP/Cobalt/GPUProc/src/send_status.cc @@ -0,0 +1,82 @@ +//# send_status.cc: Send lofar.task.feedback.status information +//# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy) +//# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +//# +//# This file is part of the LOFAR software suite. +//# The LOFAR software suite is free software: you can redistribute it and/or +//# modify it under the terms of the GNU General Public License as published +//# by the Free Software Foundation, either version 3 of the License, or +//# (at your option) any later version. +//# +//# The LOFAR software suite is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License along +//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +//# +//# $Id$ + +#include <lofar_config.h> + +#include <MessageBus/MsgBus.h> +#include <MessageBus/Protocols/TaskFeedbackStatus.h> + +#include <boost/format.hpp> + +#include <unistd.h> + +using namespace LOFAR; +using namespace std; +using boost::format; + +static void usage(const char *argv0) +{ + cerr << "Usage: " << argv0 << " status" << endl; + cerr << endl; + cerr << " -h: print this message" << endl; +} + +int main(int argc, char **argv) +{ + /* + * Parse command-line options + */ + + int opt; + while ((opt = getopt(argc, argv, "h")) != -1) { + switch (opt) { + + case 'h': + usage(argv[0]); + return EXIT_SUCCESS; + + default: /* '?' */ + usage(argv[0]); + return EXIT_FAILURE; + } + } + + // we expect a parset filename as an additional parameter + if (optind >= argc) { + usage(argv[0]); + return EXIT_FAILURE; + } + + int status = atoi(argv[optind]); + + // send status feedback + ToBus bus("lofar.task.feedback.status"); + + Protocols::TaskFeedbackStatus msg( + "Cobalt/GPUProc/sendStatus", + "", + "Status feedback", + status); + + bus.send(msg); + + return 0; +} + diff --git a/RTCP/Cobalt/GPUProc/test/Storage/CMakeLists.txt b/RTCP/Cobalt/GPUProc/test/Storage/CMakeLists.txt index d27b53f7a22f1cf7738edb2441c6762c2de6dd1a..a01cc37bebb1f2974e6ef4e4341bd022715b6708 100644 --- a/RTCP/Cobalt/GPUProc/test/Storage/CMakeLists.txt +++ b/RTCP/Cobalt/GPUProc/test/Storage/CMakeLists.txt @@ -2,6 +2,10 @@ include(LofarCTest) +configure_file( + ${CMAKE_CURRENT_SOURCE_DIR}/tStorageProcesses.run2.in + ${CMAKE_CURRENT_BINARY_DIR}/tStorageProcesses.run2 @ONLY) + # DummyStorage is started by tStorageProcesses to emulate an OutputProc process lofar_add_executable(DummyStorage DummyStorage.cc) lofar_add_test(tStorageProcesses tStorageProcesses.cc DEPENDS DummyStorage) diff --git a/RTCP/Cobalt/GPUProc/test/Storage/DummyStorage.cc b/RTCP/Cobalt/GPUProc/test/Storage/DummyStorage.cc index 7a8a1307b47f9050ce5b7fd59cf93dc17f3d8072..b246f19a15b8d111155473f90823f2637b97119d 100644 --- a/RTCP/Cobalt/GPUProc/test/Storage/DummyStorage.cc +++ b/RTCP/Cobalt/GPUProc/test/Storage/DummyStorage.cc @@ -43,10 +43,6 @@ unsigned rank; Exception::TerminateHandler th(Exception::terminate); -FinalMetaData origFinalMetaData; - -Mutex logMutex; - void emulateStorage() { // establish control connection @@ -55,52 +51,22 @@ void emulateStorage() // read and print parset Parset parset(&stream); - { - ScopedLock sl(logMutex); - cout << "Storage: Parset received." << endl; - } + cout << "Storage: Parset received." << endl; // read and print meta data FinalMetaData finalMetaData; finalMetaData.read(stream); - { - ScopedLock sl(logMutex); - - ASSERT(finalMetaData.brokenRCUsAtBegin == origFinalMetaData.brokenRCUsAtBegin); - ASSERT(finalMetaData.brokenRCUsDuring == origFinalMetaData.brokenRCUsDuring); - - cout << "Storage: FinalMetaData received and matches." << endl; - } - - // write LTA feedback - Parset feedbackLTA; - feedbackLTA.add("foo", "bar"); - feedbackLTA.write(&stream); -} -void emulateFinalMetaDataGatherer() -{ - // establish control connection - string resource = getStorageControlDescription(observationID, -1); - PortBroker::ServerStream stream(resource); - - // read and print parset - Parset parset(&stream); - { - ScopedLock sl(logMutex); - cout << "FinalMetaDataGatherer: Parset received." << endl; - } + //ASSERT(finalMetaData.brokenRCUsAtBegin == origFinalMetaData.brokenRCUsAtBegin); + //ASSERT(finalMetaData.brokenRCUsDuring == origFinalMetaData.brokenRCUsDuring); - // set and write meta data - origFinalMetaData.brokenRCUsAtBegin.push_back( FinalMetaData::BrokenRCU("CS001", "LBA", 2, "2012-01-01 12:34") ); - origFinalMetaData.brokenRCUsAtBegin.push_back( FinalMetaData::BrokenRCU("RS205", "HBA", 1, "2012-01-01 12:34") ); - origFinalMetaData.brokenRCUsDuring.push_back( FinalMetaData::BrokenRCU("DE601", "RCU", 3, "2012-01-01 12:34") ); - origFinalMetaData.write(stream); + cout << "Storage: FinalMetaData received and matches." << endl; - { - ScopedLock sl(logMutex); - cout << "FinalMetaDataGatherer: FinalMetaData sent." << endl; - } + // write completion signal + bool sentFeedback = false; + stream.write(&sentFeedback, sizeof sentFeedback); + bool success = true; + stream.write(&success, sizeof success); } int main(int argc, char **argv) @@ -118,7 +84,7 @@ int main(int argc, char **argv) // Make sure DummyStorage always dies, even if the test // malfunctions. - alarm(10); + alarm(20); observationID = boost::lexical_cast<int>(argv[1]); rank = boost::lexical_cast<unsigned>(argv[2]); @@ -126,21 +92,10 @@ int main(int argc, char **argv) // set up broker server PortBroker::createInstance(storageBrokerPort(observationID)); -#pragma omp parallel sections - { -# pragma omp section - try { - emulateStorage(); - } catch (Exception &ex) { - cout << "Storage caught exception: " << ex << endl; - } - -# pragma omp section - try { - emulateFinalMetaDataGatherer(); - } catch (Exception &ex) { - cout << "FinalMetaDataGatherer caught exception: " << ex << endl; - } + try { + emulateStorage(); + } catch (Exception &ex) { + cout << "Storage caught exception: " << ex << endl; } } diff --git a/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.cc b/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.cc index 306ba933a95bcfead49498c0f89b84f422931259..0c1d9870b511cfd5362238f80f8969a07d620f3c 100644 --- a/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.cc +++ b/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.cc @@ -56,6 +56,8 @@ void test_protocol() THROW_SYSCALL("getcwd"); p.add("Observation.ObsID", "12345"); + p.add("Observation.startTime", "2015-01-01 00:00:00"); + p.add("Observation.stopTime", "2015-01-01 00:01:00"); p.add("Cobalt.OutputProc.userName", USER); p.add("Cobalt.OutputProc.sshPublicKey", pubkey); p.add("Cobalt.OutputProc.sshPrivateKey", privkey); @@ -90,11 +92,6 @@ void test_protocol() // Give 10 seconds to wrap up sp.stop(time(0) + 10); - - // Obtain LTA feedback - ParameterSet feedbackLTA(sp.feedbackLTA()); - - ASSERT(feedbackLTA.getString("foo","") == "bar"); } } diff --git a/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.queue b/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.queue new file mode 100644 index 0000000000000000000000000000000000000000..49ab095c9f38a39e8b9712e0b5ce4945bcceac52 --- /dev/null +++ b/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.queue @@ -0,0 +1,34 @@ +<message> + <header> + <system>LOFAR</system> + <version>1.0.0</version> + <service> + <name>lofar.task.feedback.dataproducts</name> + <version>1.0.0</version> + </service> + <request> + <source>lofar.task.feedback.dataproducts</source> + <user></user> + <uuid></uuid> + <timestamp></timestamp> + <summary>Feedback for Correlated Data, subband 0</summary> + </request> + </header> + <payload> +Observation.DataProducts.Output_Correlated_[0].SAP=0 +Observation.DataProducts.Output_Correlated_[0].centralFrequency=0.000000 +Observation.DataProducts.Output_Correlated_[0].channelWidth=3051.757812 +Observation.DataProducts.Output_Correlated_[0].channelsPerSubband=64 +Observation.DataProducts.Output_Correlated_[0].duration=0 +Observation.DataProducts.Output_Correlated_[0].fileFormat=AIPS++/CASA +Observation.DataProducts.Output_Correlated_[0].filename=SB0.MS +Observation.DataProducts.Output_Correlated_[0].integrationInterval=1.006633 +Observation.DataProducts.Output_Correlated_[0].location=localhost:. +Observation.DataProducts.Output_Correlated_[0].percentageWritten=0 +Observation.DataProducts.Output_Correlated_[0].size=0 +Observation.DataProducts.Output_Correlated_[0].startTime=2015-01-01 00:00:00 +Observation.DataProducts.Output_Correlated_[0].stationSubband=0 +Observation.DataProducts.Output_Correlated_[0].subband=0 + + </payload> +</message> diff --git a/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.run b/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.run index 474574333959f95d9e89aaee548627a21e85faea..855a8c12020b14cd2cc2c5e717edbf8c240ffe1b 100755 --- a/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.run +++ b/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.run @@ -1,8 +1,4 @@ #!/bin/bash -# -DummyStorage 12345 0& # Run the command -PID_ID=$! # get the PID -tStorageProcesses || exit 1 -wait $PID_ID -exit 0 # do not exit with DymmyStorage exit. If we are waiting the test should succeed +# Defer to the .run2, which is constructed by CMake but won't be cleaned up by assay +bash ./tStorageProcesses.run2 diff --git a/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.run2.in b/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.run2.in new file mode 100755 index 0000000000000000000000000000000000000000..d6bd221dbcfcb1d8c686468a372c780ee9f3bfda --- /dev/null +++ b/RTCP/Cobalt/GPUProc/test/Storage/tStorageProcesses.run2.in @@ -0,0 +1,24 @@ +#!/bin/bash +# + +# Run a program to talk to +DummyStorage 12345 0& # Run the command +PID_ID=$! # get the PID + +# Empty queues (if any) +if [ "@HAVE_QPID@" == "TRUE" ]; then + @QPID_RECEIVE_EXECUTABLE@ -b 127.0.0.1 -a lofar.task.feedback.dataproducts --print-content no --ignore-reply-to >/dev/null 2>&1 || true +fi + +# Start the test +tStorageProcesses || exit 1 +wait $PID_ID + +# Validate queues +if [ "@HAVE_QPID@" == "TRUE" ]; then + @QPID_RECEIVE_EXECUTABLE@ -b 127.0.0.1 -a lofar.task.feedback.dataproducts --ignore-reply-to > tStorageProcesses.queue || exit 1 + + diff $srcdir/tStorageProcesses.queue tStorageProcesses.queue || exit 1 +fi + +exit 0 # do not exit with DymmyStorage exit. If we are waiting the test should succeed diff --git a/RTCP/Cobalt/OutputProc/CMakeLists.txt b/RTCP/Cobalt/OutputProc/CMakeLists.txt index 2aed652557937b472bcd4b9640f6ccb4b1a73650..d12b4addfe84de38b51621ed4e1b1c37c73bf75e 100644 --- a/RTCP/Cobalt/OutputProc/CMakeLists.txt +++ b/RTCP/Cobalt/OutputProc/CMakeLists.txt @@ -1,6 +1,6 @@ # $Id$ -lofar_package(OutputProc 1.0 DEPENDS Common Stream CoInterface MSLofar LofarStMan MACIO) +lofar_package(OutputProc 1.0 DEPENDS Common Stream CoInterface MSLofar LofarStMan MACIO MessageBus) include(LofarFindPackage) lofar_find_package(Casacore COMPONENTS casa ms tables REQUIRED) diff --git a/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc b/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc index d3c6720f26bb3c7d86b60c91a81f411f88608ffc..514359372ea68d92d19766da3be918e61e8e2b8f 100644 --- a/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc +++ b/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc @@ -32,6 +32,8 @@ #include <Common/LofarLogger.h> #include <Common/StringUtil.h> #include <Common/Exceptions.h> +#include <MessageBus/MsgBus.h> +#include <MessageBus/Protocols/TaskFeedbackDataproducts.h> #include <Stream/PortBroker.h> #include <ApplCommon/PVSSDatapointDefs.h> #include <ApplCommon/StationInfo.h> @@ -249,22 +251,41 @@ bool process(Stream &controlStream, unsigned myRank) * LTA FEEDBACK */ - LOG_DEBUG_STR("Retrieving LTA feedback"); - Parset feedbackLTA; + LOG_DEBUG_STR("Forwarding LTA feedback"); - for (size_t i = 0; i < subbandWriters.size(); ++i) - feedbackLTA.adoptCollection(subbandWriters[i]->feedbackLTA()); - for (size_t i = 0; i < tabWriters.size(); ++i) - feedbackLTA.adoptCollection(tabWriters[i]->feedbackLTA()); + ToBus bus("lofar.task.feedback.dataproducts"); - LOG_DEBUG_STR("Forwarding LTA feedback"); - try { - feedbackLTA.write(&controlStream); - } catch (LOFAR::Exception &err) { - success = false; - LOG_ERROR_STR("Failed to forward LTA feedback information: " << err); + const std::string myName = str(boost::format("Cobalt/OutputProc on %s") % myHostName); + + for (size_t i = 0; i < subbandWriters.size(); ++i) { + Protocols::TaskFeedbackDataproducts msg( + myName, + "", + str(boost::format("Feedback for Correlated Data, subband %s") % subbandWriters[i]->streamNr()), + subbandWriters[i]->feedbackLTA()); + + bus.send(msg); + } + + for (size_t i = 0; i < tabWriters.size(); ++i) { + Protocols::TaskFeedbackDataproducts msg( + myName, + "", + str(boost::format("Feedback for Beamformed Data, file nr %s") % tabWriters[i]->streamNr()), + tabWriters[i]->feedbackLTA()); + + bus.send(msg); } + /* + * SIGN OFF + */ + + bool sentFeedback = true; + + controlStream.write(&sentFeedback, sizeof sentFeedback); + controlStream.write(&success, sizeof success); + return success; } } diff --git a/SubSystems/Offline/CMakeLists.txt b/SubSystems/Offline/CMakeLists.txt index f11dad6519c8ead9b279515f1a33952fb0577b33..f11418570ff5891292fe2deb87bfe63fa7004afe 100644 --- a/SubSystems/Offline/CMakeLists.txt +++ b/SubSystems/Offline/CMakeLists.txt @@ -1,4 +1,4 @@ # $Id$ -lofar_package(Offline DEPENDS CEP MSLofar StaticMetaData) +lofar_package(Offline DEPENDS CEP MSLofar StaticMetaData MessageBus) diff --git a/SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.processing b/SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.processing new file mode 100644 index 0000000000000000000000000000000000000000..93520711799b47d6709d90e8dc141740043f7dd6 --- /dev/null +++ b/SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.processing @@ -0,0 +1,26 @@ +<message> + <header> + <system>LOFAR</system> + <version>1.0.0</version> + <service> + <name>lofar.task.feedback.processing</name> + <version>1.0.0</version> + </service> + <request> + <source>Cobalt/GPUProc/rtcp</source> + <user></user> + <uuid></uuid> + <timestamp></timestamp> + <summary>Processing feedback</summary> + </request> + </header> + <payload> +Observation.Correlator.channelWidth=12207.03125 +Observation.Correlator.channelsPerSubband=16 +Observation.Correlator.integrationInterval=0.25165824 +Observation.DataProducts.nrOfOutput_Beamformed_=0 +Observation.DataProducts.nrOfOutput_Correlated_=5 +_isCobalt=T + + </payload> +</message> diff --git a/SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.status b/SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.status new file mode 100644 index 0000000000000000000000000000000000000000..b926865d7c45e60f2cf81abb4c7b3d16840d90a2 --- /dev/null +++ b/SubSystems/Online_Cobalt/test/Correlator/tCorrelate_1sec_1st_5sb_noflagging.output/queues/lofar.task.feedback.status @@ -0,0 +1,40 @@ +<message> + <header> + <system>LOFAR</system> + <version>1.0.0</version> + <service> + <name>lofar.task.feedback.status</name> + <version>1.0.0</version> + </service> + <request> + <source>Cobalt/GPUProc/sendStatus</source> + <user></user> + <uuid></uuid> + <timestamp></timestamp> + <summary>Status feedback</summary> + </request> + </header> + <payload> +<task>ABORT</task> + </payload> +</message> +<message> + <header> + <system>LOFAR</system> + <version>1.0.0</version> + <service> + <name>lofar.task.feedback.status</name> + <version>1.0.0</version> + </service> + <request> + <source>Cobalt/GPUProc/sendStatus</source> + <user></user> + <uuid></uuid> + <timestamp></timestamp> + <summary>Status feedback</summary> + </request> + </header> + <payload> +<task>ABORT</task> + </payload> +</message> diff --git a/SubSystems/Online_Cobalt/test/MockOnlineControl.sh b/SubSystems/Online_Cobalt/test/MockOnlineControl.sh deleted file mode 100755 index d22f2c3ea63e2c27b6535f33de7d863a9a77aa00..0000000000000000000000000000000000000000 --- a/SubSystems/Online_Cobalt/test/MockOnlineControl.sh +++ /dev/null @@ -1,52 +0,0 @@ -#!/bin/bash - -# A mock OnlineControl to receive the observation status -# and the LTA feedback. - -# 1. Read parset -PARSET="$1" - -function error { - echo "$@" - exit 1 -} - -function getkey { - KEY=$1 - <$PARSET perl -ne '/^'$KEY'\s*=\s*"?(.*?)"?\s*$/ || next; print "$1";' | tail -n 1 -} - -[ -n "$PARSET" ] || error "No parset specified" -[ -r "$PARSET" ] || error "Cannot read parset: $PARSET" - -# 2. Open port @ 21000 + 1000 % obsid -OBSID=`getkey Observation.ObsID` -RESULT_PORT=$((21000 + $OBSID % 1000)) - -STATUSSTR=`timeout 120s nc -l $RESULT_PORT` - -# 3. Read string: ABORT or FINISHED -if [ "$STATUSSTR" == "FINISHED" ] -then - echo "Observation reported success" - - # 4. If finished, check for existence of feedback file - FEEDBACK_FILE=`getkey Cobalt.Feedback.remotePath`/Observation${OBSID}_feedback - - # Check existence and access rights - [ -e $FEEDBACK_FILE ] || error "Feedback file not found: $FEEDBACK_FILE" - [ -r $FEEDBACK_FILE ] || error "Feedback file not readable: $FEEDBACK_FILE" - [ -f $FEEDBACK_FILE ] || error "Feedback file not a regular file: $FEEDBACK_FILE" - - # Check file size - FILESIZE=`stat -c %s $FEEDBACK_FILE` - [ $FILESIZE -ne 0 ] || error "Feedback file empty: $FEEDBACK_FILE" -elif [ "$STATUSSTR" == "ABORT" ] -then - echo "Observation reported failure" -else - error "Invalid status string: '$STATUSSTR'" -fi - -exit 0 - diff --git a/SubSystems/Online_Cobalt/test/runtest.sh.in b/SubSystems/Online_Cobalt/test/runtest.sh.in index d3fc83b64248ebdd4151599c9ec71a1fb1a23acb..3050c1ada71c63030ba2d3533e57fe39702ca7ca 100755 --- a/SubSystems/Online_Cobalt/test/runtest.sh.in +++ b/SubSystems/Online_Cobalt/test/runtest.sh.in @@ -36,7 +36,7 @@ echo " in directory $(pwd)" [ -n "$(echo *.raw)" ] || error "runObservation.sh produced no output files" # create script to accept output (ie. copy it to the source dir for check in) - echo "#!/bin/sh + echo "#!/bin/bash cp ${PWD}/*.raw ${REFDIR}" > accept_output chmod a+x accept_output @@ -60,5 +60,16 @@ echo " in directory $(pwd)" done done + if [ $HAVE_QPID == "TRUE" ]; then + # validate the contents of the queues + mkdir "queues" || error "Failed to create temporary directory ${OUTDIR}/queues" + echo "[ -d ${PWD}/queues ] && cp -r ${PWD}/queues ${REFDIR}" >> accept_output + QUEUES=`cd ${REFDIR}/queues 2>/dev/null && ls` + for Q in $QUEUES; do + echo "Comparing output of queue $Q" + @QPID_RECEIVE_EXECUTABLE@ -b 127.0.0.1 -a $Q --ignore-reply-to > queues/$Q || error "Could not read output of queue $Q" + diff ${REFDIR}/queues/$Q queues/$Q || error "Output of queue $Q does not match" + done + fi ) || exit 1 diff --git a/SubSystems/Online_Cobalt/test/tMACfeedback.run b/SubSystems/Online_Cobalt/test/tMACfeedback.run index 620ee8e3575357353286b735a09be3c3984c0dd8..7d601ff1b37952b085f8ff105a783787a1fb2a5c 100755 --- a/SubSystems/Online_Cobalt/test/tMACfeedback.run +++ b/SubSystems/Online_Cobalt/test/tMACfeedback.run @@ -10,31 +10,22 @@ function test_parset { echo "Testing $PARSET, expecting result $EXPECTED_OBSRESULT" # Add the connection information for this test - echo "Cobalt.Feedback.host=localhost" >> $PARSET - echo "Cobalt.Feedback.remotePath=$LOFARROOT/var/run" >> $PARSET echo "Cobalt.FinalMetaDataGatherer.host=localhost" >> $PARSET - # Start a mock OnlineControl to verify the communications of runObservation.sh - $srcdir/MockOnlineControl.sh $PARSET & - ONLINECONTROL_PID=$! - # Run the observation runObservation.sh -B -C -l 1 $EXTRA_PARAMS $PARSET OBSRESULT=$? - # Wait for OnlineControl to finish - wait $ONLINECONTROL_PID - ONLINECONTROLRESULT=$? - if [ $OBSRESULT -ne $EXPECTED_OBSRESULT ]; then echo "runObservation.sh failed (status: $OBSRESULT)" exit 1 fi - if [ $ONLINECONTROLRESULT -gt 0 ]; then - echo "MockOnlineControl.sh failed (status: $ONLINECONTROLRESULT)" - exit 1 - fi + # Pull the message from the status queue + qpid-receive -b 127.0.0.1 -q lofar.task.feedback.status || error 'Could not pull status message from bus' + + # Check the message against the expected result + # <TODO> } test_parset $PWD/tMACfeedback.in_parset_success_1 0 "" @@ -42,4 +33,3 @@ test_parset $PWD/tMACfeedback.in_parset_failure_1 1 "-o Cobalt.Nodes=[unreachabl # Everything went ok exit 0 - diff --git a/SubSystems/Online_Cobalt/test/testFuncs.sh.in b/SubSystems/Online_Cobalt/test/testFuncs.sh.in index 7744152fef5983ed55bba2846b79005e6f27b404..9ef8cfba14698aeed61ec64dd119aa354f6ca26c 100755 --- a/SubSystems/Online_Cobalt/test/testFuncs.sh.in +++ b/SubSystems/Online_Cobalt/test/testFuncs.sh.in @@ -13,9 +13,23 @@ error() # Set LOFARROOT and other LOFAR env vars into install directory (var is always set). . "@CMAKE_INSTALL_PREFIX@/lofarinit.sh" || error "Could not load our lofarinit.sh -- did you run 'make install'?" +HAVE_QPID="@HAVE_QPID@" + # Create runtime output directories if not exists. # Not done at build, because it is a post-install setting. Different in production. mkdir -p "$LOFARROOT/var/log" "$LOFARROOT/var/run" || error "Failed to create runtime output directories" # Set all locales to "C" to avoid problems with, e.g., perl. export LC_ALL="C" + +# Clear all used QPID queues to prevent leakage from previous tests +echo "Clearing QPID queues (if they exist, so ignore any errors regarding that)..." + +QUEUES=" + lofar.task.feedback.status + lofar.task.feedback.dataproducts + lofar.task.feedback.processing" +for Q in $QUEUES; do + qpid-receive --print-content no --ignore-reply-to -b 127.0.0.1 -a $Q >/dev/null 2>&1 +done + diff --git a/SubSystems/Online_Cobalt/test/tstartBGL.run b/SubSystems/Online_Cobalt/test/tstartBGL.run index 5718de6be95091cf5c0745a5cde2d88036d9d61b..75e8e3ca24ae45148a8de2e20f8c6305334ab0e0 100755 --- a/SubSystems/Online_Cobalt/test/tstartBGL.run +++ b/SubSystems/Online_Cobalt/test/tstartBGL.run @@ -59,8 +59,6 @@ echo "Test 5: normal run" echo " (expects success)" echo "***************************" # Add the connection information for this test -echo "Cobalt.Feedback.host=localhost" >> tstartBGL.in_parset -echo "Cobalt.Feedback.remotePath=$LOFARROOT/var/run" >> tstartBGL.in_parset echo "Cobalt.FinalMetaDataGatherer.host=localhost" >> tstartBGL.in_parset startBGL.sh 1 2 3 tstartBGL.in_parset 1000 || error "startBGL.sh failed" @@ -88,8 +86,6 @@ echo "Test 6: stop a run" echo " (expects success)" echo "***************************" # Add the connection information for this test -echo "Cobalt.Feedback.host=localhost" >> tstartBGL.in_parset -echo "Cobalt.Feedback.remotePath=$LOFARROOT/var/run" >> tstartBGL.in_parset echo "Cobalt.FinalMetaDataGatherer.host=localhost" >> tstartBGL.in_parset # Run forever