Skip to content
Snippets Groups Projects
Commit 3668511b authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #4315: Streamlined beamlet configuration for BlockReader and MPI classes

parent 7bf58105
Branches
Tags
No related merge requests found
......@@ -41,6 +41,12 @@ namespace LOFAR
// Block will span [from,to)
int64 from, to;
// Number of beamlets that will be sent
size_t nrBeamlets;
// The set of beamlets that will be sent
unsigned beamlets[1024]; // [beamlet]
// At which offset the data will be wrapped. If:
//
// =0: the data will be sent in 1 transfer:
......@@ -50,9 +56,6 @@ namespace LOFAR
// 2. a block of `(to - from) - wrapOffsets[x]' samples
size_t wrapOffsets[1024]; // [beamlet]
// Number of beamlets that will be sent
size_t nrBeamlets;
// Size of the marshalled flags
size_t metaDataSize;
};
......
......@@ -133,6 +133,8 @@ namespace LOFAR {
const size_t beamlet = beamlets[beamletIdx];
const size_t wrapOffset = header.wrapOffsets[beamletIdx];
ASSERTSTR(header.beamlets[beamletIdx] == beamlet, "Got beamlet " << header.beamlets[beamletIdx] << ", but expected beamlet " << beamlet);
/*
* RECEIVE BEAMLET (ASYNC)
*/
......
......@@ -37,16 +37,35 @@ namespace LOFAR {
namespace Cobalt {
MPISendStation::MPISendStation( const struct BufferSettings &settings, size_t stationIdx, const std::map<size_t, int> &beamletDistribution )
MPISendStation::MPISendStation( const struct BufferSettings &settings, size_t stationIdx, const std::map<int, std::vector<size_t> > &beamletDistribution )
:
logPrefix(str(boost::format("[station %s] [MPISendStation] ") % settings.station.stationName)),
settings(settings),
stationIdx(stationIdx),
beamletDistribution(beamletDistribution),
targetRanks(values(beamletDistribution)),
beamletsOfTarget(inverse(beamletDistribution))
targetRanks(keys(beamletDistribution)),
beamletTargets(inverse(beamletDistribution))
{
LOG_INFO_STR(logPrefix << "Initialised");
// Set static header info
for(std::vector<int>::const_iterator rank = targetRanks.begin(); rank != targetRanks.end(); ++rank) {
headers[*rank].station = this->settings.station;
headers[*rank].metaDataSize = this->metaDataSize();
}
// Set beamlet info
for(std::map<int, std::vector<size_t> >::const_iterator dest = beamletDistribution.begin(); dest != beamletDistribution.end(); ++dest) {
const int rank = dest->first;
const std::vector<size_t> &beamlets = dest->second;
Header &header = headers[rank];
header.nrBeamlets = beamlets.size();
ASSERT(header.nrBeamlets < sizeof header.beamlets / sizeof header.beamlets[0]);
std::copy(beamlets.begin(), beamlets.end(), &header.beamlets[0]);
}
}
template<typename T>
......@@ -54,22 +73,18 @@ namespace LOFAR {
{
LOG_DEBUG_STR(logPrefix << "Sending header to rank " << rank);
const std::vector<size_t> &beamlets = beamletsOfTarget.at(rank);
// Copy static blockrmation
header.station = this->settings.station;
// Copy dynamic header info
header.from = block.from;
header.to = block.to;
header.nrBeamlets = beamlets.size();
header.metaDataSize = this->metaDataSize();
// Copy the wrapOffsets
ASSERT(beamlets.size() <= sizeof header.wrapOffsets / sizeof header.wrapOffsets[0]);
// Copy the beam-specific data
ASSERT(header.nrBeamlets <= sizeof header.wrapOffsets / sizeof header.wrapOffsets[0]);
for(unsigned beamletIdx = 0; beamletIdx < beamlets.size(); ++beamletIdx) {
const struct BlockReader<T>::Block::Beamlet &ib = block.beamlets[beamlets[beamletIdx]];
for(size_t i = 0; i < header.nrBeamlets; ++i) {
size_t beamletIdx = header.beamlets[i];
const struct BlockReader<T>::Block::Beamlet &ib = block.beamlets[beamletIdx];
header.wrapOffsets[beamletIdx] = ib.nrRanges == 1 ? 0 : ib.ranges[0].to - ib.ranges[0].from;
header.wrapOffsets[i] = ib.nrRanges == 1 ? 0 : ib.ranges[0].to - ib.ranges[0].from;
}
// Send the actual header
......@@ -129,31 +144,28 @@ namespace LOFAR {
void MPISendStation::sendBlock( const struct BlockReader<T>::Block &block )
{
/*
* SEND HEADERS (ASYNC)
* SEND HEADERS
*/
std::map<int, Header> headers;
std::vector<MPI_Request> headerRequests;
for(std::set<int>::const_iterator i = targetRanks.begin(); i != targetRanks.end(); ++i) {
int rank = *i;
headerRequests.push_back(sendHeader<T>(rank, headers[rank], block));
for(std::vector<int>::const_iterator rank = targetRanks.begin(); rank != targetRanks.end(); ++rank) {
headerRequests.push_back(sendHeader<T>(*rank, headers[*rank], block));
}
/*
* SEND PAYLOADS
* SEND BEAMLETS
*/
std::vector<MPI_Request> beamletRequests(block.beamlets.size() * 2, MPI_REQUEST_NULL); // [beamlet][transfer]
size_t nrBeamletRequests = 0;
for(std::map<size_t, int>::const_iterator dest = beamletTargets.begin(); dest != beamletTargets.end(); ++dest) {
const size_t beamletIdx = dest->first;
const int rank = dest->second;
for(size_t beamletIdx = 0; beamletIdx < block.beamlets.size(); ++beamletIdx) {
const struct BlockReader<T>::Block::Beamlet &ib = block.beamlets[beamletIdx];
const int rank = beamletDistribution.at(beamletIdx);
ASSERTSTR(beamletIdx < block.beamlets.size(), "Want to send beamlet #" << beamletIdx << " but block only contains " << block.beamlets.size() << " beamlets");
/*
* SEND BEAMLETS
*/
// Send beamlet
const struct BlockReader<T>::Block::Beamlet &ib = block.beamlets[beamletIdx];
nrBeamletRequests += sendData<T>(rank, beamletIdx, ib, &beamletRequests[beamletIdx * 2]);
}
......@@ -166,18 +178,18 @@ namespace LOFAR {
for(size_t b = 0; b < nrBeamletRequests; ++b) {
const size_t sendIdx = waitAny(beamletRequests);
const size_t beamletIdx = sendIdx / 2;
const size_t globalBeamletIdx = sendIdx / 2;
const size_t transfer = sendIdx % 2;
const struct BlockReader<T>::Block::Beamlet &ib = block.beamlets[beamletIdx];
const struct BlockReader<T>::Block::Beamlet &ib = block.beamlets[globalBeamletIdx];
// waitAny sets finished requests to MPI_REQUEST_NULL in our array.
if (ib.nrRanges == 1 || beamletRequests[beamletIdx * 2 + (1 - transfer)] == MPI_REQUEST_NULL) {
if (ib.nrRanges == 1 || beamletRequests[globalBeamletIdx * 2 + (1 - transfer)] == MPI_REQUEST_NULL) {
/*
* SEND FLAGS FOR BEAMLET
*/
const int rank = beamletDistribution.at(beamletIdx);
const int rank = beamletTargets.at(globalBeamletIdx);
/*
* OBTAIN FLAGS AFTER DATA IS SENT
......@@ -185,12 +197,12 @@ namespace LOFAR {
// The only valid samples are those that existed both
// before and after the transfer.
SparseSet<int64> finalFlags = ib.flagsAtBegin & block.flags(beamletIdx);
SparseSet<int64> finalFlags = ib.flagsAtBegin & block.flags(globalBeamletIdx);
/*
* SEND FLAGS
*/
flagRequests.push_back(sendFlags(rank, beamletIdx, finalFlags));
flagRequests.push_back(sendFlags(rank, globalBeamletIdx, finalFlags));
}
}
......
......@@ -62,9 +62,9 @@ namespace LOFAR
// The station index within this observation.
// beamletDistribution
// The distribution of beamlets:
// key = beamletIdx
// value = receiver MPI rank
MPISendStation( const struct BufferSettings &settings, size_t stationIdx, const std::map<size_t, int> &beamletDistribution );
// key = receiver MPI rank
// value = beamlets to send
MPISendStation( const struct BufferSettings &settings, size_t stationIdx, const std::map<int, std::vector<size_t> > &beamletDistribution );
// Send one block. The caller is responsible for matching the number of
// posted receiveBlocks.
......@@ -84,16 +84,18 @@ namespace LOFAR
// Station number in observation [0..nrStations)
const size_t stationIdx;
// To which rank to send each beamlet:
// beamletDistribution[beamlet] = rank
const std::map<size_t, int> beamletDistribution;
// Which beamlets to send to which rank:
// beamletDistribution[rank] = beamlets
const std::map<int, std::vector<size_t> > beamletDistribution;
// The ranks to which to send beamlets.
const std::set<int> targetRanks;
// Ranks to send data to
const std::vector<int> targetRanks;
// The beamlets to send to each rank:
// beamletsOfTarget[rank] = [beamlet, beamlet, ...]
const std::map<int, std::vector<size_t> > beamletsOfTarget;
// The rank to which to send each beamlet.
const std::map<size_t, int> beamletTargets;
// Cache for the headers to send
std::map<int, MPIProtocol::Header> headers;
// Construct and send a header to the given rank (async).
template<typename T>
......
#ifndef LOFAR_INPUTPROC_MAPUTIL_H
#define LOFAR_INPUTPROC_MAPUTIL_H
#include <Common/LofarLogger.h>
#include <vector>
#include <map>
......@@ -37,6 +39,20 @@ namespace LOFAR {
}
// Returns the set of values of an std::map, if the values are vectors.
template<typename K, typename V>
std::vector<V> values( const std::map<K, std::vector<V> > &m )
{
std::vector<V> values;
for (typename std::map<K, std::vector<V> >::const_iterator i = m.begin(); i != m.end(); ++i) {
values.insert(values.end(), i->second.begin(), i->second.end());
}
return values;
}
// Returns the inverse of an std::map.
template<typename K, typename V>
std::map<V, std::vector<K> > inverse( const std::map<K, V> &m )
......@@ -50,6 +66,24 @@ namespace LOFAR {
return inverse;
}
// Returns the inverse of an std::map, if the values are vectors.
template<typename K, typename V>
std::map<V, K> inverse( const std::map<K, std::vector<V> > &m )
{
std::map<V, K> inverse;
for (typename std::map<K, std::vector<V> >::const_iterator i = m.begin(); i != m.end(); ++i) {
for (typename std::vector<V>::const_iterator j = i->second.begin(); j != i->second.end(); ++j) {
ASSERTSTR(inverse.find(*j) == inverse.end(), "Inverting map impossible due to duplicate values");
inverse[*j] = i->first;
}
}
return inverse;
}
}
}
......
......@@ -68,8 +68,7 @@ typedef SampleType<i16complex> SampleT;
const TimeStamp from(time(0L) + 5, 0, clockHz);
const TimeStamp to(time(0L) + 5 + DURATION, 0, clockHz);
const size_t blockSize = BLOCKSIZE * clockHz / 1024;
map<unsigned, vector<size_t> > beamlets;
map<size_t, int> beamletDistribution;
map<int, std::vector<size_t> > beamletDistribution;
// Rank in MPI set of hosts
int rank;
......@@ -121,7 +120,7 @@ void sender()
LOG_INFO_STR("Detected " << s);
LOG_INFO_STR("Connecting to receivers to send " << from << " to " << to);
BlockReader<SampleT> reader(s, keys(beamletDistribution), 0.1);
BlockReader<SampleT> reader(s, values(beamletDistribution), 0.1);
MPISendStation sender(s, rank, beamletDistribution);
LOG_INFO_STR("Sending to receivers");
......@@ -139,24 +138,23 @@ void sender()
void receiver()
{
int receiverNr = rank - nrStations;
LOG_INFO_STR("Receiver node " << rank << " starts, handling " << beamlets[receiverNr].size() << " subbands from " << nrStations << " stations." );
LOG_INFO_STR("Connecting to senders to receive " << from << " to " << to);
const std::vector<size_t> &beamlets = beamletDistribution[rank];
const size_t nrBeamlets = beamlets.size();
std::vector<int> stationRanks(nrStations);
for (size_t i = 0; i < stationRanks.size(); i++)
stationRanks[i] = i;
{
MPIReceiveStations receiver(stationRanks, beamlets[receiverNr], blockSize);
const size_t nrStations = stationRanks.size();
const size_t nrBeamlets = beamlets[receiverNr].size();
LOG_INFO_STR("Receiver node " << rank << " starts, handling " << beamlets.size() << " subbands from " << nrStations << " stations." );
LOG_INFO_STR("Connecting to senders to receive " << from << " to " << to);
MPIReceiveStations receiver(stationRanks, beamlets, blockSize);
MultiDimArray<struct MPIReceiveStations::Beamlet<SampleT>, 2> block(boost::extents[nrStations][nrBeamlets]);
for (size_t s = 0; s < nrStations; ++s) {
for (int s = 0; s < nrStations; ++s) {
for (size_t b = 0; b < nrBeamlets; ++b) {
block[s][b].samples.resize(blockSize);
}
......@@ -167,13 +165,11 @@ void receiver()
for(TimeStamp current = from; current + blockSize < to; current += blockSize) {
receiver.receiveBlock<SampleT>(block);
// data is now in receiver.lastBlock
// calculate flagging average
const size_t nrSamples = nrStations * nrBeamlets * blockSize;
size_t nrFlaggedSamples = 0;
for (size_t s = 0; s < nrStations; ++s) {
for (int s = 0; s < nrStations; ++s) {
for (size_t b = 0; b < nrBeamlets; ++b) {
nrFlaggedSamples = block[s][b].flags.count();
}
......@@ -184,7 +180,6 @@ void receiver()
LOG_INFO_STR("Receiver " << rank << " received block " << blockIdx << " flags: " << flagPerc << "%" );
++blockIdx;
}
}
LOG_INFO_STR("Receiver " << rank << " done");
}
......@@ -212,9 +207,13 @@ int main( int argc, char **argv )
nrReceivers = nrHosts - nrStations;
// Divide the subbands over the receivers
// Make sure not to use the identity list to detect
// mixups of stationBeamlet and beamletIdx.
for (unsigned i = 0; i < NRBEAMLETS; ++i) {
beamlets[i % nrReceivers].push_back(i);
beamletDistribution[i] = nrStations + i % nrReceivers;
unsigned stationBeamlet = NRBEAMLETS - i - 1;
int receiverNr = i % nrReceivers;
beamletDistribution[nrStations + receiverNr].push_back(stationBeamlet);
}
if (rank < nrStations) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment