From 0bccc4c38eb8a400172e8103475f014d10400d31 Mon Sep 17 00:00:00 2001
From: Alexander van Amesfoort <amesfoort@astron.nl>
Date: Thu, 30 Oct 2014 17:45:36 +0000
Subject: [PATCH] Task #5844: COBALT PVSS: zero dynarrays pre-obs (while also
 logging obs ID, data product names, etc). This is nice to have, because some
 events are written conditionally. If a subband has no dropped data, it's
 better to init that value to 0.0f. Some minor cleanups related to PVSS event
 logging is also included.

---
 .../GPUProc/src/cuda/Pipelines/Pipeline.cc    | 49 ++++++++++---------
 RTCP/Cobalt/OutputProc/src/OutputThread.cc    | 43 ++++++++++------
 RTCP/Cobalt/OutputProc/src/OutputThread.h     |  7 ++-
 3 files changed, 59 insertions(+), 40 deletions(-)

diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc
index aa2bb130cf2..942726e8837 100644
--- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc
+++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc
@@ -150,31 +150,31 @@ namespace LOFAR
       ASSERTSTR(!devices.empty(), "Not bound to any GPU!");
 
       // Write data point(s) for monitoring (PVSS).
-      itsMdLogger.log(itsMdKeyPrefix + PN_CGP_OBSERVATION_NAME, boost::lexical_cast<string>(ps.settings.observationID));
+      itsMdLogger.log(itsMdKeyPrefix + PN_CGP_OBSERVATION_NAME,
+                      boost::lexical_cast<string>(ps.settings.observationID));
       for (unsigned i = 0; i < subbandIndices.size(); ++i) {
-        itsMdLogger.log(itsMdKeyPrefix + PN_CGP_SUBBAND + '[' + boost::lexical_cast<string>(i) + ']',
-                        (int)subbandIndices[i]);
-      }
+        const string sbStr = '[' + boost::lexical_cast<string>(i) + ']';
 
-      string dataProductType;
+        itsMdLogger.log(itsMdKeyPrefix + PN_CGP_SUBBAND + sbStr, (int)subbandIndices[i]);
 
-      switch (1 * (int)ps.settings.beamFormer.enabled
-            + 2 * (int)ps.settings.correlator.enabled) {
-        case 3:
-          dataProductType = "Correlated + Beamformed";
-          break;
-        case 2:
-          dataProductType = "Correlated";
-          break;
-        case 1:
-          dataProductType = "Beamformed";
-          break;
-        case 0:
-        default:
-          dataProductType = "None";
-          break;
+        // After obs start these dynarray data points are written _conditionally_, so init.
+        // While we only have to write the last index (PVSSGateway will zero the rest),
+        // we'd have to find out who has the last subband. Don't bother, just init all.
+        itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPING + sbStr, 0);
+        itsMdLogger.log(itsMdKeyPrefix + PN_CGP_WRITTEN  + sbStr, 0.0f);
+        itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPED  + sbStr, 0.0f);
       }
 
+      string dataProductType;
+      if (ps.settings.correlator.enabled && ps.settings.beamFormer.enabled) {
+        dataProductType = "Correlated + Beamformed";
+      } else if (ps.settings.correlator.enabled) {
+        dataProductType = "Correlated";
+      } else if (ps.settings.beamFormer.enabled) {
+        dataProductType = "Beamformed";
+      } else {
+        dataProductType = "None";
+      }
       itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DATA_PRODUCT_TYPE, dataProductType);
     }
 
@@ -764,16 +764,17 @@ namespace LOFAR
          */
         const double blockDuration = ps.settings.blockDuration();
 
-        // Prevent division by zero for observations without beam former
+        // Prevent division by zero for observations without beamformer
         const size_t nrFiles = std::max(multiSender.nrFiles(), 1UL);
 
