From 7ce586d29e484c99e523a7ac9274e08cf3dbfe1f Mon Sep 17 00:00:00 2001
From: John Romein <romein@astron.nl>
Date: Tue, 15 May 2007 08:04:25 +0000
Subject: [PATCH] BugID: 225 Fixed PsetsPerStorage bug.

---
 .../include/CS1_Storage/WH_SubbandWriter.h    |  4 +-
 Appl/CEP/CS1/CS1_Storage/src/AH_Storage.cc    | 23 +++---
 .../CS1/CS1_Storage/src/WH_SubbandWriter.cc   | 81 +++++++------------
 3 files changed, 47 insertions(+), 61 deletions(-)

diff --git a/Appl/CEP/CS1/CS1_Storage/include/CS1_Storage/WH_SubbandWriter.h b/Appl/CEP/CS1/CS1_Storage/include/CS1_Storage/WH_SubbandWriter.h
index 2a3dede9181..a2729a88b2b 100644
--- a/Appl/CEP/CS1/CS1_Storage/include/CS1_Storage/WH_SubbandWriter.h
+++ b/Appl/CEP/CS1/CS1_Storage/include/CS1_Storage/WH_SubbandWriter.h
@@ -77,7 +77,6 @@ namespace LOFAR
       const ACC::APS::ParameterSet itsPS;
       uint  itsNStations;
       uint  itsNBaselines;
-      uint  itsNInputsPerSubband;
       uint  itsNChannels;
       uint  itsNBeams;
       uint  itsNPolSquared;
@@ -86,6 +85,9 @@ namespace LOFAR
       MSWriter* itsWriter;
 
       uint itsNrSubbandsPerCell; ///< Number of subbands per BG/L cell
+      uint itsNrSubbandsPerStorage;
+      uint itsNrNodesPerCell;
+      vector<uint> itsCurrentInputs;
       vector<uint> itsBandIDs;   ///< MS IDs of the frequency bands
       uint itsFieldID;           ///< MS ID of the field, i.e. the beam.
       uint itsTimeCounter;       ///< Counts the time
diff --git a/Appl/CEP/CS1/CS1_Storage/src/AH_Storage.cc b/Appl/CEP/CS1/CS1_Storage/src/AH_Storage.cc
index a08d85ecfbb..3f87af2e9c6 100644
--- a/Appl/CEP/CS1/CS1_Storage/src/AH_Storage.cc
+++ b/Appl/CEP/CS1/CS1_Storage/src/AH_Storage.cc
@@ -88,16 +88,21 @@ namespace LOFAR
         // Each writer will run on a separate node.
         step.runOnNode(nw);
 	
-	vector<int> channels;
         // Connect to BG output
-	for (int core = 0; core < nNodesPerCell * nrPsetsPerStorage; core++) {
-	  step.getInDataManager(core).setInBuffer(core, false, 10);
-	  itsStub->connect(nw, core, step.getInDataManager(core), core);
-	  channels.push_back(core);
-	}	
-
-	// limit the number of concurrent incoming connections
-	step.getInDataManager(0).setInRoundRobinPolicy(channels, maxConcurrent);
+	for (unsigned pset = 0; pset < nrPsetsPerStorage; pset ++) {
+	  vector<int> channels;
+	  for (unsigned core = 0; core < nNodesPerCell; core++) {
+	    int channel = pset * nNodesPerCell + core;
+	    step.getInDataManager(channel).setInBuffer(channel, false, 10);
+	    itsStub->connect(nw, channel, step.getInDataManager(channel), channel);
+	    channels.push_back(channel);
+	  }
+	  // limit the number of concurrent incoming connections
+	  // actually, we would like to set the number of concurrent
+	  // communications for all psets together, but we cannot express this
+	  // thus we do this for each pset
+	  step.getInDataManager(0).setInRoundRobinPolicy(channels, maxConcurrent);
+	}
       }
 #ifdef HAVE_MPI
       ASSERTSTR (TH_MPI::getNumberOfNodes() ==  nrWriters,
diff --git a/Appl/CEP/CS1/CS1_Storage/src/WH_SubbandWriter.cc b/Appl/CEP/CS1/CS1_Storage/src/WH_SubbandWriter.cc
index 1d76bc02e2c..51abb5da1e4 100644
--- a/Appl/CEP/CS1/CS1_Storage/src/WH_SubbandWriter.cc
+++ b/Appl/CEP/CS1/CS1_Storage/src/WH_SubbandWriter.cc
@@ -75,7 +75,6 @@ namespace LOFAR
       itsNBaselines = itsNStations * (itsNStations +1)/2;
       itsNChannels = itsPS.getUint32("Observation.NChannels");
       itsNBeams = itsPS.getUint32("Observation.NBeams");
-      itsNInputsPerSubband = itsNinputs;
       uint pols = itsPS.getUint32("Observation.NPolarisations");
       itsNPolSquared = pols*pols;
 
@@ -94,9 +93,6 @@ namespace LOFAR
         getDataManager().addInDataHolder(i, new DH_Visibilities(str, pset));
         getDataManager().setAutoTriggerIn(i, false);
       }
-
-      // Set a round robin input selector
-      getDataManager().setInputSelector(new Sel_RoundRobin(itsNinputs));
     }
 
     WH_SubbandWriter::~WH_SubbandWriter() 
@@ -171,6 +167,13 @@ namespace LOFAR
       ASSERTSTR(antPos.size() == 3 * itsNStations,
                 antPos.size() << " == " << 3 * itsNStations);
       
+      itsNrSubbandsPerCell    = itsPS.getUint32("General.SubbandsPerPset") * itsPS.getUint32("BGLProc.PsetsPerCell");
+      itsNrSubbandsPerStorage = itsNrSubbandsPerCell * itsPS.getUint32("Storage.PsetsPerStorage");
+      itsNrNodesPerCell       = itsPS.getUint32("BGLProc.NodesPerPset") * itsPS.getUint32("BGLProc.PsetsPerCell");
+      itsCurrentInputs.resize(itsNrSubbandsPerStorage / itsNrSubbandsPerCell, 0);
+
+      LOG_TRACE_VAR_STR("SubbandsPerStorage = " << itsNrSubbandsPerStorage);
+
       vector<string> storageStationNames = itsPS.getStringVector("Storage.StorageStationNames");
 #if defined HAVE_MPI
       itsWriter = new MSWriter(msNames[TH_MPI::getCurrentRank()].c_str(),
@@ -190,17 +193,11 @@ namespace LOFAR
       
       vector<double> refFreqs= itsPS.getDoubleVector("Observation.RefFreqs");
 
-      // Here we should (somehow) derive which subband we're going to write.
-      // At least we know how many subbands we can expect, because that's in
-      // the parameter set file.
-      itsNrSubbandsPerCell = itsPS.getUint32("General.SubbandsPerPset") * itsPS.getUint32("BGLProc.PsetsPerCell") * itsPS.getUint32("Storage.PsetsPerStorage");
-      LOG_TRACE_VAR_STR("SubbandsPerCell = " << itsNrSubbandsPerCell);
-
-      // Now we must add \a itsNrSubbandsPerCell to the measurement set. The
+      // Now we must add \a itsNrSubbandsPerStorage to the measurement set. The
       // correct indices for the reference frequencies are in the vector of
       // subbandIDs.
-      itsBandIDs.resize(itsNrSubbandsPerCell);
-      for (uint sb = 0; sb < itsNrSubbandsPerCell; ++sb) {
+      itsBandIDs.resize(itsNrSubbandsPerStorage);
+      for (uint sb = 0; sb < itsNrSubbandsPerStorage; ++sb) {
 	// compensate for the half-channel shift introduced by the PPF
 	double refFreq = refFreqs[itsSubbandIDs[sb]] - chanWidth / 2;
         itsBandIDs[sb] = itsWriter->addBand (itsNPolSquared, itsNChannels,
@@ -216,17 +213,17 @@ namespace LOFAR
       itsFieldID = itsWriter->addField (RA, DEC);
 
       // Allocate buffers
-      itsFlagsBuffers   = new bool[itsNrSubbandsPerCell * itsNVisibilities];
-      itsWeightsBuffers = new float[itsNrSubbandsPerCell * itsNBaselines * itsNChannels];
-      itsVisibilities   = new DH_Visibilities::VisibilityType[itsNrSubbandsPerCell * itsNVisibilities];
+      itsFlagsBuffers   = new bool[itsNrSubbandsPerStorage * itsNVisibilities];
+      itsWeightsBuffers = new float[itsNrSubbandsPerStorage * itsNBaselines * itsNChannels];
+      itsVisibilities   = new DH_Visibilities::VisibilityType[itsNrSubbandsPerStorage * itsNVisibilities];
 
       clearAllSums();
     }
 
     void WH_SubbandWriter::clearAllSums(){
-      memset(itsWeightsBuffers, 0, itsNrSubbandsPerCell * itsNBaselines * itsNChannels * sizeof(float));
-      memset(itsVisibilities, 0, itsNrSubbandsPerCell * itsNVisibilities * sizeof(DH_Visibilities::VisibilityType));
-      for (uint i = 0; i < itsNrSubbandsPerCell * itsNVisibilities; i++) {
+      memset(itsWeightsBuffers, 0, itsNrSubbandsPerStorage * itsNBaselines * itsNChannels * sizeof(float));
+      memset(itsVisibilities, 0, itsNrSubbandsPerStorage * itsNVisibilities * sizeof(DH_Visibilities::VisibilityType));
+      for (uint i = 0; i < itsNrSubbandsPerStorage * itsNVisibilities; i++) {
 	itsFlagsBuffers[i] = true;
       }
     }
@@ -250,21 +247,18 @@ namespace LOFAR
       }
 
       // Write the visibilities for all subbands per cell.
-      for (uint sb = 0; sb < itsNrSubbandsPerCell; ++sb) {
-        
-        // Select the next input
-	int inHolderNr = getDataManager().getInputSelector()->getCurrentSelection();
-	//cerr<<"Current selection "<<inHolderNr<<endl;
-	DH_Visibilities* inputDH = (DH_Visibilities*)getDataManager().getInHolder(inHolderNr);
+      for (uint sb = 0; sb < itsNrSubbandsPerStorage; ++sb) {
+        // find out from which input channel we should read
+	unsigned cell	      = sb / itsNrSubbandsPerCell;
+	unsigned inputChannel = itsCurrentInputs[cell] + cell * itsNrNodesPerCell;
 
-	//Dump input
+	DH_Visibilities			    *inputDH	= static_cast<DH_Visibilities *>(getDataManager().getInHolder(inputChannel));
+        DH_Visibilities::NrValidSamplesType *valSamples = &inputDH->getNrValidSamples(0, 0);
+	DH_Visibilities::VisibilityType     *newVis	= &inputDH->getVisibility(0, 0, 0, 0);
 
         // Write 1 DH_Visibilities of size
-        // fcomplex[nbaselines][nsubbandchannesl][npol][npol]
-        DH_Visibilities::NrValidSamplesType *valSamples = 
-          &inputDH->getNrValidSamples(0, 0);
+        // fcomplex[itsNBaselines][itsNChannels][npol][npol]
 
-	DH_Visibilities::VisibilityType* newVis = &inputDH->getVisibility(0, 0, 0, 0);
         for (uint i = 0; i < itsNBaselines * itsNChannels; i ++) {
           itsWeightsBuffers[sb * itsNBaselines * itsNChannels + i] += itsWeightFactor * valSamples[i] / itsTimesToIntegrate;
           bool flagged = valSamples[i] == 0;
@@ -279,11 +273,7 @@ namespace LOFAR
 	  itsVisibilities[sb * itsNVisibilities + 4 * i + 2] += newVis[4 * i + 2];
 	  itsVisibilities[sb * itsNVisibilities + 4 * i + 3] += newVis[4 * i + 3];
         }
-#if 0
-        for (uint b = 0; b < itsNBaselines; b ++) {
-	  cout<<"baseLine: "<<b<<" valid samples: "<<valSamples[b*itsNChannels]<<endl;
-	}
-#endif
+
 	if ((itsTimeCounter + 1) % itsTimesToIntegrate == 0) {
 	  itsWriteTimer.start();
 	  itsWriter->write (itsBandIDs[sb], itsFieldID, 0, itsNChannels,
@@ -294,22 +284,11 @@ namespace LOFAR
 	  itsWriteTimer.stop();
 	}
 
-#if 0
-	bool found = false;
-	for (uint b = 0; b<itsNBaselines; b++) {
-	  for (uint c = 0; c<itsNChannels; c++) {
-	    found = found || (inputDH->getVisibility(b, c, 0, 0) != makefcomplex(0,0));
-	  }
-	}
-	if (found){
-	  cout<<"writing non-zero data for subband ";
-	} else {
-	  cout<<"writing zeros for subband ";
-	}
-	cout <<sb<<" from inputholder "<<getDataManager().getInputSelector()->getCurrentSelection()<<endl;
-#endif
-	getDataManager().readyWithInHolder(inHolderNr);
-	getDataManager().getInputSelector()->selectNext();
+	getDataManager().readyWithInHolder(inputChannel);
+
+	// select next channel
+	if (++ itsCurrentInputs[cell] == itsNrNodesPerCell)
+	  itsCurrentInputs[cell] = 0;
       }
 
       // Update the time counter.
-- 
GitLab