diff --git a/RTCP/Cobalt/CoInterface/src/BudgetTimer.cc b/RTCP/Cobalt/CoInterface/src/BudgetTimer.cc index 4d57b3333831afbc59dd81b3f24d971afd5e712b..4011c61bef825f701016890cda6cfc228dbfe93c 100644 --- a/RTCP/Cobalt/CoInterface/src/BudgetTimer.cc +++ b/RTCP/Cobalt/CoInterface/src/BudgetTimer.cc @@ -68,7 +68,7 @@ namespace LOFAR { const double realTimePerc = 100.0 * elapsed / budget; if (elapsed > 2 * budget) - LOG_ERROR_STR("Run-time budget exceeded: " << itsName << " ran at " << realTimePerc << "% (took " << elapsed << " s, budget is " << budget << " s)"); + LOG_WARN_STR("Run-time budget exceeded: " << itsName << " ran at " << realTimePerc << "% (took " << elapsed << " s, budget is " << budget << " s)"); else LOG_DEBUG_STR("Run-time budget exceeded: " << itsName << " ran at " << realTimePerc << "% (took " << elapsed << " s, budget is " << budget << " s)"); } diff --git a/RTCP/Cobalt/CoInterface/src/StreamableData.h b/RTCP/Cobalt/CoInterface/src/StreamableData.h index 5fe7ec3e223452ab75a27588744030f78770be4b..08c416dfbcddd86bea4a32ac332b34eeeb837c2c 100644 --- a/RTCP/Cobalt/CoInterface/src/StreamableData.h +++ b/RTCP/Cobalt/CoInterface/src/StreamableData.h @@ -90,6 +90,12 @@ namespace LOFAR { } + // Fraction of data that was lost due to output bottlenecks + virtual double outputLossFraction() const + { + return 0.0; + } + uint32_t peerMagicNumber; /// magic number received from peer protected: diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc index a8bec0461f793c7ef9863845c9b10c69ed1494fd..3b6fbfea096983c1fd9edbda0e1f400a98fe3a35 100644 --- a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc +++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc @@ -212,6 +212,14 @@ void Block::write( BeamformedData &output ) { } } + // Update flags + for (size_t sb = 0; sb < subbandCache.size(); sb++) { + output.flags[sb].reset(); + + if (!subbandCache[sb]) + output.flags[sb].include(0, nrSamples); + } + // Report summary if (complete()) LOG_DEBUG_STR("[block " << blockIdx << " stream " << fileIdx << "] [Block] Written " << (nrSubbands - nrSubbandsLeft) << " subbands, lost " << nrSubbandsLeft << " subbands."); @@ -227,9 +235,9 @@ bool Block::complete() const { // The BlockCollector collects blocks from different rtcp processes for a TAB. // More precisely, we have one BlockCollector per file (i.e. part). -BlockCollector::BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx, size_t nrSubbands, size_t nrChannels, size_t nrSamples, size_t nrBlocks, size_t maxBlocksInFlight ) +BlockCollector::BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx, size_t nrSubbands, size_t nrChannels, size_t nrSamples, size_t nrBlocks, size_t maxBlocksInFlight, const std::string &logPrefix ) : - logPrefix(str(format("[stream %u] [BlockCollector] ") % fileIdx)), + logPrefix(logPrefix + "[BlockCollector] "), nrFullBlocksLost(0), nrBlockSubbandsLost(0), @@ -374,11 +382,11 @@ void BlockCollector::finish() { const float tooLatePerc = 100.0 * nrBlockSubbandsTooLate / nrSubbands / nrBlocks; if (didNotReceivePerc > 0) - LOG_ERROR_STR(logPrefix << "Did not receive " << didNotReceivePerc << "% of the data"); + LOG_WARN_STR(logPrefix << "Did not receive " << didNotReceivePerc << "% of the data"); if (lostPerc > 0) LOG_ERROR_STR(logPrefix << "I lost " << lostPerc << "% of the data"); if (didNotSendPerc > 0) - LOG_ERROR_STR(logPrefix << "Did not send " << didNotSendPerc << "% of the data"); + LOG_WARN_STR(logPrefix << "Did not send " << didNotSendPerc << "% of the data"); if (tooLatePerc > 0) LOG_ERROR_STR(logPrefix << "Received " << tooLatePerc << "% of the data too late. Consider increasing maxBlocksInFlight."); } @@ -635,8 +643,8 @@ MultiSender::MultiSender( const HostMap &hostMap, const Parset &parset, if(find(hosts.begin(), hosts.end(), i->second) == hosts.end()) hosts.push_back(i->second); - // each file gets a drop_rate counter - drop_rates[i->first] = RunningStatistics("%"); + // each host gets a drop_rate counter + drop_rates[i->second] = RunningStatistics("%"); } for (vector<struct Host>::const_iterator i = hosts.begin(); i != hosts.end(); ++i) { @@ -647,17 +655,32 @@ MultiSender::MultiSender( const HostMap &hostMap, const Parset &parset, MultiSender::~MultiSender() { - LOG_DEBUG_STR("MultiSender: realTime = " << itsParset.settings.realTime << ", maxRetentionTime = " << maxRetentionTime); - for (HostMap::const_iterator i = hostMap.begin(); i != hostMap.end(); ++i) { - const float didNotReceivePerc = 0.0; // we receive everythnig - const float didNotSendPerc = drop_rates.at(i->first).mean(); - const float lostPerc = didNotSendPerc - didNotReceivePerc; + // at this point, process() has finished, either gracefully or with an exception + + // drain queues in case process() did not + for (std::map<Host, QueuePtr>::iterator i = queues.begin(); i != queues.end(); ++i) { + QueuePtr &queue = i->second; + + while (!queue->empty()) { + (void)queue->remove(); + + // this is loss, obviously + drop_rates.at(i->first).push(100.0); + } + } + + // report statistics + for (std::map<Host, RunningStatistics>::const_iterator i = drop_rates.begin(); i != drop_rates.end(); ++i) { + const float didNotReceivePerc = 0.0; // no output loss possible so far + const float lostPerc = i->second.mean(); + const float didNotSendPerc = didNotReceivePerc + lostPerc; if (lostPerc > 0) - LOG_ERROR_STR("[stream " << i->first << "] [BeamFormer] I lost " << lostPerc << "% of the data to " << i->second.hostName); + LOG_ERROR_STR("[BeamFormer] I lost " << lostPerc << "% of the data to " << i->first.hostName); if (didNotSendPerc > 0) - LOG_ERROR_STR("[stream " << i->first << "] [BeamFormer] Did not send " << didNotSendPerc << "% of the data to " << i->second.hostName); + LOG_WARN_STR("[BeamFormer] Did not send " << lostPerc << "% of the data to " << i->first.hostName); } + } @@ -685,7 +708,7 @@ void MultiSender::process( OMPThreadSet *threadSet ) LOG_DEBUG_STR(logPrefix << "Connected"); - SmartPtr< Queue< SmartPtr<struct Subband> > > &queue = queues.at(host); + QueuePtr &queue = queues.at(host); SmartPtr<struct Subband> subband; NSTimer sendTimer(str(format("Send Subband to %s") % host.hostName), true, true); @@ -716,13 +739,13 @@ bool MultiSender::append( SmartPtr<struct Subband> &subband ) const size_t fileIdx = subband->id.fileIdx; const struct Host &host = hostMap.at(fileIdx); - SmartPtr< Queue< SmartPtr<struct Subband> > > &queue = queues.at(host); + QueuePtr &queue = queues.at(host); bool dropped = false; // If oldest packet in queue is too old, drop it in lieu of this new one if (itsParset.settings.realTime && TimeSpec::now() - queue->oldest() > maxRetentionTime) { - drop_rates.at(fileIdx).push(100.0); + drop_rates.at(host).push(100.0); // remove oldest item SmartPtr<struct Subband> subband = queue->remove(); @@ -732,7 +755,7 @@ bool MultiSender::append( SmartPtr<struct Subband> &subband ) dropped = true; } else { - drop_rates.at(fileIdx).push(0.0); + drop_rates.at(host).push(0.0); } // Append the data to the respective queue diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.h b/RTCP/Cobalt/CoInterface/src/TABTranspose.h index 79b43cad28873101dff567edd0c23673af8ee579..46f9466d34640537ccaebf814abb9f5fb7366b8b 100644 --- a/RTCP/Cobalt/CoInterface/src/TABTranspose.h +++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.h @@ -67,7 +67,33 @@ namespace LOFAR std::ostream &operator<<(std::ostream &str, const Subband::BlockID &id); - typedef SampleData<float, 3> BeamformedData; // data: [nrSubbands][nrChannels][nrSamples] + class BeamformedData: public SampleData<float, 3, 1> { + public: + BeamformedData(size_t nrSubbands, size_t nrChannels, size_t nrSamples, Allocator &allocator = heapAllocator): + // data: [nrSubbands][nrChannels][nrSamples] + // flags: [nrSubbands], and encodes *output data loss*, not flagged input + SampleData<float, 3, 1>(boost::extents[nrSamples][nrSubbands][nrChannels], + boost::extents[nrSubbands], + allocator), + nrSubbands(nrSubbands), + nrChannels(nrChannels), + nrSamples(nrSamples) + { + } + + // fraction of data that is lost due to output bottlenecks + virtual double outputLossFraction() const { + size_t nrFlaggedSamples = 0; + + for (size_t i = 0; i < flags.shape()[0]; i++) + nrFlaggedSamples += flags[i].count(); + + return 1.0 * nrFlaggedSamples / (nrSubbands * nrSamples); + } + + private: + const size_t nrSubbands, nrChannels, nrSamples; + }; /* * A block of data, representing for one time slice all @@ -156,7 +182,7 @@ namespace LOFAR * maxBlocksInFlight: the maximum number of blocks to process in * parallel (or 0 for no limit). */ - BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx, size_t nrSubbands, size_t nrChannels, size_t nrSamples, size_t nrBlocks = 0, size_t maxBlocksInFlight = 0 ); + BlockCollector( Pool<BeamformedData> &outputPool, size_t fileIdx, size_t nrSubbands, size_t nrChannels, size_t nrSamples, size_t nrBlocks = 0, size_t maxBlocksInFlight = 0, const std::string &logPrefix = "" ); ~BlockCollector(); @@ -390,7 +416,7 @@ namespace LOFAR const Parset &itsParset; - std::map<size_t, RunningStatistics> drop_rates; // [fileIdx] + std::map<Host, RunningStatistics> drop_rates; // MultiSender has a queue per host it sends to. If it appends an element // to a queue, it will discard the head if it is older than maxRententionTime. @@ -406,7 +432,8 @@ namespace LOFAR std::vector<struct Host> hosts; // A queue for data to be sent to each host - std::map<struct Host, SmartPtr< Queue< SmartPtr<struct Subband> > > > queues; + typedef SmartPtr< Queue< SmartPtr<struct Subband> > > QueuePtr; + std::map<struct Host, QueuePtr> queues; }; } // namespace TABTranspose diff --git a/RTCP/Cobalt/CoInterface/test/tTABTranspose.cc b/RTCP/Cobalt/CoInterface/test/tTABTranspose.cc index d343f370bcbbf5d714f36b2119edff1973ddca19..170110c90c13db8629bb2e35c0cdec50a3e67a77 100644 --- a/RTCP/Cobalt/CoInterface/test/tTABTranspose.cc +++ b/RTCP/Cobalt/CoInterface/test/tTABTranspose.cc @@ -114,9 +114,7 @@ SUITE(Block) { block.addSubband(subband); } - BeamformedData output( - boost::extents[nrSamples][nrSubbands][nrChannels], - boost::extents[nrSubbands][nrChannels]); + BeamformedData output(nrSubbands, nrChannels, nrSamples); NSTimer transposeTimer(str(format("Block::write for %u subbands, %u channels, %u samples") % nrSubbands % nrChannels % nrSamples), true, true); transposeTimer.start(); @@ -143,9 +141,7 @@ struct Fixture { ctr(outputPool, 0, nrSubbands, nrChannels, nrSamples) { for (size_t i = 0; i < nrBlocks; ++i) { - outputPool.free.append(new BeamformedData( - boost::extents[nrSamples][nrSubbands][nrChannels], - boost::extents[nrSubbands][nrChannels]), false); + outputPool.free.append(new BeamformedData(nrSubbands, nrChannels, nrSamples), false); } } }; @@ -405,9 +401,7 @@ SUITE(SendReceive) { for (size_t i = 0; i < nrTABs; ++i) { outputPools[i] = new Pool<BeamformedData>(str(format("OneToOne::outputPool[%u]") % i), true); for (size_t b = 0; b < nrBlocks; ++b) { - outputPools[i]->free.append(new BeamformedData( - boost::extents[nrSamples][nrSubbands][nrChannels], - boost::extents[nrSubbands][nrChannels]), false); + outputPools[i]->free.append(new BeamformedData(nrSubbands, nrChannels, nrSamples), false); } collectors[i] = new BlockCollector(*outputPools[i], i, nrSubbands, nrChannels, nrSamples); @@ -593,9 +587,7 @@ SUITE(MultiReceiver) { outputPools[t] = new Pool<BeamformedData>(str(format("MultiReceiver::Transpose::outputPool[%u]") % t), true); for (size_t i = 0; i < nrBlocks; ++i) { - outputPools[t]->free.append(new BeamformedData( - boost::extents[nrSamples][nrSubbands][nrChannels], - boost::extents[nrSubbands][nrChannels]), false); + outputPools[t]->free.append(new BeamformedData(nrSubbands, nrChannels, nrSamples), false); } collectors[FILEIDX(t)] = new BlockCollector(*outputPools[t], FILEIDX(t), nrSubbands, nrChannels, nrSamples, nrBlocks); } diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc index 67e24f2d7b520a7a1c15436d2138e7743b5c1664..ff8c7fb34decfdace5f4bdbff3a40f19f8fb3303 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc @@ -794,21 +794,13 @@ namespace LOFAR const float lostPerc = didNotSendPerc - didNotReceivePerc; if (lostPerc > 0) - LOG_ERROR_STR("[Correlator] I lost " << lostPerc << "% of the data for stream " << globalSubbandIdx << " to " << ps.settings.correlator.files[globalSubbandIdx].location.host); + LOG_ERROR_STR("[Correlator] I lost " << lostPerc << "% of the data for subband " << globalSubbandIdx << " to " << ps.settings.correlator.files[globalSubbandIdx].location.host); if (didNotSendPerc > 0) - LOG_ERROR_STR("[Correlator] Did not send " << didNotSendPerc << "% of the data for stream " << globalSubbandIdx << " to " << ps.settings.correlator.files[globalSubbandIdx].location.host); + LOG_WARN_STR("[Correlator] Did not send " << didNotSendPerc << "% of the data for subband " << globalSubbandIdx << " to " << ps.settings.correlator.files[globalSubbandIdx].location.host); } - if (ps.settings.beamFormer.enabled) { - const float didNotReceivePerc = 0.0; // no output loss possible so far - const float didNotSendPerc = 100.0 * beamFormerLoss.blocksDropped / ps.settings.nrBlocks() / multiSender.nrFiles(); - const float lostPerc = didNotSendPerc - didNotReceivePerc; - - if (lostPerc > 0) - LOG_ERROR_STR("[BeamFormer] I lost " << lostPerc << "% of the data of subband " << globalSubbandIdx); - if (didNotSendPerc > 0) - LOG_ERROR_STR("[BeamFormer] Did not send " << didNotSendPerc << "% of the subband " << globalSubbandIdx); - } + // Per-subband loss of the BeamFormer is not really relevant due to the 2nd transpose to OutputProc. + // Instead, we report loss in the TABTranspose::MultiSender. } diff --git a/RTCP/Cobalt/GPUProc/src/scripts/watchlogs-multitail.conf b/RTCP/Cobalt/GPUProc/src/scripts/watchlogs-multitail.conf index bba6f803801da3824a3882854451050b9a72fd74..3aa3a05a1c9b82defce4c781b59e95d82eaeb714 100644 --- a/RTCP/Cobalt/GPUProc/src/scripts/watchlogs-multitail.conf +++ b/RTCP/Cobalt/GPUProc/src/scripts/watchlogs-multitail.conf @@ -17,8 +17,10 @@ rule:ev:INFO RTCP.Cobalt.GPUProc - Pipeline: Data flushed succesfully. rule:ev:INFO RTCP.Cobalt.CoInterface - Block: written .* subbands, lost 0 subbands. rule:ev:\[Timer.h:[0-9]+\] -rule:ev:INFO RTCP.Cobalt.GPUProc - Queue -rule:ev:INFO RTCP.Cobalt.GPUProc - BestEffortQueue +rule:ev:\[BudgetTimer.cc:[0-9]+\] +rule:ev:\[PerformanceCounter.cc:[0-9]+\] +rule:ev:INFO RTCP.Cobalt.(GPUProc|OutputProc|CoInterface) - Queue +rule:ev:INFO RTCP.Cobalt.(GPUProc|OutputProc|CoInterface) - BestEffortQueue # ----- remove superfluous flagging lines (since all nodes receive all RSP boards in practice) rule:ev:^rtcp\:0[1-9]+.*Flagging diff --git a/RTCP/Cobalt/InputProc/src/Station/PacketReader.cc b/RTCP/Cobalt/InputProc/src/Station/PacketReader.cc index 1047c1fa2d41a02dcd1ccff83f3b7a76090ca0b0..2eeb2fa2741547c8a0dfce2b0891b28523cfc75f 100644 --- a/RTCP/Cobalt/InputProc/src/Station/PacketReader.cc +++ b/RTCP/Cobalt/InputProc/src/Station/PacketReader.cc @@ -50,6 +50,11 @@ namespace LOFAR nrBadTime(0), nrBadData(0), nrBadOther(0), + totalNrReceived(0), + totalNrBadMode(0), + totalNrBadTime(0), + totalNrBadData(0), + totalNrBadOther(0), hadSizeError(false), lastLogTime(0) { @@ -66,6 +71,25 @@ namespace LOFAR } } + PacketReader::~PacketReader() + { + const size_t totalNrDiscarded = totalNrBadMode + totalNrBadTime + totalNrBadData + totalNrBadOther; + const float lostPerc = totalNrReceived == 0.0 ? 0.0 : 100.0 * totalNrDiscarded / totalNrReceived; + + if (lostPerc > 0) { + LOG_WARN_STR(logPrefix << "Total discarded packets is " << lostPerc << "%."); + + if (totalNrBadMode > 0) + LOG_WARN_STR(logPrefix << "Total discarded packets due to bad clock/bitmode is " << (100.0 * totalNrBadMode / totalNrReceived) << "%."); + if (totalNrBadTime > 0) + LOG_WARN_STR(logPrefix << "Total discarded packets due to bad timestamps is " << (100.0 * totalNrBadTime / totalNrReceived) << "%."); + if (totalNrBadData > 0) + LOG_WARN_STR(logPrefix << "Total discarded packets due to payload errors is " << (100.0 * totalNrBadData / totalNrReceived) << "%."); + if (totalNrBadOther > 0) + LOG_WARN_STR(logPrefix << "Total discarded packets due to other reasons is " << (100.0 * totalNrBadOther / totalNrReceived) << "%."); + } + } + void PacketReader::readPackets( std::vector<struct RSP> &packets ) { @@ -192,6 +216,13 @@ namespace LOFAR mdLogger.log(mdKeyPrefix + streamStr + ".rejected", (int)round(nrBad / interval)); + // Update totals + totalNrReceived += nrReceived; + totalNrBadTime += nrBadTime; + totalNrBadMode += nrBadMode; + totalNrBadData += nrBadData; + totalNrBadOther += nrBadOther; + // Reset counters nrReceived = 0; nrBadTime = 0; diff --git a/RTCP/Cobalt/InputProc/src/Station/PacketReader.h b/RTCP/Cobalt/InputProc/src/Station/PacketReader.h index 2140a4c27172e4ab66cfc089e19976f81e55239c..1c943a3e7cc96b863d351cff52429e32b0d09cee 100644 --- a/RTCP/Cobalt/InputProc/src/Station/PacketReader.h +++ b/RTCP/Cobalt/InputProc/src/Station/PacketReader.h @@ -48,6 +48,8 @@ namespace LOFAR PacketReader( const std::string &logPrefix, Stream &inputStream, const BoardMode &mode = MODE_ANY ); + ~PacketReader(); + // Reads a set of packets from the input stream. Sets the payloadError // flag for all invalid packets. void readPackets( std::vector<struct RSP> &packets ); @@ -81,6 +83,12 @@ namespace LOFAR size_t nrBadData; // nr. of packets with payload errors size_t nrBadOther; // nr. of packets that are bad in another fashion (illegal header, packet size, etc) + size_t totalNrReceived; + size_t totalNrBadMode; + size_t totalNrBadTime; + size_t totalNrBadData; + size_t totalNrBadOther; + bool hadSizeError; // already reported about wrongly sized packets since last logStatistics() double lastLogTime; // time since last log print, to monitor data rates diff --git a/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc b/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc index 649be8f700208df377b46d165e5f706c70154e9e..09fd05e16db7b5668e87b90da74ae9ddd88d8bfb 100644 --- a/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc +++ b/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc @@ -169,17 +169,17 @@ bool process(Stream &controlStream) continue; } - LOG_INFO_STR("starting with fileIdx " << fileIdx); + LOG_DEBUG_STR("starting with fileIdx " << fileIdx); mdLogger.log(mdKeyPrefix + PN_COP_STORAGE_HOST + '[' + lexical_cast<string>(fileIdx) + ']', myHostName); - string logPrefix = str(format("[obs %u correlated stream %3u] ") - % parset.settings.observationID % fileIdx); + string logPrefix = str(format("[stream %3u file %s] ") + % fileIdx % file.location.filename); SubbandWriter *writer = new SubbandWriter(parset, fileIdx, mdLogger, mdKeyPrefix, logPrefix); subbandWriters.push_back(writer); - LOG_INFO_STR("done with fileIdx " << fileIdx); + LOG_DEBUG_STR("done with fileIdx " << fileIdx); } } @@ -201,7 +201,7 @@ bool process(Stream &controlStream) const unsigned allFileIdx = fileIdx + parset.settings.correlator.files.size(); mdLogger.log(mdKeyPrefix + PN_COP_STORAGE_HOST + '[' + lexical_cast<string>(allFileIdx) + ']', myHostName); - LOG_INFO_STR("Allocating transpose buffers for " << file.location.filename); + LOG_DEBUG_STR("Allocating transpose buffers for " << file.location.filename); struct ObservationSettings::BeamFormer::StokesSettings &stokes = file.coherent ? parset.settings.beamFormer.coherentSettings @@ -217,27 +217,22 @@ bool process(Stream &controlStream) arenas[fileIdx] = new MallocedArena(poolSize * (MultiDimArray<float,3>::nrElements(boost::extents[nrSamples][nrSubbands][nrChannels]) * sizeof(float) + alignment), alignment); allocators[fileIdx] = new SparseSetAllocator(*arenas[fileIdx]); - outputPools[fileIdx] = new Pool<TABTranspose::BeamformedData>(str(format("process::outputPool [file %u]") % fileIdx), parset.settings.realTime); + outputPools[fileIdx] = new Pool<TABTranspose::BeamformedData>(str(format("process::outputPool [stream %u]") % fileIdx), parset.settings.realTime); // Create and fill an outputPool for this fileIdx - for (size_t i = 0; i < poolSize; ++i) { - outputPools[fileIdx]->free.append(new TABTranspose::BeamformedData( - boost::extents[nrSamples][nrSubbands][nrChannels], - boost::extents[nrSubbands][nrChannels], - *allocators[fileIdx] - ), false); - } + for (size_t i = 0; i < poolSize; ++i) + outputPools[fileIdx]->free.append(new TABTranspose::BeamformedData(nrSubbands, nrChannels, nrSamples, *allocators[fileIdx]), false); + + string logPrefix = str(format("[stream %3u file %s] ") + % fileIdx % file.location.filename); // Create a collector for this fileIdx // #blocks here is the number of blocks that can be collected in parallel on // the input side (what the correlator sends) collectors[fileIdx] = new TABTranspose::BlockCollector( - *outputPools[fileIdx], fileIdx, nrSubbands, nrChannels, nrSamples, parset.settings.nrBlocks(), parset.settings.realTime ? 5 : 0); - - string logPrefix = str(format("[obs %u beamformed stream %3u] ") - % parset.settings.observationID % fileIdx); + *outputPools[fileIdx], fileIdx, nrSubbands, nrChannels, nrSamples, parset.settings.nrBlocks(), parset.settings.realTime ? 5 : 0, logPrefix); - LOG_INFO_STR("Setting up writer for " << file.location.filename); + LOG_DEBUG_STR("Setting up writer for " << file.location.filename); TABOutputThread *writer = new TABOutputThread(parset, fileIdx, *outputPools[fileIdx], mdLogger, mdKeyPrefix, logPrefix); tabWriters.push_back(writer); diff --git a/RTCP/Cobalt/OutputProc/src/InputThread.cc b/RTCP/Cobalt/OutputProc/src/InputThread.cc index d53c81bac9d82ca51bc1f01c93e2a459afb31d4c..6adfefd062dfb3bc3562c31a7f639914ef843d3c 100644 --- a/RTCP/Cobalt/OutputProc/src/InputThread.cc +++ b/RTCP/Cobalt/OutputProc/src/InputThread.cc @@ -75,9 +75,9 @@ namespace LOFAR const float didNotSendPerc = didNotReceivePerc; if (didNotReceivePerc > 0) - LOG_ERROR_STR(itsLogPrefix << "Did not receive " << didNotReceivePerc << "% of the data"); + LOG_WARN_STR(itsLogPrefix << "Did not receive " << didNotReceivePerc << "% of the data"); if (didNotSendPerc > 0) - LOG_ERROR_STR(itsLogPrefix << "Did not send " << didNotSendPerc << "% of the data"); + LOG_WARN_STR(itsLogPrefix << "Did not send " << didNotSendPerc << "% of the data"); // Append end-of-stream marker itsOutputPool.filled.append(NULL); diff --git a/RTCP/Cobalt/OutputProc/src/MSWriterDAL.cc b/RTCP/Cobalt/OutputProc/src/MSWriterDAL.cc index 8b4aed8d6a14b233ce9ba91b04bd4436d027a3d9..6bf21ba685d6aa00dc89ed3d6395045429b19c5e 100644 --- a/RTCP/Cobalt/OutputProc/src/MSWriterDAL.cc +++ b/RTCP/Cobalt/OutputProc/src/MSWriterDAL.cc @@ -78,8 +78,8 @@ namespace LOFAR // uses global locks too anyway. static Mutex HDF5Mutex; - template <typename T,unsigned DIM> - MSWriterDAL<T,DIM>::MSWriterDAL (const string &filename, + template <typename T,unsigned DIM, unsigned FLAGS_DIM> + MSWriterDAL<T,DIM,FLAGS_DIM>::MSWriterDAL (const string &filename, const Parset &parset, unsigned fileno) : @@ -113,8 +113,8 @@ namespace LOFAR itsConfigurationPrefix = fb.beamFormedPrefix(itsFileNr); } - template <typename T,unsigned DIM> - void MSWriterDAL<T,DIM>::init() + template <typename T,unsigned DIM, unsigned FLAGS_DIM> + void MSWriterDAL<T,DIM,FLAGS_DIM>::init() { string h5filename = forceextension(itsFilename, ".h5"); string rawfilename = forceextension(itsFilename, ".raw"); @@ -451,15 +451,15 @@ namespace LOFAR stokesDS.nofSamples().value = dims[0]; } - template <typename T,unsigned DIM> - MSWriterDAL<T,DIM>::~MSWriterDAL() + template <typename T,unsigned DIM, unsigned FLAGS_DIM> + MSWriterDAL<T,DIM,FLAGS_DIM>::~MSWriterDAL() { } - template <typename T,unsigned DIM> - void MSWriterDAL<T,DIM>::write(StreamableData *data) + template <typename T,unsigned DIM, unsigned FLAGS_DIM> + void MSWriterDAL<T,DIM,FLAGS_DIM>::write(StreamableData *data) { - SampleData<T,DIM> *sdata = dynamic_cast<SampleData<T,DIM> *>(data); + SampleData<T,DIM,FLAGS_DIM> *sdata = dynamic_cast<SampleData<T,DIM,FLAGS_DIM> *>(data); ASSERT( data ); ASSERT( sdata ); diff --git a/RTCP/Cobalt/OutputProc/src/MSWriterDAL.h b/RTCP/Cobalt/OutputProc/src/MSWriterDAL.h index 8f9c552cbde8a5aa01535de8fdab279e0c7b43ab..ca25c939e1f5bda3f25865a58bbe533d31ac5265 100644 --- a/RTCP/Cobalt/OutputProc/src/MSWriterDAL.h +++ b/RTCP/Cobalt/OutputProc/src/MSWriterDAL.h @@ -38,7 +38,7 @@ namespace LOFAR namespace Cobalt { - template<typename T, unsigned DIM> + template<typename T, unsigned DIM, unsigned FLAGS_DIM=1> class MSWriterDAL : public MSWriterFile { public: diff --git a/RTCP/Cobalt/OutputProc/src/OutputThread.cc b/RTCP/Cobalt/OutputProc/src/OutputThread.cc index 5dc58114d21aa178f98c381acfd5fbe1656be921..4f8c168a92c65a8dc79a0d0d23006d5e38669f61 100644 --- a/RTCP/Cobalt/OutputProc/src/OutputThread.cc +++ b/RTCP/Cobalt/OutputProc/src/OutputThread.cc @@ -70,6 +70,8 @@ namespace LOFAR itsTargetDirectory(targetDirectory), itsBlocksWritten(0), itsBlocksDropped(0), + itsBlocksDroppedByMe(0), + itsFractionalBlocksWritten(0.0), itsNrExpectedBlocks(0), itsNextSequenceNumber(0), itsBlockDuration(parset.settings.blockDuration()), @@ -104,6 +106,7 @@ namespace LOFAR itsNextSequenceNumber = data->sequenceNumber() + 1; itsBlocksWritten++; + itsFractionalBlocksWritten += 1.0 - data->outputLossFraction(); // doubles have enough precision for this to go well itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPING + streamNrStr, droppedBlocks > 0); // logged too late if dropping: not anymore... @@ -127,6 +130,7 @@ namespace LOFAR itsWriter->write(data); } catch (SystemCallException &ex) { LOG_WARN_STR(itsLogPrefix << "OutputThread caught non-fatal exception: " << ex.what()); + itsBlocksDroppedByMe++; continue; } } else { // no try/catch: any loss (e.g. disk full) is fatal in non-real-time mode @@ -136,7 +140,7 @@ namespace LOFAR checkForDroppedData(data); // print debug info for the other blocks - LOG_DEBUG_STR(itsLogPrefix << "Written block with seqno = " << data->sequenceNumber() << ", " << itsBlocksWritten << " blocks written (" << itsWriter->percentageWritten() << "%), " << itsBlocksDropped << " blocks dropped"); + LOG_DEBUG_STR(itsLogPrefix << "Written block with seqno = " << data->sequenceNumber() << "(which was " << (100.0 - 100.0 * data->outputLossFraction()) << "% complete), " << itsBlocksWritten << " blocks written, " << itsBlocksDropped << " blocks not received"); } } @@ -164,9 +168,24 @@ namespace LOFAR template<typename T> void OutputThread<T>::cleanUp() const { - float dropPercent = itsBlocksWritten + itsBlocksDropped == 0 ? 0.0 : (100.0 * itsBlocksDropped) / (itsBlocksWritten + itsBlocksDropped); - - LOG_INFO_STR(itsLogPrefix << "Finished writing: " << itsBlocksWritten << " blocks written (" << itsWriter->percentageWritten() << "%), " << itsBlocksDropped << " blocks dropped: " << std::setprecision(3) << dropPercent << "% lost" ); + // report statistics + const float lostPerc = 100.0 * itsBlocksDroppedByMe / itsNrExpectedBlocks; + const float didNotSendPerc = itsNrExpectedBlocks == 0 ? 0.0 : (100.0 - 100.0 * itsFractionalBlocksWritten) / itsNrExpectedBlocks; + const float didNotReceivePerc = didNotSendPerc + lostPerc; + + if (didNotReceivePerc > 0) + LOG_WARN_STR(itsLogPrefix << "Did not receive " << didNotReceivePerc << "% of the data"); + if (lostPerc > 0) + LOG_ERROR_STR(itsLogPrefix << "I lost " << lostPerc << "% of the data"); + if (didNotSendPerc > 0) + LOG_WARN_STR(itsLogPrefix << "Did not send " << didNotSendPerc << "% of the data"); + + LOG_INFO_STR(itsLogPrefix << "Finished writing " << itsBlocksWritten << " blocks, dropped " << itsBlocksDropped << " blocks."); + + if (didNotSendPerc > 0) + LOG_ERROR_STR(itsLogPrefix << "Total output data loss is " << didNotSendPerc << "%."); + else + LOG_INFO_STR(itsLogPrefix << "Total output data loss is " << didNotSendPerc << "%."); } @@ -357,7 +376,7 @@ namespace LOFAR LOG_INFO_STR(itsLogPrefix << "Writing to " << path); #ifdef HAVE_DAL - itsWriter = new MSWriterDAL<float,3>(path, itsParset, itsStreamNr); + itsWriter = new MSWriterDAL<float,3,1>(path, itsParset, itsStreamNr); #else itsWriter = new MSWriterFile(path); #endif diff --git a/RTCP/Cobalt/OutputProc/src/OutputThread.h b/RTCP/Cobalt/OutputProc/src/OutputThread.h index b272680ba0c4bb8cba890402744745e134a87b37..5264e3dd150a37ee7446abca7752e690bda957d1 100644 --- a/RTCP/Cobalt/OutputProc/src/OutputThread.h +++ b/RTCP/Cobalt/OutputProc/src/OutputThread.h @@ -94,7 +94,8 @@ namespace LOFAR const std::string itsLogPrefix; const std::string itsTargetDirectory; - size_t itsBlocksWritten, itsBlocksDropped; + size_t itsBlocksWritten, itsBlocksDropped, itsBlocksDroppedByMe; + double itsFractionalBlocksWritten; size_t itsNrExpectedBlocks; size_t itsNextSequenceNumber; double itsBlockDuration; // seconds diff --git a/RTCP/Cobalt/OutputProc/test/tMSWriterDAL.cc b/RTCP/Cobalt/OutputProc/test/tMSWriterDAL.cc index c15a2b1a03a621a5f56c7ec603a12c49bc60c43d..3dfc126944eafcea2a95e7000ade4a0f1ec9c231 100644 --- a/RTCP/Cobalt/OutputProc/test/tMSWriterDAL.cc +++ b/RTCP/Cobalt/OutputProc/test/tMSWriterDAL.cc @@ -52,9 +52,9 @@ int main() const size_t nrSubbands = parset.settings.SAPs[file.sapNr].subbands.size(); - SampleData<float,3,2> data( + SampleData<float,3,1> data( boost::extents[sset.nrSamples][nrSubbands][sset.nrChannels], - boost::extents[nrSubbands][sset.nrChannels]); + boost::extents[nrSubbands]); memset(data.samples.origin(), 0, data.samples.num_elements() * sizeof *data.samples.origin());