diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc index 680fc0c669695f7ffca700b8bc39b335726fa454..f1737320fddce62aca6cd1529b7bab4220c21dfc 100644 --- a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc +++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc @@ -368,12 +368,19 @@ void BlockCollector::finish() { // Update loss statistics nrFullBlocksLost += nrBlocks - lastEmitted + 1; if (nrBlocks > 0) { - if (nrBlockSubbandsReceived > 0) - LOG_ERROR_STR(logPrefix << "Did not receive " << (100.0 - 100.0*nrBlockSubbandsReceived/nrSubbands/nrBlocks) << "% of the data"); - if (nrFullBlocksLost > 0 || nrBlockSubbandsLost > 0) - LOG_ERROR_STR(logPrefix << "Did not send " << (100.0*nrFullBlocksLost + 100.0*nrBlockSubbandsLost/nrSubbands)/nrBlocks << "% of the data"); - if (nrBlockSubbandsTooLate > 0) - LOG_ERROR_STR(logPrefix << "Received " << (100.0*nrBlockSubbandsTooLate/nrSubbands/nrBlocks) << "% of the data too late. Consider increasing maxBlocksInFlight."); + const float didNotReceivePerc = 100.0 - 100.0*nrBlockSubbandsReceived/nrSubbands/nrBlocks; + const float didNotSendPerc = (100.0*nrFullBlocksLost + 100.0*nrBlockSubbandsLost/nrSubbands) / nrBlocks; + const float lostPerc = didNotSendPerc - didNotReceivePerc; + const float tooLatePerc = 100.0 * nrBlockSubbandsTooLate / nrSubbands / nrBlocks; + + if (didNotReceivePerc > 0) + LOG_ERROR_STR(logPrefix << "Did not receive " << didNotReceivePerc << "% of the data"); + if (lostPerc > 0) + LOG_ERROR_STR(logPrefix << "I lost " << lostPerc << "% of the data"); + if (didNotSendPerc > 0) + LOG_ERROR_STR(logPrefix << "Did not send " << didNotSendPerc << "% of the data"); + if (tooLatePerc > 0) + LOG_ERROR_STR(logPrefix << "Received " << tooLatePerc << "% of the data too late. Consider increasing maxBlocksInFlight."); } if (!canDrop) { diff --git a/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc b/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc index 2de0dd6ea531f242f00aff12eaaaf908df65d480..649be8f700208df377b46d165e5f706c70154e9e 100644 --- a/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc +++ b/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc @@ -229,7 +229,7 @@ bool process(Stream &controlStream) } // Create a collector for this fileIdx - // #blocks here is the number of blocks that can be constructed in parallel on + // #blocks here is the number of blocks that can be collected in parallel on // the input side (what the correlator sends) collectors[fileIdx] = new TABTranspose::BlockCollector( *outputPools[fileIdx], fileIdx, nrSubbands, nrChannels, nrSamples, parset.settings.nrBlocks(), parset.settings.realTime ? 5 : 0); diff --git a/RTCP/Cobalt/OutputProc/src/InputThread.cc b/RTCP/Cobalt/OutputProc/src/InputThread.cc index f520d26ddbeed9c844aa5f8d1354aacb7687bbbb..d53c81bac9d82ca51bc1f01c93e2a459afb31d4c 100644 --- a/RTCP/Cobalt/OutputProc/src/InputThread.cc +++ b/RTCP/Cobalt/OutputProc/src/InputThread.cc @@ -39,10 +39,13 @@ namespace LOFAR const std::string &logPrefix) : itsLogPrefix(logPrefix + "[InputThread] "), + itsNrIntegrationsReceived(0), + itsNrIntegrations(parset.settings.correlator.nrIntegrations), itsInputDescriptor(getStreamDescriptorBetweenIONandStorage(parset, CORRELATED_DATA, streamNr)), itsOutputPool(outputPool), itsDeadline(parset.settings.realTime ? parset.settings.stopTime : 0) { + ASSERT(parset.settings.correlator.enabled); } @@ -55,17 +58,27 @@ namespace LOFAR for(SmartPtr<StreamableData> data; (data = itsOutputPool.free.remove()) != NULL; itsOutputPool.filled.append(data)) { data->read(streamFromION, true, 1); // Cobalt writes with an alignment of 1 + ++itsNrIntegrationsReceived; - LOG_DEBUG_STR(itsLogPrefix << "Read block with seqno = " << data->sequenceNumber()); + LOG_DEBUG_STR(itsLogPrefix << "Received integration " << data->sequenceNumber()); } } catch (TimeOutException &) { LOG_WARN_STR(itsLogPrefix << "Connection from " << itsInputDescriptor << " timed out"); } catch (EndOfStreamException &) { - LOG_INFO_STR(itsLogPrefix << "Connection from " << itsInputDescriptor << " closed"); + LOG_INFO_STR(itsLogPrefix << "Connection from " << itsInputDescriptor << " closed by foreign host"); } catch (SystemCallException &ex) { LOG_WARN_STR(itsLogPrefix << "Connection from " << itsInputDescriptor << " failed: " << ex.text()); } + // report statistics + const float didNotReceivePerc = 100.0 - 100.0 * itsNrIntegrationsReceived / itsNrIntegrations; + const float didNotSendPerc = didNotReceivePerc; + + if (didNotReceivePerc > 0) + LOG_ERROR_STR(itsLogPrefix << "Did not receive " << didNotReceivePerc << "% of the data"); + if (didNotSendPerc > 0) + LOG_ERROR_STR(itsLogPrefix << "Did not send " << didNotSendPerc << "% of the data"); + // Append end-of-stream marker itsOutputPool.filled.append(NULL); } diff --git a/RTCP/Cobalt/OutputProc/src/InputThread.h b/RTCP/Cobalt/OutputProc/src/InputThread.h index 30bf2218c91189cae469caae16a16dbd0ea5ee60..85b561240d468c1c460fc76829c91ae60c78fd7c 100644 --- a/RTCP/Cobalt/OutputProc/src/InputThread.h +++ b/RTCP/Cobalt/OutputProc/src/InputThread.h @@ -55,7 +55,13 @@ namespace LOFAR virtual void process(); private: - const std::string itsLogPrefix, itsInputDescriptor; + const std::string itsLogPrefix; + + // we receive integration "blocks" + size_t itsNrIntegrationsReceived; + const size_t itsNrIntegrations; + + const std::string itsInputDescriptor; Pool<StreamableData> &itsOutputPool; const double itsDeadline; }; diff --git a/RTCP/Cobalt/OutputProc/src/OutputThread.cc b/RTCP/Cobalt/OutputProc/src/OutputThread.cc index 689d8396545b905a1430fda569650cd100236dbc..98735efc431700c35b296d78b650d8581c38adea 100644 --- a/RTCP/Cobalt/OutputProc/src/OutputThread.cc +++ b/RTCP/Cobalt/OutputProc/src/OutputThread.cc @@ -94,7 +94,7 @@ namespace LOFAR if (droppedBlocks > 0) { itsBlocksDropped += droppedBlocks; - LOG_WARN_STR(itsLogPrefix << "Just dropped " << droppedBlocks << " blocks. Dropped " << itsBlocksDropped << " blocks and written " << itsBlocksWritten << " blocks so far."); + LOG_WARN_STR(itsLogPrefix << "Did not receive " << droppedBlocks << " blocks"); itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPED + streamNrStr, itsBlocksDropped * static_cast<float>(itsParset.settings.blockDuration()));