From 1385e53e2dffea9c83ac5521f95e356692c4e1e4 Mon Sep 17 00:00:00 2001
From: Jan David Mol <mol@astron.nl>
Date: Wed, 20 Sep 2017 12:24:02 +0000
Subject: [PATCH] Taks #10118: Added timers for TABTranspose, and use 1 thread
 for data transpose in outputproc

---
 RTCP/Cobalt/CoInterface/src/BudgetTimer.cc  |  2 +-
 RTCP/Cobalt/CoInterface/src/TABTranspose.cc | 35 ++++++++++++++++-----
 RTCP/Cobalt/CoInterface/src/TABTranspose.h  |  4 ++-
 RTCP/Cobalt/OutputProc/src/GPUProcIO.cc     |  2 +-
 4 files changed, 32 insertions(+), 11 deletions(-)

diff --git a/RTCP/Cobalt/CoInterface/src/BudgetTimer.cc b/RTCP/Cobalt/CoInterface/src/BudgetTimer.cc
index f2772ca9e95..33f95ef6efd 100644
--- a/RTCP/Cobalt/CoInterface/src/BudgetTimer.cc
+++ b/RTCP/Cobalt/CoInterface/src/BudgetTimer.cc
@@ -41,7 +41,7 @@ namespace LOFAR {
 
     BudgetTimer::~BudgetTimer()
     {
-      if (print_on_destruction) {
+      if (print_on_destruction && budget > 0.0) {
         const double realTimePerc = 100.0 * getAverage() / budget;
 
         if (log_on_destruction) {
diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc
index 3b6fbfea096..e77c3c5f397 100644
--- a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc
+++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc
@@ -30,6 +30,7 @@
 #include <Common/Timer.h>
 #include <ApplCommon/PVSSDatapointDefs.h>
 #include <CoInterface/TimeFuncs.h>
+#include <CoInterface/BudgetTimer.h>
 
 using namespace std;
 using boost::format;
@@ -156,7 +157,14 @@ void Block::write( BeamformedData &output ) {
    * The implemented strategy coalesces the writes in batches. Large batches
    * will cause TLB misses due to the larger number of arrays we need to index.
    */
-  const size_t MAXBATCHSIZE = 4;
+
+  // On CEP2, optimal is MAXBATCHSIZE=4 and using 4 threads (in the for loop)
+  // On CEP4, optimal is MAXBATCHSIZE=488 and using 1 thread.
+  //
+  // We want real-time performance, (see tTABTranspose | grep write, should produce
+  // numbers <1s), but if we do not need multiple threads, doing the work in one
+  // is more efficient.
+  const size_t MAXBATCHSIZE = 488;
 
   // Stride between samples in output
   const ptrdiff_t dst_sample_stride = output.samples.strides()[0];
@@ -165,8 +173,6 @@ void Block::write( BeamformedData &output ) {
 
   const vector<float> zeroes(nrChannels * nrSamples, 0.0f);
 
-  // Four threads gives the best performance on CEP2, see figures given by 'tTABTranspose | grep write'
-# pragma omp parallel for num_threads(4)
   for (size_t subbandBase = 0; subbandBase < subbandCache.size(); subbandBase += MAXBATCHSIZE) {
     // Determine actual batch size
     const size_t BATCHSIZE = std::min(MAXBATCHSIZE, subbandCache.size() - subbandBase);
@@ -235,7 +241,7 @@ bool Block::complete() const {
 
 // The BlockCollector collects blocks from different rtcp processes for a TAB.
 // More precisely, we have one BlockCollector per file (i.e. part).
-BlockCollector::BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx, size_t nrSubbands, size_t nrChannels, size_t nrSamples, size_t nrBlocks, size_t maxBlocksInFlight, const std::string &logPrefix )
+BlockCollector::BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx, size_t nrSubbands, size_t nrChannels, size_t nrSamples, size_t nrBlocks, size_t maxBlocksInFlight, const std::string &logPrefix, double blockDuration )
 :
   logPrefix(logPrefix + "[BlockCollector] "),
 
@@ -256,6 +262,8 @@ BlockCollector::BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx
   nrChannels(nrChannels),
   nrSamples(nrSamples),
 
+  blockDuration(blockDuration),
+
   maxBlocksInFlight(maxBlocksInFlight),
   canDrop(maxBlocksInFlight > 0),
   lastEmitted(-1),
@@ -285,7 +293,14 @@ void BlockCollector::addSubband( SmartPtr<Subband> &subband ) {
 void BlockCollector::inputLoop() {
   SmartPtr<Subband> subband;
 
+  BudgetTimer processTimer(
+    "BlockCollector::inputLoop: weaving subbands",
+    blockDuration / nrSubbands,
+    true, true);
+
   while ((subband = inputQueue.remove()) != NULL) {
+    BudgetTimer::StartStop ss(processTimer);
+
     processSubband(subband);
   }
 }
@@ -294,14 +309,18 @@ void BlockCollector::inputLoop() {
 void BlockCollector::outputLoop() {
   SmartPtr<Block> block;
 
-  NSTimer writeTimer("Block: data transpose/zeroing", true, true);
+  BudgetTimer writeTimer(
+    "BlockCollector::outputLoop: data transpose/zeroing",
+    blockDuration,
+    true, true);
 
   while ((block = outputQueue.remove()) != NULL) {
     SmartPtr<BeamformedData> output = outputPool.free.remove();
 
-    writeTimer.start();
-    block->write(*output);
-    writeTimer.stop();
+    {
+      BudgetTimer::StartStop ss(writeTimer);
+      block->write(*output);
+    }
 
     outputPool.filled.append(output);
   }
diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.h b/RTCP/Cobalt/CoInterface/src/TABTranspose.h
index 46f9466d346..0204787a68a 100644
--- a/RTCP/Cobalt/CoInterface/src/TABTranspose.h
+++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.h
@@ -182,7 +182,7 @@ namespace LOFAR
          * maxBlocksInFlight: the maximum number of blocks to process in
          *                    parallel (or 0 for no limit).
          */
-        BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx, size_t nrSubbands, size_t nrChannels, size_t nrSamples, size_t nrBlocks = 0, size_t maxBlocksInFlight = 0, const std::string &logPrefix = "" );
+        BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx, size_t nrSubbands, size_t nrChannels, size_t nrSamples, size_t nrBlocks = 0, size_t maxBlocksInFlight = 0, const std::string &logPrefix = "", double blockDuration = 0.0 );
 
         ~BlockCollector();
 
@@ -227,6 +227,8 @@ namespace LOFAR
         const size_t nrChannels;
         const size_t nrSamples;
 
+        const double blockDuration;
+
         // upper limit for blocks.size(), or 0 if unlimited
         const size_t maxBlocksInFlight;
 
diff --git a/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc b/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc
index b577f14322a..cac2493dd18 100644
--- a/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc
+++ b/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc
@@ -233,7 +233,7 @@ bool process(Stream &controlStream)
         // #blocks here is the number of blocks that can be collected in parallel on
         // the input side (what the correlator sends)
         collectors[fileIdx] = new TABTranspose::BlockCollector(
-          *outputPools[fileIdx], fileIdx, nrSubbands, nrChannels, nrSamples, parset.settings.nrBlocks(), parset.settings.realTime ? 5 : 0, logPrefix);
+          *outputPools[fileIdx], fileIdx, nrSubbands, nrChannels, nrSamples, parset.settings.nrBlocks(), parset.settings.realTime ? 5 : 0, logPrefix, parset.settings.blockDuration());
 
         LOG_DEBUG_STR("Setting up writer for " << file.location.filename);
 
-- 
GitLab