From 164149e79856ba5a43af1911e97c6844dda7de7e Mon Sep 17 00:00:00 2001 From: Jan David Mol <mol@astron.nl> Date: Fri, 13 Jun 2014 12:16:04 +0000 Subject: [PATCH] Task #5883: Merged MPIUtil2 into MPIUtil --- .../GPUProc/src/Station/StationInput.cc | 2 +- RTCP/Cobalt/GPUProc/src/rtcp.cc | 1 - RTCP/Cobalt/GPUProc/test/tMPIReceive.cc | 1 - RTCP/Cobalt/InputProc/src/CMakeLists.txt | 1 - .../src/Transpose/MPIReceiveStations.cc | 1 - .../InputProc/src/Transpose/MPISendStation.cc | 1 - .../Cobalt/InputProc/src/Transpose/MPIUtil.cc | 357 ++++++++++++++++- RTCP/Cobalt/InputProc/src/Transpose/MPIUtil.h | 128 ++++++ .../InputProc/src/Transpose/MPIUtil2.cc | 366 ------------------ .../Cobalt/InputProc/src/Transpose/MPIUtil2.h | 151 -------- RTCP/Cobalt/InputProc/test/tMPI.cc | 1 - RTCP/Cobalt/InputProc/test/tMPIUtil2.cc | 1 - 12 files changed, 480 insertions(+), 531 deletions(-) delete mode 100644 RTCP/Cobalt/InputProc/src/Transpose/MPIUtil2.cc delete mode 100644 RTCP/Cobalt/InputProc/src/Transpose/MPIUtil2.h diff --git a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc index 1a151a8c9b7..6a1f91a328d 100644 --- a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc +++ b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc @@ -39,7 +39,7 @@ #include <mpi.h> #include <InputProc/Transpose/MPISendStation.h> #include <InputProc/Transpose/MapUtil.h> -#include <InputProc/Transpose/MPIUtil2.h> +#include <InputProc/Transpose/MPIUtil.h> #endif #include <Common/LofarLogger.h> diff --git a/RTCP/Cobalt/GPUProc/src/rtcp.cc b/RTCP/Cobalt/GPUProc/src/rtcp.cc index 20a93dddfc6..aaed0605fbb 100644 --- a/RTCP/Cobalt/GPUProc/src/rtcp.cc +++ b/RTCP/Cobalt/GPUProc/src/rtcp.cc @@ -44,7 +44,6 @@ #ifdef HAVE_MPI #include <mpi.h> #include <InputProc/Transpose/MPIUtil.h> -#include <InputProc/Transpose/MPIUtil2.h> #endif #include <boost/format.hpp> diff --git a/RTCP/Cobalt/GPUProc/test/tMPIReceive.cc b/RTCP/Cobalt/GPUProc/test/tMPIReceive.cc index 75700027d07..bbc05b7ddea 100644 --- a/RTCP/Cobalt/GPUProc/test/tMPIReceive.cc +++ b/RTCP/Cobalt/GPUProc/test/tMPIReceive.cc @@ -10,7 +10,6 @@ #include <mpi.h> #include <InputProc/Transpose/MPIUtil.h> -#include <InputProc/Transpose/MPIUtil2.h> #include <omp.h> #include <boost/format.hpp> diff --git a/RTCP/Cobalt/InputProc/src/CMakeLists.txt b/RTCP/Cobalt/InputProc/src/CMakeLists.txt index c3c38f15760..e690ae6f86f 100644 --- a/RTCP/Cobalt/InputProc/src/CMakeLists.txt +++ b/RTCP/Cobalt/InputProc/src/CMakeLists.txt @@ -23,7 +23,6 @@ if(MPI_FOUND) Transpose/MPISendStation.cc Transpose/MPIReceiveStations.cc Transpose/MPIUtil.cc - Transpose/MPIUtil2.cc ) endif(MPI_FOUND) diff --git a/RTCP/Cobalt/InputProc/src/Transpose/MPIReceiveStations.cc b/RTCP/Cobalt/InputProc/src/Transpose/MPIReceiveStations.cc index a1c0c88735c..c74990e2937 100644 --- a/RTCP/Cobalt/InputProc/src/Transpose/MPIReceiveStations.cc +++ b/RTCP/Cobalt/InputProc/src/Transpose/MPIReceiveStations.cc @@ -22,7 +22,6 @@ #include <lofar_config.h> #include "MPIReceiveStations.h" #include "MPIUtil.h" -#include "MPIUtil2.h" #include <InputProc/SampleType.h> diff --git a/RTCP/Cobalt/InputProc/src/Transpose/MPISendStation.cc b/RTCP/Cobalt/InputProc/src/Transpose/MPISendStation.cc index 44feda3ee58..a4f7c792d4c 100644 --- a/RTCP/Cobalt/InputProc/src/Transpose/MPISendStation.cc +++ b/RTCP/Cobalt/InputProc/src/Transpose/MPISendStation.cc @@ -23,7 +23,6 @@ #include "MPISendStation.h" #include "MapUtil.h" #include "MPIUtil.h" -#include "MPIUtil2.h" #include <InputProc/SampleType.h> diff --git a/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil.cc b/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil.cc index 466db2778e4..fea645509e4 100644 --- a/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil.cc +++ b/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil.cc @@ -1,6 +1,5 @@ #include <lofar_config.h> #include "MPIUtil.h" -#include "MPIUtil2.h" #include <iomanip> @@ -17,15 +16,21 @@ #include <ctime> #include <boost/lexical_cast.hpp> #include <Common/Thread/Mutex.h> +#include <Common/Timer.h> #include <CoInterface/SmartPtr.h> +#include <CoInterface/TimeFuncs.h> using namespace std; namespace LOFAR { namespace Cobalt { - + // 'mpi' uses both MPIMutex and mpiPoller, + // so those need to be initialised first + // to avoid crashes at destruction. Mutex MPIMutex; + static MPIPoll mpiPoller; + MPI mpi; MPI::MPI() : @@ -82,7 +87,7 @@ namespace LOFAR { ASSERT(itsRank == real_rank); ASSERT(itsSize == real_size); - MPIPoll::instance().start(); + mpiPoller.start(); #endif } @@ -94,14 +99,12 @@ namespace LOFAR { #ifdef HAVE_MPI ScopedLock sl(MPIMutex); - MPIPoll::instance().stop(); + mpiPoller.stop(); MPI_Finalize(); #endif } - MPI mpi; - void *MPIAllocator::allocate(size_t size, size_t alignment) { ScopedLock sl(MPIMutex); @@ -185,6 +188,348 @@ namespace LOFAR { return request; } + + MPIPoll::MPIPoll() + : + started(false), + done(false) + { + } + + + MPIPoll::~MPIPoll() + { + stop(); + } + + + void MPIPoll::add( RequestSet *set ) { + ASSERT(set != NULL); + + DEBUG("MPIPoll::add " << set->name); + + ASSERT(mpiPoller.is_started()); + + ScopedLock sl(mutex); + + requests.push_back(set); + newRequest.signal(); + } + + + void MPIPoll::remove( RequestSet *set ) { + ScopedLock sl(mutex); + + _remove(set); + } + + + void MPIPoll::_remove( RequestSet *set ) { + ASSERT(set != NULL); + + DEBUG("MPIPoll::_remove " << set->name); + + const std::vector<RequestSet*>::iterator i = std::find(requests.begin(), requests.end(), set); + + if (i == requests.end()) + return; + + DEBUG("MPIPoll::_remove found and removing " << set->name); + + requests.erase(i); + } + + + bool MPIPoll::have( RequestSet *set ) { + ScopedLock sl(mutex); + + return std::find(requests.begin(), requests.end(), set) != requests.end(); + } + + + void MPIPoll::start() { + DEBUG("MPIPoll::start"); + + started = true; + + thread = new Thread(this, &MPIPoll::pollThread, "MPIPoll::pollThread"); + } + + + void MPIPoll::stop() { + DEBUG("MPIPoll::stop"); + + { + ScopedLock sl(mutex); + + done = true; + + // Unlock thread if it is waiting for a new request + newRequest.signal(); + } + + // Wait for thread to finish + thread = 0; + + DEBUG("MPIPoll::stop stopped"); + } + + // Track the time spent on lock contention + NSTimer MPIMutexTimer("MPIPoll::MPIMutex lock()", true, true); + + // Track the time spent in MPI_Testsome + NSTimer MPITestsomeTimer("MPIPoll::MPI_Testsome", true, true); + + std::vector<int> MPIPoll::testSome( std::vector<handle_t> &handles ) const { + DEBUG("MPIPoll::testSome on " << handles.size() << " handles"); + + vector<int> doneset; + + if (handles.empty()) + return doneset; + + doneset.resize(handles.size()); + + int outcount; + + { + MPIMutexTimer.start(); + ScopedLock sl(MPIMutex); + MPIMutexTimer.stop(); + + // MPI_Testsome will put the indices of finished requests in doneset, + // and set the respective handle to MPI_REQUEST_NULL. + // + // Note that handles that are MPI_REQUEST_NULL on input are ignored. + MPITestsomeTimer.start(); + MPI_Testsome(handles.size(), &handles[0], &outcount, &doneset[0], MPI_STATUSES_IGNORE); + MPITestsomeTimer.stop(); + } + + // Cut off doneset at the actual number of returned indices + doneset.resize(outcount); + + return doneset; + } + + namespace { + struct handle_ref { + RequestSet *set; + size_t index; + + handle_ref(RequestSet *set, size_t index): set(set), index(index) {} + }; + }; + + bool MPIPoll::handleRequests() + { + // Collect all ACTIVE requests, and keep track of their index + vector<handle_t> handles; + + vector<handle_ref> references; + + for (size_t i = 0; i < requests.size(); ++i) { + ScopedLock sl(requests[i]->mutex); + + for (size_t j = 0; j < requests[i]->handles.size(); ++j) { + if (requests[i]->states[j] != RequestSet::ACTIVE) + continue; + + handles.push_back(requests[i]->handles[j]); + references.push_back(handle_ref(requests[i], j)); + } + } + + // Ask MPI which requests have finished + // + // NOTE: Finished requests are set to MPI_REQUEST_NULL in `handles'. + const vector<int> finishedIndices = testSome(handles); + + // Process finished requests + for(size_t i = 0; i < finishedIndices.size(); ++i) { + struct handle_ref &ref = references[finishedIndices[i]]; + RequestSet &set = *(ref.set); + + ScopedLock sl(set.mutex); + + // Mark as FINISHED + DEBUG("MPIPoll::handleRequest: marking " << set.name << "[" << ref.index << "] as FINISHED"); + ASSERT(set.states[ref.index] == RequestSet::ACTIVE); + set.states[ref.index] = RequestSet::FINISHED; + + set.nrFinished++; + + // Inform waitAny/waitSome threads + if (!set.willWaitAll) + set.oneFinished.signal(); + + if (set.nrFinished == set.handles.size()) { + DEBUG("MPIPoll::handleRequest: all requests in " << set.name << " are FINISHED"); + + // Inform waitAll threads + if (set.willWaitAll) + set.allFinished.signal(); + + // Remove this set from the requests to watch + _remove(&set); + } + } + + return !finishedIndices.empty(); + } + + void MPIPoll::pollThread() { + Thread::ScopedPriority sp(SCHED_FIFO, 10); + + ScopedLock sl(mutex); + + while(!done) { + // next poll will be in 0.1 ms + // + // NOTE: MPI is VERY sensitive to this, requiring + // often enough polling to keep transfers + // running smoothly. + + if (requests.empty()) { + // wait for request, with lock released + newRequest.wait(mutex); + } else { + // poll all handles + (void)handleRequests(); + + // if there are still pending requests, release + // the lock and just wait with a timeout + if (!requests.empty()) { + struct timespec deadline = TimeSpec::now(); + TimeSpec::inc(deadline, 0.0001); + + newRequest.wait(mutex, deadline); + } + } + } + } + + + RequestSet::RequestSet(const std::vector<handle_t> &handles, bool willWaitAll, const std::string &name) + : + name(name), + willWaitAll(willWaitAll), + handles(handles), + states(handles.size(), ACTIVE), + nrFinished(0) + { + // Requests shouldn't be MPI_REQUEST_NULL, + // because those will never be reported as FINISHED + for (size_t i = 0; i < states.size(); ++i) { + ASSERT(handles[i] != MPI_REQUEST_NULL); + } + + // register ourselves + mpiPoller.add(this); + } + + RequestSet::~RequestSet() + { + // all requests should be finished and reported by now + { + ScopedLock sl(mutex); + + ASSERT(nrFinished == handles.size()); + + for (size_t i = 0; i < states.size(); ++i) { + ASSERT(states[i] == REPORTED); + } + } + + // we should have been unregistered once our last + // request was FINISHED + { + ScopedLock sl(mutex); + ASSERT(!mpiPoller.have(this)); + } + } + + size_t RequestSet::waitAny() { + ASSERT(!willWaitAll); + + ScopedLock sl(mutex); + + for(;;) { + // Look for a finished request that hasn't been + // reported yet. + for (size_t i = 0; i < states.size(); ++i) { + if (states[i] == FINISHED) { + states[i] = REPORTED; + + DEBUG("RequestSet::waitAny: set " << name << " finished request " << i); + return i; + } + } + + // Wait for another request to finish + DEBUG("RequestSet::waitAny: set " << name << " waits for a request to finish"); + + // There has to be something to wait for + ASSERT(nrFinished < handles.size()); + oneFinished.wait(mutex); + } + } + + vector<size_t> RequestSet::waitSome() { + ASSERT(!willWaitAll); + + ScopedLock sl(mutex); + + vector<size_t> finished; + + do { + // Look for all finished requests that haven't been + // reported yet. + for (size_t i = 0; i < states.size(); ++i) { + if (states[i] == FINISHED) { + states[i] = REPORTED; + + finished.push_back(i); + } + } + + if (finished.empty()) { + // Wait for another request to finish + DEBUG("RequestSet::waitSome: set " << name << " waits for a request to finish"); + + // There has to be something to wait for + ASSERT(nrFinished < handles.size()); + oneFinished.wait(mutex); + } + } while (finished.empty()); + + DEBUG("RequestSet::waitSome: set " << name << " finished " << finished.size() << " requests"); + + return finished; + } + + void RequestSet::waitAll() { + ASSERT(willWaitAll); + + ScopedLock sl(mutex); + + while (nrFinished < handles.size()) { + DEBUG("RequestSet::waitAll: set " << name << " has " << nrFinished << "/" << handles.size() << " requests finished"); + + // Wait for all requests to finish + allFinished.wait(mutex); + } + + DEBUG("RequestSet::waitAll: set " << name << " finished all requests"); + + // Mark all requests as reported + for (size_t i = 0; i < states.size(); ++i) { + ASSERT(states[i] >= FINISHED); + + states[i] = REPORTED; + } + } + } } diff --git a/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil.h b/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil.h index da15791fee7..a1529ca9b07 100644 --- a/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil.h +++ b/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil.h @@ -5,6 +5,8 @@ #include <vector> #include <Common/Thread/Mutex.h> +#include <Common/Thread/Thread.h> +#include <Common/Thread/Condition.h> #include <CoInterface/Allocator.h> #include <CoInterface/SmartPtr.h> @@ -117,6 +119,132 @@ namespace LOFAR { * NOT LOCKED */ MPI_Request Guarded_MPI_Irecv(void *ptr, size_t numBytes, int srcRank, int tag); + + class RequestSet; + + typedef MPI_Request handle_t; + + class MPIPoll { + public: + MPIPoll(); + ~MPIPoll(); + + // Register a set of MPI requests + // + // MPIPoll does NOT take ownership, and + // will unregister the set once all requests + // have been completed. + // + // It is the programmer's responsibility to + // make sure that `set' is destructed only + // after all its requests are finished. + void add( RequestSet *set ); + + // Unregister a set of MPI requests + void remove( RequestSet *set ); + + // Whether a certain set is registered. + bool have( RequestSet *set ); + + // Start watching the registered MPI requests. + void start(); + + // Stop watching the registered MPI requests. + void stop(); + + // Return whether we've been started + bool is_started() const { return started; } + + private: + bool started; + bool done; + std::vector<RequestSet *> requests; + Mutex mutex; + Condition newRequest; + + // The thread that periodically polls MPI + SmartPtr<Thread> thread; + + // Unregister a set of MPI requests (doesn't grab mutex) + void _remove( RequestSet *set ); + + // Call MPI to test which requests have finished. + // + // A vector of indices is returned of the requests that + // have finished. Also, all finished handles are set + // to MPI_REQUEST_NULL as a side-effect from the MPI call used. + // + // Indices are of type `int' instead of `size_t' because that is + // what MPI_TestSome returns. + std::vector<int> testSome( std::vector<handle_t> &handles ) const; + + // Test all registered requests, and for finished ones: + // + // 1. Update their status + // 2. Inform their owner + // + // If all requests from a RequestSet are completed, the set + // will be unregistered. + bool handleRequests(); + + // Keep polling for new requests, and handle the registered ones + // periodically. + void pollThread(); + }; + + + class RequestSet { + public: + // Register a set of handles to watch + // for completion. + RequestSet(const std::vector<handle_t> &handles, bool willWaitAll, const std::string &name = "<anonymous>"); + + ~RequestSet(); + + // Wait for one request to finish, and + // return its index. + size_t waitAny(); + + // Wait for one or more requests to finish, + // and return a vector of their indices. + std::vector<size_t> waitSome(); + + // Wait for all requests to finish. + void waitAll(); + + private: + // non-copyable + RequestSet(const RequestSet &); + + friend class MPIPoll; + + // An identifier for this set, used for debugging purposes + const std::string name; + + // true: caller will use waitAll() + // false: caller will use waitAny()/waitSome() + const bool willWaitAll; + + // MPI handles to watch + const std::vector<handle_t> handles; + + // ACTIVE: Transfer is still in operation. + // FINISHED: MPI reported the transfer as finished. + // REPORTED: This transfer has been reported as finished + // to our callers (using waitAny/waitSome/waitAll). + enum State { ACTIVE, FINISHED, REPORTED }; + + // State of each handle + std::vector<State> states; + + // How many requests have been completed + size_t nrFinished; + + Mutex mutex; + Condition oneFinished; + Condition allFinished; + }; + } } diff --git a/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil2.cc b/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil2.cc deleted file mode 100644 index e0141868f53..00000000000 --- a/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil2.cc +++ /dev/null @@ -1,366 +0,0 @@ -#include <lofar_config.h> -#include "MPIUtil2.h" -#include "MPIUtil.h" - -#include <Common/LofarLogger.h> -#include <Common/Singleton.h> -#include <Common/Timer.h> -#include <Common/Thread/Thread.h> -#include <Common/Thread/Condition.h> -#include <Common/Thread/Mutex.h> -#include <CoInterface/SmartPtr.h> -#include <CoInterface/TimeFuncs.h> -#include <sys/time.h> -#include <algorithm> - -//#define DEBUG_MPI - -#ifdef DEBUG_MPI -#define DEBUG(str) LOG_INFO_STR(str) -#else -#define DEBUG(str) -#endif - -using namespace std; - -namespace LOFAR { - - namespace Cobalt { - - MPIPoll::MPIPoll() - : - started(false), - done(false) - { - } - - - void MPIPoll::add( RequestSet *set ) { - ASSERT(set != NULL); - - DEBUG("MPIPoll::add " << set->name); - - ASSERT(MPIPoll::instance().is_started()); - - ScopedLock sl(mutex); - - requests.push_back(set); - newRequest.signal(); - } - - - void MPIPoll::remove( RequestSet *set ) { - ScopedLock sl(mutex); - - _remove(set); - } - - void MPIPoll::_remove( RequestSet *set ) { - ASSERT(set != NULL); - - DEBUG("MPIPoll::_remove " << set->name); - - const std::vector<RequestSet*>::iterator i = std::find(requests.begin(), requests.end(), set); - - if (i == requests.end()) - return; - - DEBUG("MPIPoll::_remove found and removing " << set->name); - - requests.erase(i); - } - - - bool MPIPoll::have( RequestSet *set ) { - ScopedLock sl(mutex); - - return std::find(requests.begin(), requests.end(), set) != requests.end(); - } - - - void MPIPoll::start() { - DEBUG("MPIPoll::start"); - - started = true; - - thread = new Thread(this, &MPIPoll::pollThread, "MPIPoll::pollThread"); - } - - - void MPIPoll::stop() { - DEBUG("MPIPoll::stop"); - - { - ScopedLock sl(mutex); - - done = true; - - // Unlock thread if it is waiting for a new request - newRequest.signal(); - } - - // Wait for thread to finish - thread = 0; - - DEBUG("MPIPoll::stop stopped"); - } - - // Track the time spent on lock contention - NSTimer MPIMutexTimer("MPIPoll::MPIMutex lock()", true, true); - - // Track the time spent in MPI_Testsome - NSTimer MPITestsomeTimer("MPIPoll::MPI_Testsome", true, true); - - std::vector<int> MPIPoll::testSome( std::vector<handle_t> &handles ) const { - DEBUG("MPIPoll::testSome on " << handles.size() << " handles"); - - vector<int> doneset; - - if (handles.empty()) - return doneset; - - doneset.resize(handles.size()); - - int outcount; - - { - MPIMutexTimer.start(); - ScopedLock sl(MPIMutex); - MPIMutexTimer.stop(); - - // MPI_Testsome will put the indices of finished requests in doneset, - // and set the respective handle to MPI_REQUEST_NULL. - // - // Note that handles that are MPI_REQUEST_NULL on input are ignored. - MPITestsomeTimer.start(); - MPI_Testsome(handles.size(), &handles[0], &outcount, &doneset[0], MPI_STATUSES_IGNORE); - MPITestsomeTimer.stop(); - } - - // Cut off doneset at the actual number of returned indices - doneset.resize(outcount); - - return doneset; - } - - namespace { - struct handle_ref { - RequestSet *set; - size_t index; - - handle_ref(RequestSet *set, size_t index): set(set), index(index) {} - }; - }; - - bool MPIPoll::handleRequests() - { - // Collect all ACTIVE requests, and keep track of their index - vector<handle_t> handles; - - vector<handle_ref> references; - - for (size_t i = 0; i < requests.size(); ++i) { - ScopedLock sl(requests[i]->mutex); - - for (size_t j = 0; j < requests[i]->handles.size(); ++j) { - if (requests[i]->states[j] != RequestSet::ACTIVE) - continue; - - handles.push_back(requests[i]->handles[j]); - references.push_back(handle_ref(requests[i], j)); - } - } - - // Ask MPI which requests have finished - // - // NOTE: Finished requests are set to MPI_REQUEST_NULL in `handles'. - const vector<int> finishedIndices = testSome(handles); - - // Process finished requests - for(size_t i = 0; i < finishedIndices.size(); ++i) { - struct handle_ref &ref = references[finishedIndices[i]]; - RequestSet &set = *(ref.set); - - ScopedLock sl(set.mutex); - - // Mark as FINISHED - DEBUG("MPIPoll::handleRequest: marking " << set.name << "[" << ref.index << "] as FINISHED"); - ASSERT(set.states[ref.index] == RequestSet::ACTIVE); - set.states[ref.index] = RequestSet::FINISHED; - - set.nrFinished++; - - // Inform waitAny/waitSome threads - if (!set.willWaitAll) - set.oneFinished.signal(); - - if (set.nrFinished == set.handles.size()) { - DEBUG("MPIPoll::handleRequest: all requests in " << set.name << " are FINISHED"); - - // Inform waitAll threads - if (set.willWaitAll) - set.allFinished.signal(); - - // Remove this set from the requests to watch - _remove(&set); - } - } - - return !finishedIndices.empty(); - } - - void MPIPoll::pollThread() { - Thread::ScopedPriority sp(SCHED_FIFO, 10); - - ScopedLock sl(mutex); - - while(!done) { - // next poll will be in 0.1 ms - // - // NOTE: MPI is VERY sensitive to this, requiring - // often enough polling to keep transfers - // running smoothly. - - if (requests.empty()) { - // wait for request, with lock released - newRequest.wait(mutex); - } else { - // poll all handles - (void)handleRequests(); - - // if there are still pending requests, release - // the lock and just wait with a timeout - if (!requests.empty()) { - struct timespec deadline = TimeSpec::now(); - TimeSpec::inc(deadline, 0.0001); - - newRequest.wait(mutex, deadline); - } - } - } - } - - - RequestSet::RequestSet(const std::vector<handle_t> &handles, bool willWaitAll, const std::string &name) - : - name(name), - willWaitAll(willWaitAll), - handles(handles), - states(handles.size(), ACTIVE), - nrFinished(0) - { - // Requests shouldn't be MPI_REQUEST_NULL, - // because those will never be reported as FINISHED - for (size_t i = 0; i < states.size(); ++i) { - ASSERT(handles[i] != MPI_REQUEST_NULL); - } - - // register ourselves - MPIPoll::instance().add(this); - } - - RequestSet::~RequestSet() - { - // all requests should be finished and reported by now - { - ScopedLock sl(mutex); - - ASSERT(nrFinished == handles.size()); - - for (size_t i = 0; i < states.size(); ++i) { - ASSERT(states[i] == REPORTED); - } - } - - // we should have been unregistered once our last - // request was FINISHED - { - ScopedLock sl(mutex); - ASSERT(!MPIPoll::instance().have(this)); - } - } - - size_t RequestSet::waitAny() { - ASSERT(!willWaitAll); - - ScopedLock sl(mutex); - - for(;;) { - // Look for a finished request that hasn't been - // reported yet. - for (size_t i = 0; i < states.size(); ++i) { - if (states[i] == FINISHED) { - states[i] = REPORTED; - - DEBUG("RequestSet::waitAny: set " << name << " finished request " << i); - return i; - } - } - - // Wait for another request to finish - DEBUG("RequestSet::waitAny: set " << name << " waits for a request to finish"); - - // There has to be something to wait for - ASSERT(nrFinished < handles.size()); - oneFinished.wait(mutex); - } - } - - vector<size_t> RequestSet::waitSome() { - ASSERT(!willWaitAll); - - ScopedLock sl(mutex); - - vector<size_t> finished; - - do { - // Look for all finished requests that haven't been - // reported yet. - for (size_t i = 0; i < states.size(); ++i) { - if (states[i] == FINISHED) { - states[i] = REPORTED; - - finished.push_back(i); - } - } - - if (finished.empty()) { - // Wait for another request to finish - DEBUG("RequestSet::waitSome: set " << name << " waits for a request to finish"); - - // There has to be something to wait for - ASSERT(nrFinished < handles.size()); - oneFinished.wait(mutex); - } - } while (finished.empty()); - - DEBUG("RequestSet::waitSome: set " << name << " finished " << finished.size() << " requests"); - - return finished; - } - - void RequestSet::waitAll() { - ASSERT(willWaitAll); - - ScopedLock sl(mutex); - - while (nrFinished < handles.size()) { - DEBUG("RequestSet::waitAll: set " << name << " has " << nrFinished << "/" << handles.size() << " requests finished"); - - // Wait for all requests to finish - allFinished.wait(mutex); - } - - DEBUG("RequestSet::waitAll: set " << name << " finished all requests"); - - // Mark all requests as reported - for (size_t i = 0; i < states.size(); ++i) { - ASSERT(states[i] >= FINISHED); - - states[i] = REPORTED; - } - } - - } -} - diff --git a/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil2.h b/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil2.h deleted file mode 100644 index 49375ea002e..00000000000 --- a/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil2.h +++ /dev/null @@ -1,151 +0,0 @@ -#ifndef LOFAR_INPUTPROC_MPIUTIL2_H -#define LOFAR_INPUTPROC_MPIUTIL2_H - -#include <mpi.h> -#include <vector> -#include <string> - -#include <Common/Singleton.h> -#include <Common/Thread/Mutex.h> -#include <Common/Thread/Thread.h> -#include <Common/Thread/Condition.h> -#include <CoInterface/Allocator.h> -#include <CoInterface/SmartPtr.h> - -namespace LOFAR { - - namespace Cobalt { - - class RequestSet; - - typedef MPI_Request handle_t; - - class MPIPoll { - public: - static MPIPoll &instance() { return Singleton<MPIPoll>::instance(); } - - // Register a set of MPI requests - // - // MPIPoll does NOT take ownership, and - // will unregister the set once all requests - // have been completed. - // - // It is the programmer's responsibility to - // make sure that `set' is destructed only - // after all its requests are finished. - void add( RequestSet *set ); - - // Unregister a set of MPI requests - void remove( RequestSet *set ); - - // Whether a certain set is registered. - bool have( RequestSet *set ); - - // Start watching the registered MPI requests. - void start(); - - // Stop watching the registered MPI requests. - void stop(); - - // Return whether we've been started - bool is_started() const { return started; } - - private: - MPIPoll(); - friend class Singleton<MPIPoll>; - - bool started; - bool done; - std::vector<RequestSet *> requests; - Mutex mutex; - Condition newRequest; - - // The thread that periodically polls MPI - SmartPtr<Thread> thread; - - // Unregister a set of MPI requests (doesn't grab mutex) - void _remove( RequestSet *set ); - - // Call MPI to test which requests have finished. - // - // A vector of indices is returned of the requests that - // have finished. Also, all finished handles are set - // to MPI_REQUEST_NULL as a side-effect from the MPI call used. - // - // Indices are of type `int' instead of `size_t' because that is - // what MPI_TestSome returns. - std::vector<int> testSome( std::vector<handle_t> &handles ) const; - - // Test all registered requests, and for finished ones: - // - // 1. Update their status - // 2. Inform their owner - // - // If all requests from a RequestSet are completed, the set - // will be unregistered. - bool handleRequests(); - - // Keep polling for new requests, and handle the registered ones - // periodically. - void pollThread(); - }; - - - class RequestSet { - public: - // Register a set of handles to watch - // for completion. - RequestSet(const std::vector<handle_t> &handles, bool willWaitAll, const std::string &name = "<anonymous>"); - - ~RequestSet(); - - // Wait for one request to finish, and - // return its index. - size_t waitAny(); - - // Wait for one or more requests to finish, - // and return a vector of their indices. - std::vector<size_t> waitSome(); - - // Wait for all requests to finish. - void waitAll(); - - private: - // non-copyable - RequestSet(const RequestSet &); - - friend class MPIPoll; - - // An identifier for this set, used for debugging purposes - const std::string name; - - // true: caller will use waitAll() - // false: caller will use waitAny()/waitSome() - const bool willWaitAll; - - // MPI handles to watch - const std::vector<handle_t> handles; - - // ACTIVE: Transfer is still in operation. - // FINISHED: MPI reported the transfer as finished. - // REPORTED: This transfer has been reported as finished - // to our callers (using waitAny/waitSome/waitAll). - enum State { ACTIVE, FINISHED, REPORTED }; - - // State of each handle - std::vector<State> states; - - // How many requests have been completed - size_t nrFinished; - - Mutex mutex; - Condition oneFinished; - Condition allFinished; - }; - - } - -} - -#endif - diff --git a/RTCP/Cobalt/InputProc/test/tMPI.cc b/RTCP/Cobalt/InputProc/test/tMPI.cc index 1a22e2d638c..3349a4ae72d 100644 --- a/RTCP/Cobalt/InputProc/test/tMPI.cc +++ b/RTCP/Cobalt/InputProc/test/tMPI.cc @@ -26,7 +26,6 @@ #include <Common/Timer.h> #include <CoInterface/MultiDimArray.h> #include <InputProc/Transpose/MPIUtil.h> -#include <InputProc/Transpose/MPIUtil2.h> using namespace LOFAR; using namespace LOFAR::Cobalt; diff --git a/RTCP/Cobalt/InputProc/test/tMPIUtil2.cc b/RTCP/Cobalt/InputProc/test/tMPIUtil2.cc index edad7fe91cb..503990ae866 100644 --- a/RTCP/Cobalt/InputProc/test/tMPIUtil2.cc +++ b/RTCP/Cobalt/InputProc/test/tMPIUtil2.cc @@ -21,7 +21,6 @@ #include <lofar_config.h> -#include <InputProc/Transpose/MPIUtil2.h> #include <InputProc/Transpose/MPIUtil.h> #include <vector> -- GitLab