diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc index ad70ed747797450a6e200a0b045c1e86b63e3a49..4bcdc423041a438c0eb4cb19d34a35e6fbb96076 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc @@ -738,9 +738,10 @@ namespace LOFAR /* * Forward the output to writeCorrelatedOutput (if visibilities are produced). * - * Note that loss of throughput must be detected here, since writeCorrelatedOutput() - * can freeze in its write() routine. Detecting correlator loss here also allows - * us to derive a global loss figure for this subband. + * Note that live reporting loss of throughput must be done here, since writeCorrelatedOutput() + * can freeze in its write() routine. + * + * Note also that we do not catch loss after we put it in outputQueue. */ if (ps.settings.correlator.enabled) { const double maxRetentionTime = 3.0 + ps.settings.blockDuration(); @@ -787,17 +788,6 @@ namespace LOFAR else LOG_DEBUG_STR("[" << id << "] Done"); } - // report statistics - if (ps.settings.correlator.enabled) { - const float didNotReceivePerc = 0.0; // no output loss possible so far - const float didNotSendPerc = 100.0 * correlatorLoss.blocksDropped / ps.settings.correlator.nrIntegrations; - const float lostPerc = didNotSendPerc - didNotReceivePerc; - - if (lostPerc > 0) - LOG_ERROR_STR("[Correlator] I lost " << lostPerc << "% of the data for subband " << globalSubbandIdx << " to " << ps.settings.correlator.files[globalSubbandIdx].location.host); - if (didNotSendPerc > 0) - LOG_WARN_STR("[Correlator] Did not send " << didNotSendPerc << "% of the data for subband " << globalSubbandIdx << " to " << ps.settings.correlator.files[globalSubbandIdx].location.host); - } // Per-subband loss of the BeamFormer is not really relevant due to the 2nd transpose to OutputProc. // Instead, we report loss in the TABTranspose::MultiSender. @@ -814,6 +804,8 @@ namespace LOFAR ps.settings.blockDuration(), true, true); + size_t blocksWritten = 0; + // Register our thread to be killable at exit OMPThreadSet::ScopedRun sr(outputThreads); @@ -848,8 +840,10 @@ namespace LOFAR // Write block to outputProc writeTimer.start(); try { - for (size_t i = 0; i < data->correlatedData.subblocks.size(); ++i) + for (size_t i = 0; i < data->correlatedData.subblocks.size(); ++i) { data->correlatedData.subblocks[i]->write(outputStream.get(), true); + blocksWritten++; + } } catch (Exception &ex) { // No reconnect, as outputProc doesn't yet re-listen when the conn drops. LOG_ERROR_STR("Error writing subband " << id.globalSubbandIdx << ", dropping all subsequent blocks: " << ex.what()); @@ -866,6 +860,18 @@ namespace LOFAR outputQueue.append(data); ASSERT(!data); } + + if (ps.settings.correlator.enabled) { + // Report final loss figures + const float didNotReceivePerc = 0.0; // no output loss possible so far, we account for blocks lost in writeBeamformedOutput as well + const float didNotSendPerc = 100.0 * (ps.settings.correlator.nrIntegrations - blocksWritten) / ps.settings.correlator.nrIntegrations; + const float lostPerc = didNotSendPerc - didNotReceivePerc; + + if (lostPerc > 0) + LOG_ERROR_STR("[Correlator] I lost " << lostPerc << "% of the data for subband " << globalSubbandIdx << " to " << ps.settings.correlator.files[globalSubbandIdx].location.host); + if (didNotSendPerc > 0) + LOG_WARN_STR("[Correlator] Did not send " << didNotSendPerc << "% of the data for subband " << globalSubbandIdx << " to " << ps.settings.correlator.files[globalSubbandIdx].location.host); + } } } }