-        itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPING + '[' + lexical_cast<string>(id.localSubbandIdx) + ']',
+        const string localSbStr = '[' + lexical_cast<string>(id.localSubbandIdx) + ']';
+        itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPING + localSbStr,
                         correlatorLoss.dropping || beamFormerLoss.dropping);
-        itsMdLogger.log(itsMdKeyPrefix + PN_CGP_WRITTEN  + '[' + lexical_cast<string>(id.localSubbandIdx) + ']',
+        itsMdLogger.log(itsMdKeyPrefix + PN_CGP_WRITTEN  + localSbStr,
                         static_cast<float>(correlatorLoss.blocksWritten * blockDuration) +
                         static_cast<float>(beamFormerLoss.blocksWritten * blockDuration / nrFiles)
                        );
-        itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPED  + '[' + lexical_cast<string>(id.localSubbandIdx) + ']',
+        itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPED  + localSbStr,
                         static_cast<float>(correlatorLoss.blocksDropped * blockDuration) +
                         static_cast<float>(beamFormerLoss.blocksDropped * blockDuration / nrFiles)
                        );
diff --git a/RTCP/Cobalt/OutputProc/src/OutputThread.cc b/RTCP/Cobalt/OutputProc/src/OutputThread.cc
index 5d80db41724..6c04b4b2508 100644
--- a/RTCP/Cobalt/OutputProc/src/OutputThread.cc
+++ b/RTCP/Cobalt/OutputProc/src/OutputThread.cc
@@ -139,21 +139,23 @@ namespace LOFAR
 
       ASSERTSTR(data->sequenceNumber() >= itsNextSequenceNumber, "Received block nr " << data->sequenceNumber() << " out of order! I expected nothing before " << itsNextSequenceNumber);
 
+      const string streamNrStr = '[' + lexical_cast<string>(itsStreamNr) + ']';
+
       if (droppedBlocks > 0) {
         itsBlocksDropped += droppedBlocks;
 
         LOG_WARN_STR(itsLogPrefix << "Just dropped " << droppedBlocks << " blocks. Dropped " << itsBlocksDropped << " blocks and written " << itsBlocksWritten << " blocks so far.");
 
-        itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPED  + '[' + lexical_cast<string>(itsStreamNr) + ']',
+        itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPED + streamNrStr,
                         itsBlocksDropped * static_cast<float>(itsParset.settings.blockDuration()));
       }
 
       itsNextSequenceNumber = data->sequenceNumber() + 1;
       itsBlocksWritten++;
 
-      itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPING + '[' + lexical_cast<string>(itsStreamNr) + ']',
+      itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPING + streamNrStr,
                       droppedBlocks > 0); // logged too late if dropping: not anymore...
-      itsMdLogger.log(itsMdKeyPrefix + PN_COP_WRITTEN  + '[' + lexical_cast<string>(itsStreamNr) + ']',
+      itsMdLogger.log(itsMdKeyPrefix + PN_COP_WRITTEN  + streamNrStr,
                       itsBlocksWritten * static_cast<float>(itsParset.settings.blockDuration()));
     }
 
@@ -180,6 +182,27 @@ namespace LOFAR
     }
 
 
