Skip to content
Snippets Groups Projects
Commit 1385e53e authored by Jan David Mol's avatar Jan David Mol
Browse files

Taks #10118: Added timers for TABTranspose, and use 1 thread for data transpose in outputproc

parent a34dc9fa
No related branches found
No related tags found
No related merge requests found
...@@ -41,7 +41,7 @@ namespace LOFAR { ...@@ -41,7 +41,7 @@ namespace LOFAR {
BudgetTimer::~BudgetTimer() BudgetTimer::~BudgetTimer()
{ {
if (print_on_destruction) { if (print_on_destruction && budget > 0.0) {
const double realTimePerc = 100.0 * getAverage() / budget; const double realTimePerc = 100.0 * getAverage() / budget;
if (log_on_destruction) { if (log_on_destruction) {
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include <Common/Timer.h> #include <Common/Timer.h>
#include <ApplCommon/PVSSDatapointDefs.h> #include <ApplCommon/PVSSDatapointDefs.h>
#include <CoInterface/TimeFuncs.h> #include <CoInterface/TimeFuncs.h>
#include <CoInterface/BudgetTimer.h>
using namespace std; using namespace std;
using boost::format; using boost::format;
...@@ -156,7 +157,14 @@ void Block::write( BeamformedData &output ) { ...@@ -156,7 +157,14 @@ void Block::write( BeamformedData &output ) {
* The implemented strategy coalesces the writes in batches. Large batches * The implemented strategy coalesces the writes in batches. Large batches
* will cause TLB misses due to the larger number of arrays we need to index. * will cause TLB misses due to the larger number of arrays we need to index.
*/ */
const size_t MAXBATCHSIZE = 4;
// On CEP2, optimal is MAXBATCHSIZE=4 and using 4 threads (in the for loop)
// On CEP4, optimal is MAXBATCHSIZE=488 and using 1 thread.
//
// We want real-time performance, (see tTABTranspose | grep write, should produce
// numbers <1s), but if we do not need multiple threads, doing the work in one
// is more efficient.
const size_t MAXBATCHSIZE = 488;
// Stride between samples in output // Stride between samples in output
const ptrdiff_t dst_sample_stride = output.samples.strides()[0]; const ptrdiff_t dst_sample_stride = output.samples.strides()[0];
...@@ -165,8 +173,6 @@ void Block::write( BeamformedData &output ) { ...@@ -165,8 +173,6 @@ void Block::write( BeamformedData &output ) {
const vector<float> zeroes(nrChannels * nrSamples, 0.0f); const vector<float> zeroes(nrChannels * nrSamples, 0.0f);
// Four threads gives the best performance on CEP2, see figures given by 'tTABTranspose | grep write'
# pragma omp parallel for num_threads(4)
for (size_t subbandBase = 0; subbandBase < subbandCache.size(); subbandBase += MAXBATCHSIZE) { for (size_t subbandBase = 0; subbandBase < subbandCache.size(); subbandBase += MAXBATCHSIZE) {
// Determine actual batch size // Determine actual batch size
const size_t BATCHSIZE = std::min(MAXBATCHSIZE, subbandCache.size() - subbandBase); const size_t BATCHSIZE = std::min(MAXBATCHSIZE, subbandCache.size() - subbandBase);
...@@ -235,7 +241,7 @@ bool Block::complete() const { ...@@ -235,7 +241,7 @@ bool Block::complete() const {
// The BlockCollector collects blocks from different rtcp processes for a TAB. // The BlockCollector collects blocks from different rtcp processes for a TAB.
// More precisely, we have one BlockCollector per file (i.e. part). // More precisely, we have one BlockCollector per file (i.e. part).
BlockCollector::BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx, size_t nrSubbands, size_t nrChannels, size_t nrSamples, size_t nrBlocks, size_t maxBlocksInFlight, const std::string &logPrefix ) BlockCollector::BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx, size_t nrSubbands, size_t nrChannels, size_t nrSamples, size_t nrBlocks, size_t maxBlocksInFlight, const std::string &logPrefix, double blockDuration )
: :
logPrefix(logPrefix + "[BlockCollector] "), logPrefix(logPrefix + "[BlockCollector] "),
...@@ -256,6 +262,8 @@ BlockCollector::BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx ...@@ -256,6 +262,8 @@ BlockCollector::BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx
nrChannels(nrChannels), nrChannels(nrChannels),
nrSamples(nrSamples), nrSamples(nrSamples),
blockDuration(blockDuration),
maxBlocksInFlight(maxBlocksInFlight), maxBlocksInFlight(maxBlocksInFlight),
canDrop(maxBlocksInFlight > 0), canDrop(maxBlocksInFlight > 0),
lastEmitted(-1), lastEmitted(-1),
...@@ -285,7 +293,14 @@ void BlockCollector::addSubband( SmartPtr<Subband> &subband ) { ...@@ -285,7 +293,14 @@ void BlockCollector::addSubband( SmartPtr<Subband> &subband ) {
void BlockCollector::inputLoop() { void BlockCollector::inputLoop() {
SmartPtr<Subband> subband; SmartPtr<Subband> subband;
BudgetTimer processTimer(
"BlockCollector::inputLoop: weaving subbands",
blockDuration / nrSubbands,
true, true);
while ((subband = inputQueue.remove()) != NULL) { while ((subband = inputQueue.remove()) != NULL) {
BudgetTimer::StartStop ss(processTimer);
processSubband(subband); processSubband(subband);
} }
} }
...@@ -294,14 +309,18 @@ void BlockCollector::inputLoop() { ...@@ -294,14 +309,18 @@ void BlockCollector::inputLoop() {
void BlockCollector::outputLoop() { void BlockCollector::outputLoop() {
SmartPtr<Block> block; SmartPtr<Block> block;
NSTimer writeTimer("Block: data transpose/zeroing", true, true); BudgetTimer writeTimer(
"BlockCollector::outputLoop: data transpose/zeroing",
blockDuration,
true, true);
while ((block = outputQueue.remove()) != NULL) { while ((block = outputQueue.remove()) != NULL) {
SmartPtr<BeamformedData> output = outputPool.free.remove(); SmartPtr<BeamformedData> output = outputPool.free.remove();
writeTimer.start(); {
block->write(*output); BudgetTimer::StartStop ss(writeTimer);
writeTimer.stop(); block->write(*output);
}
outputPool.filled.append(output); outputPool.filled.append(output);
} }
......
...@@ -182,7 +182,7 @@ namespace LOFAR ...@@ -182,7 +182,7 @@ namespace LOFAR
* maxBlocksInFlight: the maximum number of blocks to process in * maxBlocksInFlight: the maximum number of blocks to process in
* parallel (or 0 for no limit). * parallel (or 0 for no limit).
*/ */
BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx, size_t nrSubbands, size_t nrChannels, size_t nrSamples, size_t nrBlocks = 0, size_t maxBlocksInFlight = 0, const std::string &logPrefix = "" ); BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx, size_t nrSubbands, size_t nrChannels, size_t nrSamples, size_t nrBlocks = 0, size_t maxBlocksInFlight = 0, const std::string &logPrefix = "", double blockDuration = 0.0 );
~BlockCollector(); ~BlockCollector();
...@@ -227,6 +227,8 @@ namespace LOFAR ...@@ -227,6 +227,8 @@ namespace LOFAR
const size_t nrChannels; const size_t nrChannels;
const size_t nrSamples; const size_t nrSamples;
const double blockDuration;
// upper limit for blocks.size(), or 0 if unlimited // upper limit for blocks.size(), or 0 if unlimited
const size_t maxBlocksInFlight; const size_t maxBlocksInFlight;
......
...@@ -233,7 +233,7 @@ bool process(Stream &controlStream) ...@@ -233,7 +233,7 @@ bool process(Stream &controlStream)
// #blocks here is the number of blocks that can be collected in parallel on // #blocks here is the number of blocks that can be collected in parallel on
// the input side (what the correlator sends) // the input side (what the correlator sends)
collectors[fileIdx] = new TABTranspose::BlockCollector( collectors[fileIdx] = new TABTranspose::BlockCollector(
*outputPools[fileIdx], fileIdx, nrSubbands, nrChannels, nrSamples, parset.settings.nrBlocks(), parset.settings.realTime ? 5 : 0, logPrefix); *outputPools[fileIdx], fileIdx, nrSubbands, nrChannels, nrSamples, parset.settings.nrBlocks(), parset.settings.realTime ? 5 : 0, logPrefix, parset.settings.blockDuration());
LOG_DEBUG_STR("Setting up writer for " << file.location.filename); LOG_DEBUG_STR("Setting up writer for " << file.location.filename);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment