diff --git a/RTCP/Cobalt/CoInterface/src/BestEffortQueue.tcc b/RTCP/Cobalt/CoInterface/src/BestEffortQueue.tcc index 1fa499c00534c96180cc77e704c41281267f02a5..4cbf108924f30bca532f93ffea1e19b41f61dde1 100644 --- a/RTCP/Cobalt/CoInterface/src/BestEffortQueue.tcc +++ b/RTCP/Cobalt/CoInterface/src/BestEffortQueue.tcc @@ -42,7 +42,7 @@ template <typename T> inline BestEffortQueue<T>::~BestEffortQueue() template <typename T> inline bool BestEffortQueue<T>::_overflow() const { - return this->itsSize > maxSize; + return this->itsQueue.size() > maxSize; } diff --git a/RTCP/Cobalt/CoInterface/src/Pool.h b/RTCP/Cobalt/CoInterface/src/Pool.h index c652243be2d4246bb72d94da16c117bb1caae48d..af2ef91bbb2bf521043186909737139d00122493 100644 --- a/RTCP/Cobalt/CoInterface/src/Pool.h +++ b/RTCP/Cobalt/CoInterface/src/Pool.h @@ -31,6 +31,12 @@ namespace LOFAR // The pool operates using a free and a filled queue to cycle through buffers. Producers // move elements free->filled, and consumers move elements filled->free. By // wrapping the elements in a SmartPtr, memory leaks are prevented. + // + // If warn_on_bad_performance == True, the queues will log warnings if they are not filled/freed + // fast enough (see Queue.h for more details). + // + // batch_size is the number of elements appended to "filled" in short repetition, too fast for the consumer + // to reasonably consume between appends. template <typename T> struct Pool { @@ -39,10 +45,10 @@ namespace LOFAR Queue< SmartPtr<element_type> > free; Queue< SmartPtr<element_type> > filled; - Pool(const std::string &name, bool complain_on_empty_free_queue) + Pool(const std::string &name, bool warn_on_bad_performance, int batch_size = 1) : - free(name + " [.free]", complain_on_empty_free_queue), - filled(name + " [.filled]", false) + free(name + " [.free]", warn_on_bad_performance ? true : false, -1), + filled(name + " [.filled]", false, warn_on_bad_performance ? batch_size : -1) { } }; diff --git a/RTCP/Cobalt/CoInterface/src/Queue.h b/RTCP/Cobalt/CoInterface/src/Queue.h index a249643746865a2dcd3de4d87af105aef514e9ab..c2c89ec08e1432664a3acaf3f1d875ddc762261a 100644 --- a/RTCP/Cobalt/CoInterface/src/Queue.h +++ b/RTCP/Cobalt/CoInterface/src/Queue.h @@ -39,18 +39,22 @@ namespace LOFAR { namespace Cobalt { +// Double-ended queue, thread-safe template <typename T> class Queue { public: // Create a named queue - Queue(const std::string &name, bool warnIfEmptyOnRemove = false); + // + // warn_remove_on_empty_queue: Emit a log warning if remove() is called when the queue is empty. + // max_elements_on_append: Emit a warning if append() overflowed the queue. -1 = disable. + Queue(const std::string &name = "", bool warn_remove_on_empty_queue = false, int max_elements_on_append = -1); // Log queue statistics ~Queue(); // Add an element to the back of the queue. // - // If timed, this element is taken into account for timing statistics. + // If timed, this element is taken into account for statistics. // // Untimed items include: // * Elements added to initially fill the queue, waiting for obs start. @@ -69,6 +73,9 @@ template <typename T> class Queue struct timespec oldest() const; std::string name() const; + // Reset all collected statistics. + void reset_statistics(); + private: Queue(const Queue&); Queue& operator=(const Queue&); @@ -76,10 +83,6 @@ template <typename T> class Queue protected: const std::string itsName; - // The number of elements in the queue. We maintain this info - // because itsQueue::size() is O(N), at least until C++11. - size_t itsSize; - // The time an element spent in a queue RunningStatistics retention_time; @@ -92,7 +95,8 @@ template <typename T> class Queue // The average queue size on append() (excluding the inserted element) RunningStatistics queue_size_on_append; - const bool warn_if_empty; + const bool warn_remove_on_empty_queue; + const int max_elements_on_append; struct Element { T value; @@ -117,15 +121,15 @@ template <typename T> class Queue }; -template <typename T> Queue<T>::Queue(const std::string &name, bool warnIfEmptyOnRemove) +template <typename T> Queue<T>::Queue(const std::string &name, bool warn_remove_on_empty_queue, int max_elements_on_append) : itsName(name), - itsSize(0), retention_time("s"), remove_on_empty_queue("%"), remove_wait_time("s"), queue_size_on_append("elements"), - warn_if_empty(warnIfEmptyOnRemove) + warn_remove_on_empty_queue(warn_remove_on_empty_queue), + max_elements_on_append(max_elements_on_append) { } @@ -154,8 +158,33 @@ template <typename T> Queue<T>::~Queue() * Q holding items for processing: 0 * */ - if (itsName != "") + if (itsName != "") { + if (max_elements_on_append >= 0 && queue_size_on_append.mean() >= static_cast<size_t>(max_elements_on_append)) { + // This is a forward-feeding queue of elements holding data to be processed. + // + // Each element should disappear before the next one is offered. If not, the consumer of this queue + // does not process fast enough. Possible reasons include: + // * The producer is blocked for resources (another queue, network, OS task scheduler, etc) + // * The producer processes too slowly (requested load too high, or need more parallellisation) + + LOG_WARN_STR("Queue " << itsName << " should always hold fewer than << max_elements_on_append << elements on append(). Queue had " << queue_size_on_append.mean() << " elements on append() (mean). At remove(), queue was empty " << remove_on_empty_queue.mean() << "% of the time. Time each element spent in queue: " << retention_time << ". In case of problems, look at the consume rate of this queue."); + } + + if (warn_remove_on_empty_queue && remove_on_empty_queue.mean() > 0) { + // This is a back-feeding queue of empty elements to be reused. + // + // If this queue is empty, the consumer wants to process but can't, because it has to wait for this queue. + // This can incidentally happen, but is also a hint if performance is an issue. The thread replenishing this + // queue (through append()) apparently does not do so fast enough. This in turn could be caused by earlier + // problems, so looking for the first WARNING emitted by a remove() on an empty queue is key. Statistics of + // queues logically placed earlier in the pipeline can provide insight as well. + + LOG_WARN_STR("Queue " << itsName << " should never be empty on remove(), always ready with data. Queue was empty " << remove_on_empty_queue.mean() << "% of the time. When empty, caller had to wait: " << remove_wait_time); + } + + // Always log statistics for debugging, since it is invaluable information to debug incidental performance problems. LOG_INFO_STR("Queue " << itsName << ": avg #elements on append = " << queue_size_on_append.mean() << ", queue empty on remove = " << remove_on_empty_queue.mean() << "%, remove wait time = " << remove_wait_time.mean() << " s, element retention time: " << retention_time); + } } @@ -178,7 +207,7 @@ template <typename T> inline void Queue<T>::unlocked_append(const T& element, bo e.arrival_time = timed ? TimeSpec::now() : TimeSpec::big_bang; // Record the queue size - queue_size_on_append.push(itsSize); + if (timed) queue_size_on_append.push(itsQueue.size()); push_back(e); } @@ -205,7 +234,6 @@ template <typename T> inline void Queue<T>::prepend(const T& element) template <typename T> inline void Queue<T>::push_front( const Element &e ) { itsQueue.push_front(e); - itsSize++; itsNewElementAppended.signal(); } @@ -214,7 +242,6 @@ template <typename T> inline void Queue<T>::push_front( const Element &e ) template <typename T> inline void Queue<T>::push_back( const Element &e ) { itsQueue.push_back(e); - itsSize++; itsNewElementAppended.signal(); } @@ -224,7 +251,6 @@ template <typename T> inline typename Queue<T>::Element Queue<T>::pop_front() { Element e = itsQueue.front(); itsQueue.pop_front(); - itsSize--; return e; } @@ -240,10 +266,10 @@ template <typename T> inline T Queue<T>::remove(const struct timespec &deadline, ScopedLock scopedLock(itsMutex); - const bool beganEmpty = itsSize == 0; + const bool beganEmpty = itsQueue.size() == 0; const struct timespec begin = TimeSpec::now(); - if (beganEmpty && warn_if_empty) + if (beganEmpty && warn_remove_on_empty_queue) LOG_WARN_STR("remove() called on empty queue: " << name()); while (itsQueue.empty()) @@ -274,8 +300,7 @@ template <typename T> inline unsigned Queue<T>::size() const { ScopedLock scopedLock(itsMutex); - // Note: list::size() is O(N) - return itsSize; + return itsQueue.size(); } @@ -298,6 +323,14 @@ template <typename T> inline std::string Queue<T>::name() const return itsName; } +template <typename T> inline void Queue<T>::reset_statistics() +{ + retention_time.reset(); + remove_on_empty_queue.reset(); + remove_wait_time.reset(); + queue_size_on_append.reset(); +} + } // namespace Cobalt } // namespace LOFAR diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc index ecf72bfc0e2267ad1e82307f2a17c1cebba4d4b0..d567bb28ccf4f197dc9f54068e09893386976d9a 100644 --- a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc +++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc @@ -325,7 +325,7 @@ void BlockCollector::outputLoop() { outputPool.filled.append(output); } - outputPool.filled.append(NULL); + outputPool.filled.append(NULL, false); } diff --git a/RTCP/Cobalt/CobaltTest/test/tManyPartTABOutput.cc b/RTCP/Cobalt/CobaltTest/test/tManyPartTABOutput.cc index 77eef64d83811113c2fdac34dbc0710bee4bb398..d310005bdb519f0d684263303b419993a66e689d 100644 --- a/RTCP/Cobalt/CobaltTest/test/tManyPartTABOutput.cc +++ b/RTCP/Cobalt/CobaltTest/test/tManyPartTABOutput.cc @@ -126,7 +126,7 @@ int main() data = getTestSbCohData(ps, ctx, blockIdx, i); writePool[i].queue->append(data); - writePool[i].queue->append(NULL); + writePool[i].queue->append(NULL, false); } // Have it push a block of values per sb to outputProc. diff --git a/RTCP/Cobalt/CobaltTest/test/tMultiPartTABOutput.cc b/RTCP/Cobalt/CobaltTest/test/tMultiPartTABOutput.cc index 76ae52c75cf172668edbe76718a6ddb6e65c2e7e..f7cb9a1464c1f1b374cdc877411614da13fdbd07 100644 --- a/RTCP/Cobalt/CobaltTest/test/tMultiPartTABOutput.cc +++ b/RTCP/Cobalt/CobaltTest/test/tMultiPartTABOutput.cc @@ -123,7 +123,7 @@ int main() data = getTestSbIncohData(ps, ctx, blockIdx, i); writePool[i].queue->append(data); - writePool[i].queue->append(NULL); + writePool[i].queue->append(NULL, false); } // Have it push a block of values per sb to outputProc. diff --git a/RTCP/Cobalt/GPUProc/src/MPIReceiver.cc b/RTCP/Cobalt/GPUProc/src/MPIReceiver.cc index 1ee7baae144223111abefef007a95a3448bed3dc..36d154df37c3824cace13e80c0ef727e0ccfb908 100644 --- a/RTCP/Cobalt/GPUProc/src/MPIReceiver.cc +++ b/RTCP/Cobalt/GPUProc/src/MPIReceiver.cc @@ -129,7 +129,7 @@ namespace LOFAR } // Signal end of input - mpiPool.filled.append(NULL); + mpiPool.filled.append(NULL, false); } void MPIReceiver::receiveInput() diff --git a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc index 79703891f8cefb79718985c37d39ee08b06f3bc0..81a08c5e52ead40ab677b9c922f20c341c1a330e 100644 --- a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc +++ b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc @@ -78,6 +78,9 @@ namespace LOFAR { nrSamples(ps.settings.blockSize), nrBlocks(ps.settings.nrBlocks()), + // Do not warn on bad performance. StationMetaData uses the free queue as its INPUT, so it tends to + // consume all elements in the free queue. This is contrary to the other pools (they wait for input + // on a filled queue), resulting in the wrong analysis. metaDataPool(str(format("StationMetaData::metaDataPool [station %s]") % stationID.name()), false), subbands(values(subbandDistribution)) @@ -101,6 +104,9 @@ namespace LOFAR { for (size_t i = 0; i < 5; ++i) metaDataPool.free.append(new MPIData<SampleT>(startTime, ps.settings.subbands.size(), nrSamples), false); + // These didn't count for performance + metaDataPool.free.reset_statistics(); + /* * Set up delay compensation. * @@ -154,7 +160,7 @@ namespace LOFAR { } // Signal EOD - metaDataPool.filled.append(NULL); + metaDataPool.filled.append(NULL, false); } @@ -539,7 +545,7 @@ namespace LOFAR { } LOG_INFO_STR(logPrefix << "readRSPNonRealTime: sending EOS"); - rspDataPool[0]->filled.append(NULL); + rspDataPool[0]->filled.append(NULL, false); } @@ -701,7 +707,7 @@ namespace LOFAR { } // Signal EOD to output - outputQueue.append(NULL); + outputQueue.append(NULL, false); // Signal EOD to input. It's a free queue, so prepend to avoid // having the reader flush the whole queue first. diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc index dc8b0f64c46bbc5732cedf5fe25db1eaa4b507dd..3fcafeaf8d0cab3c2fc0fca15402072e2a794e71 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc @@ -247,7 +247,7 @@ namespace LOFAR // run the queue preprocessSubbands(queue); - queue.processPool.filled.append(NULL); + queue.processPool.filled.append(NULL, false); } } @@ -272,7 +272,7 @@ namespace LOFAR processSubbands(queue); // Signal end of output - queue.outputPool.filled.append(NULL); + queue.outputPool.filled.append(NULL, false); } } @@ -297,7 +297,7 @@ namespace LOFAR // Signal end of output for (size_t i = 0; i < writePool.size(); ++i) { - writePool[i].queue->append(NULL); + writePool[i].queue->append(NULL, false); } // Wait for data to propagate towards outputProc, @@ -455,7 +455,7 @@ namespace LOFAR // Signal end of input for (size_t i = 0; i < subbandProcs.size(); ++i) { - subbandProcs[i]->inputPool.filled.append(NULL); + subbandProcs[i]->inputPool.filled.append(NULL, false); } } @@ -602,7 +602,7 @@ namespace LOFAR OMPThread::ScopedName sn(str(format("writeBF %u") % globalSubbandIdx)); writeBeamformedOutput(globalSubbandIdx, inputQueue, queue, outputQueue); - queue.append(NULL); + queue.append(NULL, false); } // Output processing diff --git a/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/SubbandProc.cc b/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/SubbandProc.cc index 79810a2bcd9272db50967ee5cb82aa344fd76158..79876873667bdf317fabe2c351a177ff123e5b3a 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/SubbandProc.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/SubbandProc.cc @@ -45,7 +45,7 @@ namespace LOFAR size_t nrSubbandsPerSubbandProc) : inputPool("SubbandProc::inputPool", ps.settings.realTime), - processPool("SubbandProc::processPool", ps.settings.realTime), + processPool("SubbandProc::processPool", ps.settings.realTime, nrSubbandsPerSubbandProc), outputPool("SubbandProc::outputPool", ps.settings.realTime), ps(ps), diff --git a/RTCP/Cobalt/GPUProc/src/opencl/Pipelines/CorrelatorPipeline.cc b/RTCP/Cobalt/GPUProc/src/opencl/Pipelines/CorrelatorPipeline.cc index dca433a4d19e0c130bd0a505e174a6a82c5473c9..1161c874ac60915103b63462f454a4c070dfa9d6 100644 --- a/RTCP/Cobalt/GPUProc/src/opencl/Pipelines/CorrelatorPipeline.cc +++ b/RTCP/Cobalt/GPUProc/src/opencl/Pipelines/CorrelatorPipeline.cc @@ -138,7 +138,7 @@ namespace LOFAR queue.timers["CPU - total"]->stop(); // Signal end of output - queue.outputPool.filled.append(NULL); + queue.outputPool.filled.append(NULL, false); } } @@ -296,7 +296,7 @@ namespace LOFAR // Signal end of input for (size_t i = 0; i < workQueues.size(); ++i) { - workQueues[i]->inputPool.filled.append(NULL); + workQueues[i]->inputPool.filled.append(NULL, false); } } diff --git a/RTCP/Cobalt/GPUProc/test/Pipelines/tCorrelatorPipelineProcessObs.cc b/RTCP/Cobalt/GPUProc/test/Pipelines/tCorrelatorPipelineProcessObs.cc index 2878438c8f64270f1978afc5915551b0e5fb4110..478fa951ff3d5a4462d3857dd7b2d9f6c8bd9a65 100644 --- a/RTCP/Cobalt/GPUProc/test/Pipelines/tCorrelatorPipelineProcessObs.cc +++ b/RTCP/Cobalt/GPUProc/test/Pipelines/tCorrelatorPipelineProcessObs.cc @@ -84,7 +84,7 @@ int main(int argc, char *argv[]) { mpi.init(argc, argv); - MPI_receive_pool.filled.append(NULL); + MPI_receive_pool.filled.append(NULL, false); // no data, so no need to run a sender: // receiver(s) from processObservation() will fwd a end of data NULL pool item immediately. diff --git a/RTCP/Cobalt/InputProc/src/Station/filterRSP.cc b/RTCP/Cobalt/InputProc/src/Station/filterRSP.cc index a08a7c694b8873e70fb71e8fbf2258db561d540a..86809656a229a8594b5ce46ba8d4f2e3eda531b7 100644 --- a/RTCP/Cobalt/InputProc/src/Station/filterRSP.cc +++ b/RTCP/Cobalt/InputProc/src/Station/filterRSP.cc @@ -27,7 +27,7 @@ #include <omp.h> #include <boost/date_time/posix_time/posix_time.hpp> -#include <Common/Thread/Queue.h> +#include <CoInterface/Queue.h> #include <Common/Thread/Thread.h> #include <Common/LofarLogger.h> #include <Stream/StreamFactory.h> @@ -174,7 +174,7 @@ int main(int argc, char **argv) } catch(EndOfStreamException&) { } - writeQueue.append(NULL); + writeQueue.append(NULL, false); } # pragma omp section @@ -230,7 +230,7 @@ int main(int argc, char **argv) // Add a NULL if we're done to free up // readQueue.remove(), to prevent race conditions. if (writerDone) { - readQueue.append(NULL); + readQueue.append(NULL, false); break; } } diff --git a/RTCP/Cobalt/InputProc/src/Station/repairRSP.cc b/RTCP/Cobalt/InputProc/src/Station/repairRSP.cc index 26c5d28e608a3608368816a5f8fdc0a2d2cfa605..ca400d6f8caad09cafb4c71174f3bdfe297193b7 100644 --- a/RTCP/Cobalt/InputProc/src/Station/repairRSP.cc +++ b/RTCP/Cobalt/InputProc/src/Station/repairRSP.cc @@ -27,7 +27,7 @@ #include <omp.h> #include <boost/date_time/posix_time/posix_time.hpp> -#include <Common/Thread/Queue.h> +#include <CoInterface/Queue.h> #include <Common/Thread/Thread.h> #include <Common/LofarLogger.h> #include <ApplCommon/PosixTime.h> diff --git a/RTCP/Cobalt/OutputProc/src/InputThread.cc b/RTCP/Cobalt/OutputProc/src/InputThread.cc index a34218e8269108a4361899cbe32b2f26825d26fd..cbc606c6e8d07db97aadbd608aeabdf78d16fce8 100644 --- a/RTCP/Cobalt/OutputProc/src/InputThread.cc +++ b/RTCP/Cobalt/OutputProc/src/InputThread.cc @@ -84,7 +84,7 @@ namespace LOFAR LOG_WARN_STR(itsLogPrefix << "Did not send " << didNotSendPerc << "% of the data"); // Append end-of-stream marker - itsOutputPool.filled.append(NULL); + itsOutputPool.filled.append(NULL, false); } } // namespace Cobalt } // namespace LOFAR diff --git a/RTCP/Cobalt/OutputProc/src/TBB_StreamWriter.cc b/RTCP/Cobalt/OutputProc/src/TBB_StreamWriter.cc index be39e988fd7724d0cd7d24b8f3943387991a497d..d8660c2f05cf75ebae940a7956608306d8375609 100644 --- a/RTCP/Cobalt/OutputProc/src/TBB_StreamWriter.cc +++ b/RTCP/Cobalt/OutputProc/src/TBB_StreamWriter.cc @@ -81,7 +81,7 @@ namespace LOFAR try { itsInputThread.reset(new Thread(this, &TBB_StreamWriter::mainInputLoop, "TBB-in-thr", logPrefix + "InputThread: ")); } catch (exception& ) { - itsReceiveQueue.append(NULL); // tell output thread to stop + itsReceiveQueue.append(NULL, false); // tell output thread to stop throw; } // Don't change any member vars here, as threads have already started @@ -232,7 +232,7 @@ namespace LOFAR ~NotifyOutputThread() { try { - queue.append(NULL); + queue.append(NULL, false); } catch (exception& exc) { LOG_WARN_STR("TBB: may have failed to notify output thread to terminate: " << exc.what()); } diff --git a/RTCP/Cobalt/OutputProc/src/TBB_StreamWriter.h b/RTCP/Cobalt/OutputProc/src/TBB_StreamWriter.h index 8b153290d76f1958aae69aeb3616c73509cc6fb2..ce39c8823f1b15e1901fcc8dbac8a0d940021707 100644 --- a/RTCP/Cobalt/OutputProc/src/TBB_StreamWriter.h +++ b/RTCP/Cobalt/OutputProc/src/TBB_StreamWriter.h @@ -31,7 +31,7 @@ #include <boost/crc.hpp> #include <Common/Thread/Thread.h> -#include <Common/Thread/Queue.h> +#include <CoInterface/Queue.h> #include <Stream/FileStream.h> namespace LOFAR