+    template<typename T>
+    void OutputThread<T>::logInitialStreamMetadataEvents(const string& dataProductType,
+                                                         const string& fileName,
+                                                         const string& directoryName)
+    {
+      // Write data points wrt @dataProductType output file for monitoring (PVSS).
+      const string streamNrStr = '[' + lexical_cast<string>(itsStreamNr) + ']';
+
+      itsMdLogger.log(itsMdKeyPrefix + PN_COP_DATA_PRODUCT_TYPE + streamNrStr, dataProductType);
+      itsMdLogger.log(itsMdKeyPrefix + PN_COP_FILE_NAME         + streamNrStr, fileName);
+      itsMdLogger.log(itsMdKeyPrefix + PN_COP_DIRECTORY         + streamNrStr, directoryName);
+
+      // After obs start these dynarray data points are written conditionally, so init.
+      // While we only have to write the last index (PVSSGateway will zero the rest),
+      // we'd have to find out who has the last subband. Don't bother, just init all.
+      itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPING + streamNrStr, 0);
+      itsMdLogger.log(itsMdKeyPrefix + PN_COP_WRITTEN  + streamNrStr, 0.0f);
+      itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPED  + streamNrStr, 0.0f);
+    }
+
+
     template<typename T> void OutputThread<T>::cleanUp() const
     {
       float dropPercent = itsBlocksWritten + itsBlocksDropped == 0 ? 0.0 : (100.0 * itsBlocksDropped) / (itsBlocksWritten + itsBlocksDropped);
@@ -260,18 +283,14 @@ namespace LOFAR
 
       const std::string path = directoryName + "/" + fileName;
 
-   try
+      try
       {
         recursiveMakeDir(directoryName, itsLogPrefix);
         LOG_INFO_STR(itsLogPrefix << "Writing to " << path);
 
         itsWriter = new MSWriterCorrelated(itsLogPrefix, path, itsParset, itsStreamNr);
 
-        // Write data points wrt correlated output file for monitoring (PVSS)
-        // once we know the file could at least be created.
-        itsMdLogger.log(itsMdKeyPrefix + PN_COP_DATA_PRODUCT_TYPE + '[' + lexical_cast<string>(itsStreamNr) + ']', "Correlated");
-        itsMdLogger.log(itsMdKeyPrefix + PN_COP_FILE_NAME         + '[' + lexical_cast<string>(itsStreamNr) + ']', fileName);
-        itsMdLogger.log(itsMdKeyPrefix + PN_COP_DIRECTORY         + '[' + lexical_cast<string>(itsStreamNr) + ']', directoryName);
+        logInitialStreamMetadataEvents("Correlated", fileName, directoryName);
       } 
       catch (Exception &ex) 
       {
@@ -341,11 +360,7 @@ namespace LOFAR
         itsWriter = new MSWriterFile(path);
 #endif
 
-        // Write data points for beamformed output file for monitoring (PVSS)
-        // once we know the file could at least be created.
-        itsMdLogger.log(itsMdKeyPrefix + PN_COP_DATA_PRODUCT_TYPE + '[' + lexical_cast<string>(itsStreamNr) + ']', "Beamformed");
-        itsMdLogger.log(itsMdKeyPrefix + PN_COP_FILE_NAME         + '[' + lexical_cast<string>(itsStreamNr) + ']', fileName);
-        itsMdLogger.log(itsMdKeyPrefix + PN_COP_DIRECTORY         + '[' + lexical_cast<string>(itsStreamNr) + ']', directoryName);
+        logInitialStreamMetadataEvents("Beamformed", fileName, directoryName);
       }
       catch (Exception &ex)
       {
diff --git a/RTCP/Cobalt/OutputProc/src/OutputThread.h b/RTCP/Cobalt/OutputProc/src/OutputThread.h
index efce8fc85ce..d7206ea1fa4 100644
--- a/RTCP/Cobalt/OutputProc/src/OutputThread.h
+++ b/RTCP/Cobalt/OutputProc/src/OutputThread.h
@@ -81,6 +81,9 @@ namespace LOFAR
     protected:
       void checkForDroppedData(StreamableData *);
       void doWork();
+      void logInitialStreamMetadataEvents(const std::string& dataProductType,
+                                          const std::string& fileName,
+                                          const std::string& directoryName);
 
       const Parset &itsParset;
       const unsigned itsStreamNr;
@@ -113,7 +116,7 @@ namespace LOFAR
                           const std::string &logPrefix,
                           const std::string &targetDirectory = "");
 
-      void           createMS();
+      virtual void createMS();
     };
 
 
@@ -131,7 +134,7 @@ namespace LOFAR
                       const std::string &logPrefix,
                       const std::string &targetDirectory = "");
 
-      void           createMS();
+      virtual void createMS();
     };
 
 
-- 
GitLab