Skip to content
Snippets Groups Projects
Commit ace3828e authored by Bram Veenboer's avatar Bram Veenboer Committed by Jan David Mol
Browse files

CoherentStokes: major refactor with optional reduction

parent 1b2e98ae
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,10 @@
//#
//# $Id$
#include <cooperative_groups.h>
namespace cg = cooperative_groups;
#if !(TIME_INTEGRATION_FACTOR >= 1)
#error Precondition violated: TIME_INTEGRATION_FACTOR >= 1
#endif
......@@ -114,6 +118,46 @@ inline __device__ void write_stokes(
# endif
}
inline __device__ void operator+=(float4&a, const float4& b) {
a.x += b.x;
a.y += b.y;
a.z += b.z;
a.w += b.w;
}
/*!
* Compute and return the sum of n elements of data.
* This is implemented in the form of an parallel reduction
* which overwrites the input. The result (the sum) is written
* to data[0] and returned.
*/
template<typename T>
__device__ T reduce_sum(
cg::thread_group group,
T *data,
size_t n)
{
// Each iteration halves the number of active threads
// Each thread adds its partial data[i] to data[lane+i]
for (unsigned i = (n+1) / 2; i > 0; i /= 2)
{
for (unsigned j = 0; j < i; j += group.size())
{
unsigned lane = group.thread_rank() + j;
group.sync();
if (lane < i)
{
data[lane] += data[lane + i];
}
group.sync();
}
}
return data[0];
}
/*!
* Computes the Stokes I or IQUV, or outputs the 4 complex voltages (XrXiYrYi).
* http://www.astron.nl/~romein/papers/EuroPar-11/EuroPar-11.pdf
......@@ -135,10 +179,7 @@ inline __device__ void write_stokes(
* \endcode
*
* The kernel's first parallel dimension is on the channels; the second
* dimension is in time; the third on the tabs. The thread block size based on
* these factors could be larger then the hardmare max. Therefore<tt>
* NR_CHANNELS * timeParallelFactor * NR_TABS</tt> should not exceed the
* hardware maximum of threads per block (1024 on an NVIDIA K10).
* dimension is in time; the third on the tabs.
*
* \param[out] output
* 4D output array of stokes values. Each sample contains 1 or 4
......@@ -176,12 +217,41 @@ inline __device__ void write_stokes(
* timeParallelFactor, \c NR_CHANNELS.
*/
extern "C" __global__ void coherentStokes(OutputDataType output,
const InputDataType input,
unsigned timeParallelFactor)
const InputDataType input)
{
unsigned channel_idx = blockIdx.x * blockDim.x + threadIdx.x;
unsigned channel_idx = blockIdx.x;
unsigned time_idx = blockIdx.y * blockDim.y + threadIdx.y;
unsigned tab_idx = blockIdx.z * blockDim.z + threadIdx.z;
unsigned tab_idx = blockIdx.z;
const unsigned timeParallelFactor = gridDim.y * blockDim.y;
const unsigned nrTimePerBlock = NR_SAMPLES_PER_CHANNEL / timeParallelFactor;
// This kernel can operate in two modes:
// 1) time-parallel mode, with threads in a block working on different integration periods
// - ignores the x dimension of the block
// - uses the y dimension of the block to distribute the work
// 2) reduction mode, with threads in a block working on the same integration period
// - uses the x dimension of the block to distribute the work
// - expects the y dimension of the block to be 1 (for cooperative groups to work correctly)
// Determine which of the modes outlined above should be used
bool use_reduction = blockDim.x > 1;
#if defined(DEBUG)
if (use_reduction) {
assert(blockDim.y == 1);
} else {
assert(blockDim.x == 1);
}
#endif
// Setup thread group (only applies to the reduction mode)
// In the time-parallel mode, we explicitely have to set lane and nr_threads
cg::thread_block group = cg::this_thread_block();
const int lane = use_reduction ? group.thread_rank() : 0;
const int nr_threads = use_reduction? group.size() : 1;
// Shared memory, only used for the reduction mode
__shared__ float4 s_data[64];
// We support all sizes of TABs, channels, and integrations:
// skip current thread if not needed.
......@@ -195,24 +265,23 @@ extern "C" __global__ void coherentStokes(OutputDataType output,
//# set of Stokes. For parallelism over time, split the sample range in
//# timeParallelFactor sub-ranges and process these independently.
//# For complex voltages, we don't compute anything; it's only a transpose.
//#
//# TODO: This kernel must be rewritten as if it is a transpose to get efficient global mem read and write accesses.
//# This reqs shmem. Note that combining shmem barriers with the conditional returns above is problematic.
//# TODO: For very large TIME_INTEGRATION_FACTOR (e.g. 1024), we may need parallel reduction to have enough parallelization. TBD.
unsigned read_idx = time_idx * (NR_SAMPLES_PER_CHANNEL / timeParallelFactor);
unsigned read_idx = time_idx * nrTimePerBlock;
unsigned write_idx = read_idx / TIME_INTEGRATION_FACTOR;
for ( ; read_idx < (time_idx + 1) * (NR_SAMPLES_PER_CHANNEL / timeParallelFactor) &&
for ( ; read_idx < (time_idx + 1) * nrTimePerBlock &&
read_idx < NR_SAMPLES_PER_CHANNEL; write_idx++)
{
//# Integrate all values in the current stride
float4 stokes = { 0.0f, 0.0f, 0.0f, 0.0f };
//# Do the integration
for (unsigned stride_read_idx_end = read_idx + TIME_INTEGRATION_FACTOR;
read_idx < stride_read_idx_end; read_idx++)
{
float2 X = (*input)[tab_idx][0][read_idx][channel_idx];
float2 Y = (*input)[tab_idx][1][read_idx][channel_idx];
for (unsigned i = lane; i < TIME_INTEGRATION_FACTOR; i += nr_threads) {
if (i >= (read_idx + TIME_INTEGRATION_FACTOR)) {
break;
}
float2 X = (*input)[tab_idx][0][read_idx + i][channel_idx];
float2 Y = (*input)[tab_idx][1][read_idx + i][channel_idx];
if (COMPLEX_VOLTAGES == 1) {
stokes.x += X.x;
......@@ -231,6 +300,14 @@ extern "C" __global__ void coherentStokes(OutputDataType output,
}
}
//# Store partial result in shared memory and reduce
if (use_reduction) {
s_data[lane] = stokes;
stokes = reduce_sum(group, s_data, group.size());
}
//# Store the final result
if (lane == 0) {
if (COMPLEX_VOLTAGES == 1) {
write_complex_voltages(output, tab_idx, write_idx, channel_idx, stokes);
} else {
......@@ -243,4 +320,8 @@ extern "C" __global__ void coherentStokes(OutputDataType output,
}
}
}
//# Advance to next block to integrate
read_idx += TIME_INTEGRATION_FACTOR;
}
}
......@@ -104,132 +104,42 @@ namespace LOFAR
setArg(0, buffers.output);
setArg(1, buffers.input);
// The following code paragraphs attempt to determine a reasonable
// execution configuration for any #channels, timeParallelFactor, #TABs.
//
// We first produce viable configs combinatorically (starting with the
// max and min parallelization dim constants), and
// then filter these configs using some heuristics (no compilation or run).
// We end up with 1 "best" config that we then apply after some assertions.
// The used heuristics are crude. Lots of room for improvement.
// The current strategy is to maximize block level parallelism by
// setting grid.x to #channels and grid.z to #TABs.
const gpu::Device device(_context.getDevice());
const unsigned warpSize = device.getAttribute(CU_DEVICE_ATTRIBUTE_WARP_SIZE);
const unsigned minTimeParallelThreads = 1;
const gpu::Block maxLocalWorkSize = device.getMaxBlockDims();
const unsigned maxTimeParallelThreads =
gcd(params.nrSamplesPerChannel / params.timeIntegrationFactor,
maxLocalWorkSize.y);
const unsigned nrChannelsPerBlock = 1;
const unsigned nrTABsPerBlock = 1;
// Generate viable execution configurations.
// Step through each of the 2 parallelization dims and verify if supp by hw.
// Store candidates along with their timeParallelFactor to be passed as kernel arg.
using std::vector;
vector<CoherentStokesExecConfig> configs;
for (unsigned nrTimeParallelThreadsPerBlock = minTimeParallelThreads;
/* exit cond at the end */ ;
nrTimeParallelThreadsPerBlock *= smallestFactorOf(maxTimeParallelThreads /
nrTimeParallelThreadsPerBlock)) {
unsigned nrChannelThreads = align(params.nrChannels,
nrChannelsPerBlock);
unsigned nrTimeParallelThreads = align(maxTimeParallelThreads,
nrTimeParallelThreadsPerBlock);
unsigned nrTABsThreads = align(params.nrTABs, nrTABsPerBlock);
const int block_x = nrChannelsPerBlock;
const int block_y = nrTimeParallelThreadsPerBlock;
const int block_z = nrTABsPerBlock;
gpu::Grid grid (nrChannelThreads / block_x,
nrTimeParallelThreads / block_y,
nrTABsThreads / block_z);
gpu::Block block(block_x, block_y, block_z);
// Filter configs that fall outside device caps (e.g. max threads per block).
string errMsgs;
setEnqueueWorkSizes(grid, block, &errMsgs);
if (errMsgs.empty()) {
CoherentStokesExecConfig ec;
ec.grid = grid;
ec.block = block;
ec.nrTimeParallelThreads = nrTimeParallelThreads;
configs.push_back(ec);
} else {
LOG_DEBUG_STR("Skipping unsupported CoherentStokes exec config: " << errMsgs);
}
unsigned nrTimeParallelThreadsPerBlock = 1;
unsigned nrTimeParallelThreads = maxTimeParallelThreads;
unsigned nrTimeReductionThreads = 1;
// Loop exit check after (not before) considering a config, esp. for
// maxTimeParallelThreads == 1 and other cases of smallestFactorOf(1).
if (nrTimeParallelThreadsPerBlock == maxTimeParallelThreads)
break;
}
ASSERT(!configs.empty());
// Select config to use by narrowing down a few times based on some heuristics.
// Each time, the surviving exec configs are copied to a new vector.
// The first filter prefers >=64 threads per block.
// Thereafter, we prefer high occupancy, and max #blks (= smallest blk size).
vector<CoherentStokesExecConfig> configsMinBlockSize;
unsigned minThreadsPerBlock = 64;
const unsigned warpSize = device.getAttribute(CU_DEVICE_ATTRIBUTE_WARP_SIZE);
while (minThreadsPerBlock >= warpSize) {
for (vector<CoherentStokesExecConfig>::const_iterator it = configs.begin();
it != configs.end(); ++it) {
if (it->block.x * it->block.y * it->block.z >= minThreadsPerBlock) {
configsMinBlockSize.push_back(*it);
}
// Try to use at least the warpSize number of threads per block
while (nrTimeParallelThreadsPerBlock < warpSize &&
nrTimeParallelThreadsPerBlock < maxTimeParallelThreads) {
nrTimeParallelThreadsPerBlock *=2;
}
if (!configsMinBlockSize.empty()) {
break;
}
minThreadsPerBlock /= 2;
}
if (configsMinBlockSize.empty()) {
configsMinBlockSize = configs; // tough luck
// Switch to the reduction mode for sufficiently large time integration factor
if (params.timeIntegrationFactor >= nrTimeParallelThreadsPerBlock) {
nrTimeParallelThreadsPerBlock = 1;
nrTimeReductionThreads = 64; // this value has to match the shared memory in CoherentStokes.cu
}
// High occupancy heuristic.
vector<CoherentStokesExecConfig> configsMaxOccupancy;
double maxOccupancy = 0.0;
for (vector<CoherentStokesExecConfig>::const_iterator it = configsMinBlockSize.begin();
it != configsMinBlockSize.end(); ++it) {
setEnqueueWorkSizes(it->grid, it->block);
double occupancy = predictMultiProcOccupancy();
if (fpEquals(occupancy, maxOccupancy, 0.05)) { // 0.05 occ diff is meaningless for sure
configsMaxOccupancy.push_back(*it);
} else if (occupancy > maxOccupancy) {
maxOccupancy = occupancy;
configsMaxOccupancy.clear();
configsMaxOccupancy.push_back(*it);
}
}
// Within earlier constraints, prefer min block size to maximize #blocks.
CoherentStokesExecConfig selectedConfig;
unsigned minBlockSizeSeen = maxLocalWorkSize.x * maxLocalWorkSize.y * maxLocalWorkSize.z + 1;
for (vector<CoherentStokesExecConfig>::const_iterator it = configsMaxOccupancy.begin();
it != configsMaxOccupancy.end(); ++it) {
unsigned blockSize = it->block.x * it->block.y * it->block.z;
if (blockSize < minBlockSizeSeen) {
minBlockSizeSeen = blockSize;
selectedConfig = *it;
}
LOG_DEBUG_STR("Coherent Stokes exec config candidate (last round): " << *it);
}
itsTimeParallelFactor = selectedConfig.nrTimeParallelThreads;
ASSERTSTR(params.nrSamplesPerChannel % itsTimeParallelFactor == 0,
"itsTimeParallelFactor=" << itsTimeParallelFactor);
ASSERTSTR(params.nrSamplesPerChannel % (params.timeIntegrationFactor * itsTimeParallelFactor) == 0,
"itsTimeParallelFactor=" << itsTimeParallelFactor);
setEnqueueWorkSizes(selectedConfig.grid, selectedConfig.block);
LOG_INFO_STR("Coherent Stokes exec config has a (predicted) occupancy of " << maxOccupancy); // off by max 0.05...
// The itsTimeParallelFactor immediate kernel arg must outlive kernel runs.
setArg(2, itsTimeParallelFactor); // could be a kernel define, but not yet known at kernel compilation
gpu::Grid grid (params.nrChannels,
nrTimeParallelThreads / nrTimeParallelThreadsPerBlock,
params.nrTABs);
gpu::Block block(nrTimeReductionThreads, nrTimeParallelThreadsPerBlock, 1);
unsigned timeParallelFactor = grid.y * block.y;
ASSERTSTR(params.nrSamplesPerChannel % timeParallelFactor == 0,
"timeParallelFactor=" << timeParallelFactor);
ASSERTSTR(params.nrSamplesPerChannel % (params.timeIntegrationFactor * timeParallelFactor) == 0,
"timeParallelFactor=" << timeParallelFactor);
setEnqueueWorkSizes(grid, block);
nrOperations = (size_t) params.nrChannels * params.nrSamplesPerChannel * params.nrTABs * (params.nrStokes == 1 ? 8 : 20 + 2.0 / params.timeIntegrationFactor);
nrBytesRead = (size_t) params.nrChannels * params.nrSamplesPerChannel * params.nrTABs * NR_POLARIZATIONS * sizeof(std::complex<float>);
......
......@@ -86,11 +86,6 @@ namespace LOFAR
};
private:
// The timeParallelFactor is not a Parameter passed in, but is a kernel
// arg, so it must be a member var to outlive kernel launches.
unsigned itsTimeParallelFactor;
unsigned smallestFactorOf(unsigned n) const;
};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment