Commit 1ec3ae87 authored by Bram Veenboer's avatar Bram Veenboer

Apply clang formatting

parent e71677c5
Pipeline #8136 passed with stages
in 24 minutes and 19 seconds
......@@ -27,7 +27,7 @@ std::tuple<int, int, int, int, int, int, int, int, int> read_parameters() {
const unsigned int DEFAULT_NR_TIMESLOTS =
DEFAULT_NR_TIMESTEPS / (60 * 30); // update every 30 minutes
const unsigned int DEFAULT_TOTAL_NR_TIMESTEPS =
(3600 * 8) / (DEFAULT_NR_TIMESTEPS / 3600); // 8 hours of observation
(3600 * 8) / (DEFAULT_NR_TIMESTEPS / 3600); // 8 hours of observation
const unsigned int DEFAULT_GRIDSIZE = 4096;
const unsigned int DEFAULT_SUBGRIDSIZE = 32;
const unsigned int DEFAULT_NR_CYCLES = 1;
......@@ -49,8 +49,9 @@ std::tuple<int, int, int, int, int, int, int, int, int> read_parameters() {
cstr_nr_timeslots ? atoi(cstr_nr_timeslots) : DEFAULT_NR_TIMESLOTS;
char *cstr_total_nr_timesteps = getenv("TOTAL_NR_TIMESTEPS");
auto total_nr_timesteps =
cstr_total_nr_timesteps ? atoi(cstr_total_nr_timesteps) : DEFAULT_TOTAL_NR_TIMESTEPS;
auto total_nr_timesteps = cstr_total_nr_timesteps
? atoi(cstr_total_nr_timesteps)
: DEFAULT_TOTAL_NR_TIMESTEPS;
char *cstr_grid_size = getenv("GRIDSIZE");
auto grid_size = cstr_grid_size ? atoi(cstr_grid_size) : DEFAULT_GRIDSIZE;
......@@ -67,8 +68,8 @@ std::tuple<int, int, int, int, int, int, int, int, int> read_parameters() {
auto nr_cycles = cstr_nr_cycles ? atoi(cstr_nr_cycles) : DEFAULT_NR_CYCLES;
return std::make_tuple(nr_stations, nr_channels, nr_timesteps, nr_timeslots,
total_nr_timesteps, grid_size, subgrid_size, kernel_size,
nr_cycles);
total_nr_timesteps, grid_size, subgrid_size,
kernel_size, nr_cycles);
}
void print_parameters(unsigned int nr_stations, unsigned int nr_channels,
......@@ -95,7 +96,7 @@ void print_parameters(unsigned int nr_stations, unsigned int nr_channels,
os << setw(fw1) << left << "Number of timeslots"
<< "== " << setw(fw2) << right << nr_timeslots << endl;
os << setw(fw1) << left << "Total number of timesteps"
os << setw(fw1) << left << "Total number of timesteps"
<< "== " << setw(fw2) << right << total_nr_timesteps << endl;
os << setw(fw1) << left << "Imagesize"
......@@ -152,91 +153,59 @@ void receive_bytes(int src, void *buf, size_t bytes) {
MPI_Recv(buf, bytes, MPI_BYTE, src, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
class MPIRequest
{
public:
class MPIRequest {
public:
MPIRequest(bool blocking = false) : m_blocking(blocking) {}
MPIRequest(bool blocking = false) :
m_blocking(blocking)
{
}
void send(
const void *buf,
int bytes,
int dest,
int tag = 0)
{
void send(const void *buf, int bytes, int dest, int tag = 0) {
MPI_Isend(buf, bytes, MPI_BYTE, dest, tag, MPI_COMM_WORLD, &m_request);
if (m_blocking)
{
if (m_blocking) {
wait();
}
}
void receive(
void *buf,
int bytes,
int source,
int tag = 0)
{
void receive(void *buf, int bytes, int source, int tag = 0) {
MPI_Irecv(buf, bytes, MPI_BYTE, source, tag, MPI_COMM_WORLD, &m_request);
if (m_blocking)
{
if (m_blocking) {
wait();
}
}
void wait()
{
MPI_Wait(&m_request, MPI_STATUS_IGNORE);
}
void wait() { MPI_Wait(&m_request, MPI_STATUS_IGNORE); }
private:
private:
MPI_Request m_request;
bool m_blocking;
};
class MPIRequestList
{
public:
MPIRequestList() :
m_requests(0)
{
}
class MPIRequestList {
public:
MPIRequestList() : m_requests(0) {}
~MPIRequestList()
{
wait();
}
~MPIRequestList() { wait(); }
std::shared_ptr<MPIRequest> create(bool blocking = false)
{
std::shared_ptr<MPIRequest> create(bool blocking = false) {
m_requests.emplace_back(new MPIRequest(blocking));
return m_requests.back();
}
void wait()
{
for (auto &request : m_requests)
{
void wait() {
for (auto &request : m_requests) {
request->wait();
}
}
private:
private:
std::vector<std::shared_ptr<MPIRequest>> m_requests;
};
void synchronize() {
MPI_Barrier(MPI_COMM_WORLD);
}
void synchronize() { MPI_Barrier(MPI_COMM_WORLD); }
void print(int rank, const char *message) {
std::clog << "[" << rank << "] " << message << std::endl;
}
void print(int rank, const std::string& message) {
void print(int rank, const std::string &message) {
print(rank, message.c_str());
}
......@@ -248,67 +217,51 @@ idg::Plan::Options get_plan_options() {
return options;
}
void reduce_grids(
std::shared_ptr<idg::Grid> grid,
unsigned int rank,
unsigned int world_size)
{
unsigned int w = 0; // W-stacking is handled by the workers
void reduce_grids(std::shared_ptr<idg::Grid> grid, unsigned int rank,
unsigned int world_size) {
unsigned int w = 0; // W-stacking is handled by the workers
unsigned int grid_size = grid->get_y_dim();
idg::Array2D<std::complex<float>> tmp(grid_size, grid_size);
size_t sizeof_row = grid_size * sizeof(std::complex<float>);
#pragma omp parallel
#pragma omp parallel
{
for (unsigned int pol = 0; pol < NR_POLARIZATIONS; pol++)
{
for (unsigned int i = (world_size+1)/2; i > 0; i /= 2)
{
if ((unsigned int) rank < i)
{
if (omp_get_thread_num() == 0)
{
for (unsigned int pol = 0; pol < NR_POLARIZATIONS; pol++) {
for (unsigned int i = (world_size + 1) / 2; i > 0; i /= 2) {
if ((unsigned int)rank < i) {
if (omp_get_thread_num() == 0) {
MPIRequestList requests;
for (unsigned int y = 0; y < grid_size; y++)
{
for (unsigned int y = 0; y < grid_size; y++) {
requests.create()->receive(tmp.data(y, 0), sizeof_row, i + rank);
}
requests.wait();
}
auto& grid_ = *grid;
auto &grid_ = *grid;
#pragma omp barrier
#pragma omp for
#pragma omp barrier
#pragma omp for
for (unsigned int y = 0; y < grid_size; y++)
for (unsigned int x = 0; x < grid_size; x++)
{
for (unsigned int x = 0; x < grid_size; x++) {
grid_(w, pol, y, x) += *tmp.data(y, x);
}
} else if (rank < (2 * i) && omp_get_thread_num() == 0)
{
} else if (rank < (2 * i) && omp_get_thread_num() == 0) {
MPIRequestList requests;
for (unsigned int y = 0; y < grid_size; y++)
{
for (unsigned int y = 0; y < grid_size; y++) {
requests.create()->send(tmp.data(y, 0), sizeof_row, rank - i);
}
}
} // end for i
} // end for pol
} // end pragma parallel
} // end for i
} // end for pol
} // end pragma parallel
}
void broadcast_grid(
std::shared_ptr<idg::Grid> grid,
int root)
{
void broadcast_grid(std::shared_ptr<idg::Grid> grid, int root) {
unsigned int grid_size = grid->get_y_dim();
unsigned int w = 0; // W-stacking is handled by the workers
for (unsigned int y = 0; y < grid_size; y++)
{
for (unsigned int pol = 0; pol < NR_POLARIZATIONS; pol++)
{
unsigned int w = 0; // W-stacking is handled by the workers
for (unsigned int y = 0; y < grid_size; y++) {
for (unsigned int pol = 0; pol < NR_POLARIZATIONS; pol++) {
std::complex<float> *row_ptr = grid->data(w, pol, y, 0);
size_t sizeof_row = grid_size * sizeof(std::complex<float>);
MPI_Bcast(row_ptr, sizeof_row, MPI_BYTE, root, MPI_COMM_WORLD);
......@@ -335,9 +288,9 @@ void run_master() {
unsigned int nr_cycles;
// Read parameters from environment
std::tie(nr_stations, nr_channels, nr_timesteps, nr_timeslots,
total_nr_timesteps, grid_size, subgrid_size,
kernel_size, nr_cycles) = read_parameters();
std::tie(nr_stations, nr_channels, nr_timesteps, nr_timeslots,
total_nr_timesteps, grid_size, subgrid_size, kernel_size,
nr_cycles) = read_parameters();
unsigned int nr_baselines = (nr_stations * (nr_stations - 1)) / 2;
// Initialize Data object
......@@ -354,7 +307,8 @@ void run_master() {
// Print parameters
print_parameters(nr_stations, nr_channels, nr_timesteps, nr_timeslots,
total_nr_timesteps, image_size, grid_size, subgrid_size, kernel_size);
total_nr_timesteps, image_size, grid_size, subgrid_size,
kernel_size);
// Get the number of processes
int world_size;
......@@ -424,11 +378,13 @@ void run_master() {
omp_set_nested(true);
// Buffers for input data
unsigned int nr_time_blocks = std::ceil((float) total_nr_timesteps / nr_timesteps);
idg::Array3D<idg::UVW<float>> uvws =
proxy.allocate_array3d<idg::UVW<float>>(nr_time_blocks, nr_baselines, nr_timesteps);
unsigned int nr_time_blocks =
std::ceil((float)total_nr_timesteps / nr_timesteps);
idg::Array3D<idg::UVW<float>> uvws = proxy.allocate_array3d<idg::UVW<float>>(
nr_time_blocks, nr_baselines, nr_timesteps);
idg::Array3D<idg::Visibility<std::complex<float>>> visibilities =
idg::get_dummy_visibilities(proxy, nr_baselines, nr_timesteps, nr_channels);
idg::get_dummy_visibilities(proxy, nr_baselines, nr_timesteps,
nr_channels);
unsigned int bl_offset = 0;
// Vector of plans
......@@ -449,48 +405,44 @@ void run_master() {
// Iterate all cycles
runtime_imaging = -omp_get_wtime();
for (unsigned cycle = 0; cycle < nr_cycles; cycle++)
{
for (unsigned cycle = 0; cycle < nr_cycles; cycle++) {
// Run gridding and degridding for all blocks of time
for (unsigned int t = 0; t < nr_time_blocks; t++)
{
for (unsigned int t = 0; t < nr_time_blocks; t++) {
unsigned int time_offset = t * nr_timesteps;
// Get UVW coordinates for current cycle
idg::Array2D<idg::UVW<float>> uvw(uvws.data(t, 0, 0), nr_baselines, nr_timesteps);
if (cycle == 0)
{
idg::Array2D<idg::UVW<float>> uvw(uvws.data(t, 0, 0), nr_baselines,
nr_timesteps);
if (cycle == 0) {
runtimes_init[t] -= omp_get_wtime();
data.get_uvw(uvw, bl_offset, time_offset, integration_time);
runtimes_init[t] += omp_get_wtime();
}
// Create plan
if (cycle == 0)
{
if (cycle == 0) {
runtimes_plan[t] -= omp_get_wtime();
plans.emplace_back(new
idg::Plan(kernel_size, subgrid_size, grid_size,
cell_size, frequencies, uvw, baselines,
aterms_offsets, options));
plans.emplace_back(new idg::Plan(kernel_size, subgrid_size, grid_size,
cell_size, frequencies, uvw, baselines,
aterms_offsets, options));
runtimes_plan[t] += omp_get_wtime();
}
idg::Plan& plan = *plans[t];
idg::Plan &plan = *plans[t];
synchronize();
// Run gridding
runtimes_gridding[cycle] -= omp_get_wtime();
proxy.gridding(plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
subgrid_size, frequencies, visibilities, uvw, baselines,
aterms, aterms_offsets, spheroidal);
synchronize();
runtimes_gridding[cycle] += omp_get_wtime();
// Run degridding
runtimes_degridding[cycle] -= omp_get_wtime();
proxy.degridding(plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
subgrid_size, frequencies, visibilities, uvw, baselines,
aterms, aterms_offsets, spheroidal);
synchronize();
runtimes_degridding[cycle] += omp_get_wtime();
}
......@@ -501,8 +453,7 @@ void run_master() {
runtimes_grid_fft[cycle] += omp_get_wtime();
// Reduce grids
if (world_size > 1)
{
if (world_size > 1) {
// Get grid
grid = proxy.get_grid();
......@@ -515,8 +466,7 @@ void run_master() {
// not implemented
// Broadcast model image to workers
if (world_size > 1)
{
if (world_size > 1) {
runtimes_grid_broadcast[cycle] = -omp_get_wtime();
broadcast_grid(grid, 0);
runtimes_grid_broadcast[cycle] += omp_get_wtime();
......@@ -534,13 +484,20 @@ void run_master() {
// Report timings
std::clog << std::endl;
double runtime_init = std::accumulate(runtimes_init.begin(), runtimes_init.end(), 0.0);
double runtime_plan = std::accumulate(runtimes_plan.begin(), runtimes_plan.end(), 0.0);
double runtime_gridding = std::accumulate(runtimes_gridding.begin(), runtimes_gridding.end(), 0.0);
double runtime_degridding = std::accumulate(runtimes_degridding.begin(), runtimes_degridding.end(), 0.0);
double runtime_grid_fft = std::accumulate(runtimes_grid_fft.begin(), runtimes_grid_fft.end(), 0.0);
double runtime_grid_reduce = std::accumulate(runtimes_grid_reduce.begin(), runtimes_grid_reduce.end(), 0.0);
double runtime_grid_broadcast = std::accumulate(runtimes_grid_broadcast.begin(), runtimes_grid_broadcast.end(), 0.0);
double runtime_init =
std::accumulate(runtimes_init.begin(), runtimes_init.end(), 0.0);
double runtime_plan =
std::accumulate(runtimes_plan.begin(), runtimes_plan.end(), 0.0);
double runtime_gridding =
std::accumulate(runtimes_gridding.begin(), runtimes_gridding.end(), 0.0);
double runtime_degridding = std::accumulate(runtimes_degridding.begin(),
runtimes_degridding.end(), 0.0);
double runtime_grid_fft =
std::accumulate(runtimes_grid_fft.begin(), runtimes_grid_fft.end(), 0.0);
double runtime_grid_reduce = std::accumulate(runtimes_grid_reduce.begin(),
runtimes_grid_reduce.end(), 0.0);
double runtime_grid_broadcast = std::accumulate(
runtimes_grid_broadcast.begin(), runtimes_grid_broadcast.end(), 0.0);
idg::report("initialize", runtime_init);
idg::report("plan", runtime_plan);
idg::report("gridding", runtime_gridding);
......@@ -552,11 +509,12 @@ void run_master() {
std::clog << std::endl;
// Report throughput
uint64_t nr_visibilities = 1ULL * nr_cycles * nr_baselines * total_nr_timesteps * nr_channels * world_size;
uint64_t nr_visibilities = 1ULL * nr_cycles * nr_baselines *
total_nr_timesteps * nr_channels * world_size;
idg::report_visibilities("gridding", runtime_gridding, nr_visibilities);
idg::report_visibilities("degridding", runtime_degridding, nr_visibilities);
idg::report_visibilities("imaging", runtime_imaging, nr_visibilities);
} // end run_master
} // end run_master
void run_worker() {
// Get the rank of the process
......@@ -614,11 +572,13 @@ void run_worker() {
omp_set_nested(true);
// Buffers for input data
unsigned int nr_time_blocks = std::ceil((float) total_nr_timesteps / nr_timesteps);
idg::Array3D<idg::UVW<float>> uvws =
proxy.allocate_array3d<idg::UVW<float>>(nr_time_blocks, nr_baselines, nr_timesteps);
unsigned int nr_time_blocks =
std::ceil((float)total_nr_timesteps / nr_timesteps);
idg::Array3D<idg::UVW<float>> uvws = proxy.allocate_array3d<idg::UVW<float>>(
nr_time_blocks, nr_baselines, nr_timesteps);
idg::Array3D<idg::Visibility<std::complex<float>>> visibilities =
idg::get_dummy_visibilities(proxy, nr_baselines, nr_timesteps, nr_channels);
idg::get_dummy_visibilities(proxy, nr_baselines, nr_timesteps,
nr_channels);
unsigned int bl_offset = 0;
// Vector of plans
......@@ -630,38 +590,35 @@ void run_worker() {
// Iterate all cycles
for (unsigned cycle = 0; cycle < nr_cycles; cycle++) {
// Run gridding and degridding for all blocks of time
for (unsigned int t = 0; t < nr_time_blocks; t++)
{
for (unsigned int t = 0; t < nr_time_blocks; t++) {
unsigned int time_offset = t * nr_timesteps;
// Get UVW coordinates for current cycle
idg::Array2D<idg::UVW<float>> uvw(uvws.data(t, 0, 0), nr_baselines, nr_timesteps);
if (cycle == 0)
{
idg::Array2D<idg::UVW<float>> uvw(uvws.data(t, 0, 0), nr_baselines,
nr_timesteps);
if (cycle == 0) {
data.get_uvw(uvw, bl_offset, time_offset, integration_time);
}
// Create plan
if (cycle == 0)
{
plans.emplace_back(new
idg::Plan(kernel_size, subgrid_size, grid_size,
cell_size, frequencies, uvw, baselines,
aterms_offsets, options));
if (cycle == 0) {
plans.emplace_back(new idg::Plan(kernel_size, subgrid_size, grid_size,
cell_size, frequencies, uvw, baselines,
aterms_offsets, options));
}
idg::Plan& plan = *plans[t];
idg::Plan &plan = *plans[t];
synchronize();
// Run gridding
proxy.gridding(plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
subgrid_size, frequencies, visibilities, uvw, baselines,
aterms, aterms_offsets, spheroidal);
synchronize();
// Run degridding
proxy.degridding(plan, w_offset, shift, cell_size, kernel_size,
subgrid_size, frequencies, visibilities, uvw,
baselines, aterms, aterms_offsets, spheroidal);
subgrid_size, frequencies, visibilities, uvw, baselines,
aterms, aterms_offsets, spheroidal);
synchronize();
}
......@@ -688,8 +645,7 @@ void run_worker() {
// Subtract model visibilities
// not implemented
}
} // end run_worker
} // end run_worker
void run() {
// Initialize the MPI environment
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment