From b8b8bc8c83d5a5f33206db54cf52b1d5deaf0b59 Mon Sep 17 00:00:00 2001
From: Jan David Mol <mol@astron.nl>
Date: Tue, 11 Jul 2017 20:34:29 +0000
Subject: [PATCH] Task #11059: Added better loss statistics for CorrelatedData
 and improved log lines

---
 RTCP/Cobalt/CoInterface/src/TABTranspose.cc | 19 +++++++++++++------
 RTCP/Cobalt/OutputProc/src/GPUProcIO.cc     |  2 +-
 RTCP/Cobalt/OutputProc/src/InputThread.cc   | 17 +++++++++++++++--
 RTCP/Cobalt/OutputProc/src/InputThread.h    |  8 +++++++-
 RTCP/Cobalt/OutputProc/src/OutputThread.cc  |  2 +-
 5 files changed, 37 insertions(+), 11 deletions(-)

diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc
index 680fc0c6696..f1737320fdd 100644
--- a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc
+++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc
@@ -368,12 +368,19 @@ void BlockCollector::finish() {
   // Update loss statistics
   nrFullBlocksLost += nrBlocks - lastEmitted + 1;
   if (nrBlocks > 0) {
-    if (nrBlockSubbandsReceived > 0)
-      LOG_ERROR_STR(logPrefix << "Did not receive " << (100.0 - 100.0*nrBlockSubbandsReceived/nrSubbands/nrBlocks) << "% of the data");
-    if (nrFullBlocksLost > 0 || nrBlockSubbandsLost > 0)
-      LOG_ERROR_STR(logPrefix << "Did not send " << (100.0*nrFullBlocksLost + 100.0*nrBlockSubbandsLost/nrSubbands)/nrBlocks << "% of the data");
-    if (nrBlockSubbandsTooLate > 0)
-      LOG_ERROR_STR(logPrefix << "Received " << (100.0*nrBlockSubbandsTooLate/nrSubbands/nrBlocks) << "% of the data too late. Consider increasing maxBlocksInFlight.");
+    const float didNotReceivePerc = 100.0 - 100.0*nrBlockSubbandsReceived/nrSubbands/nrBlocks;
+    const float didNotSendPerc = (100.0*nrFullBlocksLost + 100.0*nrBlockSubbandsLost/nrSubbands) / nrBlocks;
+    const float lostPerc = didNotSendPerc - didNotReceivePerc;
+    const float tooLatePerc = 100.0 * nrBlockSubbandsTooLate / nrSubbands / nrBlocks;
+
+    if (didNotReceivePerc > 0)
+      LOG_ERROR_STR(logPrefix << "Did not receive " << didNotReceivePerc << "% of the data");
+    if (lostPerc > 0)
+      LOG_ERROR_STR(logPrefix << "I lost " << lostPerc << "% of the data");
+    if (didNotSendPerc > 0)
+      LOG_ERROR_STR(logPrefix << "Did not send " << didNotSendPerc << "% of the data");
+    if (tooLatePerc > 0)
+      LOG_ERROR_STR(logPrefix << "Received " << tooLatePerc << "% of the data too late. Consider increasing maxBlocksInFlight.");
   }
 
   if (!canDrop) {
diff --git a/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc b/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc
index 2de0dd6ea53..649be8f7002 100644
--- a/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc
+++ b/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc
@@ -229,7 +229,7 @@ bool process(Stream &controlStream)
         }
 
         // Create a collector for this fileIdx
-        // #blocks here is the number of blocks that can be constructed in parallel on
+        // #blocks here is the number of blocks that can be collected in parallel on
         // the input side (what the correlator sends)
         collectors[fileIdx] = new TABTranspose::BlockCollector(
           *outputPools[fileIdx], fileIdx, nrSubbands, nrChannels, nrSamples, parset.settings.nrBlocks(), parset.settings.realTime ? 5 : 0);
diff --git a/RTCP/Cobalt/OutputProc/src/InputThread.cc b/RTCP/Cobalt/OutputProc/src/InputThread.cc
index f520d26ddbe..d53c81bac9d 100644
--- a/RTCP/Cobalt/OutputProc/src/InputThread.cc
+++ b/RTCP/Cobalt/OutputProc/src/InputThread.cc
@@ -39,10 +39,13 @@ namespace LOFAR
                              const std::string &logPrefix)
       :
       itsLogPrefix(logPrefix + "[InputThread] "),
