Skip to content
Snippets Groups Projects
Commit e7b0b324 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #11059: Added loss statistics for Beamformed data and streamlined logs

parent 0542b1ab
Branches
Tags
No related merge requests found
...@@ -62,7 +62,7 @@ void Subband::write(Stream &stream) const { ...@@ -62,7 +62,7 @@ void Subband::write(Stream &stream) const {
stream.write(&dim2, sizeof dim2); stream.write(&dim2, sizeof dim2);
stream.write(data.origin(), data.num_elements() * sizeof *data.origin()); stream.write(data.origin(), data.num_elements() * sizeof *data.origin());
LOG_DEBUG_STR("Written block " << id); LOG_DEBUG_STR("Written block " << id);
} }
...@@ -84,7 +84,7 @@ void Subband::read(Stream &stream) { ...@@ -84,7 +84,7 @@ void Subband::read(Stream &stream) {
std::ostream &operator<<(std::ostream &str, const Subband::BlockID &id) std::ostream &operator<<(std::ostream &str, const Subband::BlockID &id)
{ {
return str << "file " << id.fileIdx << " block " << id.block << " subband " << id.subband; return str << "stream " << id.fileIdx << " block " << id.block << " subband " << id.subband;
} }
...@@ -213,12 +213,10 @@ void Block::write( BeamformedData &output ) { ...@@ -213,12 +213,10 @@ void Block::write( BeamformedData &output ) {
} }
// Report summary // Report summary
size_t nrLost = std::count(subbandCache.begin(), subbandCache.end(), (Subband*)NULL); if (complete())
LOG_DEBUG_STR("[block " << blockIdx << " stream " << fileIdx << "] [Block] Written " << (nrSubbands - nrSubbandsLeft) << " subbands, lost " << nrSubbandsLeft << " subbands.");
if (nrLost == 0)
LOG_DEBUG_STR("Block: written " << (nrSubbands - nrLost) << " subbands, lost " << nrLost << " subbands.");
else else
LOG_INFO_STR("Block: written " << (nrSubbands - nrLost) << " subbands, lost " << nrLost << " subbands."); LOG_INFO_STR("[block " << blockIdx << " stream " << fileIdx << "] [Block] Written " << (nrSubbands - nrSubbandsLeft) << " subbands, lost " << nrSubbandsLeft << " subbands.");
} }
...@@ -231,9 +229,16 @@ bool Block::complete() const { ...@@ -231,9 +229,16 @@ bool Block::complete() const {
// More precisely, we have one BlockCollector per file (i.e. part). // More precisely, we have one BlockCollector per file (i.e. part).
BlockCollector::BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx, size_t nrSubbands, size_t nrChannels, size_t nrSamples, size_t nrBlocks, size_t maxBlocksInFlight ) BlockCollector::BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx, size_t nrSubbands, size_t nrChannels, size_t nrSamples, size_t nrBlocks, size_t maxBlocksInFlight )
: :
logPrefix(str(format("[stream %u] [BlockCollector] ") % fileIdx)),
nrFullBlocksLost(0),
nrBlockSubbandsLost(0),
nrBlockSubbandsReceived(0),
nrBlockSubbandsTooLate(0),
// drop = false: we drop at the output, not at the input, but we do want to protect against unbounded growth // drop = false: we drop at the output, not at the input, but we do want to protect against unbounded growth
inputQueue(str(format("BlockCollector::inputQueue [file %u]") % fileIdx), (1 + maxBlocksInFlight) * nrSubbands, false), inputQueue(str(format("BlockCollector::inputQueue [stream %u]") % fileIdx), (1 + maxBlocksInFlight) * nrSubbands, false),
outputQueue(str(format("BlockCollector::outputQueue [file %u]") % fileIdx), 3, false), outputQueue(str(format("BlockCollector::outputQueue [stream %u]") % fileIdx), 3, false),
outputPool(outputPool), outputPool(outputPool),
...@@ -302,7 +307,9 @@ void BlockCollector::outputLoop() { ...@@ -302,7 +307,9 @@ void BlockCollector::outputLoop() {
// subsequent Blocks are missing something, send it (or them) off into the // subsequent Blocks are missing something, send it (or them) off into the
// outputQueue for write-back to storage. // outputQueue for write-back to storage.
void BlockCollector::processSubband( SmartPtr<Subband> &subband ) { void BlockCollector::processSubband( SmartPtr<Subband> &subband ) {
LOG_DEBUG_STR("BlockCollector: Add " << subband->id); ++nrBlockSubbandsReceived;
LOG_DEBUG_STR(logPrefix << "Add " << subband->id);
const size_t &blockIdx = subband->id.block; const size_t &blockIdx = subband->id.block;
...@@ -311,10 +318,11 @@ void BlockCollector::processSubband( SmartPtr<Subband> &subband ) { ...@@ -311,10 +318,11 @@ void BlockCollector::processSubband( SmartPtr<Subband> &subband ) {
if (!have(blockIdx)) { if (!have(blockIdx)) {
if (!fetch(blockIdx)) { if (!fetch(blockIdx)) {
// too late -- discard packet // too late -- discard packet
LOG_DEBUG_STR("BlockCollector: Dropped subband " << subband->id.subband << " of file " << subband->id.fileIdx); ++nrBlockSubbandsTooLate;
LOG_DEBUG_STR(logPrefix << "Dropped subband " << subband->id.subband);
// if we can't drop, we shouldn't even be here. // if we can't drop, we shouldn't even be here.
ASSERTSTR(canDrop, "Received block " << blockIdx << ", but already emitted up to " << lastEmitted << " for file " << subband->id.fileIdx << " subband " << subband->id.subband); ASSERTSTR(canDrop, "Received block " << blockIdx << ", but already emitted up to " << lastEmitted << " for stream " << subband->id.fileIdx << " subband " << subband->id.subband);
return; return;
} }
...@@ -336,7 +344,7 @@ void BlockCollector::processSubband( SmartPtr<Subband> &subband ) { ...@@ -336,7 +344,7 @@ void BlockCollector::processSubband( SmartPtr<Subband> &subband ) {
if (nrBlocks > 0 && blockIdx == nrBlocks - 1) { if (nrBlocks > 0 && blockIdx == nrBlocks - 1) {
// Received last block -- wrap up // Received last block -- wrap up
LOG_INFO_STR("BlockCollector: Received last block of file " << fileIdx); LOG_INFO_STR(logPrefix << "Received last block");
ASSERT(blocks.empty()); ASSERT(blocks.empty());
...@@ -357,6 +365,17 @@ void BlockCollector::finish() { ...@@ -357,6 +365,17 @@ void BlockCollector::finish() {
emitUpTo(maxBlock()); emitUpTo(maxBlock());
} }
// Update loss statistics
nrFullBlocksLost += nrBlocks - lastEmitted + 1;
if (nrBlocks > 0) {
if (nrBlockSubbandsReceived > 0)
LOG_ERROR_STR(logPrefix << "Did not receive " << (100.0 - 100.0*nrBlockSubbandsReceived/nrSubbands/nrBlocks) << "% of the data");
if (nrFullBlocksLost > 0 || nrBlockSubbandsLost > 0)
LOG_ERROR_STR(logPrefix << "Did not send " << (100.0*nrFullBlocksLost + 100.0*nrBlockSubbandsLost/nrSubbands)/nrBlocks << "% of the data");
if (nrBlockSubbandsTooLate > 0)
LOG_ERROR_STR(logPrefix << "Received " << (100.0*nrBlockSubbandsTooLate/nrSubbands/nrBlocks) << "% of the data too late. Consider increasing maxBlocksInFlight.");
}
if (!canDrop) { if (!canDrop) {
// Should have received everything // Should have received everything
ASSERT(nrBlocks == 0 || (ssize_t)nrBlocks == lastEmitted + 1); ASSERT(nrBlocks == 0 || (ssize_t)nrBlocks == lastEmitted + 1);
...@@ -389,18 +408,23 @@ void BlockCollector::emit(size_t blockIdx) { ...@@ -389,18 +408,23 @@ void BlockCollector::emit(size_t blockIdx) {
} else { } else {
ASSERT((ssize_t)blockIdx > lastEmitted); ASSERT((ssize_t)blockIdx > lastEmitted);
} }
lastEmitted = blockIdx;
// clear data we didn't receive // fetch data
SmartPtr<Block> &block = blocks.at(blockIdx); SmartPtr<Block> &block = blocks.at(blockIdx);
LOG_DEBUG_STR(logPrefix << "Emitting block " << blockIdx << " (last emit: " << lastEmitted << ")");
LOG_DEBUG_STR("BlockCollector: emitting block " << blockIdx << " of file " << fileIdx); // update loss statistics
nrFullBlocksLost += blockIdx - lastEmitted + 1;
nrBlockSubbandsLost += block->nrSubbandsLeft;
// emit to outputPool.filled() // emit to outputPool.filled()
outputQueue.append(block); outputQueue.append(block);
// remove from our administration // remove from our administration
blocks.erase(blockIdx); blocks.erase(blockIdx);
// remember where we left off
lastEmitted = blockIdx;
} }
...@@ -487,7 +511,7 @@ void Receiver::receiveLoop() ...@@ -487,7 +511,7 @@ void Receiver::receiveLoop()
const size_t fileIdx = subband->id.fileIdx; const size_t fileIdx = subband->id.fileIdx;
ASSERTSTR(collectors.find(fileIdx) != collectors.end(), "Received a piece of file " << fileIdx << ", which is unknown to me"); ASSERTSTR(collectors.find(fileIdx) != collectors.end(), "Received a piece of stream " << fileIdx << ", which is unknown to me");
//LOG_DEBUG_STR("File " << fileIdx << ": Adding subband " << subband.id); //LOG_DEBUG_STR("File " << fileIdx << ": Adding subband " << subband.id);
...@@ -618,7 +642,7 @@ MultiSender::~MultiSender() ...@@ -618,7 +642,7 @@ MultiSender::~MultiSender()
{ {
LOG_INFO_STR("MultiSender: realTime = " << itsParset.settings.realTime << ", maxRetentionTime = " << maxRetentionTime); LOG_INFO_STR("MultiSender: realTime = " << itsParset.settings.realTime << ", maxRetentionTime = " << maxRetentionTime);
for (HostMap::const_iterator i = hostMap.begin(); i != hostMap.end(); ++i) { for (HostMap::const_iterator i = hostMap.begin(); i != hostMap.end(); ++i) {
LOG_INFO_STR("MultiSender: [file " << i->first << " to " << i->second.hostName << "] Dropped " << drop_rates.at(i->first).mean() << "% of the data"); LOG_INFO_STR("MultiSender: [stream " << i->first << " to " << i->second.hostName << "] Dropped " << drop_rates.at(i->first).mean() << "% of the data");
} }
} }
......
...@@ -119,6 +119,7 @@ namespace LOFAR ...@@ -119,6 +119,7 @@ namespace LOFAR
// Cache of subband data for this block // Cache of subband data for this block
std::vector< SmartPtr<Subband> > subbandCache; std::vector< SmartPtr<Subband> > subbandCache;
public:
// The number of subbands left to receive. // The number of subbands left to receive.
size_t nrSubbandsLeft; size_t nrSubbandsLeft;
}; };
...@@ -171,6 +172,14 @@ namespace LOFAR ...@@ -171,6 +172,14 @@ namespace LOFAR
void finish(); void finish();
private: private:
const std::string logPrefix;
// statistics
size_t nrFullBlocksLost; // could not emit anything of these blocks
size_t nrBlockSubbandsLost; // could not receive this number of subbands of any block
size_t nrBlockSubbandsReceived; // received this number of subbands of any block
size_t nrBlockSubbandsTooLate; // received this number of subbands of any block too late
/* /*
* Elements travel along the following path * Elements travel along the following path
* *
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment