diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc index 4d062e03277c33d64b8f82e30016b08b6800dcdb..680fc0c669695f7ffca700b8bc39b335726fa454 100644 --- a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc +++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc @@ -62,7 +62,7 @@ void Subband::write(Stream &stream) const { stream.write(&dim2, sizeof dim2); stream.write(data.origin(), data.num_elements() * sizeof *data.origin()); - LOG_DEBUG_STR("Written block " << id); + LOG_DEBUG_STR("Written block " << id); } @@ -84,7 +84,7 @@ void Subband::read(Stream &stream) { std::ostream &operator<<(std::ostream &str, const Subband::BlockID &id) { - return str << "file " << id.fileIdx << " block " << id.block << " subband " << id.subband; + return str << "stream " << id.fileIdx << " block " << id.block << " subband " << id.subband; } @@ -213,12 +213,10 @@ void Block::write( BeamformedData &output ) { } // Report summary - size_t nrLost = std::count(subbandCache.begin(), subbandCache.end(), (Subband*)NULL); - - if (nrLost == 0) - LOG_DEBUG_STR("Block: written " << (nrSubbands - nrLost) << " subbands, lost " << nrLost << " subbands."); + if (complete()) + LOG_DEBUG_STR("[block " << blockIdx << " stream " << fileIdx << "] [Block] Written " << (nrSubbands - nrSubbandsLeft) << " subbands, lost " << nrSubbandsLeft << " subbands."); else - LOG_INFO_STR("Block: written " << (nrSubbands - nrLost) << " subbands, lost " << nrLost << " subbands."); + LOG_INFO_STR("[block " << blockIdx << " stream " << fileIdx << "] [Block] Written " << (nrSubbands - nrSubbandsLeft) << " subbands, lost " << nrSubbandsLeft << " subbands."); } @@ -231,9 +229,16 @@ bool Block::complete() const { // More precisely, we have one BlockCollector per file (i.e. part). BlockCollector::BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx, size_t nrSubbands, size_t nrChannels, size_t nrSamples, size_t nrBlocks, size_t maxBlocksInFlight ) : + logPrefix(str(format("[stream %u] [BlockCollector] ") % fileIdx)), + + nrFullBlocksLost(0), + nrBlockSubbandsLost(0), + nrBlockSubbandsReceived(0), + nrBlockSubbandsTooLate(0), + // drop = false: we drop at the output, not at the input, but we do want to protect against unbounded growth - inputQueue(str(format("BlockCollector::inputQueue [file %u]") % fileIdx), (1 + maxBlocksInFlight) * nrSubbands, false), - outputQueue(str(format("BlockCollector::outputQueue [file %u]") % fileIdx), 3, false), + inputQueue(str(format("BlockCollector::inputQueue [stream %u]") % fileIdx), (1 + maxBlocksInFlight) * nrSubbands, false), + outputQueue(str(format("BlockCollector::outputQueue [stream %u]") % fileIdx), 3, false), outputPool(outputPool), @@ -302,7 +307,9 @@ void BlockCollector::outputLoop() { // subsequent Blocks are missing something, send it (or them) off into the // outputQueue for write-back to storage. void BlockCollector::processSubband( SmartPtr<Subband> &subband ) { - LOG_DEBUG_STR("BlockCollector: Add " << subband->id); + ++nrBlockSubbandsReceived; + + LOG_DEBUG_STR(logPrefix << "Add " << subband->id); const size_t &blockIdx = subband->id.block; @@ -311,10 +318,11 @@ void BlockCollector::processSubband( SmartPtr<Subband> &subband ) { if (!have(blockIdx)) { if (!fetch(blockIdx)) { // too late -- discard packet - LOG_DEBUG_STR("BlockCollector: Dropped subband " << subband->id.subband << " of file " << subband->id.fileIdx); + ++nrBlockSubbandsTooLate; + LOG_DEBUG_STR(logPrefix << "Dropped subband " << subband->id.subband); // if we can't drop, we shouldn't even be here. - ASSERTSTR(canDrop, "Received block " << blockIdx << ", but already emitted up to " << lastEmitted << " for file " << subband->id.fileIdx << " subband " << subband->id.subband); + ASSERTSTR(canDrop, "Received block " << blockIdx << ", but already emitted up to " << lastEmitted << " for stream " << subband->id.fileIdx << " subband " << subband->id.subband); return; } @@ -336,7 +344,7 @@ void BlockCollector::processSubband( SmartPtr<Subband> &subband ) { if (nrBlocks > 0 && blockIdx == nrBlocks - 1) { // Received last block -- wrap up - LOG_INFO_STR("BlockCollector: Received last block of file " << fileIdx); + LOG_INFO_STR(logPrefix << "Received last block"); ASSERT(blocks.empty()); @@ -357,6 +365,17 @@ void BlockCollector::finish() { emitUpTo(maxBlock()); } + // Update loss statistics + nrFullBlocksLost += nrBlocks - lastEmitted + 1; + if (nrBlocks > 0) { + if (nrBlockSubbandsReceived > 0) + LOG_ERROR_STR(logPrefix << "Did not receive " << (100.0 - 100.0*nrBlockSubbandsReceived/nrSubbands/nrBlocks) << "% of the data"); + if (nrFullBlocksLost > 0 || nrBlockSubbandsLost > 0) + LOG_ERROR_STR(logPrefix << "Did not send " << (100.0*nrFullBlocksLost + 100.0*nrBlockSubbandsLost/nrSubbands)/nrBlocks << "% of the data"); + if (nrBlockSubbandsTooLate > 0) + LOG_ERROR_STR(logPrefix << "Received " << (100.0*nrBlockSubbandsTooLate/nrSubbands/nrBlocks) << "% of the data too late. Consider increasing maxBlocksInFlight."); + } + if (!canDrop) { // Should have received everything ASSERT(nrBlocks == 0 || (ssize_t)nrBlocks == lastEmitted + 1); @@ -389,18 +408,23 @@ void BlockCollector::emit(size_t blockIdx) { } else { ASSERT((ssize_t)blockIdx > lastEmitted); } - lastEmitted = blockIdx; - // clear data we didn't receive + // fetch data SmartPtr<Block> &block = blocks.at(blockIdx); + LOG_DEBUG_STR(logPrefix << "Emitting block " << blockIdx << " (last emit: " << lastEmitted << ")"); - LOG_DEBUG_STR("BlockCollector: emitting block " << blockIdx << " of file " << fileIdx); + // update loss statistics + nrFullBlocksLost += blockIdx - lastEmitted + 1; + nrBlockSubbandsLost += block->nrSubbandsLeft; // emit to outputPool.filled() outputQueue.append(block); // remove from our administration blocks.erase(blockIdx); + + // remember where we left off + lastEmitted = blockIdx; } @@ -487,7 +511,7 @@ void Receiver::receiveLoop() const size_t fileIdx = subband->id.fileIdx; - ASSERTSTR(collectors.find(fileIdx) != collectors.end(), "Received a piece of file " << fileIdx << ", which is unknown to me"); + ASSERTSTR(collectors.find(fileIdx) != collectors.end(), "Received a piece of stream " << fileIdx << ", which is unknown to me"); //LOG_DEBUG_STR("File " << fileIdx << ": Adding subband " << subband.id); @@ -618,7 +642,7 @@ MultiSender::~MultiSender() { LOG_INFO_STR("MultiSender: realTime = " << itsParset.settings.realTime << ", maxRetentionTime = " << maxRetentionTime); for (HostMap::const_iterator i = hostMap.begin(); i != hostMap.end(); ++i) { - LOG_INFO_STR("MultiSender: [file " << i->first << " to " << i->second.hostName << "] Dropped " << drop_rates.at(i->first).mean() << "% of the data"); + LOG_INFO_STR("MultiSender: [stream " << i->first << " to " << i->second.hostName << "] Dropped " << drop_rates.at(i->first).mean() << "% of the data"); } } diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.h b/RTCP/Cobalt/CoInterface/src/TABTranspose.h index d302d6726cb83d75f94849c680fb54eec4bc0989..ef33a0d4d21a3ee85fe570694f589d115794d699 100644 --- a/RTCP/Cobalt/CoInterface/src/TABTranspose.h +++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.h @@ -119,6 +119,7 @@ namespace LOFAR // Cache of subband data for this block std::vector< SmartPtr<Subband> > subbandCache; + public: // The number of subbands left to receive. size_t nrSubbandsLeft; }; @@ -171,6 +172,14 @@ namespace LOFAR void finish(); private: + const std::string logPrefix; + + // statistics + size_t nrFullBlocksLost; // could not emit anything of these blocks + size_t nrBlockSubbandsLost; // could not receive this number of subbands of any block + size_t nrBlockSubbandsReceived; // received this number of subbands of any block + size_t nrBlockSubbandsTooLate; // received this number of subbands of any block too late + /* * Elements travel along the following path *