From d21bffba668a4b1a92001d12e7106f318b64e7fe Mon Sep 17 00:00:00 2001
From: Jan David Mol <mol@astron.nl>
Date: Tue, 1 Jun 2021 13:47:48 +0200
Subject: [PATCH] TMSS-821: Report the actual last processed block when
 reporting the end time of the observation in the LTA Feedback. This is to be
 able to account the actual telescope use for this observation.

---
 RTCP/Cobalt/CoInterface/src/LTAFeedback.cc |  9 ++++++---
 RTCP/Cobalt/CoInterface/src/LTAFeedback.h  |  4 +++-
 RTCP/Cobalt/CoInterface/src/Parset.cc      | 12 +++++++++++-
 RTCP/Cobalt/CoInterface/src/Parset.h       |  4 ++++
 RTCP/Cobalt/GPUProc/src/MPIReceiver.cc     | 19 ++++++++++---------
 RTCP/Cobalt/GPUProc/src/MPIReceiver.h      |  7 +++++--
 RTCP/Cobalt/GPUProc/src/rtcp.cc            |  7 +++++--
 7 files changed, 44 insertions(+), 18 deletions(-)

diff --git a/RTCP/Cobalt/CoInterface/src/LTAFeedback.cc b/RTCP/Cobalt/CoInterface/src/LTAFeedback.cc
index 93c3d008882..d4d28ed3fe5 100644
--- a/RTCP/Cobalt/CoInterface/src/LTAFeedback.cc
+++ b/RTCP/Cobalt/CoInterface/src/LTAFeedback.cc
@@ -44,8 +44,9 @@ namespace LOFAR
     std::string LTAFeedback::feedback_version(boost::str(boost::format("%02d.%02d.%02d") %
                                         major_version % minor_version % patch_number));
       
-    LTAFeedback::LTAFeedback(const ObservationSettings &settings):
-      settings(settings)
+    LTAFeedback::LTAFeedback(const ObservationSettings &settings, ssize_t last_block):
+      settings(settings),
+      last_block(last_block)
     {
     }
 
@@ -246,7 +247,9 @@ namespace LOFAR
 
       // actual start/end times that we used
       ps.add("Observation.startTime", toUTC(settings.startTime));
-      ps.add("Observation.endTime", toUTC(settings.getRealStopTime()));
+
+      // make sure we don't report a stopTime < startTime if we stopped during block -1.
+      ps.add("Observation.stopTime", toUTC(std::max(settings.startTime, settings.getBlockEndTime(last_block))));
 
       ps.add("Observation.DataProducts.nrOfOutput_Correlated_", 
              str(format("%u") % (settings.correlator.enabled ? settings.correlator.files.size() : 0)));
diff --git a/RTCP/Cobalt/CoInterface/src/LTAFeedback.h b/RTCP/Cobalt/CoInterface/src/LTAFeedback.h
index 8005e83cc86..0d68f085c4e 100644
--- a/RTCP/Cobalt/CoInterface/src/LTAFeedback.h
+++ b/RTCP/Cobalt/CoInterface/src/LTAFeedback.h
@@ -33,7 +33,8 @@ namespace LOFAR
     class LTAFeedback
     {
     public:
-      LTAFeedback(const ObservationSettings &settings);
+      // Generate feedback for these parset settings, given that "last_block" was the last block processed.
+      LTAFeedback(const ObservationSettings &settings, ssize_t last_block);
 
       // Subset name of each correlated and beamFormed file, respectively.
       static std::string correlatedPrefix(size_t fileno);
@@ -66,6 +67,7 @@ namespace LOFAR
 
     private:
       const ObservationSettings settings;
+      const ssize_t last_block;
     };
   } // namespace Cobalt
 } // namespace LOFAR
diff --git a/RTCP/Cobalt/CoInterface/src/Parset.cc b/RTCP/Cobalt/CoInterface/src/Parset.cc
index 6084f1d7724..e629028159f 100644
--- a/RTCP/Cobalt/CoInterface/src/Parset.cc
+++ b/RTCP/Cobalt/CoInterface/src/Parset.cc
@@ -1790,7 +1790,17 @@ namespace LOFAR
     // because we process in blocks.
     double ObservationSettings::getRealStopTime() const 
     {
-      return startTime + nrBlocks() * blockDuration();
+      return getBlockEndTime(nrBlocks() - 1);
+    }
+
+    double ObservationSettings::getBlockStartTime(ssize_t block) const
+    {
+      return startTime + block * blockDuration();
+    }
+
+    double ObservationSettings::getBlockEndTime(ssize_t block) const
+    {
+      return getBlockStartTime(block + 1);
     }
 
     std::string Parset::getHostName(OutputType outputType, unsigned streamNr) const
diff --git a/RTCP/Cobalt/CoInterface/src/Parset.h b/RTCP/Cobalt/CoInterface/src/Parset.h
index 1af3b7af100..16589206364 100644
--- a/RTCP/Cobalt/CoInterface/src/Parset.h
+++ b/RTCP/Cobalt/CoInterface/src/Parset.h
@@ -94,6 +94,10 @@ namespace LOFAR
       // actual stop time, which is different, as we round to block boundaries 
       double getRealStopTime() const;
 
+      // start and stop times for a given block
+      double getBlockStartTime() const;
+      double getBlockEndTime() const;
+
       // The station clock, in MHz (200 or 160)
       //
       // key: Observation.sampleClock
diff --git a/RTCP/Cobalt/GPUProc/src/MPIReceiver.cc b/RTCP/Cobalt/GPUProc/src/MPIReceiver.cc
index 070ae30541c..4b60789c31e 100644
--- a/RTCP/Cobalt/GPUProc/src/MPIReceiver.cc
+++ b/RTCP/Cobalt/GPUProc/src/MPIReceiver.cc
@@ -59,7 +59,7 @@ namespace LOFAR
       nrBitsPerSample(i_nrBitsPerSample)
     {}
 
-    template<typename SampleT> void MPIReceiver::receiveInput()
+    template<typename SampleT> ssize_t MPIReceiver::receiveInput()
     {
       NSTimer receiveTimer("MPI: Receive station data", true, true);
      
@@ -83,12 +83,13 @@ namespace LOFAR
       }
 
       bool allDone = false;
+      ssize_t block;
 
       // Receive input from StationInput::sendInputToPipeline.
       //
       // Start processing from block -1, and don't process anything if the
       // observation is empty.
-      for (ssize_t block = -1; !allDone; block++) 
+      for (block = -1; !allDone; block++) 
       {
         // Receive the samples from all subbands from the ant fields for this block.
         LOG_INFO_STR("[block " << block << "] Collecting input buffers");
@@ -130,21 +131,21 @@ namespace LOFAR
 
       // Signal end of input
       mpiPool.filled.append(NULL, false);
+
+      // This is the last block we received (or -2 if we haven't received any)
+      return block - 1;
     }
 
-    void MPIReceiver::receiveInput()
+    ssize_t MPIReceiver::receiveInput()
     {
       switch (nrBitsPerSample) {
       default:
       case 16:
-        receiveInput< SampleType<i16complex> >();
-        break;
+        return receiveInput< SampleType<i16complex> >();
       case 8:
-        receiveInput< SampleType<i8complex> >();
-        break;
+        return receiveInput< SampleType<i8complex> >();
       case 4:
-        receiveInput< SampleType<i4complex> >();
-        break;
+        return receiveInput< SampleType<i4complex> >();
       }
     }
   }
diff --git a/RTCP/Cobalt/GPUProc/src/MPIReceiver.h b/RTCP/Cobalt/GPUProc/src/MPIReceiver.h
index e13c1783e8f..9212a1db7cd 100644
--- a/RTCP/Cobalt/GPUProc/src/MPIReceiver.h
+++ b/RTCP/Cobalt/GPUProc/src/MPIReceiver.h
@@ -79,11 +79,14 @@ namespace LOFAR
       // raw MPIRecvData     
       // This function is typically started in a seperate thread
       // Internally the type of samples depends on nrBitsPerSample
-      void receiveInput();
+      //
+      // Returns the index of the last block received, or -2 if no
+      // block ever was received.
+      ssize_t receiveInput();
       
     private:
       // The templeted receive function.
-      template<typename SampleT> void receiveInput();
+      template<typename SampleT> ssize_t receiveInput();
 
       Pool<struct MPIRecvData> &mpiPool;
       const std::vector<size_t> subbandIndices;
diff --git a/RTCP/Cobalt/GPUProc/src/rtcp.cc b/RTCP/Cobalt/GPUProc/src/rtcp.cc
index 7adc34409a1..fa8c5ca5b74 100644
--- a/RTCP/Cobalt/GPUProc/src/rtcp.cc
+++ b/RTCP/Cobalt/GPUProc/src/rtcp.cc
@@ -495,6 +495,9 @@ int main(int argc, char **argv)
 
       OMPThread::ScopedName sn("observation");
 
+      // last received block index
+      ssize_t last_block = -2;
+
       if (ps.settings.realTime) {
         // Wait just before the obs starts to allocate resources,
         // both the UDP sockets and the GPU buffers!
@@ -564,7 +567,7 @@ int main(int argc, char **argv)
         {
           OMPThread::ScopedName sn("mpi recv");
 
-          MPI_receiver.receiveInput();
+          last_block = MPI_receiver.receiveInput();
         }
 
         // Retrieve items from pool and process further on
@@ -619,7 +622,7 @@ int main(int argc, char **argv)
     // send processing feedback
     ToBus bus("otdb.task.feedback.processing", broker_feedback());
 
-    LTAFeedback fb(ps.settings);
+    LTAFeedback fb(ps.settings, last_block);
 
     Protocols::TaskFeedbackProcessing msg(
       "Cobalt/GPUProc/rtcp",
-- 
GitLab