From 44fc8591479b64f22bd129b9b5b4461b3a7b4542 Mon Sep 17 00:00:00 2001
From: John Romein <romein@astron.nl>
Date: Wed, 27 Feb 2008 10:55:37 +0000
Subject: [PATCH] bug 225: enable writing of raw data (needs recompilation with
 #define DUMP_RAW_DATA)

---
 Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.cc |  15 ++
 Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.h  |   1 +
 .../CS1/CS1_IONProc/src/WH_InputSection.cc    | 131 +++++++++++++-----
 .../CEP/CS1/CS1_IONProc/src/WH_InputSection.h |   4 -
 4 files changed, 115 insertions(+), 36 deletions(-)

diff --git a/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.cc b/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.cc
index c563c325003..c0fd30fa2a4 100644
--- a/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.cc
+++ b/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.cc
@@ -140,6 +140,20 @@ void BeamletBuffer::sendSubband(TransportHolder *th, unsigned subband) /*const*/
 }
 
 
+void BeamletBuffer::sendUnalignedSubband(TransportHolder *th, unsigned subband) /*const*/
+{
+  if (itsEndI < itsStartI) {
+    // the data wraps around the allocated memory, so copy in two parts
+    unsigned firstChunk = itsSize - itsStartI;
+
+    th->sendBlocking(itsSBBuffers[subband][itsStartI].origin(), sizeof(SampleType[firstChunk][NR_POLARIZATIONS]), 0, 0);
+    th->sendBlocking(itsSBBuffers[subband][0].origin(),		sizeof(SampleType[itsEndI][NR_POLARIZATIONS]), 0, 0);
+  } else {
+    th->sendBlocking(itsSBBuffers[subband][itsStartI].origin(), sizeof(SampleType[itsEndI - itsStartI][NR_POLARIZATIONS]), 0, 0);
+  }
+}
+
+
 void BeamletBuffer::readFlags(SparseSet<unsigned> &flags)
 {
   pthread_mutex_lock(&itsValidDataMutex);
@@ -153,6 +167,7 @@ void BeamletBuffer::readFlags(SparseSet<unsigned> &flags)
 		  static_cast<unsigned>(it->end - itsBegin));
 }
 