+      itsNrIntegrationsReceived(0),
+      itsNrIntegrations(parset.settings.correlator.nrIntegrations),
       itsInputDescriptor(getStreamDescriptorBetweenIONandStorage(parset, CORRELATED_DATA, streamNr)),
       itsOutputPool(outputPool),
       itsDeadline(parset.settings.realTime ? parset.settings.stopTime : 0)
     {
+      ASSERT(parset.settings.correlator.enabled);
     }
 
 
@@ -55,17 +58,27 @@ namespace LOFAR
 
         for(SmartPtr<StreamableData> data; (data = itsOutputPool.free.remove()) != NULL; itsOutputPool.filled.append(data)) {
           data->read(streamFromION, true, 1); // Cobalt writes with an alignment of 1
+          ++itsNrIntegrationsReceived;
 
-          LOG_DEBUG_STR(itsLogPrefix << "Read block with seqno = " << data->sequenceNumber());
+          LOG_DEBUG_STR(itsLogPrefix << "Received integration " << data->sequenceNumber());
         }
       } catch (TimeOutException &) {
         LOG_WARN_STR(itsLogPrefix << "Connection from " << itsInputDescriptor << " timed out");
       } catch (EndOfStreamException &) {
-        LOG_INFO_STR(itsLogPrefix << "Connection from " << itsInputDescriptor << " closed");
+        LOG_INFO_STR(itsLogPrefix << "Connection from " << itsInputDescriptor << " closed by foreign host");
       } catch (SystemCallException &ex) {
         LOG_WARN_STR(itsLogPrefix << "Connection from " << itsInputDescriptor << " failed: " << ex.text());
       }
 
+      // report statistics
+      const float didNotReceivePerc = 100.0 - 100.0 * itsNrIntegrationsReceived / itsNrIntegrations;
+      const float didNotSendPerc = didNotReceivePerc;
+
+      if (didNotReceivePerc > 0)
+        LOG_ERROR_STR(itsLogPrefix << "Did not receive " << didNotReceivePerc << "% of the data");
+      if (didNotSendPerc > 0)
+        LOG_ERROR_STR(itsLogPrefix << "Did not send " << didNotSendPerc << "% of the data");
+
       // Append end-of-stream marker
       itsOutputPool.filled.append(NULL);
     }
diff --git a/RTCP/Cobalt/OutputProc/src/InputThread.h b/RTCP/Cobalt/OutputProc/src/InputThread.h
index 30bf2218c91..85b561240d4 100644
--- a/RTCP/Cobalt/OutputProc/src/InputThread.h
+++ b/RTCP/Cobalt/OutputProc/src/InputThread.h
@@ -55,7 +55,13 @@ namespace LOFAR
       virtual void process();
 
     private:
-      const std::string itsLogPrefix, itsInputDescriptor;
+      const std::string itsLogPrefix;
+
+      // we receive integration "blocks"
+      size_t itsNrIntegrationsReceived;
+      const size_t itsNrIntegrations;
+
+      const std::string itsInputDescriptor;
       Pool<StreamableData> &itsOutputPool;
       const double itsDeadline;
     };
diff --git a/RTCP/Cobalt/OutputProc/src/OutputThread.cc b/RTCP/Cobalt/OutputProc/src/OutputThread.cc
index 689d8396545..98735efc431 100644
--- a/RTCP/Cobalt/OutputProc/src/OutputThread.cc
+++ b/RTCP/Cobalt/OutputProc/src/OutputThread.cc
@@ -94,7 +94,7 @@ namespace LOFAR
       if (droppedBlocks > 0) {
         itsBlocksDropped += droppedBlocks;
 
-        LOG_WARN_STR(itsLogPrefix << "Just dropped " << droppedBlocks << " blocks. Dropped " << itsBlocksDropped << " blocks and written " << itsBlocksWritten << " blocks so far.");
+        LOG_WARN_STR(itsLogPrefix << "Did not receive " << droppedBlocks << " blocks");
 
         itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPED + streamNrStr,
                         itsBlocksDropped * static_cast<float>(itsParset.settings.blockDuration()));
-- 
GitLab