Commit ed0c40c6 authored by Bram Veenboer's avatar Bram Veenboer

Add MPIRequest and MPIRequestList classes

parent 1645cca7
......@@ -138,53 +138,81 @@ void receive_array(int src, T &array) {
MPI_STATUS_IGNORE);
}
void wait(MPI_Request &request) { MPI_Wait(&request, MPI_STATUS_IGNORE); }
MPI_Request send_async(int dst, void *ptr, size_t bytes,
bool blocking = false) {
MPI_Request request;
MPI_Isend(ptr, bytes, MPI_BYTE, dst, 0, MPI_COMM_WORLD, &request);
if (blocking) {
//std::cout << "send_async wait" << std::endl;
wait(request);
//std::cout << "send_async finished" << std::endl;
class MPIRequest
{
public:
MPIRequest(bool blocking = false) :
m_blocking(blocking)
{
}
return request;
}
MPI_Request receive_async(int src, void *ptr, size_t bytes,
bool blocking = false) {
MPI_Request request;
MPI_Irecv(ptr, bytes, MPI_BYTE, src, 0, MPI_COMM_WORLD, &request);
if (blocking) {
//std::cout << "receive_async wait" << std::endl;
wait(request);
//std::cout << "receive_async finished" << std::endl;
void send(
const void *buf,
int bytes,
int dest,
int tag = 0)
{
MPI_Isend(buf, bytes, MPI_BYTE, dest, tag, MPI_COMM_WORLD, &m_request);
if (m_blocking)
{
wait();
}
}
return request;
}
template <typename T>
MPI_Request send_array_async(int dst, T &array, bool blocking = false) {
MPI_Request request;
MPI_Isend(array.data(), array.bytes(), MPI_BYTE, dst, 0, MPI_COMM_WORLD,
&request);
if (blocking) {
wait(request);
void receive(
void *buf,
int bytes,
int source,
int tag = 0)
{
MPI_Irecv(buf, bytes, MPI_BYTE, source, tag, MPI_COMM_WORLD, &m_request);
if (m_blocking)
{
wait();
}
}
return request;
}
template <typename T>
MPI_Request receive_array_async(int src, T &array, bool blocking = false) {
MPI_Request request;
MPI_Irecv(array.data(), array.bytes(), MPI_BYTE, src, 0, MPI_COMM_WORLD,
&request);
if (blocking) {
wait(request);
void wait()
{
MPI_Wait(&m_request, MPI_STATUS_IGNORE);
}
private:
MPI_Request m_request;
bool m_blocking;
};
class MPIRequestList
{
public:
MPIRequestList() :
m_requests(0)
{
}
~MPIRequestList()
{
wait();
}
std::shared_ptr<MPIRequest> create(bool blocking = false)
{
m_requests.emplace_back(new MPIRequest(blocking));
return m_requests.back();
}
return request;
}
void wait()
{
for (auto &request : m_requests)
{
request->wait();
}
}
private:
std::vector<std::shared_ptr<MPIRequest>> m_requests;
};
void synchronize() {
MPI_Barrier(MPI_COMM_WORLD);
......@@ -325,35 +353,34 @@ void run_master(int argc, char *argv[]) {
double bytes_input = 0;
// Iterate all cycles
runtime_gridding = -omp_get_wtime();
for (unsigned cycle = 0; cycle < nr_cycles; cycle++) {
// Get UVW coordinates for current cycle
data.get_uvw(uvw_all, 0, time_offset, integration_time);
// Distribute input data
MPIRequestList requests;
runtime_input -= omp_get_wtime();
for (unsigned int bl = 0; bl < nr_baselines_all_workers; bl++) {
unsigned int dst = 1 + (bl / nr_baselines_per_worker);
bool blocking = bl == (nr_baselines_all_workers - 1);
unsigned int dest = 1 + (bl / nr_baselines_per_worker);
// Send visibilities
void *visibilities_ptr = (void *) visibilities_all.data(bl, 0, 0);
size_t sizeof_visibilities =
nr_timesteps * nr_channels *
sizeof(idg::Visibility<std::complex<float>>);
send_async(dst, visibilities_ptr, sizeof_visibilities, blocking);
requests.create()->send(visibilities_ptr, sizeof_visibilities, dest);
// Send uvw coordinates
void *uvw_ptr = (void *) uvw_all.data(bl, 0);
size_t sizeof_uvw = nr_timesteps * sizeof(idg::UVW<float>);
send_async(dst, (void *) uvw_ptr, sizeof_uvw, blocking);
requests.create()->send(uvw_ptr, sizeof_uvw, dest);
// Update bytes_input
bytes_input += sizeof_visibilities;
bytes_input += sizeof_uvw;
}
synchronize();
requests.wait();
runtime_input += omp_get_wtime();
// Get master buffers
......@@ -367,17 +394,17 @@ void run_master(int argc, char *argv[]) {
baselines, aterms_offsets, options));
// Run gridding
runtime_gridding = -omp_get_wtime();
proxy.gridding(*plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
time_offset += nr_timesteps;
synchronize();
}
runtime_gridding += omp_get_wtime();
synchronize();
runtime_gridding += omp_get_wtime();
// Go the the next batch of timesteps
time_offset += nr_timesteps;
}
// Get grid
grid = proxy.get_grid();
......@@ -399,7 +426,8 @@ void run_master(int argc, char *argv[]) {
#pragma omp critical
{
runtime_output -= omp_get_wtime();
receive_async(src, row.data(), row.bytes(), true);
MPIRequest request(true);
request.receive(row.data(), row.bytes(), src);
runtime_output += omp_get_wtime();
}
......@@ -512,23 +540,21 @@ void run_worker() {
// Iterate all cycles
for (unsigned cycle = 0; cycle < nr_cycles; cycle++) {
// Receive input data
MPIRequestList requests;
for (unsigned bl = 0; bl < nr_baselines; bl++) {
bool blocking = bl == (nr_baselines - 1);
//blocking = true;
// Receive visibilities
void *visibilities_ptr = (void *) visibilities.data(bl, 0, 0);
size_t sizeof_visibilities = nr_timesteps * nr_channels *
sizeof(idg::Visibility<std::complex<float>>);
receive_async(0, visibilities_ptr, sizeof_visibilities, blocking);
requests.create()->receive(visibilities_ptr, sizeof_visibilities, 0);
// Receive uvw coordinates
void *uvw_ptr = (void *) uvw.data(bl, 0);
size_t sizeof_uvw = nr_timesteps * sizeof(idg::UVW<float>);
receive_async(0, uvw_ptr, sizeof_uvw, blocking);
requests.create()->receive(uvw_ptr, sizeof_uvw, 0);
}
synchronize();
requests.wait();
// Create plan
auto plan = std::unique_ptr<idg::Plan>(new idg::Plan(
......@@ -541,30 +567,26 @@ void run_worker() {
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
//print(rank, "worker finished cycle");
}
synchronize();
// Get grid
grid = proxy.get_grid();
// Send grid to master
MPIRequestList requests;
for (unsigned int y = 0; y < grid_size; y++)
{
for (unsigned int w = 0; w < nr_w_layers; w++)
{
for (unsigned int pol = 0; pol < NR_POLARIZATIONS; pol++)
{
//std::complex<float> *row_ptr = grid->data(w, pol, y, 0);
std::complex<float> *row_ptr = grid->data(0, 0, 0 ,0);
std::complex<float> *row_ptr = grid->data(w, pol, y, 0);
size_t sizeof_row = grid_size * sizeof(std::complex<float>);
bool blocking = y == (grid_size - 1);
blocking = true;
send_async(0, row_ptr, sizeof_row, blocking);
requests.create()->send(row_ptr, sizeof_row, 0);
}
}
}
requests.wait();
synchronize();
} // end run_worker
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment