From 796067d42f638346f7ad9663941fc8625257b40a Mon Sep 17 00:00:00 2001
From: John Romein <romein@astron.nl>
Date: Fri, 26 Jan 2007 21:06:56 +0000
Subject: [PATCH] BugID: 225 Use MPI_Alltoallv instead of CEPframe's
 point-to-point communication.

---
 Appl/CEP/CS1/CS1_BGLProc/test/transpose.cc | 233 ++++++++++++++-------
 1 file changed, 163 insertions(+), 70 deletions(-)

diff --git a/Appl/CEP/CS1/CS1_BGLProc/test/transpose.cc b/Appl/CEP/CS1/CS1_BGLProc/test/transpose.cc
index 5dcb199a061..afb2a16e11b 100644
--- a/Appl/CEP/CS1/CS1_BGLProc/test/transpose.cc
+++ b/Appl/CEP/CS1/CS1_BGLProc/test/transpose.cc
@@ -31,14 +31,23 @@
 #include <Transport/DataHolder.h>
 #include <Transport/TH_MPI.h>
 
+#if defined HAVE_BGL
 #include <bglpersonality.h>
 #include <rts.h>
+#endif
 
 #include <exception>
 #include <iostream>
 #include <string>
 #include <vector>
 
+#include <boost/multi_array.hpp>
+
+
+#define SIMULATE_PSETS
+
+#define NR_POLARIZATIONS 2
+
 
 namespace LOFAR {
 namespace CS1 {
@@ -94,6 +103,12 @@ class Position
     Position(unsigned x, unsigned y, unsigned z, unsigned t);
     Position(unsigned rank);
 
+#if defined SIMULATE_PSETS
+    static const unsigned	psetSize = 16;
+#else
+    static const unsigned	psetSize = 1;
+#endif
+
     unsigned		rank() const;
     unsigned		psetNumber() const;
     Position		psetBase() const;
@@ -118,15 +133,21 @@ Position::Position(unsigned x, unsigned y, unsigned z, unsigned t)
 
 Position::Position(unsigned rank)
 {
+#if defined SIMULATE_PSETS
   if (rts_coordinatesForRank(rank, &x, &y, &z, &t) != 0) {
     cerr << "error calling rts_coordinatesForRank" << endl;
     exit(1);
   }
+#else
+  x = rank;
+  y = z = t = 0;
+#endif
 }
 
 
 unsigned Position::rank() const
 {
+#if defined SIMULATE_PSETS
   unsigned rank, numProcs;
 
   if (rts_rankForCoordinates(x, y, z, t, &rank, &numProcs) != 0) {
@@ -135,40 +156,63 @@ unsigned Position::rank() const
   }
 
   return rank;
+#else
+  return x;
+#endif
 }
 
 
 unsigned Position::psetNumber() const
 {
+#if defined SIMULATE_PSETS
   return (x / 2) + (xSize / 2) * ((y / 2) + (ySize / 2) * (z / 2));
+#else
+  return x;
+#endif
 }
 
 
 Position Position::psetBase() const
 {
+#if defined SIMULATE_PSETS
   return Position(x & ~1, y & ~1, z & ~1, 0);
+#else
+  return *this;
+#endif
 }
 
 
 Position Position::psetBase(unsigned psetNumber)
 {
+#if defined SIMULATE_PSETS
   return Position(2 * (psetNumber % (xSize / 2)), 
 		  2 * (psetNumber / (xSize / 2) % (ySize / 2)),
 		  2 * (psetNumber / (xSize / 2) / (ySize / 2)),
 		  0);
+#else
+  return Position(psetNumber);
+#endif
 }
 
 
 Position Position::positionInPset(unsigned index) const
 {
+#if defined SIMULATE_PSETS
   Position base = psetBase();
   return Position(base.x + index % 2, base.y + index / 2 % 2, base.z + index / 4 % 2, base.t + index / 8 % 2);
+#else
+  return *this;
+#endif
 }
 
 
 unsigned Position::indexInPset() const
 {
+#if defined SIMULATE_PSETS
   return (x % 2) + 2 * (y % 2) + 4 * (z % 2) + 8 * (t % 2);
+#else
+  return 0;
+#endif
 }
 
 
@@ -177,47 +221,68 @@ unsigned Position::indexInPset() const
 class WH_Transpose : public WorkHolder
 {
   public:
-    WH_Transpose(const string &name, const ACC::APS::ParameterSet &ps, unsigned rank);
+    typedef i4complex SampleType;
+
+    WH_Transpose(const string &name, const ACC::APS::ParameterSet &ps, unsigned rank, MPI_Comm comm);
 
     virtual void preprocess();
     virtual void process();
+    virtual void postprocess();
 
   private:
     virtual WorkHolder		 *make(const string &name);
+    bool			 isInput() const, isOutput() const;
 
-    void			 sendToAll();
-    void			 receiveFromAll();
+    void			 allToAll();
 
     const ACC::APS::ParameterSet &itsParamSet;
-    unsigned			 itsNrStations;
     unsigned			 itsCoreNumber, itsPsetNumber, itsPsetIndex;
+    unsigned			 itsNrStations, itsNrCorrelatorPsets, itsNrPsets;
     unsigned			 itsPhase;
+    unsigned			 itsNrSamplesPerIntegration;
+
+    boost::multi_array<SampleType, 3> *itsInData, *itsOutData;
+
+    MPI_Comm			 itsMPIcomm;
 };
 
 
-WH_Transpose::WH_Transpose(const string &name, const ACC::APS::ParameterSet &ps, unsigned rank)
+WH_Transpose::WH_Transpose(const string &name, const ACC::APS::ParameterSet &ps, unsigned rank, MPI_Comm communicator)
 :
-  WorkHolder(ps.getUint32("Observation.NStations"),
-	     ps.getUint32("Observation.NStations"),
-	     name, string("WH_Transpose")),
+  WorkHolder(0, 0, name, string("WH_Transpose")),
   itsParamSet(ps),
-  itsNrStations(ps.getUint32("Observation.NStations")),
   itsCoreNumber(rank),
   itsPsetNumber(Position(rank).psetNumber()),
   itsPsetIndex(Position(rank).indexInPset()),
-  itsPhase(itsPsetIndex)
+  itsNrStations(ps.getUint32("Observation.NStations")),
+  itsNrCorrelatorPsets(ps.getUint32("Observation.NSubbands") / ps.getUint32("General.SubbandsPerPset")),
+  itsNrPsets(std::max(itsNrStations, itsNrCorrelatorPsets)),
+  itsPhase(itsPsetIndex),
+  itsNrSamplesPerIntegration(ps.getUint32("Observation.NSubbandSamples")),
+  itsMPIcomm(communicator)
 {
-  TinyDataManager &dm = getDataManager();
+}
 
-  for (unsigned i = 0; i < itsNrStations; i ++) {
-    dm.addInDataHolder(i, new DH_Station("input", ps));
-    dm.addOutDataHolder(i, new DH_Station("input", ps));
-  }
+
+inline bool WH_Transpose::isInput() const
+{
+  return itsPsetNumber < itsNrStations;
+}
+
+
+inline bool WH_Transpose::isOutput() const
+{
+  return itsPsetNumber < itsNrCorrelatorPsets;
 }
 
 
 void WH_Transpose::preprocess()
 {
+  if (isInput())
+    itsInData  = new boost::multi_array<SampleType, 3>(boost::extents[itsNrCorrelatorPsets][itsNrSamplesPerIntegration][NR_POLARIZATIONS]);
+
+  if (isOutput())
+    itsOutData = new boost::multi_array<SampleType, 3>(boost::extents[itsNrStations][itsNrSamplesPerIntegration][NR_POLARIZATIONS]);
 }
 
 
@@ -230,54 +295,64 @@ void WH_Transpose::process()
   totalTimer.start();
   transposeTimer.start();
 
-  if (itsPhase == 2)
-    receiveFromAll();
-
-  if (itsPhase == 1)
-    sendToAll();
+  if (itsPhase == 0)
+    allToAll();
 
   TH_MPI::synchroniseAllProcesses();
   transposeTimer.stop();
   totalTimer.stop();
 
-  ++ itsPhase, itsPhase %= 16;
+  ++ itsPhase, itsPhase %= Position::psetSize;
 }
 
 
-void WH_Transpose::sendToAll()
+void WH_Transpose::allToAll()
 {
-  unsigned station = itsPsetNumber, i = 0;
+  int sendCounts[itsNrPsets], sendDisplacements[itsNrPsets];
+  int receiveCounts[itsNrPsets], receiveDisplacements[itsNrPsets];
+
+  for (unsigned pset = 0; pset < itsNrPsets; pset ++) {
+    if (isInput() && pset < itsNrCorrelatorPsets) {
+      sendCounts[pset] = (*itsInData)[pset].num_elements() * sizeof(SampleType);
+      sendDisplacements[pset] = ((*itsInData)[pset].origin() - itsInData->origin()) / sizeof(SampleType);
+    } else {
+      sendCounts[pset] = 0;
+      sendDisplacements[pset] = 0;
+    }
+
+    if (isOutput() && pset < itsNrStations) {
+      receiveCounts[pset] = (*itsOutData)[pset].num_elements() * sizeof(SampleType);
+      receiveDisplacements[pset] = ((*itsOutData)[pset].origin() - itsOutData->origin()) / sizeof(SampleType);
+    } else {
+      receiveCounts[pset] = 0;
+      receiveDisplacements[pset] = 0;
+    }
+  }
 
-  do {
-    unsigned psetIndex = (itsPsetIndex + 1) % 16;
-    unsigned dest      = Position::psetBase(station).positionInPset(psetIndex).rank();
-    getDataManager().getOutHolder(i);
-    ((unsigned *) getDataManager().getOutHolder(i)->getDataPtr())[100] = itsCoreNumber;
-    //clog << itsCoreNumber << " sends message to " << dest << ", size = " << getDataManager().getOutHolder(i)->getDataSize() << endl;
-    //NSTimer timer("send timer", true);
-    //timer.start();
-    getDataManager().readyWithOutHolder(i);
-    //timer.stop();
-  } while (++ i, ++ station, station %= itsNrStations, station != itsPsetNumber);
+  if (MPI_Alltoallv(isInput() ? itsInData->origin() : 0,
+	sendCounts, sendDisplacements, MPI_BYTE,
+	isOutput() ? itsOutData->origin() : 0,
+	receiveCounts, receiveDisplacements, MPI_BYTE,
+	itsMPIcomm) != MPI_SUCCESS)
+  {
+    std::cerr << "MPI_Alltoallv() failed" << std::endl;
+    exit(1);
+  }
 }
 
-void WH_Transpose::receiveFromAll()
+
+void WH_Transpose::postprocess()
 {
-  unsigned station = itsPsetNumber, i = 0;
+  if (isInput())
+    delete itsInData;
 
-  do {
-    unsigned psetIndex = (itsPsetIndex - 1) % 16;
-    unsigned source    = Position::psetBase(station).positionInPset(psetIndex).rank();
-    getDataManager().getInHolder(i);
-    getDataManager().readyWithInHolder(i);
-    //clog << itsCoreNumber << " received message from " << source << ", value = " << ((unsigned *) getDataManager().getInHolder(i)->getDataPtr())[100] << endl;
-  } while (++ i, ++ station, station %= itsNrStations, station != itsPsetNumber);
+  if (isOutput())
+    delete itsOutData;
 }
 
-
 WorkHolder *WH_Transpose::make(const string &name)
 {
-  return new WH_Transpose(name, itsParamSet, itsCoreNumber);
+  return new WH_Transpose(name, itsParamSet, itsCoreNumber, itsMPIcomm);
 }
 
 
@@ -330,6 +405,7 @@ class AH_Transpose : public TinyApplicationHolder
 
 void AH_Transpose::define(const KeyValueMap &)
 {
+#if defined HAVE_BGL
   struct BGLPersonality personality;
 
   if (rts_get_personality(&personality, sizeof personality) != 0) {
@@ -342,45 +418,62 @@ void AH_Transpose::define(const KeyValueMap &)
   Position::zSize = personality.getZsize();
 
   //clog << itsCoreNumber << " at (" << personality.getXcoord() << ',' << personality.getYcoord() << ',' << personality.getZcoord() << "), phase = " << itsPhase << endl;
+#else
+  Position::xSize = TH_MPI::getNumberOfNodes();
+#endif
 
   unsigned nrStations = itsParamSet.getUint32("Observation.NStations");
   unsigned nrNodes    = TH_MPI::getNumberOfNodes();
+  unsigned nrCorrelatorPsets = itsParamSet.getUint32("Observation.NSubbands") / itsParamSet.getUint32("General.SubbandsPerPset");
 
-  if (16 * nrStations > nrNodes) {
+  if (Position::psetSize * nrStations > nrNodes) {
     if (TH_MPI::getCurrentRank() == 0)
       cerr << "Too many stations for number of nodes" << endl;
 
     exit(1);
   }
 
-  for (int rank = 0; rank < nrNodes; rank ++) {
-    WorkHolder *wh = Position(rank).psetNumber() < nrStations ?
-      (WorkHolder *) new WH_Transpose("WH_Transpose", itsParamSet, rank) :
-      (WorkHolder *) new WH_Idle("WH_Idle");
-    wh->runOnNode(rank);
-    itsWHs.push_back(wh);
+  if (Position::psetSize * nrCorrelatorPsets > nrNodes) {
+    if (TH_MPI::getCurrentRank() == 0)
+      cerr << "Too many subbands divided over too few psets" << endl;
+
+    exit(1);
+  }
+
+  unsigned nrPsetsNeeded = std::max(nrStations, nrCorrelatorPsets);
+
+  MPI_Group all, group;
+  MPI_Comm  comms[Position::psetSize];
+
+  if (MPI_Comm_group(MPI_COMM_WORLD, &all) != MPI_SUCCESS) {
+    std::cerr << "MPI_Comm_group() failed" << std::endl;
+    exit(1);
   }
 
-  for (unsigned sourcePset = 0; sourcePset < nrStations; sourcePset ++) {
-    for (unsigned sourcePsetIndex = 0; sourcePsetIndex < 16; sourcePsetIndex ++) {
-      for (unsigned destPset = 0; destPset < nrStations; destPset ++) {
-	unsigned	destPsetIndex = (sourcePsetIndex + 1) % 16;
-	unsigned	source	      = Position::psetBase(sourcePset).positionInPset(sourcePsetIndex).rank();
-	unsigned	dest	      = Position::psetBase(destPset).positionInPset(destPsetIndex).rank();
-	unsigned	channel	      = (destPset - sourcePset + nrStations) % nrStations; // unsigned modulo
-
-	TH_MPI		*th	      = new TH_MPI(source, dest);
-	TinyDataManager &sourceDM     = itsWHs[source]->getDataManager();
-	TinyDataManager &destDM       = itsWHs[dest]->getDataManager();
-	BGLConnection	*connection   = new BGLConnection("mpi", sourceDM.getGeneralOutHolder(channel), destDM.getGeneralInHolder(channel), th);
-
-	sourceDM.setOutConnection(channel, connection);
-	sourceDM.setAutoTriggerOut(channel, false);
-	destDM.setInConnection(channel, connection);
-	destDM.setAutoTriggerIn(channel, false);
-      }
+  for (unsigned psetIndex = 0; psetIndex < Position::psetSize; psetIndex ++) {
+    int ranks[nrPsetsNeeded];
+
+    for (unsigned pset = 0; pset < nrPsetsNeeded; pset ++)
+      ranks[pset] = Position::psetBase(pset).positionInPset(psetIndex).rank();
+
+    if (MPI_Group_incl(all, nrPsetsNeeded, ranks, &group) != MPI_SUCCESS) {
+      std::cerr << "MPI_Group_incl() failed" << std::endl;
+      exit(1);
+    }
+
+    if (MPI_Comm_create(MPI_COMM_WORLD, group, &comms[psetIndex]) != MPI_SUCCESS) {
+      std::cerr << "MPI_Comm_create() failed" << std::endl;
+      exit(1);
     }
   }
+
+  for (unsigned rank = 0; rank < nrNodes; rank ++) {
+    WorkHolder *wh = Position(rank).psetNumber() < nrPsetsNeeded ?
+      (WorkHolder *) new WH_Transpose("WH_Transpose", itsParamSet, rank, comms[Position(rank).indexInPset()]) :
+      (WorkHolder *) new WH_Idle("WH_Idle");
+    wh->runOnNode(rank);
+    itsWHs.push_back(wh);
+  }
 }
 
 
-- 
GitLab