Skip to content
Snippets Groups Projects
Commit 796067d4 authored by John Romein's avatar John Romein
Browse files

BugID: 225

Use MPI_Alltoallv instead of CEPframe's point-to-point communication.
parent 3ebfcf79
No related branches found
No related tags found
No related merge requests found
......@@ -31,14 +31,23 @@
#include <Transport/DataHolder.h>
#include <Transport/TH_MPI.h>
#if defined HAVE_BGL
#include <bglpersonality.h>
#include <rts.h>
#endif
#include <exception>
#include <iostream>
#include <string>
#include <vector>
#include <boost/multi_array.hpp>
#define SIMULATE_PSETS
#define NR_POLARIZATIONS 2
namespace LOFAR {
namespace CS1 {
......@@ -94,6 +103,12 @@ class Position
Position(unsigned x, unsigned y, unsigned z, unsigned t);
Position(unsigned rank);
#if defined SIMULATE_PSETS
static const unsigned psetSize = 16;
#else
static const unsigned psetSize = 1;
#endif
unsigned rank() const;
unsigned psetNumber() const;
Position psetBase() const;
......@@ -118,15 +133,21 @@ Position::Position(unsigned x, unsigned y, unsigned z, unsigned t)
Position::Position(unsigned rank)
{
#if defined SIMULATE_PSETS
if (rts_coordinatesForRank(rank, &x, &y, &z, &t) != 0) {
cerr << "error calling rts_coordinatesForRank" << endl;
exit(1);
}
#else
x = rank;
y = z = t = 0;
#endif
}
unsigned Position::rank() const
{
#if defined SIMULATE_PSETS
unsigned rank, numProcs;
if (rts_rankForCoordinates(x, y, z, t, &rank, &numProcs) != 0) {
......@@ -135,40 +156,63 @@ unsigned Position::rank() const
}
return rank;
#else
return x;
#endif
}
unsigned Position::psetNumber() const
{
#if defined SIMULATE_PSETS
return (x / 2) + (xSize / 2) * ((y / 2) + (ySize / 2) * (z / 2));
#else
return x;
#endif
}
Position Position::psetBase() const
{
#if defined SIMULATE_PSETS
return Position(x & ~1, y & ~1, z & ~1, 0);
#else
return *this;
#endif
}
Position Position::psetBase(unsigned psetNumber)
{
#if defined SIMULATE_PSETS
return Position(2 * (psetNumber % (xSize / 2)),
2 * (psetNumber / (xSize / 2) % (ySize / 2)),
2 * (psetNumber / (xSize / 2) / (ySize / 2)),
0);
#else
return Position(psetNumber);
#endif
}
Position Position::positionInPset(unsigned index) const
{
#if defined SIMULATE_PSETS
Position base = psetBase();
return Position(base.x + index % 2, base.y + index / 2 % 2, base.z + index / 4 % 2, base.t + index / 8 % 2);
#else
return *this;
#endif
}
unsigned Position::indexInPset() const
{
#if defined SIMULATE_PSETS
return (x % 2) + 2 * (y % 2) + 4 * (z % 2) + 8 * (t % 2);
#else
return 0;
#endif
}
......@@ -177,47 +221,68 @@ unsigned Position::indexInPset() const
class WH_Transpose : public WorkHolder
{
public:
WH_Transpose(const string &name, const ACC::APS::ParameterSet &ps, unsigned rank);
typedef i4complex SampleType;
WH_Transpose(const string &name, const ACC::APS::ParameterSet &ps, unsigned rank, MPI_Comm comm);
virtual void preprocess();
virtual void process();
virtual void postprocess();
private:
virtual WorkHolder *make(const string &name);
bool isInput() const, isOutput() const;
void sendToAll();
void receiveFromAll();
void allToAll();
const ACC::APS::ParameterSet &itsParamSet;
unsigned itsNrStations;
unsigned itsCoreNumber, itsPsetNumber, itsPsetIndex;
unsigned itsNrStations, itsNrCorrelatorPsets, itsNrPsets;
unsigned itsPhase;
unsigned itsNrSamplesPerIntegration;
boost::multi_array<SampleType, 3> *itsInData, *itsOutData;
MPI_Comm itsMPIcomm;
};
WH_Transpose::WH_Transpose(const string &name, const ACC::APS::ParameterSet &ps, unsigned rank)
WH_Transpose::WH_Transpose(const string &name, const ACC::APS::ParameterSet &ps, unsigned rank, MPI_Comm communicator)
:
WorkHolder(ps.getUint32("Observation.NStations"),
ps.getUint32("Observation.NStations"),
name, string("WH_Transpose")),
WorkHolder(0, 0, name, string("WH_Transpose")),
itsParamSet(ps),
itsNrStations(ps.getUint32("Observation.NStations")),
itsCoreNumber(rank),
itsPsetNumber(Position(rank).psetNumber()),
itsPsetIndex(Position(rank).indexInPset()),
itsPhase(itsPsetIndex)
itsNrStations(ps.getUint32("Observation.NStations")),
itsNrCorrelatorPsets(ps.getUint32("Observation.NSubbands") / ps.getUint32("General.SubbandsPerPset")),
itsNrPsets(std::max(itsNrStations, itsNrCorrelatorPsets)),
itsPhase(itsPsetIndex),
itsNrSamplesPerIntegration(ps.getUint32("Observation.NSubbandSamples")),
itsMPIcomm(communicator)
{
TinyDataManager &dm = getDataManager();
}
for (unsigned i = 0; i < itsNrStations; i ++) {
dm.addInDataHolder(i, new DH_Station("input", ps));
dm.addOutDataHolder(i, new DH_Station("input", ps));
}
inline bool WH_Transpose::isInput() const
{
return itsPsetNumber < itsNrStations;
}
inline bool WH_Transpose::isOutput() const
{
return itsPsetNumber < itsNrCorrelatorPsets;
}
void WH_Transpose::preprocess()
{
if (isInput())
itsInData = new boost::multi_array<SampleType, 3>(boost::extents[itsNrCorrelatorPsets][itsNrSamplesPerIntegration][NR_POLARIZATIONS]);
if (isOutput())
itsOutData = new boost::multi_array<SampleType, 3>(boost::extents[itsNrStations][itsNrSamplesPerIntegration][NR_POLARIZATIONS]);
}
......@@ -230,54 +295,64 @@ void WH_Transpose::process()
totalTimer.start();
transposeTimer.start();
if (itsPhase == 2)
receiveFromAll();
if (itsPhase == 1)
sendToAll();
if (itsPhase == 0)
allToAll();
TH_MPI::synchroniseAllProcesses();
transposeTimer.stop();
totalTimer.stop();
++ itsPhase, itsPhase %= 16;
++ itsPhase, itsPhase %= Position::psetSize;
}
void WH_Transpose::sendToAll()
void WH_Transpose::allToAll()
{
unsigned station = itsPsetNumber, i = 0;
int sendCounts[itsNrPsets], sendDisplacements[itsNrPsets];
int receiveCounts[itsNrPsets], receiveDisplacements[itsNrPsets];
for (unsigned pset = 0; pset < itsNrPsets; pset ++) {
if (isInput() && pset < itsNrCorrelatorPsets) {
sendCounts[pset] = (*itsInData)[pset].num_elements() * sizeof(SampleType);
sendDisplacements[pset] = ((*itsInData)[pset].origin() - itsInData->origin()) / sizeof(SampleType);
} else {
sendCounts[pset] = 0;
sendDisplacements[pset] = 0;
}
if (isOutput() && pset < itsNrStations) {
receiveCounts[pset] = (*itsOutData)[pset].num_elements() * sizeof(SampleType);
receiveDisplacements[pset] = ((*itsOutData)[pset].origin() - itsOutData->origin()) / sizeof(SampleType);
} else {
receiveCounts[pset] = 0;
receiveDisplacements[pset] = 0;
}
}
do {
unsigned psetIndex = (itsPsetIndex + 1) % 16;
unsigned dest = Position::psetBase(station).positionInPset(psetIndex).rank();
getDataManager().getOutHolder(i);
((unsigned *) getDataManager().getOutHolder(i)->getDataPtr())[100] = itsCoreNumber;
//clog << itsCoreNumber << " sends message to " << dest << ", size = " << getDataManager().getOutHolder(i)->getDataSize() << endl;
//NSTimer timer("send timer", true);
//timer.start();
getDataManager().readyWithOutHolder(i);
//timer.stop();
} while (++ i, ++ station, station %= itsNrStations, station != itsPsetNumber);
if (MPI_Alltoallv(isInput() ? itsInData->origin() : 0,
sendCounts, sendDisplacements, MPI_BYTE,
isOutput() ? itsOutData->origin() : 0,
receiveCounts, receiveDisplacements, MPI_BYTE,
itsMPIcomm) != MPI_SUCCESS)
{
std::cerr << "MPI_Alltoallv() failed" << std::endl;
exit(1);
}
}
void WH_Transpose::receiveFromAll()
void WH_Transpose::postprocess()
{
unsigned station = itsPsetNumber, i = 0;
if (isInput())
delete itsInData;
do {
unsigned psetIndex = (itsPsetIndex - 1) % 16;
unsigned source = Position::psetBase(station).positionInPset(psetIndex).rank();
getDataManager().getInHolder(i);
getDataManager().readyWithInHolder(i);
//clog << itsCoreNumber << " received message from " << source << ", value = " << ((unsigned *) getDataManager().getInHolder(i)->getDataPtr())[100] << endl;
} while (++ i, ++ station, station %= itsNrStations, station != itsPsetNumber);
if (isOutput())
delete itsOutData;
}
WorkHolder *WH_Transpose::make(const string &name)
{
return new WH_Transpose(name, itsParamSet, itsCoreNumber);
return new WH_Transpose(name, itsParamSet, itsCoreNumber, itsMPIcomm);
}
......@@ -330,6 +405,7 @@ class AH_Transpose : public TinyApplicationHolder
void AH_Transpose::define(const KeyValueMap &)
{
#if defined HAVE_BGL
struct BGLPersonality personality;
if (rts_get_personality(&personality, sizeof personality) != 0) {
......@@ -342,45 +418,62 @@ void AH_Transpose::define(const KeyValueMap &)
Position::zSize = personality.getZsize();
//clog << itsCoreNumber << " at (" << personality.getXcoord() << ',' << personality.getYcoord() << ',' << personality.getZcoord() << "), phase = " << itsPhase << endl;
#else
Position::xSize = TH_MPI::getNumberOfNodes();
#endif
unsigned nrStations = itsParamSet.getUint32("Observation.NStations");
unsigned nrNodes = TH_MPI::getNumberOfNodes();
unsigned nrCorrelatorPsets = itsParamSet.getUint32("Observation.NSubbands") / itsParamSet.getUint32("General.SubbandsPerPset");
if (16 * nrStations > nrNodes) {
if (Position::psetSize * nrStations > nrNodes) {
if (TH_MPI::getCurrentRank() == 0)
cerr << "Too many stations for number of nodes" << endl;
exit(1);
}
for (int rank = 0; rank < nrNodes; rank ++) {
WorkHolder *wh = Position(rank).psetNumber() < nrStations ?
(WorkHolder *) new WH_Transpose("WH_Transpose", itsParamSet, rank) :
(WorkHolder *) new WH_Idle("WH_Idle");
wh->runOnNode(rank);
itsWHs.push_back(wh);
if (Position::psetSize * nrCorrelatorPsets > nrNodes) {
if (TH_MPI::getCurrentRank() == 0)
cerr << "Too many subbands divided over too few psets" << endl;
exit(1);
}
unsigned nrPsetsNeeded = std::max(nrStations, nrCorrelatorPsets);
MPI_Group all, group;
MPI_Comm comms[Position::psetSize];
if (MPI_Comm_group(MPI_COMM_WORLD, &all) != MPI_SUCCESS) {
std::cerr << "MPI_Comm_group() failed" << std::endl;
exit(1);
}
for (unsigned sourcePset = 0; sourcePset < nrStations; sourcePset ++) {
for (unsigned sourcePsetIndex = 0; sourcePsetIndex < 16; sourcePsetIndex ++) {
for (unsigned destPset = 0; destPset < nrStations; destPset ++) {
unsigned destPsetIndex = (sourcePsetIndex + 1) % 16;
unsigned source = Position::psetBase(sourcePset).positionInPset(sourcePsetIndex).rank();
unsigned dest = Position::psetBase(destPset).positionInPset(destPsetIndex).rank();
unsigned channel = (destPset - sourcePset + nrStations) % nrStations; // unsigned modulo
TH_MPI *th = new TH_MPI(source, dest);
TinyDataManager &sourceDM = itsWHs[source]->getDataManager();
TinyDataManager &destDM = itsWHs[dest]->getDataManager();
BGLConnection *connection = new BGLConnection("mpi", sourceDM.getGeneralOutHolder(channel), destDM.getGeneralInHolder(channel), th);
sourceDM.setOutConnection(channel, connection);
sourceDM.setAutoTriggerOut(channel, false);
destDM.setInConnection(channel, connection);
destDM.setAutoTriggerIn(channel, false);
}
for (unsigned psetIndex = 0; psetIndex < Position::psetSize; psetIndex ++) {
int ranks[nrPsetsNeeded];
for (unsigned pset = 0; pset < nrPsetsNeeded; pset ++)
ranks[pset] = Position::psetBase(pset).positionInPset(psetIndex).rank();
if (MPI_Group_incl(all, nrPsetsNeeded, ranks, &group) != MPI_SUCCESS) {
std::cerr << "MPI_Group_incl() failed" << std::endl;
exit(1);
}
if (MPI_Comm_create(MPI_COMM_WORLD, group, &comms[psetIndex]) != MPI_SUCCESS) {
std::cerr << "MPI_Comm_create() failed" << std::endl;
exit(1);
}
}
for (unsigned rank = 0; rank < nrNodes; rank ++) {
WorkHolder *wh = Position(rank).psetNumber() < nrPsetsNeeded ?
(WorkHolder *) new WH_Transpose("WH_Transpose", itsParamSet, rank, comms[Position(rank).indexInPset()]) :
(WorkHolder *) new WH_Idle("WH_Idle");
wh->runOnNode(rank);
itsWHs.push_back(wh);
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment