From c37568ff420cd1b312f928bd3485a0987b8632bc Mon Sep 17 00:00:00 2001 From: Alexander van Amesfoort <amesfoort@astron.nl> Date: Wed, 18 Jun 2014 15:05:20 +0000 Subject: [PATCH] Task #5830: address review comments. Improve data point logging in CorrelatorPipeline. Move INIT log earlier in rtcp. --- .../src/cuda/Pipelines/CorrelatorPipeline.cc | 36 +++++++++---------- .../src/cuda/Pipelines/CorrelatorPipeline.h | 2 ++ RTCP/Cobalt/GPUProc/src/rtcp.cc | 4 +-- RTCP/Cobalt/OutputProc/src/OutputThread.cc | 4 +-- 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc index 64e498f06d6..9bcccd49095 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc @@ -55,7 +55,8 @@ namespace LOFAR Pipeline(ps, subbandIndices, devices, pool, mdLogger, mdKeyPrefix), factories(ps, nrSubbandsPerSubbandProc), itsBlocksWritten(0), - itsBlocksDropped(0) + itsBlocksDropped(0), + itsNextSequenceNumber(0) { // Write data point(s) for monitoring (PVSS). itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DATA_PRODUCT_TYPE, "Correlated"); @@ -133,9 +134,6 @@ namespace LOFAR SmartPtr<Stream> outputStream = connectToOutput(globalSubbandIdx); - bool dropping = false; - size_t written = 0, dropped = 0; - SmartPtr<SubbandProcOutputData> outputData; // Process pool elements until end-of-output @@ -147,21 +145,22 @@ namespace LOFAR LOG_DEBUG_STR("[" << id << "] Writing start"); - // Write block to disk + size_t droppedBlocks = correlatedData.sequenceNumber() - itsNextSequenceNumber; + itsNextSequenceNumber = correlatedData.sequenceNumber() + 1; + + // Write block to outputProc try { correlatedData.write(outputStream.get(), true); - if (!dropping) - written += 1; - else - dropped += 1; + itsBlocksWritten += 1; } catch (Exception &ex) { + // No reconnect, as outputProc doesn't yet re-listen when the conn drops. LOG_ERROR_STR("Error writing subband " << id.globalSubbandIdx << ", dropping all subsequent blocks: " << ex.what()); outputStream = new NullStream; - dropping = true; + droppedBlocks += 1; } SubbandProc &workQueue = *workQueues[id.localSubbandIdx % workQueues.size()]; @@ -173,16 +172,15 @@ namespace LOFAR LOG_INFO_STR("[" << id << "] Done"); else LOG_DEBUG_STR("[" << id << "] Done"); - } - itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPING + '[' + lexical_cast<string>(globalSubbandIdx) + ']', - dropping ? "1" : "0"); - itsBlocksWritten += written; - itsMdLogger.log(itsMdKeyPrefix + PN_CGP_WRITTEN + '[' + lexical_cast<string>(globalSubbandIdx) + ']', - itsBlocksWritten * static_cast<float>(ps.settings.blockDuration())); - itsBlocksDropped += dropped; - itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPED + '[' + lexical_cast<string>(globalSubbandIdx) + ']', - itsBlocksDropped * static_cast<float>(ps.settings.blockDuration())); + itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPING + '[' + lexical_cast<string>(globalSubbandIdx) + ']', + droppedBlocks > 0 ? "1" : "0"); + itsBlocksDropped += droppedBlocks; + itsMdLogger.log(itsMdKeyPrefix + PN_CGP_WRITTEN + '[' + lexical_cast<string>(globalSubbandIdx) + ']', + itsBlocksWritten * static_cast<float>(ps.settings.blockDuration())); + itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPED + '[' + lexical_cast<string>(globalSubbandIdx) + ']', + itsBlocksDropped * static_cast<float>(ps.settings.blockDuration())); + } } diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.h b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.h index 518ed17a08d..546a04710d4 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.h +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.h @@ -68,6 +68,8 @@ namespace LOFAR // For monitoring (PVSS). The beamformer has these in TABTranspose. size_t itsBlocksWritten, itsBlocksDropped; + + size_t itsNextSequenceNumber; }; } } diff --git a/RTCP/Cobalt/GPUProc/src/rtcp.cc b/RTCP/Cobalt/GPUProc/src/rtcp.cc index 1aad42e1676..15bb7cf5d23 100644 --- a/RTCP/Cobalt/GPUProc/src/rtcp.cc +++ b/RTCP/Cobalt/GPUProc/src/rtcp.cc @@ -193,6 +193,8 @@ int main(int argc, char **argv) #endif LOG_INFO_STR("GPUProc version " << GPUProcVersion::getVersion() << " r" << GPUProcVersion::getRevision()); + LOG_INFO("===== INIT ====="); + // Create a parameters set object based on the inputs LOG_INFO("----- Reading Parset"); Parset ps(argv[optind]); @@ -219,8 +221,6 @@ int main(int argc, char **argv) const string mdHostName = ps.getString("Cobalt.PVSSGateway.host", ""); MACIO::RTmetadata mdLogger(ps.observationID(), mdRegisterName, mdHostName); - LOG_INFO("===== INIT ====="); - #ifdef HAVE_MPI LOG_INFO_STR("MPI rank " << rank << " out of " << nrHosts << " hosts"); #else diff --git a/RTCP/Cobalt/OutputProc/src/OutputThread.cc b/RTCP/Cobalt/OutputProc/src/OutputThread.cc index cd9fc03871e..68b54b54d49 100644 --- a/RTCP/Cobalt/OutputProc/src/OutputThread.cc +++ b/RTCP/Cobalt/OutputProc/src/OutputThread.cc @@ -134,7 +134,7 @@ namespace LOFAR { // TODO: check for dropped data at end of observation - unsigned droppedBlocks = data->sequenceNumber() - itsNextSequenceNumber; + size_t droppedBlocks = data->sequenceNumber() - itsNextSequenceNumber; ASSERTSTR(data->sequenceNumber() >= itsNextSequenceNumber, "Received block nr " << data->sequenceNumber() << " out of order! I expected nothing before " << itsNextSequenceNumber); @@ -151,7 +151,7 @@ namespace LOFAR itsBlocksWritten++; itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPING + '[' + lexical_cast<string>(itsStreamNr) + ']', - droppedBlocks > 0 ? "1" : "0"); // logged too late? + droppedBlocks > 0 ? "1" : "0"); // logged too late if dropping: not anymore... itsMdLogger.log(itsMdKeyPrefix + PN_COP_WRITTEN + '[' + lexical_cast<string>(itsStreamNr) + ']', itsBlocksWritten * static_cast<float>(itsParset.settings.blockDuration())); } -- GitLab