From 4d59d179accf1b151281d30d9c7ad6ac06dd0943 Mon Sep 17 00:00:00 2001
From: Jan David Mol <mol@astron.nl>
Date: Wed, 22 May 2013 11:50:14 +0000
Subject: [PATCH] Task #4471: Fixed various timing issues, and made
 tMPITransfer non-realtime

---
 .../InputProc/src/Buffer/SampleBuffer.h       |  4 +++
 RTCP/Cobalt/InputProc/src/CMakeLists.txt      |  1 +
 .../Cobalt/InputProc/src/Station/Generator.cc |  9 +++---
 RTCP/Cobalt/InputProc/src/Station/Generator.h |  4 ++-
 .../InputProc/src/Station/PacketsToBuffer.cc  | 28 +------------------
 .../InputProc/src/Station/PacketsToBuffer.h   |  2 +-
 RTCP/Cobalt/InputProc/src/Station/generate.cc |  5 +++-
 RTCP/Cobalt/InputProc/test/tBlockReader.cc    |  5 ++--
 RTCP/Cobalt/InputProc/test/tGenerator.cc      |  4 ++-
 RTCP/Cobalt/InputProc/test/tMPITransfer.cc    | 25 ++++++++++++++---
 RTCP/Cobalt/InputProc/test/tPacketWriter.cc   |  1 +
 .../Cobalt/InputProc/test/tPacketsToBuffer.cc |  3 +-
 RTCP/Cobalt/InputProc/test/tSampleBuffer.cc   |  1 +
 .../InputProc/test/tSampleBufferSync.cc       |  1 +
 14 files changed, 51 insertions(+), 42 deletions(-)

diff --git a/RTCP/Cobalt/InputProc/src/Buffer/SampleBuffer.h b/RTCP/Cobalt/InputProc/src/Buffer/SampleBuffer.h
index f99b7d35484..ec6c0a0d3e0 100644
--- a/RTCP/Cobalt/InputProc/src/Buffer/SampleBuffer.h
+++ b/RTCP/Cobalt/InputProc/src/Buffer/SampleBuffer.h
@@ -132,6 +132,10 @@ namespace LOFAR
 
       std::vector<Board> boards;
     };
+
+    // Removes the sample buffers that correspond to settings.dataKey,
+    // as well as any sample buffer that refers to the same station.
+    void removeSampleBuffers( const BufferSettings &settings );
   }
 }
 
diff --git a/RTCP/Cobalt/InputProc/src/CMakeLists.txt b/RTCP/Cobalt/InputProc/src/CMakeLists.txt
index 4b34a30bd59..0c54f3870da 100644
--- a/RTCP/Cobalt/InputProc/src/CMakeLists.txt
+++ b/RTCP/Cobalt/InputProc/src/CMakeLists.txt
@@ -10,6 +10,7 @@ lofar_add_library(inputproc
   RSPBoards.cc
   Buffer/BufferSettings.cc
   Buffer/Ranges.cc
+  Buffer/SampleBuffer.cc
   Buffer/SharedMemory.cc
   Buffer/StationID.cc
   Delays/Delays.cc
diff --git a/RTCP/Cobalt/InputProc/src/Station/Generator.cc b/RTCP/Cobalt/InputProc/src/Station/Generator.cc
index dfcf249f48d..6fec9c57eac 100644
--- a/RTCP/Cobalt/InputProc/src/Station/Generator.cc
+++ b/RTCP/Cobalt/InputProc/src/Station/Generator.cc
@@ -35,13 +35,15 @@ namespace LOFAR
   namespace Cobalt
   {
 
-    Generator::Generator( const BufferSettings &settings, const std::vector< SmartPtr<Stream> > &outputStreams_, PacketFactory &packetFactory )
+    Generator::Generator( const BufferSettings &settings, const std::vector< SmartPtr<Stream> > &outputStreams_, PacketFactory &packetFactory, const TimeStamp &from, const TimeStamp &to )
       :
       RSPBoards(str(boost::format("[station %s %s] [Generator] ") % settings.station.stationName % settings.station.antennaField), outputStreams_.size()),
       settings(settings),
       outputStreams(outputStreams_.size()),
       packetFactory(packetFactory),
-      nrSent(nrBoards, 0)
+      nrSent(nrBoards, 0),
+      from(from),
+      to(to)
     {
       for (size_t i = 0; i < outputStreams.size(); ++i) {
         outputStreams[i] = outputStreams_[i];
@@ -60,8 +62,7 @@ namespace LOFAR
 
         LOG_INFO_STR( logPrefix << "Start" );
 
-        TimeStamp current(time(0L) + 1, 0, settings.station.clockMHz * 1000000);
-        for(;; ) {
+        for(TimeStamp current = from; !to || current < to; /* increment in loop */ ) {
           struct RSP packet;
 
           // generate packet
diff --git a/RTCP/Cobalt/InputProc/src/Station/Generator.h b/RTCP/Cobalt/InputProc/src/Station/Generator.h
index 43360a4f55d..485ed8a74eb 100644
--- a/RTCP/Cobalt/InputProc/src/Station/Generator.h
+++ b/RTCP/Cobalt/InputProc/src/Station/Generator.h
@@ -44,7 +44,7 @@ namespace LOFAR
     class Generator : public RSPBoards
     {
     public:
-      Generator( const BufferSettings &settings, const std::vector< SmartPtr<Stream> > &outputStreams, PacketFactory &packetFactory );
+      Generator( const BufferSettings &settings, const std::vector< SmartPtr<Stream> > &outputStreams, PacketFactory &packetFactory, const TimeStamp &from, const TimeStamp &to );
 
     protected:
       const BufferSettings settings;
@@ -53,6 +53,8 @@ namespace LOFAR
 
       std::vector<size_t> nrSent;
 
+      const TimeStamp from, to;
+
       virtual void processBoard( size_t nr );
       virtual void logStatistics();
     };
diff --git a/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.cc b/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.cc
index 27ad3ddd517..0ce0336da24 100644
--- a/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.cc
+++ b/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.cc
@@ -37,7 +37,7 @@ namespace LOFAR
   {
 
 
-    PacketsToBuffer::PacketsToBuffer( Stream &inputStream, const BufferSettings &settings, unsigned boardNr, bool cleanup )
+    PacketsToBuffer::PacketsToBuffer( Stream &inputStream, const BufferSettings &settings, unsigned boardNr )
       :
       logPrefix(str(boost::format("[station %s board %u] [PacketsToBuffer] ") % settings.station.stationName % boardNr)),
       inputStream(inputStream),
@@ -46,32 +46,6 @@ namespace LOFAR
       boardNr(boardNr)
     {
       LOG_INFO_STR( logPrefix << "Initialised" );
-
-      if (cleanup) {
-        /*
-         * Make sure there are no lingering SHM buffers for this
-         * station from previous runs.
-         */
-
-        // Remove the provided dataKey, as it could be a custom setting
-        SharedMemoryArena::remove(settings.dataKey);
-
-        // Remove the keys of all possible configurations
-        StationID station = settings.station;
-
-        const unsigned bitmodes[] = { 4, 8, 16 };
-        const unsigned clocks[]   = { 160, 200 };
-
-        for (size_t b = 0; b < sizeof bitmodes / sizeof bitmodes[0]; ++b) {
-          for (size_t c = 0; c < sizeof clocks / sizeof clocks[0]; ++c) {
-            station.bitMode  = bitmodes[b];
-            station.clockMHz = clocks[c];
-            
-            // Remove any lingering buffer for this mode
-            SharedMemoryArena::remove(station.hash());
-          }
-        }
-      }
     }
 
 
diff --git a/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.h b/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.h
index a0010453b3f..92cabc57384 100644
--- a/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.h
+++ b/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.h
@@ -54,7 +54,7 @@ namespace LOFAR
     public:
       // cleanup: if true, erase any existing SHM areas corresponding to our
       // station.
-      PacketsToBuffer( Stream &inputStream, const BufferSettings &settings, unsigned boardNr, bool cleanup = true );
+      PacketsToBuffer( Stream &inputStream, const BufferSettings &settings, unsigned boardNr );
 
       // Process data for this board until interrupted or end of data. Auto-senses
       // mode (bit mode & clock).
diff --git a/RTCP/Cobalt/InputProc/src/Station/generate.cc b/RTCP/Cobalt/InputProc/src/Station/generate.cc
index 0e42a9e81af..662ee885577 100644
--- a/RTCP/Cobalt/InputProc/src/Station/generate.cc
+++ b/RTCP/Cobalt/InputProc/src/Station/generate.cc
@@ -59,8 +59,11 @@ int main( int argc, char **argv )
   struct StationID stationID(stationName, "LBA", 200, 16);
   struct BufferSettings settings(stationID, false);
 
+  const TimeStamp from(time(0), 0, stationID.clockMHz * 1000000);
+  const TimeStamp to(0);
+
   PacketFactory factory(settings);
-  Generator g(settings, outputStreams, factory);
+  Generator g(settings, outputStreams, factory, from, to);
 
   // Generate packets
   g.process();
diff --git a/RTCP/Cobalt/InputProc/test/tBlockReader.cc b/RTCP/Cobalt/InputProc/test/tBlockReader.cc
index 8342765d406..3c22775beb3 100644
--- a/RTCP/Cobalt/InputProc/test/tBlockReader.cc
+++ b/RTCP/Cobalt/InputProc/test/tBlockReader.cc
@@ -1,4 +1,4 @@
-/* PacketsToBuffer.cc
+/* tBlockReader.cc
  * Copyright (C) 2013  ASTRON (Netherlands Institute for Radio Astronomy)
  * P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
  *
@@ -112,7 +112,7 @@ void test( struct BufferSettings &settings, const std::string &filename )
   FileStream fs(filename);
 
   // Set up transfer
-  PacketsToBuffer transfer(fs, settings, 0, false);
+  PacketsToBuffer transfer(fs, settings, 0);
 
   // Do transfer
   transfer.process();
@@ -152,6 +152,7 @@ int main()
 
   // Use a fixed key, so the test suite knows what to clean
   settings.dataKey = 0x10000001;
+  removeSampleBuffers(settings);
 
   // Limit the array in size to work on systems with only 32MB SHM
   settings.nrBoards = 1;
diff --git a/RTCP/Cobalt/InputProc/test/tGenerator.cc b/RTCP/Cobalt/InputProc/test/tGenerator.cc
index 55f5897822c..ee45c92f667 100644
--- a/RTCP/Cobalt/InputProc/test/tGenerator.cc
+++ b/RTCP/Cobalt/InputProc/test/tGenerator.cc
@@ -69,8 +69,10 @@ int main( int, char **argv )
   struct StationID stationID("RS106", "LBA", 200, 16);
   struct BufferSettings settings(stationID, false);
 
+  const TimeStamp from(time(0), 0, stationID.clockMHz * 1000000);
+  const TimeStamp to = from + NUMPACKETS * 16; /* 16 timeslots/packet */
   PacketFactory factory(settings);
-  Generator g(settings, outputStreams, factory);
+  Generator g(settings, outputStreams, factory, from, to);
 
   bool error = false;
 
diff --git a/RTCP/Cobalt/InputProc/test/tMPITransfer.cc b/RTCP/Cobalt/InputProc/test/tMPITransfer.cc
index 87d808d3e8e..9a9e096bf87 100644
--- a/RTCP/Cobalt/InputProc/test/tMPITransfer.cc
+++ b/RTCP/Cobalt/InputProc/test/tMPITransfer.cc
@@ -69,8 +69,8 @@ Exception::TerminateHandler t(Exception::terminate);
 const size_t clockMHz = 200;
 const size_t clockHz = clockMHz * 1000 * 1000;
 typedef SampleType<i16complex> SampleT;
-const TimeStamp from(time(0L) + 10, 0, clockHz);
-const TimeStamp to(time(0L) + 10 + DURATION, 0, clockHz);
+TimeStamp from;
+TimeStamp to;
 const size_t blockSize = BLOCKSIZE * clockHz / 1024;
 map<int, std::vector<size_t> > beamletDistribution;
 const size_t nrHistorySamples = 16;
@@ -94,6 +94,15 @@ void sender()
   struct StationID stationID(str(format("CS%03d") % rank), "LBA", clockMHz, 16);
   struct BufferSettings settings(stationID, false);
 
+  // sync readers and writers to prevent data loss
+  // if the reader is delayed w.r.t. the generator
+
+  SmartPtr<SyncLock> syncLock;
+
+  syncLock = new SyncLock(settings);
+  settings.syncLock = syncLock;
+  settings.sync = true;
+
   omp_set_nested(true);
   //omp_set_num_threads(32);
   OMPThread::init();
@@ -115,9 +124,11 @@ void sender()
     }
   }
 
+  removeSampleBuffers(settings);
+
   MultiPacketsToBuffer station( settings, inputStreams );
   PacketFactory factory( settings );
-  Generator generator( settings, outputStreams, factory );
+  Generator generator( settings, outputStreams, factory, from - nrHistorySamples, to );
 
   #pragma omp parallel sections
   {
@@ -192,7 +203,7 @@ void receiver()
     // validate meta data
     for (int s = 0; s < nrStations; ++s) {
       for (size_t b = 0; b < nrBeamlets; ++b) {
-        ASSERT(blocks[s].beamlets[b].metaData.flags == metaData.flags);
+        ASSERTSTR(blocks[s].beamlets[b].metaData.flags == metaData.flags, "Got flags " << blocks[s].beamlets[b].metaData.flags << " but expected flags " << metaData.flags);
       }
     }
 
@@ -233,6 +244,12 @@ int main( int argc, char **argv )
   // Need at least one sender and one receiver
   ASSERT( nrHosts >= 2 );
 
+  // agree on the start time
+  time_t now = time(0L);
+  MPI_Bcast(&now, sizeof now, MPI_CHAR, 0, MPI_COMM_WORLD);
+  from = TimeStamp(now + 5, 0, clockHz);
+  to   = TimeStamp(now + 5 + DURATION, 0, clockHz);
+
   // Use half of the nodes as stations
   nrStations = nrHosts/2;
   nrReceivers = nrHosts - nrStations;
diff --git a/RTCP/Cobalt/InputProc/test/tPacketWriter.cc b/RTCP/Cobalt/InputProc/test/tPacketWriter.cc
index 4a7019e0d5d..f97c7bea15b 100644
--- a/RTCP/Cobalt/InputProc/test/tPacketWriter.cc
+++ b/RTCP/Cobalt/InputProc/test/tPacketWriter.cc
@@ -111,6 +111,7 @@ int main()
 
   // Use a fixed key, so the test suite knows what to clean
   settings.dataKey = 0x10000003;
+  removeSampleBuffers(settings);
 
   // Limit the array in size to work on systems with only 32MB SHM
   settings.nrBoards = 1;
diff --git a/RTCP/Cobalt/InputProc/test/tPacketsToBuffer.cc b/RTCP/Cobalt/InputProc/test/tPacketsToBuffer.cc
index fe777f80f69..8edd3719f3e 100644
--- a/RTCP/Cobalt/InputProc/test/tPacketsToBuffer.cc
+++ b/RTCP/Cobalt/InputProc/test/tPacketsToBuffer.cc
@@ -47,7 +47,7 @@ void test( struct BufferSettings &settings, const std::string &filename )
   FileStream fs(filename);
 
   // Set up transfer
-  PacketsToBuffer transfer(fs, settings, 0, false);
+  PacketsToBuffer transfer(fs, settings, 0);
 
   // Do transfer
   transfer.process();
@@ -73,6 +73,7 @@ int main()
 
   // Use a fixed key, so the test suite knows what to clean
   settings.dataKey = 0x10000002;
+  removeSampleBuffers(settings);
 
   // Limit the array in size to work on systems with only 32MB SHM
   settings.nrBoards = 1;
diff --git a/RTCP/Cobalt/InputProc/test/tSampleBuffer.cc b/RTCP/Cobalt/InputProc/test/tSampleBuffer.cc
index ea138714391..7c25d529c3d 100644
--- a/RTCP/Cobalt/InputProc/test/tSampleBuffer.cc
+++ b/RTCP/Cobalt/InputProc/test/tSampleBuffer.cc
@@ -57,6 +57,7 @@ int main()
 
   // Use a fixed key, so the test suite knows what to clean
   settings.dataKey = 0x10000004;
+  removeSampleBuffers(settings);
 
   // Limit the array in size to work on systems with only 32MB SHM
   settings.nrBoards = 1;
diff --git a/RTCP/Cobalt/InputProc/test/tSampleBufferSync.cc b/RTCP/Cobalt/InputProc/test/tSampleBufferSync.cc
index 7883a138447..120f7d91e3c 100644
--- a/RTCP/Cobalt/InputProc/test/tSampleBufferSync.cc
+++ b/RTCP/Cobalt/InputProc/test/tSampleBufferSync.cc
@@ -57,6 +57,7 @@ void initBoard()
 
   // Use a fixed key, so the test suite knows what to clean
   settings.dataKey = 0x10000005;
+  removeSampleBuffers(settings);
 
   // Limit the array in size to work on systems with only 32MB SHM
   settings.nrBoards = 1;
-- 
GitLab