Skip to content
Snippets Groups Projects
Commit 164149e7 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #5883: Merged MPIUtil2 into MPIUtil

parent 1aedf36b
No related branches found
No related tags found
No related merge requests found
Showing with 480 additions and 531 deletions
......@@ -39,7 +39,7 @@
#include <mpi.h>
#include <InputProc/Transpose/MPISendStation.h>
#include <InputProc/Transpose/MapUtil.h>
#include <InputProc/Transpose/MPIUtil2.h>
#include <InputProc/Transpose/MPIUtil.h>
#endif
#include <Common/LofarLogger.h>
......
......@@ -44,7 +44,6 @@
#ifdef HAVE_MPI
#include <mpi.h>
#include <InputProc/Transpose/MPIUtil.h>
#include <InputProc/Transpose/MPIUtil2.h>
#endif
#include <boost/format.hpp>
......
......@@ -10,7 +10,6 @@
#include <mpi.h>
#include <InputProc/Transpose/MPIUtil.h>
#include <InputProc/Transpose/MPIUtil2.h>
#include <omp.h>
#include <boost/format.hpp>
......
......@@ -23,7 +23,6 @@ if(MPI_FOUND)
Transpose/MPISendStation.cc
Transpose/MPIReceiveStations.cc
Transpose/MPIUtil.cc
Transpose/MPIUtil2.cc
)
endif(MPI_FOUND)
......
......@@ -22,7 +22,6 @@
#include <lofar_config.h>
#include "MPIReceiveStations.h"
#include "MPIUtil.h"
#include "MPIUtil2.h"
#include <InputProc/SampleType.h>
......
......@@ -23,7 +23,6 @@
#include "MPISendStation.h"
#include "MapUtil.h"
#include "MPIUtil.h"
#include "MPIUtil2.h"
#include <InputProc/SampleType.h>
......
#include <lofar_config.h>
#include "MPIUtil.h"
#include "MPIUtil2.h"
#include <iomanip>
......@@ -17,15 +16,21 @@
#include <ctime>
#include <boost/lexical_cast.hpp>
#include <Common/Thread/Mutex.h>
#include <Common/Timer.h>
#include <CoInterface/SmartPtr.h>
#include <CoInterface/TimeFuncs.h>
using namespace std;
namespace LOFAR {
namespace Cobalt {
// 'mpi' uses both MPIMutex and mpiPoller,
// so those need to be initialised first
// to avoid crashes at destruction.
Mutex MPIMutex;
static MPIPoll mpiPoller;
MPI mpi;
MPI::MPI()
:
......@@ -82,7 +87,7 @@ namespace LOFAR {
ASSERT(itsRank == real_rank);
ASSERT(itsSize == real_size);
MPIPoll::instance().start();
mpiPoller.start();
#endif
}
......@@ -94,14 +99,12 @@ namespace LOFAR {
#ifdef HAVE_MPI
ScopedLock sl(MPIMutex);
MPIPoll::instance().stop();
mpiPoller.stop();
MPI_Finalize();
#endif
}
MPI mpi;
void *MPIAllocator::allocate(size_t size, size_t alignment)
{
ScopedLock sl(MPIMutex);
......@@ -185,6 +188,348 @@ namespace LOFAR {
return request;
}
MPIPoll::MPIPoll()
:
started(false),
done(false)
{
}
MPIPoll::~MPIPoll()
{
stop();
}
void MPIPoll::add( RequestSet *set ) {
ASSERT(set != NULL);
DEBUG("MPIPoll::add " << set->name);
ASSERT(mpiPoller.is_started());
ScopedLock sl(mutex);
requests.push_back(set);
newRequest.signal();
}
void MPIPoll::remove( RequestSet *set ) {
ScopedLock sl(mutex);
_remove(set);
}
void MPIPoll::_remove( RequestSet *set ) {
ASSERT(set != NULL);
DEBUG("MPIPoll::_remove " << set->name);
const std::vector<RequestSet*>::iterator i = std::find(requests.begin(), requests.end(), set);
if (i == requests.end())
return;
DEBUG("MPIPoll::_remove found and removing " << set->name);
requests.erase(i);
}
bool MPIPoll::have( RequestSet *set ) {
ScopedLock sl(mutex);
return std::find(requests.begin(), requests.end(), set) != requests.end();
}
void MPIPoll::start() {
DEBUG("MPIPoll::start");
started = true;
thread = new Thread(this, &MPIPoll::pollThread, "MPIPoll::pollThread");
}
void MPIPoll::stop() {
DEBUG("MPIPoll::stop");
{
ScopedLock sl(mutex);
done = true;
// Unlock thread if it is waiting for a new request
newRequest.signal();
}
// Wait for thread to finish
thread = 0;
DEBUG("MPIPoll::stop stopped");
}
// Track the time spent on lock contention
NSTimer MPIMutexTimer("MPIPoll::MPIMutex lock()", true, true);
// Track the time spent in MPI_Testsome
NSTimer MPITestsomeTimer("MPIPoll::MPI_Testsome", true, true);
std::vector<int> MPIPoll::testSome( std::vector<handle_t> &handles ) const {
DEBUG("MPIPoll::testSome on " << handles.size() << " handles");
vector<int> doneset;
if (handles.empty())
return doneset;
doneset.resize(handles.size());
int outcount;
{
MPIMutexTimer.start();
ScopedLock sl(MPIMutex);
MPIMutexTimer.stop();
// MPI_Testsome will put the indices of finished requests in doneset,
// and set the respective handle to MPI_REQUEST_NULL.
//
// Note that handles that are MPI_REQUEST_NULL on input are ignored.
MPITestsomeTimer.start();
MPI_Testsome(handles.size(), &handles[0], &outcount, &doneset[0], MPI_STATUSES_IGNORE);
MPITestsomeTimer.stop();
}
// Cut off doneset at the actual number of returned indices
doneset.resize(outcount);
return doneset;
}
namespace {
struct handle_ref {
RequestSet *set;
size_t index;
handle_ref(RequestSet *set, size_t index): set(set), index(index) {}
};
};
bool MPIPoll::handleRequests()
{
// Collect all ACTIVE requests, and keep track of their index
vector<handle_t> handles;
vector<handle_ref> references;
for (size_t i = 0; i < requests.size(); ++i) {
ScopedLock sl(requests[i]->mutex);
for (size_t j = 0; j < requests[i]->handles.size(); ++j) {
if (requests[i]->states[j] != RequestSet::ACTIVE)
continue;
handles.push_back(requests[i]->handles[j]);
references.push_back(handle_ref(requests[i], j));
}
}
// Ask MPI which requests have finished
//
// NOTE: Finished requests are set to MPI_REQUEST_NULL in `handles'.
const vector<int> finishedIndices = testSome(handles);
// Process finished requests
for(size_t i = 0; i < finishedIndices.size(); ++i) {
struct handle_ref &ref = references[finishedIndices[i]];
RequestSet &set = *(ref.set);
ScopedLock sl(set.mutex);
// Mark as FINISHED
DEBUG("MPIPoll::handleRequest: marking " << set.name << "[" << ref.index << "] as FINISHED");
ASSERT(set.states[ref.index] == RequestSet::ACTIVE);
set.states[ref.index] = RequestSet::FINISHED;
set.nrFinished++;
// Inform waitAny/waitSome threads
if (!set.willWaitAll)
set.oneFinished.signal();
if (set.nrFinished == set.handles.size()) {
DEBUG("MPIPoll::handleRequest: all requests in " << set.name << " are FINISHED");
// Inform waitAll threads
if (set.willWaitAll)
set.allFinished.signal();
// Remove this set from the requests to watch
_remove(&set);
}
}
return !finishedIndices.empty();
}
void MPIPoll::pollThread() {
Thread::ScopedPriority sp(SCHED_FIFO, 10);
ScopedLock sl(mutex);
while(!done) {
// next poll will be in 0.1 ms
//
// NOTE: MPI is VERY sensitive to this, requiring
// often enough polling to keep transfers
// running smoothly.
if (requests.empty()) {
// wait for request, with lock released
newRequest.wait(mutex);
} else {
// poll all handles
(void)handleRequests();
// if there are still pending requests, release
// the lock and just wait with a timeout
if (!requests.empty()) {
struct timespec deadline = TimeSpec::now();
TimeSpec::inc(deadline, 0.0001);
newRequest.wait(mutex, deadline);
}
}
}
}
RequestSet::RequestSet(const std::vector<handle_t> &handles, bool willWaitAll, const std::string &name)
:
name(name),
willWaitAll(willWaitAll),
handles(handles),
states(handles.size(), ACTIVE),
nrFinished(0)
{
// Requests shouldn't be MPI_REQUEST_NULL,
// because those will never be reported as FINISHED
for (size_t i = 0; i < states.size(); ++i) {
ASSERT(handles[i] != MPI_REQUEST_NULL);
}
// register ourselves
mpiPoller.add(this);
}
RequestSet::~RequestSet()
{
// all requests should be finished and reported by now
{
ScopedLock sl(mutex);
ASSERT(nrFinished == handles.size());
for (size_t i = 0; i < states.size(); ++i) {
ASSERT(states[i] == REPORTED);
}
}
// we should have been unregistered once our last
// request was FINISHED
{
ScopedLock sl(mutex);
ASSERT(!mpiPoller.have(this));
}
}
size_t RequestSet::waitAny() {
ASSERT(!willWaitAll);
ScopedLock sl(mutex);
for(;;) {
// Look for a finished request that hasn't been
// reported yet.
for (size_t i = 0; i < states.size(); ++i) {
if (states[i] == FINISHED) {
states[i] = REPORTED;
DEBUG("RequestSet::waitAny: set " << name << " finished request " << i);
return i;
}
}
// Wait for another request to finish
DEBUG("RequestSet::waitAny: set " << name << " waits for a request to finish");
// There has to be something to wait for
ASSERT(nrFinished < handles.size());
oneFinished.wait(mutex);
}
}
vector<size_t> RequestSet::waitSome() {
ASSERT(!willWaitAll);
ScopedLock sl(mutex);
vector<size_t> finished;
do {
// Look for all finished requests that haven't been
// reported yet.
for (size_t i = 0; i < states.size(); ++i) {
if (states[i] == FINISHED) {
states[i] = REPORTED;
finished.push_back(i);
}
}
if (finished.empty()) {
// Wait for another request to finish
DEBUG("RequestSet::waitSome: set " << name << " waits for a request to finish");
// There has to be something to wait for
ASSERT(nrFinished < handles.size());
oneFinished.wait(mutex);
}
} while (finished.empty());
DEBUG("RequestSet::waitSome: set " << name << " finished " << finished.size() << " requests");
return finished;
}
void RequestSet::waitAll() {
ASSERT(willWaitAll);
ScopedLock sl(mutex);
while (nrFinished < handles.size()) {
DEBUG("RequestSet::waitAll: set " << name << " has " << nrFinished << "/" << handles.size() << " requests finished");
// Wait for all requests to finish
allFinished.wait(mutex);
}
DEBUG("RequestSet::waitAll: set " << name << " finished all requests");
// Mark all requests as reported
for (size_t i = 0; i < states.size(); ++i) {
ASSERT(states[i] >= FINISHED);
states[i] = REPORTED;
}
}
}
}
......@@ -5,6 +5,8 @@
#include <vector>
#include <Common/Thread/Mutex.h>
#include <Common/Thread/Thread.h>
#include <Common/Thread/Condition.h>
#include <CoInterface/Allocator.h>
#include <CoInterface/SmartPtr.h>
......@@ -117,6 +119,132 @@ namespace LOFAR {
* NOT LOCKED
*/
MPI_Request Guarded_MPI_Irecv(void *ptr, size_t numBytes, int srcRank, int tag);
class RequestSet;
typedef MPI_Request handle_t;
class MPIPoll {
public:
MPIPoll();
~MPIPoll();
// Register a set of MPI requests
//
// MPIPoll does NOT take ownership, and
// will unregister the set once all requests
// have been completed.
//
// It is the programmer's responsibility to
// make sure that `set' is destructed only
// after all its requests are finished.
void add( RequestSet *set );
// Unregister a set of MPI requests
void remove( RequestSet *set );
// Whether a certain set is registered.
bool have( RequestSet *set );
// Start watching the registered MPI requests.
void start();
// Stop watching the registered MPI requests.
void stop();
// Return whether we've been started
bool is_started() const { return started; }
private:
bool started;
bool done;
std::vector<RequestSet *> requests;
Mutex mutex;
Condition newRequest;
// The thread that periodically polls MPI
SmartPtr<Thread> thread;
// Unregister a set of MPI requests (doesn't grab mutex)
void _remove( RequestSet *set );
// Call MPI to test which requests have finished.
//
// A vector of indices is returned of the requests that
// have finished. Also, all finished handles are set
// to MPI_REQUEST_NULL as a side-effect from the MPI call used.
//
// Indices are of type `int' instead of `size_t' because that is
// what MPI_TestSome returns.
std::vector<int> testSome( std::vector<handle_t> &handles ) const;
// Test all registered requests, and for finished ones:
//
// 1. Update their status
// 2. Inform their owner
//
// If all requests from a RequestSet are completed, the set
// will be unregistered.
bool handleRequests();
// Keep polling for new requests, and handle the registered ones
// periodically.
void pollThread();
};
class RequestSet {
public:
// Register a set of handles to watch
// for completion.
RequestSet(const std::vector<handle_t> &handles, bool willWaitAll, const std::string &name = "<anonymous>");
~RequestSet();
// Wait for one request to finish, and
// return its index.
size_t waitAny();
// Wait for one or more requests to finish,
// and return a vector of their indices.
std::vector<size_t> waitSome();
// Wait for all requests to finish.
void waitAll();
private:
// non-copyable
RequestSet(const RequestSet &);
friend class MPIPoll;
// An identifier for this set, used for debugging purposes
const std::string name;
// true: caller will use waitAll()
// false: caller will use waitAny()/waitSome()
const bool willWaitAll;
// MPI handles to watch
const std::vector<handle_t> handles;
// ACTIVE: Transfer is still in operation.
// FINISHED: MPI reported the transfer as finished.
// REPORTED: This transfer has been reported as finished
// to our callers (using waitAny/waitSome/waitAll).
enum State { ACTIVE, FINISHED, REPORTED };
// State of each handle
std::vector<State> states;
// How many requests have been completed
size_t nrFinished;
Mutex mutex;
Condition oneFinished;
Condition allFinished;
};
}
}
......
#include <lofar_config.h>
#include "MPIUtil2.h"
#include "MPIUtil.h"
#include <Common/LofarLogger.h>
#include <Common/Singleton.h>
#include <Common/Timer.h>
#include <Common/Thread/Thread.h>
#include <Common/Thread/Condition.h>
#include <Common/Thread/Mutex.h>
#include <CoInterface/SmartPtr.h>
#include <CoInterface/TimeFuncs.h>
#include <sys/time.h>
#include <algorithm>
//#define DEBUG_MPI
#ifdef DEBUG_MPI
#define DEBUG(str) LOG_INFO_STR(str)
#else
#define DEBUG(str)
#endif
using namespace std;
namespace LOFAR {
namespace Cobalt {
MPIPoll::MPIPoll()
:
started(false),
done(false)
{
}
void MPIPoll::add( RequestSet *set ) {
ASSERT(set != NULL);
DEBUG("MPIPoll::add " << set->name);
ASSERT(MPIPoll::instance().is_started());
ScopedLock sl(mutex);
requests.push_back(set);
newRequest.signal();
}
void MPIPoll::remove( RequestSet *set ) {
ScopedLock sl(mutex);
_remove(set);
}
void MPIPoll::_remove( RequestSet *set ) {
ASSERT(set != NULL);
DEBUG("MPIPoll::_remove " << set->name);
const std::vector<RequestSet*>::iterator i = std::find(requests.begin(), requests.end(), set);
if (i == requests.end())
return;
DEBUG("MPIPoll::_remove found and removing " << set->name);
requests.erase(i);
}
bool MPIPoll::have( RequestSet *set ) {
ScopedLock sl(mutex);
return std::find(requests.begin(), requests.end(), set) != requests.end();
}
void MPIPoll::start() {
DEBUG("MPIPoll::start");
started = true;
thread = new Thread(this, &MPIPoll::pollThread, "MPIPoll::pollThread");
}
void MPIPoll::stop() {
DEBUG("MPIPoll::stop");
{
ScopedLock sl(mutex);
done = true;
// Unlock thread if it is waiting for a new request
newRequest.signal();
}
// Wait for thread to finish
thread = 0;
DEBUG("MPIPoll::stop stopped");
}
// Track the time spent on lock contention
NSTimer MPIMutexTimer("MPIPoll::MPIMutex lock()", true, true);
// Track the time spent in MPI_Testsome
NSTimer MPITestsomeTimer("MPIPoll::MPI_Testsome", true, true);
std::vector<int> MPIPoll::testSome( std::vector<handle_t> &handles ) const {
DEBUG("MPIPoll::testSome on " << handles.size() << " handles");
vector<int> doneset;
if (handles.empty())
return doneset;
doneset.resize(handles.size());
int outcount;
{
MPIMutexTimer.start();
ScopedLock sl(MPIMutex);
MPIMutexTimer.stop();
// MPI_Testsome will put the indices of finished requests in doneset,
// and set the respective handle to MPI_REQUEST_NULL.
//
// Note that handles that are MPI_REQUEST_NULL on input are ignored.
MPITestsomeTimer.start();
MPI_Testsome(handles.size(), &handles[0], &outcount, &doneset[0], MPI_STATUSES_IGNORE);
MPITestsomeTimer.stop();
}
// Cut off doneset at the actual number of returned indices
doneset.resize(outcount);
return doneset;
}
namespace {
struct handle_ref {
RequestSet *set;
size_t index;
handle_ref(RequestSet *set, size_t index): set(set), index(index) {}
};
};
bool MPIPoll::handleRequests()
{
// Collect all ACTIVE requests, and keep track of their index
vector<handle_t> handles;
vector<handle_ref> references;
for (size_t i = 0; i < requests.size(); ++i) {
ScopedLock sl(requests[i]->mutex);
for (size_t j = 0; j < requests[i]->handles.size(); ++j) {
if (requests[i]->states[j] != RequestSet::ACTIVE)
continue;
handles.push_back(requests[i]->handles[j]);
references.push_back(handle_ref(requests[i], j));
}
}
// Ask MPI which requests have finished
//
// NOTE: Finished requests are set to MPI_REQUEST_NULL in `handles'.
const vector<int> finishedIndices = testSome(handles);
// Process finished requests
for(size_t i = 0; i < finishedIndices.size(); ++i) {
struct handle_ref &ref = references[finishedIndices[i]];
RequestSet &set = *(ref.set);
ScopedLock sl(set.mutex);
// Mark as FINISHED
DEBUG("MPIPoll::handleRequest: marking " << set.name << "[" << ref.index << "] as FINISHED");
ASSERT(set.states[ref.index] == RequestSet::ACTIVE);
set.states[ref.index] = RequestSet::FINISHED;
set.nrFinished++;
// Inform waitAny/waitSome threads
if (!set.willWaitAll)
set.oneFinished.signal();
if (set.nrFinished == set.handles.size()) {
DEBUG("MPIPoll::handleRequest: all requests in " << set.name << " are FINISHED");
// Inform waitAll threads
if (set.willWaitAll)
set.allFinished.signal();
// Remove this set from the requests to watch
_remove(&set);
}
}
return !finishedIndices.empty();
}
void MPIPoll::pollThread() {
Thread::ScopedPriority sp(SCHED_FIFO, 10);
ScopedLock sl(mutex);
while(!done) {
// next poll will be in 0.1 ms
//
// NOTE: MPI is VERY sensitive to this, requiring
// often enough polling to keep transfers
// running smoothly.
if (requests.empty()) {
// wait for request, with lock released
newRequest.wait(mutex);
} else {
// poll all handles
(void)handleRequests();
// if there are still pending requests, release
// the lock and just wait with a timeout
if (!requests.empty()) {
struct timespec deadline = TimeSpec::now();
TimeSpec::inc(deadline, 0.0001);
newRequest.wait(mutex, deadline);
}
}
}
}
RequestSet::RequestSet(const std::vector<handle_t> &handles, bool willWaitAll, const std::string &name)
:
name(name),
willWaitAll(willWaitAll),
handles(handles),
states(handles.size(), ACTIVE),
nrFinished(0)
{
// Requests shouldn't be MPI_REQUEST_NULL,
// because those will never be reported as FINISHED
for (size_t i = 0; i < states.size(); ++i) {
ASSERT(handles[i] != MPI_REQUEST_NULL);
}
// register ourselves
MPIPoll::instance().add(this);
}
RequestSet::~RequestSet()
{
// all requests should be finished and reported by now
{
ScopedLock sl(mutex);
ASSERT(nrFinished == handles.size());
for (size_t i = 0; i < states.size(); ++i) {
ASSERT(states[i] == REPORTED);
}
}
// we should have been unregistered once our last
// request was FINISHED
{
ScopedLock sl(mutex);
ASSERT(!MPIPoll::instance().have(this));
}
}
size_t RequestSet::waitAny() {
ASSERT(!willWaitAll);
ScopedLock sl(mutex);
for(;;) {
// Look for a finished request that hasn't been
// reported yet.
for (size_t i = 0; i < states.size(); ++i) {
if (states[i] == FINISHED) {
states[i] = REPORTED;
DEBUG("RequestSet::waitAny: set " << name << " finished request " << i);
return i;
}
}
// Wait for another request to finish
DEBUG("RequestSet::waitAny: set " << name << " waits for a request to finish");
// There has to be something to wait for
ASSERT(nrFinished < handles.size());
oneFinished.wait(mutex);
}
}
vector<size_t> RequestSet::waitSome() {
ASSERT(!willWaitAll);
ScopedLock sl(mutex);
vector<size_t> finished;
do {
// Look for all finished requests that haven't been
// reported yet.
for (size_t i = 0; i < states.size(); ++i) {
if (states[i] == FINISHED) {
states[i] = REPORTED;
finished.push_back(i);
}
}
if (finished.empty()) {
// Wait for another request to finish
DEBUG("RequestSet::waitSome: set " << name << " waits for a request to finish");
// There has to be something to wait for
ASSERT(nrFinished < handles.size());
oneFinished.wait(mutex);
}
} while (finished.empty());
DEBUG("RequestSet::waitSome: set " << name << " finished " << finished.size() << " requests");
return finished;
}
void RequestSet::waitAll() {
ASSERT(willWaitAll);
ScopedLock sl(mutex);
while (nrFinished < handles.size()) {
DEBUG("RequestSet::waitAll: set " << name << " has " << nrFinished << "/" << handles.size() << " requests finished");
// Wait for all requests to finish
allFinished.wait(mutex);
}
DEBUG("RequestSet::waitAll: set " << name << " finished all requests");
// Mark all requests as reported
for (size_t i = 0; i < states.size(); ++i) {
ASSERT(states[i] >= FINISHED);
states[i] = REPORTED;
}
}
}
}
#ifndef LOFAR_INPUTPROC_MPIUTIL2_H
#define LOFAR_INPUTPROC_MPIUTIL2_H
#include <mpi.h>
#include <vector>
#include <string>
#include <Common/Singleton.h>
#include <Common/Thread/Mutex.h>
#include <Common/Thread/Thread.h>
#include <Common/Thread/Condition.h>
#include <CoInterface/Allocator.h>
#include <CoInterface/SmartPtr.h>
namespace LOFAR {
namespace Cobalt {
class RequestSet;
typedef MPI_Request handle_t;
class MPIPoll {
public:
static MPIPoll &instance() { return Singleton<MPIPoll>::instance(); }
// Register a set of MPI requests
//
// MPIPoll does NOT take ownership, and
// will unregister the set once all requests
// have been completed.
//
// It is the programmer's responsibility to
// make sure that `set' is destructed only
// after all its requests are finished.
void add( RequestSet *set );
// Unregister a set of MPI requests
void remove( RequestSet *set );
// Whether a certain set is registered.
bool have( RequestSet *set );
// Start watching the registered MPI requests.
void start();
// Stop watching the registered MPI requests.
void stop();
// Return whether we've been started
bool is_started() const { return started; }
private:
MPIPoll();
friend class Singleton<MPIPoll>;
bool started;
bool done;
std::vector<RequestSet *> requests;
Mutex mutex;
Condition newRequest;
// The thread that periodically polls MPI
SmartPtr<Thread> thread;
// Unregister a set of MPI requests (doesn't grab mutex)
void _remove( RequestSet *set );
// Call MPI to test which requests have finished.
//
// A vector of indices is returned of the requests that
// have finished. Also, all finished handles are set
// to MPI_REQUEST_NULL as a side-effect from the MPI call used.
//
// Indices are of type `int' instead of `size_t' because that is
// what MPI_TestSome returns.
std::vector<int> testSome( std::vector<handle_t> &handles ) const;
// Test all registered requests, and for finished ones:
//
// 1. Update their status
// 2. Inform their owner
//
// If all requests from a RequestSet are completed, the set
// will be unregistered.
bool handleRequests();
// Keep polling for new requests, and handle the registered ones
// periodically.
void pollThread();
};
class RequestSet {
public:
// Register a set of handles to watch
// for completion.
RequestSet(const std::vector<handle_t> &handles, bool willWaitAll, const std::string &name = "<anonymous>");
~RequestSet();
// Wait for one request to finish, and
// return its index.
size_t waitAny();
// Wait for one or more requests to finish,
// and return a vector of their indices.
std::vector<size_t> waitSome();
// Wait for all requests to finish.
void waitAll();
private:
// non-copyable
RequestSet(const RequestSet &);
friend class MPIPoll;
// An identifier for this set, used for debugging purposes
const std::string name;
// true: caller will use waitAll()
// false: caller will use waitAny()/waitSome()
const bool willWaitAll;
// MPI handles to watch
const std::vector<handle_t> handles;
// ACTIVE: Transfer is still in operation.
// FINISHED: MPI reported the transfer as finished.
// REPORTED: This transfer has been reported as finished
// to our callers (using waitAny/waitSome/waitAll).
enum State { ACTIVE, FINISHED, REPORTED };
// State of each handle
std::vector<State> states;
// How many requests have been completed
size_t nrFinished;
Mutex mutex;
Condition oneFinished;
Condition allFinished;
};
}
}
#endif
......@@ -26,7 +26,6 @@
#include <Common/Timer.h>
#include <CoInterface/MultiDimArray.h>
#include <InputProc/Transpose/MPIUtil.h>
#include <InputProc/Transpose/MPIUtil2.h>
using namespace LOFAR;
using namespace LOFAR::Cobalt;
......
......@@ -21,7 +21,6 @@
#include <lofar_config.h>
#include <InputProc/Transpose/MPIUtil2.h>
#include <InputProc/Transpose/MPIUtil.h>
#include <vector>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment