diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc index 64e498f06d6507c2b2b4260243c32c5e34cc483e..9bcccd49095de1afa2089061f064f72e1b8a1398 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc @@ -55,7 +55,8 @@ namespace LOFAR Pipeline(ps, subbandIndices, devices, pool, mdLogger, mdKeyPrefix), factories(ps, nrSubbandsPerSubbandProc), itsBlocksWritten(0), - itsBlocksDropped(0) + itsBlocksDropped(0), + itsNextSequenceNumber(0) { // Write data point(s) for monitoring (PVSS). itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DATA_PRODUCT_TYPE, "Correlated"); @@ -133,9 +134,6 @@ namespace LOFAR SmartPtr<Stream> outputStream = connectToOutput(globalSubbandIdx); - bool dropping = false; - size_t written = 0, dropped = 0; - SmartPtr<SubbandProcOutputData> outputData; // Process pool elements until end-of-output @@ -147,21 +145,22 @@ namespace LOFAR LOG_DEBUG_STR("[" << id << "] Writing start"); - // Write block to disk + size_t droppedBlocks = correlatedData.sequenceNumber() - itsNextSequenceNumber; + itsNextSequenceNumber = correlatedData.sequenceNumber() + 1; + + // Write block to outputProc try { correlatedData.write(outputStream.get(), true); - if (!dropping) - written += 1; - else - dropped += 1; + itsBlocksWritten += 1; } catch (Exception &ex) { + // No reconnect, as outputProc doesn't yet re-listen when the conn drops. LOG_ERROR_STR("Error writing subband " << id.globalSubbandIdx << ", dropping all subsequent blocks: " << ex.what()); outputStream = new NullStream; - dropping = true; + droppedBlocks += 1; } SubbandProc &workQueue = *workQueues[id.localSubbandIdx % workQueues.size()]; @@ -173,16 +172,15 @@ namespace LOFAR LOG_INFO_STR("[" << id << "] Done"); else LOG_DEBUG_STR("[" << id << "] Done"); - } - itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPING + '[' + lexical_cast<string>(globalSubbandIdx) + ']', - dropping ? "1" : "0"); - itsBlocksWritten += written; - itsMdLogger.log(itsMdKeyPrefix + PN_CGP_WRITTEN + '[' + lexical_cast<string>(globalSubbandIdx) + ']', - itsBlocksWritten * static_cast<float>(ps.settings.blockDuration())); - itsBlocksDropped += dropped; - itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPED + '[' + lexical_cast<string>(globalSubbandIdx) + ']', - itsBlocksDropped * static_cast<float>(ps.settings.blockDuration())); + itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPING + '[' + lexical_cast<string>(globalSubbandIdx) + ']', + droppedBlocks > 0 ? "1" : "0"); + itsBlocksDropped += droppedBlocks; + itsMdLogger.log(itsMdKeyPrefix + PN_CGP_WRITTEN + '[' + lexical_cast<string>(globalSubbandIdx) + ']', + itsBlocksWritten * static_cast<float>(ps.settings.blockDuration())); + itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPED + '[' + lexical_cast<string>(globalSubbandIdx) + ']', + itsBlocksDropped * static_cast<float>(ps.settings.blockDuration())); + } } diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.h b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.h index 518ed17a08dccee7d36881435ddf5b710bdb91ae..546a04710d4fe27fa778a14fe48d49e2806dd0ea 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.h +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.h @@ -68,6 +68,8 @@ namespace LOFAR // For monitoring (PVSS). The beamformer has these in TABTranspose. size_t itsBlocksWritten, itsBlocksDropped; + + size_t itsNextSequenceNumber; }; } } diff --git a/RTCP/Cobalt/GPUProc/src/rtcp.cc b/RTCP/Cobalt/GPUProc/src/rtcp.cc index 1aad42e16760eb6bb4fec854620c177c45ccaf2a..15bb7cf5d2387bfdde94952904e3c9677fe50d7a 100644 --- a/RTCP/Cobalt/GPUProc/src/rtcp.cc +++ b/RTCP/Cobalt/GPUProc/src/rtcp.cc @@ -193,6 +193,8 @@ int main(int argc, char **argv) #endif LOG_INFO_STR("GPUProc version " << GPUProcVersion::getVersion() << " r" << GPUProcVersion::getRevision()); + LOG_INFO("===== INIT ====="); + // Create a parameters set object based on the inputs LOG_INFO("----- Reading Parset"); Parset ps(argv[optind]); @@ -219,8 +221,6 @@ int main(int argc, char **argv) const string mdHostName = ps.getString("Cobalt.PVSSGateway.host", ""); MACIO::RTmetadata mdLogger(ps.observationID(), mdRegisterName, mdHostName); - LOG_INFO("===== INIT ====="); - #ifdef HAVE_MPI LOG_INFO_STR("MPI rank " << rank << " out of " << nrHosts << " hosts"); #else diff --git a/RTCP/Cobalt/OutputProc/src/OutputThread.cc b/RTCP/Cobalt/OutputProc/src/OutputThread.cc index cd9fc03871e6505ea9cd1b521d991bc107d878e5..68b54b54d494bae35e0cfa939c86ea83b9528f6b 100644 --- a/RTCP/Cobalt/OutputProc/src/OutputThread.cc +++ b/RTCP/Cobalt/OutputProc/src/OutputThread.cc @@ -134,7 +134,7 @@ namespace LOFAR { // TODO: check for dropped data at end of observation - unsigned droppedBlocks = data->sequenceNumber() - itsNextSequenceNumber; + size_t droppedBlocks = data->sequenceNumber() - itsNextSequenceNumber; ASSERTSTR(data->sequenceNumber() >= itsNextSequenceNumber, "Received block nr " << data->sequenceNumber() << " out of order! I expected nothing before " << itsNextSequenceNumber); @@ -151,7 +151,7 @@ namespace LOFAR itsBlocksWritten++; itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPING + '[' + lexical_cast<string>(itsStreamNr) + ']', - droppedBlocks > 0 ? "1" : "0"); // logged too late? + droppedBlocks > 0 ? "1" : "0"); // logged too late if dropping: not anymore... itsMdLogger.log(itsMdKeyPrefix + PN_COP_WRITTEN + '[' + lexical_cast<string>(itsStreamNr) + ']', itsBlocksWritten * static_cast<float>(itsParset.settings.blockDuration())); }