From 3668511b4f2f9960ad28286a65cfca534aa00f3c Mon Sep 17 00:00:00 2001
From: Jan David Mol <mol@astron.nl>
Date: Tue, 26 Mar 2013 13:14:24 +0000
Subject: [PATCH] Task #4315: Streamlined beamlet configuration for BlockReader
 and MPI classes

---
 .../InputProc/src/Transpose/MPIProtocol.h     |  9 +-
 .../src/Transpose/MPIReceiveStations.cc       |  2 +
 .../InputProc/src/Transpose/MPISendStation.cc | 84 +++++++++++--------
 .../InputProc/src/Transpose/MPISendStation.h  | 24 +++---
 RTCP/Cobalt/InputProc/src/Transpose/MapUtil.h | 34 ++++++++
 RTCP/Cobalt/InputProc/test/tMPITransfer.cc    | 67 ++++++++-------
 6 files changed, 136 insertions(+), 84 deletions(-)

diff --git a/RTCP/Cobalt/InputProc/src/Transpose/MPIProtocol.h b/RTCP/Cobalt/InputProc/src/Transpose/MPIProtocol.h
index 2c7d9bb2ad5..e4c25bcfcc2 100644
--- a/RTCP/Cobalt/InputProc/src/Transpose/MPIProtocol.h
+++ b/RTCP/Cobalt/InputProc/src/Transpose/MPIProtocol.h
@@ -41,6 +41,12 @@ namespace LOFAR
         // Block will span [from,to)
         int64 from, to;
 
+        // Number of beamlets that will be sent
+        size_t nrBeamlets;
+
+        // The set of beamlets that will be sent
+        unsigned beamlets[1024]; // [beamlet]
+
         // At which offset the data will be wrapped. If:
         //
         //   =0: the data will be sent in 1 transfer:
@@ -50,9 +56,6 @@ namespace LOFAR
         //          2. a block of `(to - from) - wrapOffsets[x]' samples
         size_t wrapOffsets[1024]; // [beamlet]
 
-        // Number of beamlets that will be sent
-        size_t nrBeamlets;
-
         // Size of the marshalled flags
         size_t metaDataSize;
       };
diff --git a/RTCP/Cobalt/InputProc/src/Transpose/MPIReceiveStations.cc b/RTCP/Cobalt/InputProc/src/Transpose/MPIReceiveStations.cc
index e3fc8d1cc5f..ed091b1aa15 100644
--- a/RTCP/Cobalt/InputProc/src/Transpose/MPIReceiveStations.cc
+++ b/RTCP/Cobalt/InputProc/src/Transpose/MPIReceiveStations.cc
@@ -133,6 +133,8 @@ namespace LOFAR {
           const size_t beamlet = beamlets[beamletIdx];
           const size_t wrapOffset = header.wrapOffsets[beamletIdx];
 
+          ASSERTSTR(header.beamlets[beamletIdx] == beamlet, "Got beamlet " << header.beamlets[beamletIdx] << ", but expected beamlet " << beamlet);
+
           /*
            * RECEIVE BEAMLET (ASYNC)
            */
diff --git a/RTCP/Cobalt/InputProc/src/Transpose/MPISendStation.cc b/RTCP/Cobalt/InputProc/src/Transpose/MPISendStation.cc
index 1f8f100895d..4d53fa93f8a 100644
--- a/RTCP/Cobalt/InputProc/src/Transpose/MPISendStation.cc
+++ b/RTCP/Cobalt/InputProc/src/Transpose/MPISendStation.cc
@@ -37,16 +37,35 @@ namespace LOFAR {
 
   namespace Cobalt {
 
-    MPISendStation::MPISendStation( const struct BufferSettings &settings, size_t stationIdx, const std::map<size_t, int> &beamletDistribution )
+    MPISendStation::MPISendStation( const struct BufferSettings &settings, size_t stationIdx, const std::map<int, std::vector<size_t> > &beamletDistribution )
     :
       logPrefix(str(boost::format("[station %s] [MPISendStation] ") % settings.station.stationName)),
       settings(settings),
       stationIdx(stationIdx),
       beamletDistribution(beamletDistribution),
-      targetRanks(values(beamletDistribution)),
-      beamletsOfTarget(inverse(beamletDistribution))
+      targetRanks(keys(beamletDistribution)),
+      beamletTargets(inverse(beamletDistribution))
     {
       LOG_INFO_STR(logPrefix << "Initialised");
+
+      // Set static header info
+      for(std::vector<int>::const_iterator rank = targetRanks.begin(); rank != targetRanks.end(); ++rank) {
+        headers[*rank].station      = this->settings.station;
+        headers[*rank].metaDataSize = this->metaDataSize();
+      }
+
+      // Set beamlet info
+      for(std::map<int, std::vector<size_t> >::const_iterator dest = beamletDistribution.begin(); dest != beamletDistribution.end(); ++dest) {
+        const int rank = dest->first;
+        const std::vector<size_t> &beamlets = dest->second;
+
+        Header &header = headers[rank];
+
+        header.nrBeamlets = beamlets.size();
+        ASSERT(header.nrBeamlets < sizeof header.beamlets / sizeof header.beamlets[0]);
+
+        std::copy(beamlets.begin(), beamlets.end(), &header.beamlets[0]);
+      }
     }
 
     template<typename T>
@@ -54,22 +73,18 @@ namespace LOFAR {
     {
       LOG_DEBUG_STR(logPrefix << "Sending header to rank " << rank);
 
-      const std::vector<size_t> &beamlets = beamletsOfTarget.at(rank);
-
-      // Copy static blockrmation
-      header.station      = this->settings.station;
+      // Copy dynamic header info
       header.from         = block.from;
       header.to           = block.to;
-      header.nrBeamlets   = beamlets.size();
-      header.metaDataSize = this->metaDataSize();
 
-      // Copy the wrapOffsets
-      ASSERT(beamlets.size() <= sizeof header.wrapOffsets / sizeof header.wrapOffsets[0]);
+      // Copy the beam-specific data
+      ASSERT(header.nrBeamlets <= sizeof header.wrapOffsets / sizeof header.wrapOffsets[0]);
 
-      for(unsigned beamletIdx = 0; beamletIdx < beamlets.size(); ++beamletIdx) {
-        const struct BlockReader<T>::Block::Beamlet &ib = block.beamlets[beamlets[beamletIdx]];
+      for(size_t i = 0; i < header.nrBeamlets; ++i) {
+        size_t beamletIdx = header.beamlets[i];
+        const struct BlockReader<T>::Block::Beamlet &ib = block.beamlets[beamletIdx];
 
-        header.wrapOffsets[beamletIdx] = ib.nrRanges == 1 ? 0 : ib.ranges[0].to - ib.ranges[0].from;
+        header.wrapOffsets[i] = ib.nrRanges == 1 ? 0 : ib.ranges[0].to - ib.ranges[0].from;
       }
 
       // Send the actual header
@@ -129,31 +144,28 @@ namespace LOFAR {
     void MPISendStation::sendBlock( const struct BlockReader<T>::Block &block )
     {
       /*
-       * SEND HEADERS (ASYNC)
+       * SEND HEADERS
        */
 
-      std::map<int, Header> headers;
       std::vector<MPI_Request> headerRequests;
 
-      for(std::set<int>::const_iterator i = targetRanks.begin(); i != targetRanks.end(); ++i) {
-        int rank = *i;
-
-        headerRequests.push_back(sendHeader<T>(rank, headers[rank], block));
+      for(std::vector<int>::const_iterator rank = targetRanks.begin(); rank != targetRanks.end(); ++rank) {
+        headerRequests.push_back(sendHeader<T>(*rank, headers[*rank], block));
       }
-      
+
       /*
-       * SEND PAYLOADS
+       * SEND BEAMLETS
        */
       std::vector<MPI_Request> beamletRequests(block.beamlets.size() * 2, MPI_REQUEST_NULL); // [beamlet][transfer]
       size_t nrBeamletRequests = 0;
+      for(std::map<size_t, int>::const_iterator dest = beamletTargets.begin(); dest != beamletTargets.end(); ++dest) {
+        const size_t beamletIdx = dest->first;
+        const int rank = dest->second;
 
-      for(size_t beamletIdx = 0; beamletIdx < block.beamlets.size(); ++beamletIdx) {
-        const struct BlockReader<T>::Block::Beamlet &ib = block.beamlets[beamletIdx];
-        const int rank = beamletDistribution.at(beamletIdx);
+        ASSERTSTR(beamletIdx < block.beamlets.size(), "Want to send beamlet #" << beamletIdx << " but block only contains " << block.beamlets.size() << " beamlets");
 
-        /*
-         * SEND BEAMLETS
-         */
+        // Send beamlet
+        const struct BlockReader<T>::Block::Beamlet &ib = block.beamlets[beamletIdx];
 
         nrBeamletRequests += sendData<T>(rank, beamletIdx, ib, &beamletRequests[beamletIdx * 2]);
       }
@@ -165,19 +177,19 @@ namespace LOFAR {
       std::vector<MPI_Request> flagRequests;
 
       for(size_t b = 0; b < nrBeamletRequests; ++b) {
-        const size_t sendIdx = waitAny(beamletRequests);
-        const size_t beamletIdx  = sendIdx / 2;
-        const size_t transfer    = sendIdx % 2;
+        const size_t sendIdx           = waitAny(beamletRequests);
+        const size_t globalBeamletIdx  = sendIdx / 2;
+        const size_t transfer          = sendIdx % 2;
 
-        const struct BlockReader<T>::Block::Beamlet &ib = block.beamlets[beamletIdx];
+        const struct BlockReader<T>::Block::Beamlet &ib = block.beamlets[globalBeamletIdx];
 
         // waitAny sets finished requests to MPI_REQUEST_NULL in our array.
-        if (ib.nrRanges == 1 || beamletRequests[beamletIdx * 2 + (1 - transfer)] == MPI_REQUEST_NULL) {
+        if (ib.nrRanges == 1 || beamletRequests[globalBeamletIdx * 2 + (1 - transfer)] == MPI_REQUEST_NULL) {
           /*
            * SEND FLAGS FOR BEAMLET
            */
 
-          const int rank = beamletDistribution.at(beamletIdx);
+          const int rank = beamletTargets.at(globalBeamletIdx);
 
           /*
            * OBTAIN FLAGS AFTER DATA IS SENT
@@ -185,12 +197,12 @@ namespace LOFAR {
 
           // The only valid samples are those that existed both
           // before and after the transfer.
-          SparseSet<int64> finalFlags = ib.flagsAtBegin & block.flags(beamletIdx);
+          SparseSet<int64> finalFlags = ib.flagsAtBegin & block.flags(globalBeamletIdx);
 
           /*
            * SEND FLAGS
            */
-          flagRequests.push_back(sendFlags(rank, beamletIdx, finalFlags));
+          flagRequests.push_back(sendFlags(rank, globalBeamletIdx, finalFlags));
         }
       }
 
diff --git a/RTCP/Cobalt/InputProc/src/Transpose/MPISendStation.h b/RTCP/Cobalt/InputProc/src/Transpose/MPISendStation.h
index 9d726e4a3f4..8a11444a4fb 100644
--- a/RTCP/Cobalt/InputProc/src/Transpose/MPISendStation.h
+++ b/RTCP/Cobalt/InputProc/src/Transpose/MPISendStation.h
@@ -62,9 +62,9 @@ namespace LOFAR
       //   The station index within this observation.
       // beamletDistribution
       //   The distribution of beamlets:
-      //     key   = beamletIdx
-      //     value = receiver MPI rank
-      MPISendStation( const struct BufferSettings &settings, size_t stationIdx, const std::map<size_t, int> &beamletDistribution );
+      //     key   = receiver MPI rank
+      //     value = beamlets to send
+      MPISendStation( const struct BufferSettings &settings, size_t stationIdx, const std::map<int, std::vector<size_t> > &beamletDistribution );
 
       // Send one block. The caller is responsible for matching the number of
       // posted receiveBlocks.
@@ -84,16 +84,18 @@ namespace LOFAR
       // Station number in observation [0..nrStations)
       const size_t stationIdx;
 
-      // To which rank to send each beamlet:
-      //   beamletDistribution[beamlet] = rank
-      const std::map<size_t, int> beamletDistribution;
+      // Which beamlets to send to which rank:
+      //   beamletDistribution[rank] = beamlets
+      const std::map<int, std::vector<size_t> > beamletDistribution;
 
-      // The ranks to which to send beamlets.
-      const std::set<int> targetRanks;
+      // Ranks to send data to
+      const std::vector<int> targetRanks;
 
-      // The beamlets to send to each rank:
-      //   beamletsOfTarget[rank] = [beamlet, beamlet, ...]
-      const std::map<int, std::vector<size_t> > beamletsOfTarget;
+      // The rank to which to send each beamlet.
+      const std::map<size_t, int> beamletTargets;
+
+      // Cache for the headers to send
+      std::map<int, MPIProtocol::Header> headers;
 
       // Construct and send a header to the given rank (async).
       template<typename T>
diff --git a/RTCP/Cobalt/InputProc/src/Transpose/MapUtil.h b/RTCP/Cobalt/InputProc/src/Transpose/MapUtil.h
index ee36bbb9aba..a4cba51eebc 100644
--- a/RTCP/Cobalt/InputProc/src/Transpose/MapUtil.h
+++ b/RTCP/Cobalt/InputProc/src/Transpose/MapUtil.h
@@ -1,6 +1,8 @@
 #ifndef LOFAR_INPUTPROC_MAPUTIL_H
 #define LOFAR_INPUTPROC_MAPUTIL_H
 
+#include <Common/LofarLogger.h>
+
 #include <vector>
 #include <map>
 
@@ -37,6 +39,20 @@ namespace LOFAR {
     }
 
 
+    // Returns the set of values of an std::map, if the values are vectors.
+    template<typename K, typename V>
+    std::vector<V> values( const std::map<K, std::vector<V> > &m )
+    {
+      std::vector<V> values;
+
+      for (typename std::map<K, std::vector<V> >::const_iterator i = m.begin(); i != m.end(); ++i) {
+        values.insert(values.end(), i->second.begin(), i->second.end());
+      }
+
+      return values;
+    }
+
+
     // Returns the inverse of an std::map.
     template<typename K, typename V>
     std::map<V, std::vector<K> > inverse( const std::map<K, V> &m )
@@ -50,6 +66,24 @@ namespace LOFAR {
       return inverse;
     }
 
+
+    // Returns the inverse of an std::map, if the values are vectors.
+    template<typename K, typename V>
+    std::map<V, K> inverse( const std::map<K, std::vector<V> > &m )
+    {
+      std::map<V, K> inverse;
+
+      for (typename std::map<K, std::vector<V> >::const_iterator i = m.begin(); i != m.end(); ++i) {
+        for (typename std::vector<V>::const_iterator j = i->second.begin(); j != i->second.end(); ++j) {
+          ASSERTSTR(inverse.find(*j) == inverse.end(), "Inverting map impossible due to duplicate values");
+
+          inverse[*j] = i->first;
+        }
+      }
+
+      return inverse;
+    }
+
   }
 }
 
diff --git a/RTCP/Cobalt/InputProc/test/tMPITransfer.cc b/RTCP/Cobalt/InputProc/test/tMPITransfer.cc
index 18dc741b375..206f3bdac5b 100644
--- a/RTCP/Cobalt/InputProc/test/tMPITransfer.cc
+++ b/RTCP/Cobalt/InputProc/test/tMPITransfer.cc
@@ -68,8 +68,7 @@ typedef SampleType<i16complex> SampleT;
 const TimeStamp from(time(0L) + 5, 0, clockHz);
 const TimeStamp to(time(0L) + 5 + DURATION, 0, clockHz);
 const size_t blockSize = BLOCKSIZE * clockHz / 1024;
-map<unsigned, vector<size_t> > beamlets;
-map<size_t, int> beamletDistribution;
+map<int, std::vector<size_t> > beamletDistribution;
 
 // Rank in MPI set of hosts
 int rank;
@@ -121,7 +120,7 @@ void sender()
 
       LOG_INFO_STR("Detected " << s);
       LOG_INFO_STR("Connecting to receivers to send " << from << " to " << to);
-      BlockReader<SampleT> reader(s, keys(beamletDistribution), 0.1);
+      BlockReader<SampleT> reader(s, values(beamletDistribution), 0.1);
       MPISendStation sender(s, rank, beamletDistribution);
 
       LOG_INFO_STR("Sending to receivers");
@@ -139,51 +138,47 @@ void sender()
 
 void receiver()
 {
-  int receiverNr = rank - nrStations;
-
-  LOG_INFO_STR("Receiver node " << rank << " starts, handling " << beamlets[receiverNr].size() << " subbands from " << nrStations << " stations." );
-  LOG_INFO_STR("Connecting to senders to receive " << from << " to " << to);
+  const std::vector<size_t> &beamlets = beamletDistribution[rank];
+  const size_t nrBeamlets = beamlets.size();
 
   std::vector<int> stationRanks(nrStations);
 
   for (size_t i = 0; i < stationRanks.size(); i++)
     stationRanks[i] = i;
 
-  {
-    MPIReceiveStations receiver(stationRanks, beamlets[receiverNr], blockSize);
-    const size_t nrStations = stationRanks.size();
-    const size_t nrBeamlets = beamlets[receiverNr].size();
+  LOG_INFO_STR("Receiver node " << rank << " starts, handling " << beamlets.size() << " subbands from " << nrStations << " stations." );
+  LOG_INFO_STR("Connecting to senders to receive " << from << " to " << to);
 
-    MultiDimArray<struct MPIReceiveStations::Beamlet<SampleT>, 2> block(boost::extents[nrStations][nrBeamlets]);
 
-    for (size_t s = 0; s < nrStations; ++s) {
-      for (size_t b = 0; b < nrBeamlets; ++b) {
-        block[s][b].samples.resize(blockSize);
-      }
-    }
+  MPIReceiveStations receiver(stationRanks, beamlets, blockSize);
 
-    size_t blockIdx = 0;
+  MultiDimArray<struct MPIReceiveStations::Beamlet<SampleT>, 2> block(boost::extents[nrStations][nrBeamlets]);
 
-    for(TimeStamp current = from; current + blockSize < to; current += blockSize) {
-      receiver.receiveBlock<SampleT>(block);
+  for (int s = 0; s < nrStations; ++s) {
+    for (size_t b = 0; b < nrBeamlets; ++b) {
+      block[s][b].samples.resize(blockSize);
+    }
+  }
 
-      // data is now in receiver.lastBlock
+  size_t blockIdx = 0;
 
-      // calculate flagging average
-      const size_t nrSamples = nrStations * nrBeamlets * blockSize;
-      size_t nrFlaggedSamples = 0;
+  for(TimeStamp current = from; current + blockSize < to; current += blockSize) {
+    receiver.receiveBlock<SampleT>(block);
 
-      for (size_t s = 0; s < nrStations; ++s) {
-        for (size_t b = 0; b < nrBeamlets; ++b) {
-          nrFlaggedSamples = block[s][b].flags.count();
-        }
+    // calculate flagging average
+    const size_t nrSamples = nrStations * nrBeamlets * blockSize;
+    size_t nrFlaggedSamples = 0;
+
+    for (int s = 0; s < nrStations; ++s) {
+      for (size_t b = 0; b < nrBeamlets; ++b) {
+        nrFlaggedSamples = block[s][b].flags.count();
       }
+    }
 
-      float flagPerc = 100.0f * nrFlaggedSamples / nrSamples;
+    float flagPerc = 100.0f * nrFlaggedSamples / nrSamples;
 
-      LOG_INFO_STR("Receiver " << rank << " received block " << blockIdx << " flags: " << flagPerc << "%" );
-      ++blockIdx;
-    }
+    LOG_INFO_STR("Receiver " << rank << " received block " << blockIdx << " flags: " << flagPerc << "%" );
+    ++blockIdx;
   }
 
   LOG_INFO_STR("Receiver " << rank << " done");
@@ -212,9 +207,13 @@ int main( int argc, char **argv )
   nrReceivers = nrHosts - nrStations;
 
   // Divide the subbands over the receivers
+  // Make sure not to use the identity list to detect
+  // mixups of stationBeamlet and beamletIdx.
   for (unsigned i = 0; i < NRBEAMLETS; ++i) {
-    beamlets[i % nrReceivers].push_back(i);
-    beamletDistribution[i] = nrStations + i % nrReceivers;
+    unsigned stationBeamlet = NRBEAMLETS - i - 1;
+    int receiverNr = i % nrReceivers;
+
+    beamletDistribution[nrStations + receiverNr].push_back(stationBeamlet);
   }
 
   if (rank < nrStations) {
-- 
GitLab