From 9c6efa443fed4ca2761581476246aee4a698c9b5 Mon Sep 17 00:00:00 2001
From: zwart <sdos@astron.nl>
Date: Wed, 31 May 2006 15:19:15 +0000
Subject: [PATCH] BugID: 719

Added timers to WH_RSPInput and WH_SBCollect
several other (small) changes
---
 .../include/CS1_InputSection/WH_RSPInput.h    |  9 ++-
 .../include/CS1_InputSection/WH_SBCollect.h   |  9 +++
 .../CS1_InputSection/src/AH_InputSection.cc   |  3 +-
 .../CS1/CS1_InputSection/src/WH_RSPInput.cc   | 78 ++++++++++++++++++-
 .../CS1/CS1_InputSection/src/WH_SBCollect.cc  | 77 +++++++++++++++++-
 5 files changed, 166 insertions(+), 10 deletions(-)

diff --git a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_RSPInput.h b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_RSPInput.h
index e119e55ca41..91e4f4eb344 100644
--- a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_RSPInput.h
+++ b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_RSPInput.h
@@ -103,7 +103,6 @@ namespace LOFAR
       int itsNSubbandsPerCell;
       int itsNSamplesPerSec;
       int itsNHistorySamples;
-      int itsStationID;
      
       BeamletBuffer* itsBBuffer;
 
@@ -112,6 +111,14 @@ namespace LOFAR
       NSTimer* itsProcessTimer;
       NSTimer* itsGetElemTimer;
       
+
+      //handle timer alarm
+      static void timerSignal(int signal);    
+      double itsFrequency;
+      static int theirNoRunningWHs;
+      static int theirNoAlarms;
+      static bool theirTimerSet;
+
     };
     
     // @}
diff --git a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_SBCollect.h b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_SBCollect.h
index 6ec6184be33..22bf2520d1d 100644
--- a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_SBCollect.h
+++ b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_SBCollect.h
@@ -58,6 +58,7 @@ namespace LOFAR
 				   const int noutputs);
       virtual WH_SBCollect* make(const string& name);
       
+      virtual void preprocess();
       virtual void process();
       virtual void postprocess();
 
@@ -70,6 +71,14 @@ namespace LOFAR
       ACC::APS::ParameterSet itsPS;
       unsigned itsNStations;
       unsigned itsNSubbandsPerCell;
+
+      //handle timer alarm
+      static void timerSignal(int signal);    
+      double itsFrequency;
+      static int theirNoRunningWHs;
+      static int theirNoAlarms;
+      static bool theirTimerSet;
+
     };
 
     // @}
diff --git a/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc b/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc
index dcbf849e78c..b7e48c6ed00 100644
--- a/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc
+++ b/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc
@@ -113,7 +113,6 @@ namespace LOFAR {
 	  // Connect the Delay Controller
 	  itsInputStub->connect(ic * nStations + station, (RSPSteps.back())->getInDataManager(0), 0);
 	}
-	
 
 	LOG_TRACE_FLOW_STR("Create the Subband merger workholders");
 	vector<Step*> collectSteps;
@@ -136,7 +135,7 @@ namespace LOFAR {
 	  // connect outputs to Subband stub
 	  vector<int> channels;
 	  for (int core = 0; core < nNodesPerCell; core++) {
-	    //collectSteps.back()->getOutDataManager(0).setOutBuffer(core, false, 10);
+	    collectSteps.back()->getOutDataManager(0).setOutBuffer(core, false, 10);
 #if 1
 	    itsOutputStub->connect(cell + ic * nCells / inputCells,
 				   core,
diff --git a/Appl/CEP/CS1/CS1_InputSection/src/WH_RSPInput.cc b/Appl/CEP/CS1/CS1_InputSection/src/WH_RSPInput.cc
index b29bfd2b8e3..58e5d3503da 100644
--- a/Appl/CEP/CS1/CS1_InputSection/src/WH_RSPInput.cc
+++ b/Appl/CEP/CS1/CS1_InputSection/src/WH_RSPInput.cc
@@ -37,9 +37,21 @@
 #include <CS1_InputSection/BeamletBuffer.h>
 #include <CS1_InputSection/InputThread.h>
 
+// for timer
+#include <signal.h>
+#include <sys/time.h>
+// for rand
+#include <stdlib.h>
+// for sleep (yield)
+#include <boost/thread.hpp>
+
 namespace LOFAR {
   namespace CS1 {
 
+    int WH_RSPInput::theirNoRunningWHs = 0;
+    int WH_RSPInput::theirNoAlarms = 0;
+    bool WH_RSPInput::theirTimerSet = 0;
+
     WH_RSPInput::WH_RSPInput(const string& name, 
 			     ACC::APS::ParameterSet& ps,
 			     TransportHolder& th,
@@ -152,6 +164,34 @@ namespace LOFAR {
       cout<<"Starting buffer at "<<itsSyncedStamp<<endl;cout.flush();
       itsBBuffer->startBufferRead(itsSyncedStamp);
       cout<<"end of WH_RSPInput::preprocess"<<endl;cout.flush();
+
+
+      if (!theirTimerSet) {
+#define USE_TIMER 0
+#if USE_TIMER
+	sighandler_t ret = signal(SIGALRM, *WH_RSPInput::timerSignal);
+	ASSERTSTR(ret != SIG_ERR, "WH_RSPInput couldn't set signal handler for timer");    
+	struct itimerval value;
+	memset (&value, 0, sizeof(itimerval));
+
+	__time_t secs = 1;
+	__time_t usecs = 0;
+	// this means 1MHz is the highest frequency
+	value.it_interval.tv_sec = secs;
+	value.it_interval.tv_usec = usecs;
+	value.it_value.tv_sec = sec;
+	value.it_value.tv_usec = usecs;
+	cout << "Setting timer interval to " << secs << "secs and " << usecs << "ms" << endl;
+
+	setitimer(ITIMER_REAL, &value, 0);
+#else
+	theirNoAlarms = -1; //This will make sure data is written at maximum speed
+#endif
+      
+	theirTimerSet = true;
+      }
+      theirNoRunningWHs++;
+
     }
 
     void WH_RSPInput::process() 
@@ -197,19 +237,45 @@ namespace LOFAR {
     
 	// fill in the data
 	rspDHp->getFlags() = flags;
-	rspDHp->setStationID(itsStationID);
+	rspDHp->setStationID(itsStationNr);
 	rspDHp->setTimeStamp(delayedstamp);
 	rspDHp->fillExtraData();
-	rspDHp->setFineDelayAtBegin(delayDHp->getFineDelayAtBegin(itsStationID));
-	rspDHp->setFineDelayAfterEnd(delayDHp->getFineDelayAfterEnd(itsStationID));
+	rspDHp->setFineDelayAtBegin(delayDHp->getFineDelayAtBegin(itsStationNr));
+	rspDHp->setFineDelayAfterEnd(delayDHp->getFineDelayAfterEnd(itsStationNr));
       }    
 
       itsSyncedStamp += itsNSamplesPerSec;
       itsProcessTimer->stop();
+      while (theirNoAlarms == 0) 
+	{
+	  // wait for alarm to go off
+	  boost::thread::yield();
+	};
+      
+      // we handled one alarm, so decrease it
+      theirNoAlarms--;
     }
 
     void WH_RSPInput::postprocess()
     {
+      theirNoRunningWHs--;
+      if (theirNoRunningWHs == 0)
+	{
+	  theirTimerSet = false;
+	  if (itsFrequency != 0) {
+#if USE_TIMER
+	    // unset timer
+	    struct itimerval value;
+	    memset (&value, 0, sizeof(itimerval));
+	    setitimer(ITIMER_REAL, &value, 0);
+	    // remove sig handler
+	    sighandler_t ret = signal(SIGALRM, SIG_DFL);
+	    ASSERTSTR(ret != SIG_ERR, "WH_RSPInput couldn't unset signal handler for timer");    
+#endif
+	  }
+	}
+
+
       sleep(1);
       itsPrePostTimer->stop();
 
@@ -244,5 +310,11 @@ namespace LOFAR {
     {
     }
 
+    void WH_RSPInput::timerSignal(int sig)
+    {
+      // set the number of frames that can be sent
+      theirNoAlarms += theirNoRunningWHs;
+    }
+
   } // namespace CS1
 } // namespace LOFAR
diff --git a/Appl/CEP/CS1/CS1_InputSection/src/WH_SBCollect.cc b/Appl/CEP/CS1/CS1_InputSection/src/WH_SBCollect.cc
index b009e02ff8e..2a4baf8c3b5 100644
--- a/Appl/CEP/CS1/CS1_InputSection/src/WH_SBCollect.cc
+++ b/Appl/CEP/CS1/CS1_InputSection/src/WH_SBCollect.cc
@@ -32,11 +32,24 @@
 #include <tinyCEP/Sel_RoundRobin.h>
 #include <CEPFrame/DataManager.h>
 
+// for timer
+#include <signal.h>
+#include <sys/time.h>
+// for rand
+#include <stdlib.h>
+// for sleep (yield)
+#include <boost/thread.hpp>
+#include <mpi.h>
+
 namespace LOFAR
 {
   namespace CS1
   {
 
+    int WH_SBCollect::theirNoRunningWHs = 0;
+    int WH_SBCollect::theirNoAlarms = 0;
+    bool WH_SBCollect::theirTimerSet = 0;
+
     WH_SBCollect::WH_SBCollect(const string& name,
 			       const ACC::APS::ParameterSet pset,
 			       const int noutputs) 
@@ -80,24 +93,51 @@ namespace LOFAR
       return new WH_SBCollect(name, itsPS, itsNoutputs);
     }
 
+    void WH_SBCollect::preprocess() {
+      sleep(60);
+      if (!theirTimerSet) {
+#define USE_TIMER 0
+#if USE_TIMER
+	sighandler_t ret = signal(SIGALRM, *WH_SBCollect::timerSignal);
+	ASSERTSTR(ret != SIG_ERR, "WH_SBCollect couldn't set signal handler for timer");    
+	struct itimerval value;
+	memset (&value, 0, sizeof(itimerval));
+
+	double interval = 1 / itsNSubbandsPerCell;
+	__time_t secs = static_cast<__time_t>(floor(interval));
+	__time_t usecs = static_cast<__time_t>(1e6 * (interval - secs));
+	// this means 1MHz is the highest frequency
+	value.it_interval.tv_sec = secs;
+	value.it_interval.tv_usec = usecs;
+	value.it_value.tv_sec = secs;
+	value.it_value.tv_usec = usecs;
+	cout << "Setting timer interval to " << secs << "secs and " << usecs << "ms" << endl;
+
+	setitimer(ITIMER_REAL, &value, 0);
+#else
+	theirNoAlarms = -1; //This will make sure data is written at maximum speed
+#endif
+	theirTimerSet = true;
+      }
+      theirNoRunningWHs++;
+    }
+
     void WH_SBCollect::process() 
     { 
+      
       vector<DH_RSP*> inHolders;
       vector<RectMatrix<DH_RSP::BufferType> *> inMatrices;
       vector<RectMatrix<DH_RSP::BufferType>::cursorType> inCursors;
       RectMatrix<DH_Subband::SampleType>::cursorType outCursor;
 
-      cerr<<"Creating cursors to incoming data"<<endl;
       // create cursors to incoming data
       for (unsigned station = 0; station < itsNStations; station++) {
-	cerr<<"getting DH of station "<<station<<endl;
 	inHolders.push_back(dynamic_cast<DH_RSP *>(getDataManager().getInHolder(station)));
 	inMatrices.push_back(&(inHolders.back()->getDataMatrix()));
 	dimType inSubbandDim = inMatrices[0]->getDim("Subbands");
 	inCursors.push_back(inHolders.back()->getDataMatrix().getCursor(0*inSubbandDim));
       }
 
-      cerr<<"Start loop over subbands"<<endl;
       // Copy every subband to one BG/L core
       for (unsigned subband = 0; subband < itsNSubbandsPerCell; subband++) {
 	// ask the round robin selector for the next output
@@ -108,7 +148,6 @@ namespace LOFAR
 	outCursor = outMatrix->getCursor( 0 * outStationDim);
 
 	// Copy one subbands from every input
-	cerr<<"Start loop over stations"<<endl;
 	for (unsigned station = 0; station < itsNStations; station++) {
 	  inMatrices[station]->cpy2Matrix(inCursors[station], inSubbandDim, *outMatrix, outCursor, outStationDim, 1);
 	  inMatrices[station]->moveCursor(&inCursors[station], inSubbandDim);
@@ -122,6 +161,14 @@ namespace LOFAR
 	  outHolder->getFlags(station) = inHolders[station]->getFlags();
 	}
 	outHolder->fillExtraData();
+	while (theirNoAlarms == 0) 
+	  {
+	    // wait for alarm to go off
+	    boost::thread::yield();
+	  };
+	// we handled one alarm, so decrease it
+	theirNoAlarms--;
+	getDataManager().readyWithOutHolder(getDataManager().getOutputSelector()->getCurrentSelection());
       }
 
 #if 0
@@ -140,13 +187,35 @@ namespace LOFAR
 	      matrixSize * sizeof(DH_Subband::SampleType));
       cout << "WH_SBCollect output done " << endl;
 #endif
+
     }
 
     void WH_SBCollect::postprocess() 
     {
+      theirNoRunningWHs--;
+      if (theirNoRunningWHs == 0)
+	{
+	  theirTimerSet = false;
+	  if (itsFrequency != 0) {
+	    // unset timer
+#if USE_TIMER
+	    struct itimerval value;
+	    memset (&value, 0, sizeof(itimerval));
+	    setitimer(ITIMER_REAL, &value, 0);
+	    // remove sig handler
+	    sighandler_t ret = signal(SIGALRM, SIG_DFL);
+	    ASSERTSTR(ret != SIG_ERR, "WH_SBCollect couldn't unset signal handler for timer");    
+#endif
+	  }
+	}
       sleep(10);
     }  
 
+    void WH_SBCollect::timerSignal(int sig)
+    {
+      // set the number of frames that can be sent
+      theirNoAlarms += theirNoRunningWHs;
+    }
   } // namespace CS1
 
 } // namespace LOFAR
-- 
GitLab