+
 void BeamletBuffer::stopReadTransaction()
 {
   itsLockedRanges.unlock(itsStartI, itsEndI, itsSize);
diff --git a/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.h b/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.h
index 68ad3193af3..3c9e897e960 100644
--- a/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.h
+++ b/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.h
@@ -62,6 +62,7 @@ class BeamletBuffer
 
     void     startReadTransaction(const TimeStamp &begin, unsigned nrElements);
     void     sendSubband(TransportHolder *, unsigned subband) /*const*/;
+    void     sendUnalignedSubband(TransportHolder *, unsigned subband) /*const*/;
     unsigned alignmentShift() const;
     void     readFlags(SparseSet<unsigned> &flags);
     void     stopReadTransaction();
diff --git a/Appl/CEP/CS1/CS1_IONProc/src/WH_InputSection.cc b/Appl/CEP/CS1/CS1_IONProc/src/WH_InputSection.cc
index a8f732cf6e4..f1fa78b13a1 100644
--- a/Appl/CEP/CS1/CS1_IONProc/src/WH_InputSection.cc
+++ b/Appl/CEP/CS1/CS1_IONProc/src/WH_InputSection.cc
@@ -24,9 +24,8 @@
 #include <lofar_config.h>
 
 //# Includes
-#include <Common/LofarLogger.h>
+#include <Common/Timer.h>
 #include <Common/PrettyUnits.h>
-//#include <AMCBase/Epoch.h>
 #include <BGL_Personality.h>
 #include <WH_InputSection.h>
 #include <BeamletBuffer.h>
@@ -39,14 +38,23 @@
 #include <CS1_Interface/CS1_Parset.h>
 #include <CS1_Interface/RSPTimeStamp.h>
 #include <Transport/TransportHolder.h>
-//#include <tinyCEP/Sel_RoundRobin.h>
 
 #include <sys/time.h>
 
+#undef DUMP_RAW_DATA
+
+#if defined DUMP_RAW_DATA
+#include <Transport/TH_Socket.h>
+#endif
 
 namespace LOFAR {
 namespace CS1 {
 
+#if defined DUMP_RAW_DATA
+static TransportHolder *rawDataTH;
+#endif
+
+
 WH_InputSection::WH_InputSection(const string &name, 
 				 unsigned stationNumber,
 				 CS1_Parset *ps,
@@ -56,19 +64,17 @@ WH_InputSection::WH_InputSection(const string &name,
   itsInputThread(0),
   itsInputTH(inputTH),
   itsStationNr(stationNumber),
-  itsStationName(ps->stationName(stationNumber)),
   itsCS1PS(ps),
-  itsBBuffer(0),
-  itsPrePostTimer("pre/post"),
-  itsProcessTimer("process"),
-  itsGetElemTimer("getElem")
+  itsBBuffer(0)
 {
-  LOG_TRACE_FLOW_STR("WH_InputSection constructor");    
-
   // get parameters
   itsNSubbandsPerPset = itsCS1PS->nrSubbandsPerPset();
   itsNSamplesPerSec   = itsCS1PS->nrSubbandSamples();
+#if defined DUMP_RAW_DATA
+  itsNHistorySamples  = 0;
+#else
   itsNHistorySamples  = itsCS1PS->nrHistorySamples();
+#endif
 
   // create incoming dataholder holding the delay information 
   getDataManager().addInDataHolder(0, new DH_Delay("DH_Delay", itsCS1PS->nrStations()));
@@ -91,19 +97,17 @@ void WH_InputSection::startThread()
 {
   /* start up thread which writes RSP data from ethernet link
      into cyclic buffers */
-  LOG_TRACE_FLOW_STR("WH_InputSection starting thread");   
 
   ThreadArgs args;
+
   args.BBuffer            = itsBBuffer;
   args.th                 = itsInputTH;
   args.ipHeaderSize       = itsCS1PS->getInt32("OLAP.IPHeaderSize");
   args.frameHeaderSize    = itsCS1PS->getInt32("OLAP.EPAHeaderSize");
   args.nTimesPerFrame     = itsCS1PS->getInt32("OLAP.nrTimesInFrame");
   args.nSubbandsPerFrame  = itsCS1PS->getInt32("OLAP.nrSubbandsPerFrame");
-
   args.frameSize          = args.frameHeaderSize + args.nSubbandsPerFrame * args.nTimesPerFrame * sizeof(Beamlet);
 
-
 #if 0
   if (itsInputTH->getType() == "TH_File" || itsInputTH->getType() == "TH_Null") {
     // if we are reading from file, overwriting the buffer should not be allowed
@@ -118,8 +122,6 @@ void WH_InputSection::startThread()
 
 void WH_InputSection::preprocess()
 {
-  itsPrePostTimer.start();
-
   itsCurrentComputeCore = 0;
   itsNrCoresPerPset	= itsCS1PS->nrCoresPerPset();
   itsPsetNumber		= getBGLpersonality()->getPsetNum();
@@ -146,6 +148,20 @@ void WH_InputSection::preprocess()
 
   itsSampleDuration = itsCS1PS->sampleDuration();
 
+#if defined DUMP_RAW_DATA
+  vector<string> rawDataServers = itsCS1PS->getStringVector("OLAP.OLAP_Conn.rawDataServers");
+  vector<string> rawDataPorts = itsCS1PS->getStringVector("OLAP.OLAP_Conn.rawDataPorts");
+
+  if (itsStationNr >= rawDataServers.size() || itsStationNr >= rawDataPorts.size()) {
+    std::cerr << "too many stations for too few raw data servers/ports" << std::endl;
+    exit(1);
+  }
+
+  std::clog << "trying to connect raw data stream to " << rawDataServers[itsStationNr] << ':' << rawDataPorts[itsStationNr] << std::endl;
+  rawDataTH = new TH_Socket(rawDataServers[itsStationNr], std::string(rawDataPorts[itsStationNr]));
+  std::clog << "raw data stream connected" << std::endl;
+#endif
+
   startThread();
 }
 
@@ -160,11 +176,9 @@ void WH_InputSection::limitFlagsLength(SparseSet<unsigned> &flags)
 
 
 void WH_InputSection::process() 
-{ 
+{
   BGL_Command command(BGL_Command::PROCESS);
 
-  itsProcessTimer.start();
-
   TimeStamp delayedStamp = itsSyncedStamp - itsNHistorySamples;
   itsSyncedStamp += itsNSamplesPerSec;
   int samplesDelay;
@@ -196,12 +210,72 @@ void WH_InputSection::process()
   double currentTime  = tv.tv_sec + tv.tv_usec / 1e6;
   double expectedTime = (delayedStamp + (itsNSamplesPerSec + itsNHistorySamples + itsMaxNetworkDelay)) * itsSampleDuration;
   
-  std::clog << itsStationName
-	    << ' ' << delayedStamp
+  std::clog << delayedStamp
 	    << " late: " << PrettyTime(currentTime - expectedTime)
 	    << ", delay: " << samplesDelay
-	    << ", flags: " << itsIONtoCNdata.flags() << " (" << (100.0 * itsIONtoCNdata.flags().count() / (itsNSamplesPerSec + itsNHistorySamples)) << "%)" << std::endl;
+	    << ", flags: " << itsIONtoCNdata.flags() << " (" << std::setprecision(3) << (100.0 * itsIONtoCNdata.flags().count() / (itsNSamplesPerSec + itsNHistorySamples)) << "%)" << std::endl;
+
+  NSTimer timer;
+  timer.start();
+
+#if defined DUMP_RAW_DATA
+  static bool fileHeaderWritten = false;
+
+  if (!fileHeaderWritten) {
+    struct FileHeader {
+      uint32	magic;		// 0x3F8304EC, also determines endianness
+      uint8	bitsPerSample;
+      uint8	nrPolarizations;
+      uint16	nrBeamlets;
+      uint32	nrSamplesPerBeamlet;
+      char	station[16];
+      double	sampleRate;	// 156250.0 or 195312.5
+      double	subbandFrequencies[54];
+      double	beamDirections[54][2];
+    } fileHeader;  
+
+    fileHeader.magic = 0x3F8304EC;
+    fileHeader.bitsPerSample = 16;
+    fileHeader.nrPolarizations = 2;
+    fileHeader.nrBeamlets = itsCS1PS->nrSubbands();
+    fileHeader.nrSamplesPerBeamlet = itsNSamplesPerSec;
+    strncpy(fileHeader.station, itsCS1PS->stationName(itsStationNr).c_str(), 16);
+    fileHeader.sampleRate = itsCS1PS->sampleRate();
+    memcpy(fileHeader.subbandFrequencies, &itsCS1PS->refFreqs()[0], itsCS1PS->nrSubbands() * sizeof(double));
+    
+    /* TODO: fill in beams/
+    for (unsigned subband = 0; subband < itsCS1PS->nrSubbands(); subband ++)
+      fileHeader.beamDirections[subband][0] = 
+      */
+
+    rawDataTH->sendBlocking(&fileHeader, sizeof fileHeader, 0, 0);
+    fileHeaderWritten = true;
+  }
 
+  struct BlockHeader {
+    uint32	magic; // 0x2913D852
+    int32	coarseDelayApplied;
+    double	fineDelayRemainingAtBegin, fineDelayRemainingAfterEnd;
+    int64	time; // compatible with TimeStamp class.
+    uint32      nrFlagsRanges;
+    struct range {
+      uint32    begin; // inclusive
+      uint32    end;   // exclusive
+    } flagsRanges[16];
+  } blockHeader;  
+
+  blockHeader.magic = 0x2913D852;
+  blockHeader.time = delayedStamp;
+  blockHeader.coarseDelayApplied = samplesDelay;
+  blockHeader.fineDelayRemainingAtBegin = itsIONtoCNdata.delayAtBegin();
+  blockHeader.fineDelayRemainingAfterEnd = itsIONtoCNdata.delayAfterEnd();
+  itsIONtoCNdata.flags().marshall(reinterpret_cast<char *>(&blockHeader.nrFlagsRanges), sizeof blockHeader.nrFlagsRanges + sizeof blockHeader.flagsRanges);
+
+  rawDataTH->sendBlocking(&blockHeader, sizeof blockHeader, 0, 0);
+
+  for (unsigned subband = 0; subband < itsCS1PS->nrSubbands(); subband ++)
+    itsBBuffer->sendUnalignedSubband(rawDataTH, subband);
+#else
   for (unsigned subbandBase = 0; subbandBase < itsNSubbandsPerPset; subbandBase ++) {
     unsigned	    core = BGL_Mapping::mapCoreOnPset(itsCurrentComputeCore, itsPsetNumber);
     TransportHolder *th  = TH_ZoidServer::theirTHs[core];
@@ -209,22 +283,21 @@ void WH_InputSection::process()
     command.write(th);
     itsIONtoCNdata.write(th);
 
-    itsGetElemTimer.start();
-
     for (unsigned pset = 0; pset < itsCS1PS->nrPsets(); pset ++) {
       unsigned subband = itsNSubbandsPerPset * pset + subbandBase;
 
       itsBBuffer->sendSubband(th, subband);
     }
 
-    itsGetElemTimer.stop();
-
     if (++ itsCurrentComputeCore == itsNrCoresPerPset)
       itsCurrentComputeCore = 0;
   }
+#endif
 
   itsBBuffer->stopReadTransaction();
-  itsProcessTimer.stop();
+
+  timer.stop();
+  std::clog << "ION->CN: " << PrettyTime(timer.getElapsed()) << std::endl;
 }
 
 
@@ -234,12 +307,6 @@ void WH_InputSection::postprocess()
 
   delete itsInputThread;	itsInputThread	= 0;
   delete itsBBuffer;		itsBBuffer	= 0;
-
-  itsPrePostTimer.stop();
-
-  itsPrePostTimer.print(clog);
-  itsProcessTimer.print(clog);
-  itsGetElemTimer.print(clog);
 }
 
 } // namespace CS1
diff --git a/Appl/CEP/CS1/CS1_IONProc/src/WH_InputSection.h b/Appl/CEP/CS1/CS1_IONProc/src/WH_InputSection.h
index 2ee2fc8d03a..420d8eb7ef3 100644
--- a/Appl/CEP/CS1/CS1_IONProc/src/WH_InputSection.h
+++ b/Appl/CEP/CS1/CS1_IONProc/src/WH_InputSection.h
@@ -35,7 +35,6 @@
 #include <CS1_Interface/ION_to_CN.h>
 #include <BeamletBuffer.h>
 #include <InputThread.h>
-#include <Common/Timer.h>
 
 #include <boost/multi_array.hpp>
 #include <pthread.h>
@@ -76,7 +75,6 @@ class WH_InputSection: public WorkHolder {
 
     TransportHolder *itsInputTH;
     unsigned itsStationNr;
-    string   itsStationName;
     
     CS1_Parset *itsCS1PS;
     
@@ -94,8 +92,6 @@ class WH_InputSection: public WorkHolder {
    
     BeamletBuffer *itsBBuffer;
     
-    NSTimer itsPrePostTimer, itsProcessTimer, itsGetElemTimer;
-    
     void startThread();
 };
     
-- 
GitLab