diff --git a/RTCP/Cobalt/InputProc/src/CMakeLists.txt b/RTCP/Cobalt/InputProc/src/CMakeLists.txt index 2439d27c7a2ff238e355ac6605182ecd8e29e519..ab2fab414628d4f5bc0f403dcb9daa882ef6e72f 100644 --- a/RTCP/Cobalt/InputProc/src/CMakeLists.txt +++ b/RTCP/Cobalt/InputProc/src/CMakeLists.txt @@ -27,6 +27,7 @@ if(MPI_FOUND) Transpose/MPISendStation.cc Transpose/MPIReceiveStations.cc Transpose/MPIUtil.cc + Transpose/MPIUtil2.cc ) endif(MPI_FOUND) diff --git a/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil2.cc b/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil2.cc new file mode 100644 index 0000000000000000000000000000000000000000..6bbee3038891d577de4a9de29f22be9a408dcb39 --- /dev/null +++ b/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil2.cc @@ -0,0 +1,289 @@ +#include <lofar_config.h> +#include "MPIUtil2.h" +#include "MPIUtil.h" + +#include <Common/LofarLogger.h> +#include <Common/Singleton.h> +#include <Common/Thread/Thread.h> +#include <Common/Thread/Condition.h> +#include <Common/Thread/Mutex.h> +#include <CoInterface/SmartPtr.h> +#include <sys/time.h> +#include <algorithm> + +//#define DEBUG_MPI + +#ifdef DEBUG_MPI +#define DEBUG(str) LOG_DEBUG_STR(str) +#else +#define DEBUG(str) +#endif + +using namespace std; + +namespace LOFAR { + + namespace Cobalt { + + MPIPoll::MPIPoll() + : + started(false), + done(false) + { + } + + + void MPIPoll::add( RequestSet *set ) { + ScopedLock sl(mutex); + + requests.push_back(set); + newRequest.signal(); + } + + + void MPIPoll::remove( RequestSet *set ) { + ScopedLock sl(mutex); + + const std::vector<RequestSet*>::iterator i = std::find(requests.begin(), requests.end(), set); + + if (i == requests.end()) + return; + + requests.erase(i); + } + + + bool MPIPoll::have( RequestSet *set ) { + ScopedLock sl(mutex); + + return std::find(requests.begin(), requests.end(), set) != requests.end(); + } + + + void MPIPoll::start() { + started = true; + + thread = new Thread(this, &MPIPoll::pollThread, "MPIPoll::pollThread"); + } + + + void MPIPoll::stop() { + done = true; + + // Unlock thread if it is waiting for a new request + newRequest.signal(); + + // Wait for thread to finish + thread = 0; + } + + std::vector<int> MPIPoll::testSome( std::vector<handle_t> &handles ) const { + vector<int> doneset; + + if (handles.empty()) + return doneset; + + doneset.resize(handles.size()); + + int outcount; + + { + ScopedLock sl(MPIMutex); + + // MPI_Testsome will put the indices of finished requests in doneset, + // and set the respective handle to MPI_REQUEST_NULL. + // + // Note that handles that are MPI_REQUEST_NULL on input are ignored. + MPI_Testsome(handles.size(), &handles[0], &outcount, &doneset[0], MPI_STATUSES_IGNORE); + } + + // Cut off doneset at the actual number of returned indices + doneset.resize(outcount); + + return doneset; + } + + namespace { + struct handle_ref { + RequestSet *set; + size_t index; + + handle_ref(RequestSet *set, size_t index): set(set), index(index) {} + }; + }; + + void MPIPoll::handleRequests() + { + // Collect all ACTIVE requests, and keep track of their index + vector<handle_t> handles; + + vector<handle_ref> references; + + for (size_t i = 0; i < requests.size(); ++i) { + ScopedLock sl(requests[i]->mutex); + + for (size_t j = 0; j < requests[i]->handles.size(); ++j) { + if (requests[i]->states[j] != RequestSet::ACTIVE) + continue; + + handles.push_back(requests[i]->handles[j]); + references.push_back(handle_ref(requests[i], j)); + } + } + + // Ask MPI which requests have finished + // + // NOTE: Finished requests are set to MPI_REQUEST_NULL in `handles'. + const vector<int> finishedIndices = testSome(handles); + + // Process finished requests + for(size_t i = 0; i < finishedIndices.size(); ++i) { + struct handle_ref &ref = references[finishedIndices[i]]; + RequestSet &set = *(ref.set); + + ScopedLock sl(set.mutex); + + // Mark as FINISHED + set.states[ref.index] = RequestSet::FINISHED; + set.nrFinished++; + + // Inform waitAny/waitSome threads + set.oneFinished.signal(); + + if (set.nrFinished == set.handles.size()) { + // Inform waitAll threads + set.allFinished.signal(); + + // Remove this set from the requests to watch + remove(&set); + } + } + } + + void MPIPoll::pollThread() { + ScopedLock sl(mutex); + + while(!done) { + if (requests.empty()) { + // wait for request, with lock released + newRequest.wait(mutex); + } else { + // poll all handles + handleRequests(); + + // if there are still pending requests, release + // the lock and just wait with a timeout + if (!requests.empty()) { + struct timeval now; + gettimeofday(&now, NULL); + + struct timespec deadline; + + deadline.tv_sec = now.tv_sec; + deadline.tv_nsec = now.tv_usec * 1000 + 1000000; // 1 ms + + newRequest.wait(mutex, deadline); + } + } + } + } + + + RequestSet::RequestSet(const std::vector<handle_t> &handles) + : + handles(handles), + states(handles.size(), ACTIVE), + nrFinished(0) + { + // Requests shouldn't be MPI_REQUEST_NULL, + // because those will never be reported as FINISHED + for (size_t i = 0; i < states.size(); ++i) { + ASSERT(handles[i] != MPI_REQUEST_NULL); + } + + // register ourselves + MPIPoll::instance().add(this); + } + + RequestSet::~RequestSet() + { + // all requests should be finished and reported by now + { + ScopedLock sl(mutex); + + ASSERT(nrFinished == handles.size()); + + for (size_t i = 0; i < states.size(); ++i) { + ASSERT(states[i] == REPORTED); + } + } + + // we should have been unregistered once our last + // request was FINISHED + { + ScopedLock sl(mutex); + ASSERT(!MPIPoll::instance().have(this)); + } + } + + size_t RequestSet::waitAny() { + ScopedLock sl(mutex); + + // There has to be something to wait for + ASSERT(nrFinished < handles.size()); + + for(;;) { + // Look for a finished request that hasn't been + // reported yet. + for (size_t i = 0; i < states.size(); ++i) { + if (states[i] == FINISHED) { + states[i] = REPORTED; + return i; + } + } + + // Wait for another request to finish + oneFinished.wait(mutex); + } + } + + vector<size_t> RequestSet::waitSome() { + ScopedLock sl(mutex); + + // There has to be something to wait for + ASSERT(nrFinished < handles.size()); + + vector<size_t> finished; + + do { + // Look for all finished requests that haven't been + // reported yet. + for (size_t i = 0; i < states.size(); ++i) { + if (states[i] == FINISHED) { + states[i] = REPORTED; + + finished.push_back(i); + } + } + + if (!finished.empty()) { + // Wait for another request to finish + oneFinished.wait(mutex); + } + } while (finished.empty()); + + return finished; + } + + void RequestSet::waitAll() { + ScopedLock sl(mutex); + + while (nrFinished < handles.size()) { + // Wait for all requests to finish + allFinished.wait(mutex); + } + } + + } +} + diff --git a/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil2.h b/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil2.h new file mode 100644 index 0000000000000000000000000000000000000000..c16825537bff128be5a350dbc64ea5819f66f33a --- /dev/null +++ b/RTCP/Cobalt/InputProc/src/Transpose/MPIUtil2.h @@ -0,0 +1,134 @@ +#ifndef LOFAR_INPUTPROC_MPIUTIL2_H +#define LOFAR_INPUTPROC_MPIUTIL2_H + +#include <mpi.h> +#include <vector> + +#include <Common/Singleton.h> +#include <Common/Thread/Mutex.h> +#include <Common/Thread/Thread.h> +#include <Common/Thread/Condition.h> +#include <CoInterface/Allocator.h> +#include <CoInterface/SmartPtr.h> + +namespace LOFAR { + + namespace Cobalt { + + class RequestSet; + + typedef MPI_Request handle_t; + + class MPIPoll { + public: + static MPIPoll &instance() { return Singleton<MPIPoll>::instance(); } + + // Register a set of MPI requests + // + // MPIPoll does NOT take ownership, and + // will unregister the set once all requests + // have been completed. + // + // It is the programmer's responsibility to + // make sure that `set' is destructed only + // after all its requests are finished. + void add( RequestSet *set ); + + // Unregister a set of MPI requests + void remove( RequestSet *set ); + + // Whether a certain set is registered. + bool have( RequestSet *set ); + + // Start watching the registered MPI requests. + void start(); + + // Stop watching the registered MPI requests. + void stop(); + + private: + MPIPoll(); + friend class Singleton<MPIPoll>; + + bool started; + bool done; + std::vector<RequestSet *> requests; + Mutex mutex; + Condition newRequest; + + // The thread that periodically polls MPI + SmartPtr<Thread> thread; + + // Call MPI to test which requests have finished. + // + // A vector of indices is returned of the requests that + // have finished. Also, all finished handles are set + // to MPI_REQUEST_NULL as a side-effect from the MPI call used. + // + // Indices are of type `int' instead of `size_t' because that is + // what MPI_TestSome returns. + std::vector<int> testSome( std::vector<handle_t> &handles ) const; + + // Test all registered requests, and for finished ones: + // + // 1. Update their status + // 2. Inform their owner + // + // If all requests from a RequestSet are completed, the set + // will be unregistered. + void handleRequests(); + + // Keep polling for new requests, and handle the registered ones + // periodically. + void pollThread(); + }; + + + class RequestSet { + public: + // Register a set of handles to watch + // for completion. + RequestSet(const std::vector<handle_t> &handles); + + ~RequestSet(); + + // Wait for one request to finish, and + // return its index. + size_t waitAny(); + + // Wait for one or more requests to finish, + // and return a vector of their indices. + std::vector<size_t> waitSome(); + + // Wait for all requests to finish. + void waitAll(); + + private: + friend class MPIPoll; + + // MPI handles to watch + const std::vector<handle_t> handles; + + // ACTIVE: Transfer is still in operation. + // FINISHED: MPI reported the transfer as finished. + // REPORTED: This transfer has been reported as finished + // to our callers (using waitAny/waitSome/waitAll). + enum State { ACTIVE, FINISHED, REPORTED }; + + // State of each handle + std::vector<bool> states; + + // How many requests have been completed + size_t nrFinished; + + Mutex mutex; + Condition oneFinished; + Condition allFinished; + }; + + } + +} + +#endif +