Skip to content
Snippets Groups Projects
Commit c37568ff authored by Alexander van Amesfoort's avatar Alexander van Amesfoort
Browse files

Task #5830: address review comments. Improve data point logging in...

Task #5830: address review comments. Improve data point logging in CorrelatorPipeline. Move INIT log earlier in rtcp.
parent b8387745
No related branches found
No related tags found
No related merge requests found
...@@ -55,7 +55,8 @@ namespace LOFAR ...@@ -55,7 +55,8 @@ namespace LOFAR
Pipeline(ps, subbandIndices, devices, pool, mdLogger, mdKeyPrefix), Pipeline(ps, subbandIndices, devices, pool, mdLogger, mdKeyPrefix),
factories(ps, nrSubbandsPerSubbandProc), factories(ps, nrSubbandsPerSubbandProc),
itsBlocksWritten(0), itsBlocksWritten(0),
itsBlocksDropped(0) itsBlocksDropped(0),
itsNextSequenceNumber(0)
{ {
// Write data point(s) for monitoring (PVSS). // Write data point(s) for monitoring (PVSS).
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DATA_PRODUCT_TYPE, "Correlated"); itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DATA_PRODUCT_TYPE, "Correlated");
...@@ -133,9 +134,6 @@ namespace LOFAR ...@@ -133,9 +134,6 @@ namespace LOFAR
SmartPtr<Stream> outputStream = connectToOutput(globalSubbandIdx); SmartPtr<Stream> outputStream = connectToOutput(globalSubbandIdx);
bool dropping = false;
size_t written = 0, dropped = 0;
SmartPtr<SubbandProcOutputData> outputData; SmartPtr<SubbandProcOutputData> outputData;
// Process pool elements until end-of-output // Process pool elements until end-of-output
...@@ -147,21 +145,22 @@ namespace LOFAR ...@@ -147,21 +145,22 @@ namespace LOFAR
LOG_DEBUG_STR("[" << id << "] Writing start"); LOG_DEBUG_STR("[" << id << "] Writing start");
// Write block to disk size_t droppedBlocks = correlatedData.sequenceNumber() - itsNextSequenceNumber;
itsNextSequenceNumber = correlatedData.sequenceNumber() + 1;
// Write block to outputProc
try { try {
correlatedData.write(outputStream.get(), true); correlatedData.write(outputStream.get(), true);
if (!dropping) itsBlocksWritten += 1;
written += 1;
else
dropped += 1;
} catch (Exception &ex) { } catch (Exception &ex) {
// No reconnect, as outputProc doesn't yet re-listen when the conn drops.
LOG_ERROR_STR("Error writing subband " << id.globalSubbandIdx << ", dropping all subsequent blocks: " << ex.what()); LOG_ERROR_STR("Error writing subband " << id.globalSubbandIdx << ", dropping all subsequent blocks: " << ex.what());
outputStream = new NullStream; outputStream = new NullStream;
dropping = true; droppedBlocks += 1;
} }
SubbandProc &workQueue = *workQueues[id.localSubbandIdx % workQueues.size()]; SubbandProc &workQueue = *workQueues[id.localSubbandIdx % workQueues.size()];
...@@ -173,16 +172,15 @@ namespace LOFAR ...@@ -173,16 +172,15 @@ namespace LOFAR
LOG_INFO_STR("[" << id << "] Done"); LOG_INFO_STR("[" << id << "] Done");
else else
LOG_DEBUG_STR("[" << id << "] Done"); LOG_DEBUG_STR("[" << id << "] Done");
}
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPING + '[' + lexical_cast<string>(globalSubbandIdx) + ']', itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPING + '[' + lexical_cast<string>(globalSubbandIdx) + ']',
dropping ? "1" : "0"); droppedBlocks > 0 ? "1" : "0");
itsBlocksWritten += written; itsBlocksDropped += droppedBlocks;
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_WRITTEN + '[' + lexical_cast<string>(globalSubbandIdx) + ']', itsMdLogger.log(itsMdKeyPrefix + PN_CGP_WRITTEN + '[' + lexical_cast<string>(globalSubbandIdx) + ']',
itsBlocksWritten * static_cast<float>(ps.settings.blockDuration())); itsBlocksWritten * static_cast<float>(ps.settings.blockDuration()));
itsBlocksDropped += dropped; itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPED + '[' + lexical_cast<string>(globalSubbandIdx) + ']',
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPED + '[' + lexical_cast<string>(globalSubbandIdx) + ']', itsBlocksDropped * static_cast<float>(ps.settings.blockDuration()));
itsBlocksDropped * static_cast<float>(ps.settings.blockDuration())); }
} }
......
...@@ -68,6 +68,8 @@ namespace LOFAR ...@@ -68,6 +68,8 @@ namespace LOFAR
// For monitoring (PVSS). The beamformer has these in TABTranspose. // For monitoring (PVSS). The beamformer has these in TABTranspose.
size_t itsBlocksWritten, itsBlocksDropped; size_t itsBlocksWritten, itsBlocksDropped;
size_t itsNextSequenceNumber;
}; };
} }
} }
......
...@@ -193,6 +193,8 @@ int main(int argc, char **argv) ...@@ -193,6 +193,8 @@ int main(int argc, char **argv)
#endif #endif
LOG_INFO_STR("GPUProc version " << GPUProcVersion::getVersion() << " r" << GPUProcVersion::getRevision()); LOG_INFO_STR("GPUProc version " << GPUProcVersion::getVersion() << " r" << GPUProcVersion::getRevision());
LOG_INFO("===== INIT =====");
// Create a parameters set object based on the inputs // Create a parameters set object based on the inputs
LOG_INFO("----- Reading Parset"); LOG_INFO("----- Reading Parset");
Parset ps(argv[optind]); Parset ps(argv[optind]);
...@@ -219,8 +221,6 @@ int main(int argc, char **argv) ...@@ -219,8 +221,6 @@ int main(int argc, char **argv)
const string mdHostName = ps.getString("Cobalt.PVSSGateway.host", ""); const string mdHostName = ps.getString("Cobalt.PVSSGateway.host", "");
MACIO::RTmetadata mdLogger(ps.observationID(), mdRegisterName, mdHostName); MACIO::RTmetadata mdLogger(ps.observationID(), mdRegisterName, mdHostName);
LOG_INFO("===== INIT =====");
#ifdef HAVE_MPI #ifdef HAVE_MPI
LOG_INFO_STR("MPI rank " << rank << " out of " << nrHosts << " hosts"); LOG_INFO_STR("MPI rank " << rank << " out of " << nrHosts << " hosts");
#else #else
......
...@@ -134,7 +134,7 @@ namespace LOFAR ...@@ -134,7 +134,7 @@ namespace LOFAR
{ {
// TODO: check for dropped data at end of observation // TODO: check for dropped data at end of observation
unsigned droppedBlocks = data->sequenceNumber() - itsNextSequenceNumber; size_t droppedBlocks = data->sequenceNumber() - itsNextSequenceNumber;
ASSERTSTR(data->sequenceNumber() >= itsNextSequenceNumber, "Received block nr " << data->sequenceNumber() << " out of order! I expected nothing before " << itsNextSequenceNumber); ASSERTSTR(data->sequenceNumber() >= itsNextSequenceNumber, "Received block nr " << data->sequenceNumber() << " out of order! I expected nothing before " << itsNextSequenceNumber);
...@@ -151,7 +151,7 @@ namespace LOFAR ...@@ -151,7 +151,7 @@ namespace LOFAR
itsBlocksWritten++; itsBlocksWritten++;
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPING + '[' + lexical_cast<string>(itsStreamNr) + ']', itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPING + '[' + lexical_cast<string>(itsStreamNr) + ']',
droppedBlocks > 0 ? "1" : "0"); // logged too late? droppedBlocks > 0 ? "1" : "0"); // logged too late if dropping: not anymore...
itsMdLogger.log(itsMdKeyPrefix + PN_COP_WRITTEN + '[' + lexical_cast<string>(itsStreamNr) + ']', itsMdLogger.log(itsMdKeyPrefix + PN_COP_WRITTEN + '[' + lexical_cast<string>(itsStreamNr) + ']',
itsBlocksWritten * static_cast<float>(itsParset.settings.blockDuration())); itsBlocksWritten * static_cast<float>(itsParset.settings.blockDuration()));
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment