From 43a27e1f3d4ff4f0862e042e29d18cfba649e1a1 Mon Sep 17 00:00:00 2001
From: Jan David Mol <mol@astron.nl>
Date: Tue, 4 Dec 2012 21:27:07 +0000
Subject: [PATCH] Task #3696: RSPBoard sub class refactored

---
 RTCP/InputProc/src/Station.h | 239 +++++++++++++++++++----------------
 1 file changed, 132 insertions(+), 107 deletions(-)

diff --git a/RTCP/InputProc/src/Station.h b/RTCP/InputProc/src/Station.h
index 859893fd195..44e2b1476f7 100644
--- a/RTCP/InputProc/src/Station.h
+++ b/RTCP/InputProc/src/Station.h
@@ -30,14 +30,13 @@ protected:
 
   /* Receives input of one RSP board and stores it in shared memory. */
 
-  class RSPBoard {
+  class StreamReader {
   public:
-    RSPBoard( const std::string &streamDescriptor, SampleBuffer<T> &buffer, unsigned boardNr, const struct BufferSettings &settings );
+    StreamReader( const std::string &logPrefix, const std::string &streamDescriptor, SampleBuffer<T> &buffer, const struct BufferSettings &settings );
 
-    // RSP board number
-    const unsigned nr;
-
-    void processBoard();
+    // Reads a packet from the input stream. Returns true if a packet was
+    // succesfully read.
+    bool readPacket( struct RSP &packet );
 
     void logStatistics();
 
@@ -46,24 +45,33 @@ protected:
 
     SmartPtr<Stream> inputStream;
     bool supportPartialReads;
-
-    SampleBuffer<T> &buffer;
-    Ranges &flags;
     const struct BufferSettings settings;
-    const size_t firstBeamlet;
 
     size_t nrReceived, nrBadSize, nrBadTime, nrBadData, nrBadMode;
+  };
 
-    // Reads a packet from the input stream. Returns true if a packet was
-    // succesfully read.
-    bool readPacket( struct RSP &packet );
+  class BufferWriter {
+  public:
+    BufferWriter( const std::string &logPrefix, SampleBuffer<T> &buffer, size_t firstBeamlet, const struct BufferSettings &settings );
 
-    // Write the packet to the SampleBuffer
+    // Write a packet to the SampleBuffer
     void writePacket( const struct RSP &packet );
+
+    void logStatistics();
+
+  private:
+    const std::string logPrefix;
+
+    SampleBuffer<T> &buffer;
+    Ranges &flags;
+    const struct BufferSettings settings;
+    const size_t firstBeamlet;
+
+    size_t nrWritten;
   };
 
-  std::vector< SmartPtr<Stream> > streams;
-  std::vector< SmartPtr< RSPBoard<T> > > boards;
+  std::vector< SmartPtr<StreamReader> > readers;
+  std::vector< SmartPtr<BufferWriter> > writers;
 
   // process data for this board until interrupted or end of data
   virtual void processBoard( size_t nr );
@@ -74,25 +82,34 @@ protected:
 
 template<typename T> Station<T>::Station( const BufferSettings &settings, const std::vector<std::string> &streamDescriptors )
 :
-  StationStreams(str(boost::format("[station %s %s] [Station] ") % settings.station.stationName % settings.station.antennaSet), settings, streamDescriptors),
+  StationStreams(str(boost::format("[station %s %s] ") % settings.station.stationName % settings.station.antennaSet), settings, streamDescriptors),
 
   buffer(settings, true),
-  streams(nrBoards, 0),
-  boards(nrBoards, 0)
+  readers(nrBoards, 0),
+  writers(nrBoards, 0)
 {
   LOG_INFO_STR( logPrefix << "Initialised" );
 }
 
 template<typename T> void Station<T>::processBoard( size_t nr )
 {
-  const std::string logPrefix(str(boost::format("[station %s %s board %u] [Station] ") % settings.station.stationName % settings.station.antennaSet % nr));
+  const std::string logPrefix(str(boost::format("%s [board %u] ") % this->logPrefix % nr));
 
   try {
+    LOG_INFO_STR( logPrefix << "Connecting to " << streamDescriptors[nr] );
+    readers[nr] = new StreamReader<T>(logPrefix, streamDescriptors[nr], settings);
+
     LOG_INFO_STR( logPrefix << "Connecting to shared memory buffer 0x" << std::hex << settings.dataKey );
-    boards[nr] = new RSPBoard<T>(streamDescriptors[nr], buffer, nr, settings);
+    size_t firstBeamlet = settings.nrBeamlets / settings.nrBoards * nr;
+    writers[nr] = new BufferWriter<T>(logPrefix, buffer, firstBeamlet, settings);
 
     LOG_INFO_STR( logPrefix << "Start" );
-    boards[nr]->processBoard();
+
+    struct RSP packet;
+
+    for(;;)
+      if (readers[nr]->readPacket(packet))
+        writers[nr]->writePacket(packet);
 
   } catch (Stream::EndOfStreamException &ex) {
     LOG_INFO_STR( logPrefix << "End of stream");
@@ -111,22 +128,20 @@ template<typename T> void Station<T>::processBoard( size_t nr )
 
 template<typename T> void Station<T>::logStatistics()
 {
-  for (size_t nr = 0; nr < boards.size(); nr++)
-    if (boards[nr].get())
-      boards[nr]->logStatistics();
+  for (size_t nr = 0; nr < boards.size(); nr++) {
+    if (readers[nr].get())
+      readers[nr]->logStatistics();
+
+    if (writers[nr].get())
+      writers[nr]->logStatistics();
+  }
 }
 
 
-template<typename T> Station<T>::RSPBoard::RSPBoard( const std::string &streamDescriptor, SampleBuffer<T> &buffer, unsigned boardNr, const struct BufferSettings &settings )
+template<typename T> Station<T>::StreamReader::StreamReader( const std::string &logPrefix, const std::string &streamDescriptor, const struct BufferSettings &settings )
 :
-  nr(boardNr),
-  logPrefix(str(boost::format("[station %s %s board %u] [RSPBoard] ") % settings.station.stationName % settings.station.antennaSet % nr)),
-  last_logtime(0),
-
-  buffer(buffer),
-  flags(buffer.flags[boardNr]),
+  logPrefix(str(boost::format("%s [StreamReader] ") % logPrefix)),
   settings(settings),
-  firstBeamlet(settings.nrBeamlets / settings.nrBoards * boardNr),
 
   nrReceived(0),
   nrBadSize(0),
@@ -134,9 +149,6 @@ template<typename T> Station<T>::RSPBoard::RSPBoard( const std::string &streamDe
   nrBadData(0),
   nrBadMode(0)
 {
-  ASSERTSTR(boardNr < settings.nrBoards, "Invalid board number: " << boardNr << ", nrBoards: " << settings.nrBoards);
-
-  LOG_INFO_STR( logPrefix << "Connecting to " << streamDescriptor );
   inputStream = createStream(streamDescriptor, true);
 
   SocketStream *asSocket = dynamic_cast<SocketStream *>(inputStream.get());
@@ -145,12 +157,90 @@ template<typename T> Station<T>::RSPBoard::RSPBoard( const std::string &streamDe
   supportPartialReads = !isUDP;
 }
 
-template<typename T> void Station<T>::RSPBoard::writePacket( const struct RSP &packet )
+
+template<typename T> bool Station<T>::StreamReader::readPacket( struct RSP &packet )
+{
+  if (supportPartialReads) {
+    // read header first
+    inputStream->read(&packet, sizeof(struct RSP::Header));
+
+    // read rest of packet
+    inputStream->read(&packet.data, packet.packetSize() - sizeof(struct RSP::Header));
+
+    ++nrReceived;
+  } else {
+    // read full packet at once -- numbytes will tell us how much we've actually read
+    size_t numbytes = inputStream->tryRead(&packet, sizeof packet);
+
+    ++nrReceived;
+
+    if( numbytes < sizeof(struct RSP::Header)
+     || numbytes != packet.packetSize() ) {
+      LOG_WARN_STR( logPrefix << "Packet is " << numbytes << " bytes, but should be " << packet.packetSize() << " bytes" );
+
+      ++nrBadSize;
+      return false;
+    }
+  }
+
+  // illegal timestamp means illegal packet
+  if (packet.header.timestamp == ~0U) {
+    ++nrBadTime;
+    return false;
+  }
+
+  // check sanity of packet
+
+  // discard packets with errors
+  if (packet.payloadError()) {
+    ++nrBadData;
+    return false;
+  }
+
+  // check whether the station configuration matches ours
+  if (packet.clockMHz() * 1000000 != settings.station.clock
+   || packet.bitMode() != settings.station.bitmode) {
+    ++nrBadMode;
+    return false;
+  }
+
+  return true;
+}
+
+
+template<typename T> void Station<T>::StreamReader::logStatistics()
+{
+  LOG_INFO_STR( logPrefix << "Received " << nrReceived << " packets: " << nrBadTime << " bad timestamps, " << nrBadSize << " bad sizes, " << nrBadData << " payload errors, " << nrBadMode << " clock/bitmode errors" );
+
+  nrReceived = 0;
+  nrBadTime = 0;
+  nrBadSize = 0;
+  nrBadData = 0;
+  nrBadMode = 0;
+}
+
+
+template<typename T> Station<T>::BufferWriter::BufferWriter( const std::string &logPrefix, SampleBuffer<T> &buffer, size_t firstBeamlet, const struct BufferSettings &settings )
+:
+  logPrefix(str(boost::format("%s [BufferWriter] ") % logPrefix)),
+
+  buffer(buffer),
+  flags(buffer.flags[boardNr]),
+  settings(settings),
+  firstBeamlet(firstBeamlet),
+
+  nrWritten(0)
+{
+}
+
+
+template<typename T> void Station<T>::BufferWriter::writePacket( const struct RSP &packet )
 {
   const uint8 &nrBeamlets  = packet.header.nrBeamlets;
   const uint8 &nrTimeslots = packet.header.nrBlocks;
 
-  ASSERT( nrBeamlets <= settings.nrBeamlets / settings.nrBoards );
+  // should not exceed the number of beamlets in the buffer
+  ASSERT( firstBeamlet + nrBeamlets < settings.nrBeamlets );
 
   const TimeStamp timestamp(packet.header.timestamp, packet.header.blockSequenceNumber, settings.station.clock);
 
@@ -184,81 +274,16 @@ template<typename T> void Station<T>::RSPBoard::writePacket( const struct RSP &p
 
   // mark as valid
   flags.include(timestamp, timestamp + nrTimeslots);
-}
-
-template<typename T> bool Station<T>::RSPBoard::readPacket( struct RSP &packet )
-{
-  bool valid = true;
 
-  if (supportPartialReads) {
-    // read header first
-    inputStream->read(&packet, sizeof(struct RSP::Header));
-
-    // read rest of packet
-    inputStream->read(&packet.data, packet.packetSize() - sizeof(struct RSP::Header));
-
-    ++nrReceived;
-  } else {
-    // read full packet at once -- numbytes will tell us how much we've actually read
-    size_t numbytes = inputStream->tryRead(&packet, sizeof packet);
-
-    ++nrReceived;
-
-    if( numbytes < sizeof(struct RSP::Header)
-     || numbytes != packet.packetSize() ) {
-      LOG_WARN_STR( logPrefix << "Packet is " << numbytes << " bytes, but should be " << packet.packetSize() << " bytes" );
-
-      ++nrBadSize;
-      valid = false;
-    }
-  }
-
-  // illegal timestamp means illegal packet
-  if (valid && packet.header.timestamp == ~0U) {
-    ++nrBadTime;
-    valid = false;
-  }
-
-  if (valid) {
-    // check sanity of packet
-
-    // discard packets with errors
-    if (packet.payloadError()) {
-      ++nrBadData;
-      valid = false;
-    }
-
-    // check whether the station configuration matches ours
-    if (packet.clockMHz() * 1000000 != settings.station.clock
-     || packet.bitMode() != settings.station.bitmode) {
-      ++nrBadMode;
-      valid = false;
-    }
-  }
-
-  return valid;
-}
-
-
-template<typename T> void Station<T>::RSPBoard::processBoard()
-{
-  struct RSP packet;
-
-  for(;;)
-    if (readPacket(packet))
-      writePacket(packet);
+  ++nrWritten;
 }
 
 
-template<typename T> void Station<T>::RSPBoard::logStatistics()
+template<typename T> void Station<T>::BufferWriter::logStatistics()
 {
-  LOG_INFO_STR( logPrefix << "Received " << nrReceived << " packets: " << nrBadTime << " bad timestamps, " << nrBadSize << " bad sizes, " << nrBadData << " payload errors, " << nrBadMode << " mode errors" );
+  LOG_INFO_STR( logPrefix << "Written " << nrWritten << " packets");
 
-  nrReceived = 0;
-  nrBadTime = 0;
-  nrBadSize = 0;
-  nrBadData = 0;
-  nrBadMode = 0;
+  nrWritten = 0;
 }
 
 
-- 
GitLab