Commit c1021c3a authored by Bram Veenboer's avatar Bram Veenboer

Update distributed example

parent 710e8a02
......@@ -19,13 +19,15 @@
using namespace std;
std::tuple<int, int, int, int, int, int, int, int> read_parameters() {
std::tuple<int, int, int, int, int, int, int, int, int> read_parameters() {
const unsigned int DEFAULT_NR_STATIONS = 52; // all LOFAR LBA stations
const unsigned int DEFAULT_NR_CHANNELS = 16 * 4; // 16 channels, 4 subbands
const unsigned int DEFAULT_NR_TIMESTEPS =
(3600 * 4); // 4 hours of observation
(3600 * 1); // 1 hour of observation
const unsigned int DEFAULT_NR_TIMESLOTS =
DEFAULT_NR_TIMESTEPS / (60 * 30); // update every 30 minutes
const unsigned int DEFAULT_TOTAL_NR_TIMESTEPS =
(3600 * 8) / (DEFAULT_NR_TIMESTEPS / 3600); // 8 hours of observation
const unsigned int DEFAULT_GRIDSIZE = 4096;
const unsigned int DEFAULT_SUBGRIDSIZE = 32;
const unsigned int DEFAULT_NR_CYCLES = 1;
......@@ -46,6 +48,10 @@ std::tuple<int, int, int, int, int, int, int, int> read_parameters() {
auto nr_timeslots =
cstr_nr_timeslots ? atoi(cstr_nr_timeslots) : DEFAULT_NR_TIMESLOTS;
char *cstr_total_nr_timesteps = getenv("TOTAL_NR_TIMESTEPS");
auto total_nr_timesteps =
cstr_total_nr_timesteps ? atoi(cstr_total_nr_timesteps) : DEFAULT_TOTAL_NR_TIMESTEPS;
char *cstr_grid_size = getenv("GRIDSIZE");
auto grid_size = cstr_grid_size ? atoi(cstr_grid_size) : DEFAULT_GRIDSIZE;
......@@ -61,13 +67,15 @@ std::tuple<int, int, int, int, int, int, int, int> read_parameters() {
auto nr_cycles = cstr_nr_cycles ? atoi(cstr_nr_cycles) : DEFAULT_NR_CYCLES;
return std::make_tuple(nr_stations, nr_channels, nr_timesteps, nr_timeslots,
grid_size, subgrid_size, kernel_size, nr_cycles);
total_nr_timesteps, grid_size, subgrid_size, kernel_size,
nr_cycles);
}
void print_parameters(unsigned int nr_stations, unsigned int nr_channels,
unsigned int nr_timesteps, unsigned int nr_timeslots,
float image_size, unsigned int grid_size,
unsigned int subgrid_size, unsigned int kernel_size) {
unsigned int total_nr_timesteps, float image_size,
unsigned int grid_size, unsigned int subgrid_size,
unsigned int kernel_size) {
const int fw1 = 30;
const int fw2 = 10;
ostream &os = clog;
......@@ -87,6 +95,9 @@ void print_parameters(unsigned int nr_stations, unsigned int nr_channels,
os << setw(fw1) << left << "Number of timeslots"
<< "== " << setw(fw2) << right << nr_timeslots << endl;
os << setw(fw1) << left << "Total number of timesteps"
<< "== " << setw(fw2) << right << total_nr_timesteps << endl;
os << setw(fw1) << left << "Imagesize"
<< "== " << setw(fw2) << right << image_size << endl;
......@@ -316,6 +327,7 @@ void run_master() {
unsigned int nr_channels;
unsigned int nr_timesteps;
unsigned int nr_timeslots;
unsigned int total_nr_timesteps;
float integration_time = 1.0;
unsigned int grid_size;
unsigned int subgrid_size;
......@@ -323,9 +335,9 @@ void run_master() {
unsigned int nr_cycles;
// Read parameters from environment
std::tie(nr_stations, nr_channels, nr_timesteps, nr_timeslots, grid_size,
subgrid_size, kernel_size, nr_cycles) =
read_parameters();
std::tie(nr_stations, nr_channels, nr_timesteps, nr_timeslots,
total_nr_timesteps, grid_size, subgrid_size,
kernel_size, nr_cycles) = read_parameters();
unsigned int nr_baselines = (nr_stations * (nr_stations - 1)) / 2;
// Initialize Data object
......@@ -342,7 +354,7 @@ void run_master() {
// Print parameters
print_parameters(nr_stations, nr_channels, nr_timesteps, nr_timeslots,
image_size, grid_size, subgrid_size, kernel_size);
total_nr_timesteps, image_size, grid_size, subgrid_size, kernel_size);
// Get the number of processes
int world_size;
......@@ -357,6 +369,7 @@ void run_master() {
send_int(dst, nr_baselines);
send_int(dst, nr_timesteps);
send_int(dst, nr_timeslots);
send_int(dst, total_nr_timesteps);
send_float(dst, integration_time);
send_int(dst, nr_channels);
send_int(dst, nr_correlations);
......@@ -411,16 +424,18 @@ void run_master() {
omp_set_nested(true);
// Input buffers
idg::Array2D<idg::UVW<float>> uvw(nr_baselines, nr_timesteps);
idg::Array2D<idg::UVW<float>> uvw =
proxy.allocate_array2d<idg::UVW<float>>(nr_baselines, nr_timesteps);
idg::Array3D<idg::Visibility<std::complex<float>>> visibilities =
idg::get_dummy_visibilities(nr_baselines, nr_timesteps, nr_channels);
int time_offset = 0;
int bl_offset = 0;
idg::get_dummy_visibilities(proxy, nr_baselines, nr_timesteps, nr_channels);
unsigned int bl_offset = 0;
// Set grid
proxy.set_grid(grid);
// Performance measurement
std::vector<double> runtimes_init(nr_cycles);
std::vector<double> runtimes_plan(nr_cycles);
std::vector<double> runtimes_gridding(nr_cycles);
std::vector<double> runtimes_degridding(nr_cycles);
std::vector<double> runtimes_grid_reduce(nr_cycles);
......@@ -429,21 +444,39 @@ void run_master() {
// Iterate all cycles
for (unsigned cycle = 0; cycle < nr_cycles; cycle++) {
// Get UVW coordinates for current cycle
data.get_uvw(uvw, bl_offset, time_offset, integration_time);
// Create plan
auto plan = std::unique_ptr<idg::Plan>(new idg::Plan(
kernel_size, subgrid_size, grid_size, cell_size, frequencies, uvw,
baselines, aterms_offsets, options));
// Run gridding
runtimes_gridding[cycle] = -omp_get_wtime();
proxy.gridding(*plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
runtimes_gridding[cycle] += omp_get_wtime();
// Run gridding and degridding for all blocks of time
for (unsigned int time_offset = 0; time_offset < total_nr_timesteps; time_offset += nr_timesteps)
{
// Get UVW coordinates
runtimes_init[cycle] -= omp_get_wtime();
data.get_uvw(uvw, bl_offset, time_offset, integration_time);
runtimes_init[cycle] += omp_get_wtime();
// Create plan
runtimes_plan[cycle] -= omp_get_wtime();
idg::Plan plan(kernel_size, subgrid_size, grid_size,
cell_size, frequencies, uvw, baselines,
aterms_offsets, options);
runtimes_plan[cycle] += omp_get_wtime();
synchronize();
// Run gridding
runtimes_gridding[cycle] = -omp_get_wtime();
proxy.gridding(plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
runtimes_gridding[cycle] += omp_get_wtime();
// Run degridding
runtimes_degridding[cycle] = -omp_get_wtime();
proxy.degridding(plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
runtimes_degridding[cycle] += omp_get_wtime();
}
// Get grid
grid = proxy.get_grid();
......@@ -479,26 +512,19 @@ void run_master() {
runtimes_grid_fft[cycle] -= omp_get_wtime();
proxy.transform(idg::ImageDomainToFourierDomain, *grid);
runtimes_grid_fft[cycle] += omp_get_wtime();
// Run degridding
runtimes_degridding[cycle] = -omp_get_wtime();
proxy.degridding(*plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
runtimes_degridding[cycle] += omp_get_wtime();
// Go the the next batch of timesteps
time_offset += nr_timesteps;
}
// Report timings
std::clog << std::endl;
double runtime_init = std::accumulate(runtimes_init.begin(), runtimes_init.end(), 0.0);
double runtime_plan = std::accumulate(runtimes_plan.begin(), runtimes_plan.end(), 0.0);
double runtime_gridding = std::accumulate(runtimes_gridding.begin(), runtimes_gridding.end(), 0.0);
double runtime_degridding = std::accumulate(runtimes_degridding.begin(), runtimes_degridding.end(), 0.0);
double runtime_grid_fft = std::accumulate(runtimes_grid_fft.begin(), runtimes_grid_fft.end(), 0.0);
double runtime_grid_reduce = std::accumulate(runtimes_grid_reduce.begin(), runtimes_grid_reduce.end(), 0.0);
double runtime_grid_broadcast = std::accumulate(runtimes_grid_broadcast.begin(), runtimes_grid_broadcast.end(), 0.0);
idg::report("initialize", runtime_init);
idg::report("plan", runtime_plan);
idg::report("gridding", runtime_gridding);
idg::report("grid fft", runtime_grid_fft);
idg::report("degridding", runtime_degridding);
......@@ -531,6 +557,7 @@ void run_worker() {
unsigned int nr_baselines = receive_int();
unsigned int nr_timesteps = receive_int();
unsigned int nr_timeslots = receive_int();
unsigned int total_nr_timesteps = receive_int();
float integration_time = receive_float();
unsigned int nr_channels = receive_int();
unsigned int nr_correlations = receive_int();
......@@ -572,31 +599,41 @@ void run_worker() {
omp_set_nested(true);
// Buffers for input data
idg::Array2D<idg::UVW<float>> uvw(nr_baselines, nr_timesteps);
idg::Array2D<idg::UVW<float>> uvw =
proxy.allocate_array2d<idg::UVW<float>>(nr_baselines, nr_timesteps);
idg::Array3D<idg::Visibility<std::complex<float>>> visibilities =
idg::get_dummy_visibilities(nr_baselines, nr_timesteps, nr_channels);
int time_offset = 0;
int bl_offset = 0;
idg::get_dummy_visibilities(proxy, nr_baselines, nr_timesteps, nr_channels);
unsigned int bl_offset = 0;
// Set grid
proxy.set_grid(grid);
// Iterate all cycles
for (unsigned cycle = 0; cycle < nr_cycles; cycle++) {
// Get UVW coordinates for current cycle
data.get_uvw(uvw, bl_offset, time_offset, integration_time);
// Create plan
auto plan = std::unique_ptr<idg::Plan>(new idg::Plan(
kernel_size, subgrid_size, grid_size, cell_size, frequencies, uvw,
baselines, aterms_offsets, options));
// Run gridding
proxy.gridding(*plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
// Run gridding and degridding for all blocks of time
for (unsigned int time_offset = 0; time_offset < total_nr_timesteps; time_offset += nr_timesteps)
{
// Get UVW coordinates for current cycle
data.get_uvw(uvw, bl_offset, time_offset, integration_time);
// Create plan
idg::Plan plan(kernel_size, subgrid_size, grid_size,
cell_size, frequencies, uvw, baselines,
aterms_offsets, options);
synchronize();
// Run gridding
proxy.gridding(plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
// Run degridding
proxy.degridding(plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
}
// Get grid
grid = proxy.get_grid();
......@@ -615,25 +652,11 @@ void run_worker() {
// Set grid
proxy.set_grid(grid);
// Run gridding #2 (create model image)
proxy.gridding(*plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
// Run FFT
proxy.transform(idg::ImageDomainToFourierDomain, *grid);
// Run degridding
proxy.degridding(*plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
synchronize();
// Subtract model visibilities
// not implemented
// Go the the next batch of timesteps
time_offset += nr_timesteps;
}
} // end run_worker
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment