Commit 708eea08 authored by Bram Veenboer's avatar Bram Veenboer

Update distributed example

parent ee871913
......@@ -19,11 +19,6 @@
using namespace std;
// Option to let the master distribute input data (uvw coordinates
// and visibilities) to all workers. If set 0, the workers will
// initialize their own data, taking their baseline offset into account.
#define DISTRIBUTE_INPUT 0
std::tuple<int, int, int, int, int, int, int, int> read_parameters() {
const unsigned int DEFAULT_NR_STATIONS = 52; // all LOFAR LBA stations
const unsigned int DEFAULT_NR_CHANNELS = 16 * 4; // 16 channels, 4 subbands
......@@ -287,94 +282,21 @@ void reduce_grids(
}
}
void distribute_grid(
std::shared_ptr<idg::Grid> grid,
unsigned int world_size)
{
unsigned int nr_w_layers = grid->get_w_dim();
unsigned int grid_size = grid->get_y_dim();
MPIRequestList requests;
for (unsigned int y = 0; y < grid_size; y++)
{
for (unsigned int w = 0; w < nr_w_layers; w++)
{
for (unsigned int pol = 0; pol < NR_POLARIZATIONS; pol++)
{
std::complex<float> *row_ptr = grid->data(w, pol, y, 0);
size_t sizeof_row = grid_size * sizeof(std::complex<float>);
for (unsigned int dest = 1; dest < world_size; dest++)
{
requests.create()->send(row_ptr, sizeof_row, dest);
}
}
}
}
requests.wait();
}
void receive_grid(
void broadcast_grid(
std::shared_ptr<idg::Grid> grid,
unsigned int source)
int root)
{
unsigned int nr_w_layers = grid->get_w_dim();
unsigned int grid_size = grid->get_y_dim();
MPIRequestList requests;
unsigned int w = 0; // W-stacking is handled by the workers
for (unsigned int y = 0; y < grid_size; y++)
{
for (unsigned int w = 0; w < nr_w_layers; w++)
for (unsigned int pol = 0; pol < NR_POLARIZATIONS; pol++)
{
for (unsigned int pol = 0; pol < NR_POLARIZATIONS; pol++)
{
std::complex<float> *row_ptr = grid->data(w, pol, y, 0);
size_t sizeof_row = grid_size * sizeof(std::complex<float>);
requests.create()->receive(row_ptr, sizeof_row, source);
}
std::complex<float> *row_ptr = grid->data(w, pol, y, 0);
size_t sizeof_row = grid_size * sizeof(std::complex<float>);
MPI_Bcast(row_ptr, sizeof_row, MPI_BYTE, root, MPI_COMM_WORLD);
}
}
requests.wait();
}
void receive_visibilities(
idg::Array3D<idg::Visibility<std::complex<float>>>& visibilities,
unsigned int nr_baselines_per_worker,
unsigned int world_size)
{
unsigned int nr_baselines = nr_baselines_per_worker;
unsigned int nr_timesteps = visibilities.get_y_dim();
unsigned int nr_channels = visibilities.get_x_dim();
MPIRequestList requests;
for (unsigned int source = 1; source < world_size; source++)
{
send_int(source, 0);
for (unsigned int bl = 0; bl < nr_baselines; bl++)
{
void *visibilities_ptr = (void *) visibilities.data(bl, 0, 0);
size_t sizeof_visibilities = nr_timesteps * nr_channels *
sizeof(idg::Visibility<std::complex<float>>);
requests.create()->receive(visibilities_ptr, sizeof_visibilities, source);
}
}
requests.wait();
}
void send_visibilities(
idg::Array3D<idg::Visibility<std::complex<float>>>& visibilities,
int rank)
{
receive_int();
unsigned int nr_baselines = visibilities.get_z_dim();
unsigned int nr_timesteps = visibilities.get_y_dim();
unsigned int nr_channels = visibilities.get_x_dim();
MPIRequestList requests;
for (unsigned int bl = 0; bl < nr_baselines; bl++)
{
void *visibilities_ptr = (void *) visibilities.data(bl, 0, 0);
size_t sizeof_visibilities = nr_timesteps * nr_channels *
sizeof(idg::Visibility<std::complex<float>>);
requests.create()->send(visibilities_ptr, sizeof_visibilities, 0);
}
requests.wait();
}
void run_master() {
......@@ -420,16 +342,13 @@ void run_master() {
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
// Determine number of baselines per worker
unsigned int nr_baselines_per_worker = nr_baselines / world_size;
unsigned int nr_baselines_all_workers = (world_size - 1) * nr_baselines_per_worker;
unsigned int nr_baselines_master = nr_baselines - nr_baselines_all_workers;
// Distribute the work over frequency
nr_channels /= world_size;
// Distribute parameters
for (int dst = 0; dst < world_size; dst++) {
send_int(dst, nr_stations);
send_int(dst, nr_baselines);
send_int(dst, nr_baselines_per_worker);
send_int(dst, nr_timesteps);
send_int(dst, nr_timeslots);
send_float(dst, integration_time);
......@@ -444,21 +363,25 @@ void run_master() {
send_int(dst, nr_cycles);
}
// Initialize frequency data
// Initialize frequency data for master
idg::Array1D<float> frequencies(nr_channels);
data.get_frequencies(frequencies, image_size);
// Distribute frequencies
// Distribute frequencies to workers
// Every worker processes a different subband, thus
// take a channel offset into account when initializing
// frequencies.
for (int dst = 1; dst < world_size; dst++) {
send_array(dst, frequencies);
int channel_offset = (dst - 1) * nr_channels;
idg::Array1D<float> frequencies_(nr_channels);
data.get_frequencies(frequencies_, image_size, channel_offset);
send_array(dst, frequencies_);
}
// Distribute data
#if !DISTRIBUTE_INPUT
for (int dst = 1; dst < world_size; dst++) {
send_bytes(dst, &data, sizeof(data));
}
#endif
// Initialize proxy
ProxyType proxy;
......@@ -474,74 +397,34 @@ void run_master() {
auto grid =
proxy.allocate_grid(nr_w_layers, nr_correlations, grid_size, grid_size);
idg::Array1D<float> shift = idg::get_zero_shift();
idg::Array1D<std::pair<unsigned int, unsigned int>> baselines_all =
idg::Array1D<std::pair<unsigned int, unsigned int>> baselines =
idg::get_example_baselines(proxy, nr_stations, nr_baselines);
// Plan options
idg::Plan::Options options = get_plan_options();
omp_set_nested(true);
// Input buffers for all workers
#if DISTRIBUTE_INPUT
idg::Array2D<idg::UVW<float>> uvw_all(nr_baselines, nr_timesteps);
idg::Array3D<idg::Visibility<std::complex<float>>> visibilities_all =
idg::get_dummy_visibilities(nr_baselines, nr_timesteps, nr_channels);
#else
idg::Array2D<idg::UVW<float>> uvw(nr_baselines_master, nr_timesteps);
// Input buffers
idg::Array2D<idg::UVW<float>> uvw(nr_baselines, nr_timesteps);
idg::Array3D<idg::Visibility<std::complex<float>>> visibilities =
idg::get_dummy_visibilities(nr_baselines_master, nr_timesteps, nr_channels);
#endif
idg::get_dummy_visibilities(nr_baselines, nr_timesteps, nr_channels);
int time_offset = 0;
int bl_offset = 0;
// Set grid
proxy.set_grid(grid);
// Performance measurement
double runtime_send_input = 0;
double runtime_gridding = 0;
double runtime_degridding = 0;
double runtime_reduction = 0;
double runtime_fft = 0;
double runtime_send_grid = 0;
double runtime_receive_output = 0;
std::vector<double> runtimes_gridding(nr_cycles);
std::vector<double> runtimes_degridding(nr_cycles);
std::vector<double> runtimes_grid_reduce(nr_cycles);
std::vector<double> runtimes_grid_fft(nr_cycles);
std::vector<double> runtimes_grid_broadcast(nr_cycles);
// Iterate all cycles
for (unsigned cycle = 0; cycle < nr_cycles; cycle++) {
#if DISTRIBUTE_INPUT
// Get UVW coordinates for current cycle
data.get_uvw(uvw_all, 0, time_offset, integration_time);
// Distribute input data
MPIRequestList requests;
runtime_send_input -= omp_get_wtime();
for (unsigned int bl = 0; bl < nr_baselines_all_workers; bl++) {
unsigned int dest = 1 + (bl / nr_baselines_per_worker);
// Send visibilities
void *visibilities_ptr = (void *) visibilities_all.data(bl, 0, 0);
size_t sizeof_visibilities =
nr_timesteps * nr_channels *
sizeof(idg::Visibility<std::complex<float>>);
requests.create()->send(visibilities_ptr, sizeof_visibilities, dest);
// Send uvw coordinates
void *uvw_ptr = (void *) uvw_all.data(bl, 0);
size_t sizeof_uvw = nr_timesteps * sizeof(idg::UVW<float>);
requests.create()->send(uvw_ptr, sizeof_uvw, dest);
}
requests.wait();
runtime_send_input += omp_get_wtime();
// Get master buffers
idg::Array2D<idg::UVW<float>> uvw(uvw_all.data(nr_baselines_all_workers, 0), nr_baselines_master, nr_timesteps);
idg::Array3D<idg::Visibility<std::complex<float>>> visibilities(visibilities_all.data(nr_baselines_all_workers, 0, 0), nr_baselines_master, nr_timesteps, nr_channels);
#else
// Get UVW coordinates for current cycle
data.get_uvw(uvw, nr_baselines_all_workers, time_offset, integration_time);
#endif
idg::Array1D<std::pair<unsigned int, unsigned int>> baselines(baselines_all.data(nr_baselines_all_workers), nr_baselines_master);
data.get_uvw(uvw, bl_offset, time_offset, integration_time);
// Create plan
auto plan = std::unique_ptr<idg::Plan>(new idg::Plan(
......@@ -549,53 +432,55 @@ void run_master() {
baselines, aterms_offsets, options));
// Run gridding
runtime_gridding -= omp_get_wtime();
runtimes_gridding[cycle] = -omp_get_wtime();
proxy.gridding(*plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
runtime_gridding += omp_get_wtime();
runtimes_gridding[cycle] += omp_get_wtime();
if (cycle == (nr_cycles - 1))
{
// Get grid
grid = proxy.get_grid();
// Get grid
grid = proxy.get_grid();
// Run FFT
runtimes_grid_fft[cycle] = -omp_get_wtime();
proxy.transform(idg::FourierDomainToImageDomain, *grid);
runtimes_grid_fft[cycle] += omp_get_wtime();
// Reduce grids
runtime_reduction -= omp_get_wtime();
if (world_size > 1)
// Reduce grids
if (world_size > 1)
{
runtimes_grid_reduce[cycle] = -omp_get_wtime();
reduce_grids(grid, 0, world_size);
runtime_reduction += omp_get_wtime();
// Run fft
runtime_fft -= omp_get_wtime();
proxy.transform(idg::FourierDomainToImageDomain, *grid);
proxy.transform(idg::ImageDomainToFourierDomain, *grid);
runtime_fft += omp_get_wtime();
// Distribute grid
runtime_send_grid -= omp_get_wtime();
if (world_size > 1)
distribute_grid(grid, world_size);
runtime_send_grid += omp_get_wtime();
// Set grid
proxy.set_grid(grid);
runtimes_grid_reduce[cycle] += omp_get_wtime();
}
// Deconvolution
// not implemented
// Broadcast model image to workers
if (world_size > 1)
{
runtimes_grid_broadcast[cycle] = -omp_get_wtime();
broadcast_grid(grid, 0);
runtimes_grid_broadcast[cycle] += omp_get_wtime();
}
// Set grid
proxy.set_grid(grid);
// Run FFT
runtimes_grid_fft[cycle] -= omp_get_wtime();
proxy.transform(idg::ImageDomainToFourierDomain, *grid);
runtimes_grid_fft[cycle] += omp_get_wtime();
// Run degridding
runtime_degridding -= omp_get_wtime();
runtimes_degridding[cycle] = -omp_get_wtime();
proxy.degridding(*plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
runtime_degridding += omp_get_wtime();
// Receive visibilities
runtime_receive_output -= omp_get_wtime();
if (world_size > 1)
receive_visibilities(visibilities, nr_baselines_per_worker, world_size);
runtime_receive_output += omp_get_wtime();
runtimes_degridding[cycle] += omp_get_wtime();
// Go the the next batch of timesteps
time_offset += nr_timesteps;
......@@ -603,21 +488,24 @@ void run_master() {
// Report timings
std::clog << std::endl;
idg::report("send input", runtime_send_input);
double runtime_gridding = std::accumulate(runtimes_gridding.begin(), runtimes_gridding.end(), 0.0);
double runtime_degridding = std::accumulate(runtimes_degridding.begin(), runtimes_degridding.end(), 0.0);
double runtime_grid_fft = std::accumulate(runtimes_grid_fft.begin(), runtimes_grid_fft.end(), 0.0);
double runtime_grid_reduce = std::accumulate(runtimes_grid_reduce.begin(), runtimes_grid_reduce.end(), 0.0);
double runtime_grid_broadcast = std::accumulate(runtimes_grid_broadcast.begin(), runtimes_grid_broadcast.end(), 0.0);
idg::report("gridding", runtime_gridding);
idg::report("grid fft", runtime_fft);
idg::report("grid fft", runtime_grid_fft);
idg::report("degridding", runtime_degridding);
idg::report("reduction", runtime_reduction);
idg::report("send grid", runtime_send_grid);
idg::report("receive output", runtime_receive_output);
idg::report("grid reduce", runtime_grid_reduce);
idg::report("grid broadcast", runtime_grid_broadcast);
double runtime_imaging =
runtime_send_input + runtime_gridding + runtime_degridding +
runtime_reduction + runtime_send_grid + runtime_receive_output;
runtime_gridding + runtime_degridding + runtime_grid_fft +
runtime_grid_reduce + runtime_grid_broadcast;
idg::report("runtime imaging", runtime_imaging);
std::clog << std::endl;
// Report throughput
uint64_t nr_visibilities = 1ULL * nr_baselines * nr_timesteps * nr_channels;
uint64_t nr_visibilities = 1ULL * nr_baselines * nr_timesteps * nr_channels * world_size;
idg::report_visibilities("gridding", runtime_gridding, nr_visibilities);
idg::report_visibilities("degridding", runtime_degridding, nr_visibilities);
idg::report_visibilities("imaging", runtime_imaging, nr_visibilities);
......@@ -634,7 +522,6 @@ void run_worker() {
// Receive parameters
unsigned int nr_stations = receive_int();
unsigned int total_nr_baselines = receive_int();
unsigned int nr_baselines = receive_int();
unsigned int nr_timesteps = receive_int();
unsigned int nr_timeslots = receive_int();
......@@ -672,7 +559,7 @@ void run_worker() {
// Receive data
idg::Data data =
idg::get_example_data(total_nr_baselines, grid_size, integration_time);
idg::get_example_data(nr_baselines, grid_size, integration_time);
// Plan options
idg::Plan::Options options = get_plan_options();
......@@ -680,15 +567,10 @@ void run_worker() {
// Buffers for input data
idg::Array2D<idg::UVW<float>> uvw(nr_baselines, nr_timesteps);
#if DISTRIBUTE_INPUT
idg::Array3D<idg::Visibility<std::complex<float>>> visibilities(
nr_baselines, nr_timesteps, nr_channels);
#else
idg::Array3D<idg::Visibility<std::complex<float>>> visibilities =
idg::get_dummy_visibilities(nr_baselines, nr_timesteps, nr_channels);
int time_offset = 0;
int bl_offset = (rank - 1) * nr_baselines;
#endif
int bl_offset = 0;
// Set grid
proxy.set_grid(grid);
......@@ -696,26 +578,8 @@ void run_worker() {
// Iterate all cycles
for (unsigned cycle = 0; cycle < nr_cycles; cycle++) {
#if DISTRIBUTE_INPUT
// Receive input data
MPIRequestList requests;
for (unsigned bl = 0; bl < nr_baselines; bl++) {
// Receive visibilities
void *visibilities_ptr = (void *) visibilities.data(bl, 0, 0);
size_t sizeof_visibilities = nr_timesteps * nr_channels *
sizeof(idg::Visibility<std::complex<float>>);
requests.create()->receive(visibilities_ptr, sizeof_visibilities, 0);
// Receive uvw coordinates
void *uvw_ptr = (void *) uvw.data(bl, 0);
size_t sizeof_uvw = nr_timesteps * sizeof(idg::UVW<float>);
requests.create()->receive(uvw_ptr, sizeof_uvw, 0);
}
requests.wait();
#else
// Get UVW coordinates for current cycle
data.get_uvw(uvw, bl_offset, time_offset, integration_time);
#endif
// Create plan
auto plan = std::unique_ptr<idg::Plan>(new idg::Plan(
......@@ -726,26 +590,32 @@ void run_worker() {
proxy.gridding(*plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
if (cycle == (nr_cycles - 1))
{
// Get grid
grid = proxy.get_grid();
// Get grid
grid = proxy.get_grid();
// Reduce grids
reduce_grids(grid, rank, world_size);
// Run FFT
proxy.transform(idg::FourierDomainToImageDomain, *grid);
// Master performs FFT
synchronize();
// Reduce grids
reduce_grids(grid, rank, world_size);
// Receive grid
receive_grid(grid, 0);
// Master performs deconvolution and constructs model image
// Set grid
proxy.set_grid(grid);
} else {
synchronize();
}
// Receive model image from master
broadcast_grid(grid, 0);
// Set grid
proxy.set_grid(grid);
// Run gridding #2 (create model image)
proxy.gridding(*plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
// Run FFT
proxy.transform(idg::ImageDomainToFourierDomain, *grid);
// Run degridding
proxy.degridding(*plan, w_offset, shift, cell_size, kernel_size,
......@@ -753,8 +623,11 @@ void run_worker() {
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
// Send visibilities
send_visibilities(visibilities, rank);
// Subtract model visibilities
// not implemented
// Go the the next batch of timesteps
time_offset += nr_timesteps;
}
} // end run_worker
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment