Commit 4d90e7a2 authored by Bram Veenboer's avatar Bram Veenboer

Add remaning pats of imaging cycle

parent 61937a7a
......@@ -249,8 +249,8 @@ idg::Plan::Options get_plan_options() {
void reduce_grids(
std::shared_ptr<idg::Grid> grid,
int rank,
int world_size)
unsigned int rank,
unsigned int world_size)
{
unsigned int nr_w_layers = grid->get_w_dim();
unsigned int grid_size = grid->get_y_dim();
......@@ -264,7 +264,7 @@ void reduce_grids(
{
for (unsigned int pol = 0; pol < NR_POLARIZATIONS; pol++)
{
for (unsigned int i = (world_size/2); i > 0; i /= 2)
for (unsigned int i = (world_size+1)/2; i > 0; i /= 2)
{
if ((unsigned int) rank < i)
{
......@@ -279,7 +279,7 @@ void reduce_grids(
auto& grid_ = *grid;
row(x) += grid_(w, pol, y, x);
}
} else {
} else if (rank < (2 * i)) {
#pragma omp critical
{
MPIRequest request(true);
......@@ -292,6 +292,93 @@ void reduce_grids(
}
}
void distribute_grid(
std::shared_ptr<idg::Grid> grid,
unsigned int world_size)
{
unsigned int nr_w_layers = grid->get_w_dim();
unsigned int grid_size = grid->get_y_dim();
MPIRequestList requests;
for (unsigned int y = 0; y < grid_size; y++)
{
for (unsigned int w = 0; w < nr_w_layers; w++)
{
for (unsigned int pol = 0; pol < NR_POLARIZATIONS; pol++)
{
std::complex<float> *row_ptr = grid->data(w, pol, y, 0);
size_t sizeof_row = grid_size * sizeof(std::complex<float>);
for (unsigned int dest = 1; dest < world_size; dest++)
{
requests.create()->send(row_ptr, sizeof_row, dest);
}
}
}
}
requests.wait();
}
void receive_grid(
std::shared_ptr<idg::Grid> grid,
unsigned int source)
{
unsigned int nr_w_layers = grid->get_w_dim();
unsigned int grid_size = grid->get_y_dim();
MPIRequestList requests;
for (unsigned int y = 0; y < grid_size; y++)
{
for (unsigned int w = 0; w < nr_w_layers; w++)
{
for (unsigned int pol = 0; pol < NR_POLARIZATIONS; pol++)
{
std::complex<float> *row_ptr = grid->data(w, pol, y, 0);
size_t sizeof_row = grid_size * sizeof(std::complex<float>);
requests.create()->receive(row_ptr, sizeof_row, source);
}
}
}
requests.wait();
}
void receive_visibilities(
idg::Array3D<idg::Visibility<std::complex<float>>>& visibilities,
unsigned int nr_baselines_per_worker,
unsigned int world_size)
{
unsigned int nr_baselines = nr_baselines_per_worker;
unsigned int nr_timesteps = visibilities.get_y_dim();
unsigned int nr_channels = visibilities.get_x_dim();
MPIRequestList requests;
for (unsigned int bl = 0; bl < nr_baselines; bl++)
{
for (unsigned int source = 1; source < world_size; source++)
{
void *visibilities_ptr = (void *) visibilities.data(bl, 0, 0);
size_t sizeof_visibilities = nr_timesteps * nr_channels *
sizeof(idg::Visibility<std::complex<float>>);
requests.create()->receive(visibilities_ptr, sizeof_visibilities, source);
}
}
requests.wait();
}
void send_visibilities(
idg::Array3D<idg::Visibility<std::complex<float>>>& visibilities,
int dest)
{
unsigned int nr_baselines = visibilities.get_z_dim();
unsigned int nr_timesteps = visibilities.get_y_dim();
unsigned int nr_channels = visibilities.get_x_dim();
MPIRequestList requests;
for (unsigned int bl = 0; bl < nr_baselines; bl++)
{
void *visibilities_ptr = (void *) visibilities.data(bl, 0, 0);
size_t sizeof_visibilities = nr_timesteps * nr_channels *
sizeof(idg::Visibility<std::complex<float>>);
requests.create()->send(visibilities_ptr, sizeof_visibilities, dest);
}
requests.wait();
}
void run_master(int argc, char *argv[]) {
idg::auxiliary::print_version();
......@@ -412,9 +499,13 @@ void run_master(int argc, char *argv[]) {
proxy.set_grid(grid);
// Performance measurement
double runtime_input = 0;
double runtime_send_input = 0;
double runtime_gridding = 0;
double bytes_input = 0;
double runtime_degridding = 0;
double runtime_reduction = 0;
double runtime_fft = 0;
double runtime_send_grid = 0;
double runtime_receive_output = 0;
// Iterate all cycles
for (unsigned cycle = 0; cycle < nr_cycles; cycle++) {
......@@ -439,10 +530,6 @@ void run_master(int argc, char *argv[]) {
void *uvw_ptr = (void *) uvw_all.data(bl, 0);
size_t sizeof_uvw = nr_timesteps * sizeof(idg::UVW<float>);
requests.create()->send(uvw_ptr, sizeof_uvw, dest);
// Update bytes_input
bytes_input += sizeof_visibilities;
bytes_input += sizeof_uvw;
}
requests.wait();
......@@ -468,52 +555,64 @@ void run_master(int argc, char *argv[]) {
proxy.gridding(*plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
runtime_gridding += omp_get_wtime();
// Go the the next batch of timesteps
time_offset += nr_timesteps;
}
// Get grid
grid = proxy.get_grid();
// Reduce grids
double runtime_add_grid = -omp_get_wtime();
reduce_grids(grid, 0, world_size);
runtime_add_grid += omp_get_wtime();
// Get grid
grid = proxy.get_grid();
synchronize();
// Reduce grids
runtime_reduction -= omp_get_wtime();
reduce_grids(grid, 0, world_size);
runtime_reduction += omp_get_wtime();
// Run fft
double runtime_fft = -omp_get_wtime();
proxy.transform(idg::FourierDomainToImageDomain, *grid);
runtime_fft += omp_get_wtime();
// Run fft
runtime_fft -= omp_get_wtime();
proxy.transform(idg::FourierDomainToImageDomain, *grid);
runtime_fft += omp_get_wtime();
std::cout << std::scientific;
// Distribute grid
runtime_send_grid -= omp_get_wtime();
distribute_grid(grid, world_size);
runtime_send_grid += omp_get_wtime();
// Report input
double input_bw = bytes_input ? bytes_input / runtime_input * 1e-9 : 0;
std::stringstream report_input;
report_input << "input: " << runtime_input
<< " s , " << input_bw << " GB/s";
print(0, report_input.str());
// Set grid
proxy.set_grid(grid);
// Report gridding
std::stringstream report_gridding;
report_gridding << "gridding: " << runtime_gridding << "s";
print(0, report_gridding.str());
// Run degridding
runtime_degridding -= omp_get_wtime();
proxy.degridding(*plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
runtime_degridding += omp_get_wtime();
std::stringstream report_add_grid;
report_add_grid << "reduction: " << runtime_add_grid << " s";
print(0, report_add_grid.str());
// Receive visibilities
runtime_receive_output -= omp_get_wtime();
receive_visibilities(visibilities, nr_baselines_per_worker, world_size);
runtime_receive_output += omp_get_wtime();
// Report fft runtime
std::stringstream report_fft;
report_fft << "fft: " << runtime_fft << " s";
print(0, report_fft.str());
// Go the the next batch of timesteps
time_offset += nr_timesteps;
}
// Report timings
idg::report("send input", runtime_send_input);
idg::report("gridding", runtime_gridding);
idg::report("degridding", runtime_degridding);
idg::report("reduction", runtime_reduction);
idg::report("send grid", runtime_send_grid);
idg::report("receive output", runtime_receive_output);
double runtime_imaging =
runtime_send_input + runtime_gridding + runtime_degridding +
runtime_reduction + runtime_send_grid + runtime_receive_output;
idg::report("runtime imaging", runtime_imaging);
// Report throughput
uint64_t nr_visibilities = 1ULL * nr_baselines * nr_timesteps * nr_channels;
idg::report_visibilities("gridding", runtime_gridding, nr_visibilities);
idg::report_visibilities("degridding", runtime_degridding, nr_visibilities);
idg::report_visibilities("imaging", runtime_imaging, nr_visibilities);
} // end run_master
void run_worker() {
......@@ -543,7 +642,6 @@ void run_worker() {
unsigned int nr_cycles = receive_int();
// Initialize proxy
print(rank, "initializing proxy");
ProxyType proxy;
// Allocate and initialize static data structures
......@@ -621,16 +719,30 @@ void run_worker() {
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
// Get grid
grid = proxy.get_grid();
// Reduce grids
reduce_grids(grid, rank, world_size);
// Master performs FFT
synchronize();
}
// Get grid
grid = proxy.get_grid();
// Receive grid
receive_grid(grid, 0);
// Set grid
proxy.set_grid(grid);
// Reduce grids
reduce_grids(grid, rank, world_size);
// Run degridding
proxy.degridding(*plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
synchronize();
// Send visibilities
send_visibilities(visibilities, 0);
}
} // end run_worker
int main(int argc, char *argv[]) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment