Skip to content
Snippets Groups Projects
Commit 0bccc4c3 authored by Alexander van Amesfoort's avatar Alexander van Amesfoort
Browse files

Task #5844: COBALT PVSS: zero dynarrays pre-obs (while also logging obs ID,...

Task #5844: COBALT PVSS: zero dynarrays pre-obs (while also logging obs ID, data product names, etc). This is nice to have, because some events are written conditionally. If a subband has no dropped data, it's better to init that value to 0.0f. Some minor cleanups related to PVSS event logging is also included.
parent 6e4daf57
No related branches found
No related tags found
No related merge requests found
......@@ -150,31 +150,31 @@ namespace LOFAR
ASSERTSTR(!devices.empty(), "Not bound to any GPU!");
// Write data point(s) for monitoring (PVSS).
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_OBSERVATION_NAME, boost::lexical_cast<string>(ps.settings.observationID));
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_OBSERVATION_NAME,
boost::lexical_cast<string>(ps.settings.observationID));
for (unsigned i = 0; i < subbandIndices.size(); ++i) {
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_SUBBAND + '[' + boost::lexical_cast<string>(i) + ']',
(int)subbandIndices[i]);
}
const string sbStr = '[' + boost::lexical_cast<string>(i) + ']';
string dataProductType;
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_SUBBAND + sbStr, (int)subbandIndices[i]);
switch (1 * (int)ps.settings.beamFormer.enabled
+ 2 * (int)ps.settings.correlator.enabled) {
case 3:
dataProductType = "Correlated + Beamformed";
break;
case 2:
dataProductType = "Correlated";
break;
case 1:
dataProductType = "Beamformed";
break;
case 0:
default:
dataProductType = "None";
break;
// After obs start these dynarray data points are written _conditionally_, so init.
// While we only have to write the last index (PVSSGateway will zero the rest),
// we'd have to find out who has the last subband. Don't bother, just init all.
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPING + sbStr, 0);
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_WRITTEN + sbStr, 0.0f);
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPED + sbStr, 0.0f);
}
string dataProductType;
if (ps.settings.correlator.enabled && ps.settings.beamFormer.enabled) {
dataProductType = "Correlated + Beamformed";
} else if (ps.settings.correlator.enabled) {
dataProductType = "Correlated";
} else if (ps.settings.beamFormer.enabled) {
dataProductType = "Beamformed";
} else {
dataProductType = "None";
}
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DATA_PRODUCT_TYPE, dataProductType);
}
......@@ -764,16 +764,17 @@ namespace LOFAR
*/
const double blockDuration = ps.settings.blockDuration();
// Prevent division by zero for observations without beam former
// Prevent division by zero for observations without beamformer
const size_t nrFiles = std::max(multiSender.nrFiles(), 1UL);
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPING + '[' + lexical_cast<string>(id.localSubbandIdx) + ']',
const string localSbStr = '[' + lexical_cast<string>(id.localSubbandIdx) + ']';
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPING + localSbStr,
correlatorLoss.dropping || beamFormerLoss.dropping);
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_WRITTEN + '[' + lexical_cast<string>(id.localSubbandIdx) + ']',
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_WRITTEN + localSbStr,
static_cast<float>(correlatorLoss.blocksWritten * blockDuration) +
static_cast<float>(beamFormerLoss.blocksWritten * blockDuration / nrFiles)
);
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPED + '[' + lexical_cast<string>(id.localSubbandIdx) + ']',
itsMdLogger.log(itsMdKeyPrefix + PN_CGP_DROPPED + localSbStr,
static_cast<float>(correlatorLoss.blocksDropped * blockDuration) +
static_cast<float>(beamFormerLoss.blocksDropped * blockDuration / nrFiles)
);
......
......@@ -139,21 +139,23 @@ namespace LOFAR
ASSERTSTR(data->sequenceNumber() >= itsNextSequenceNumber, "Received block nr " << data->sequenceNumber() << " out of order! I expected nothing before " << itsNextSequenceNumber);
const string streamNrStr = '[' + lexical_cast<string>(itsStreamNr) + ']';
if (droppedBlocks > 0) {
itsBlocksDropped += droppedBlocks;
LOG_WARN_STR(itsLogPrefix << "Just dropped " << droppedBlocks << " blocks. Dropped " << itsBlocksDropped << " blocks and written " << itsBlocksWritten << " blocks so far.");
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPED + '[' + lexical_cast<string>(itsStreamNr) + ']',
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPED + streamNrStr,
itsBlocksDropped * static_cast<float>(itsParset.settings.blockDuration()));
}
itsNextSequenceNumber = data->sequenceNumber() + 1;
itsBlocksWritten++;
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPING + '[' + lexical_cast<string>(itsStreamNr) + ']',
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPING + streamNrStr,
droppedBlocks > 0); // logged too late if dropping: not anymore...
itsMdLogger.log(itsMdKeyPrefix + PN_COP_WRITTEN + '[' + lexical_cast<string>(itsStreamNr) + ']',
itsMdLogger.log(itsMdKeyPrefix + PN_COP_WRITTEN + streamNrStr,
itsBlocksWritten * static_cast<float>(itsParset.settings.blockDuration()));
}
......@@ -180,6 +182,27 @@ namespace LOFAR
}
template<typename T>
void OutputThread<T>::logInitialStreamMetadataEvents(const string& dataProductType,
const string& fileName,
const string& directoryName)
{
// Write data points wrt @dataProductType output file for monitoring (PVSS).
const string streamNrStr = '[' + lexical_cast<string>(itsStreamNr) + ']';
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DATA_PRODUCT_TYPE + streamNrStr, dataProductType);
itsMdLogger.log(itsMdKeyPrefix + PN_COP_FILE_NAME + streamNrStr, fileName);
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DIRECTORY + streamNrStr, directoryName);
// After obs start these dynarray data points are written conditionally, so init.
// While we only have to write the last index (PVSSGateway will zero the rest),
// we'd have to find out who has the last subband. Don't bother, just init all.
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPING + streamNrStr, 0);
itsMdLogger.log(itsMdKeyPrefix + PN_COP_WRITTEN + streamNrStr, 0.0f);
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPED + streamNrStr, 0.0f);
}
template<typename T> void OutputThread<T>::cleanUp() const
{
float dropPercent = itsBlocksWritten + itsBlocksDropped == 0 ? 0.0 : (100.0 * itsBlocksDropped) / (itsBlocksWritten + itsBlocksDropped);
......@@ -260,18 +283,14 @@ namespace LOFAR
const std::string path = directoryName + "/" + fileName;
try
try
{
recursiveMakeDir(directoryName, itsLogPrefix);
LOG_INFO_STR(itsLogPrefix << "Writing to " << path);
itsWriter = new MSWriterCorrelated(itsLogPrefix, path, itsParset, itsStreamNr);
// Write data points wrt correlated output file for monitoring (PVSS)
// once we know the file could at least be created.
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DATA_PRODUCT_TYPE + '[' + lexical_cast<string>(itsStreamNr) + ']', "Correlated");
itsMdLogger.log(itsMdKeyPrefix + PN_COP_FILE_NAME + '[' + lexical_cast<string>(itsStreamNr) + ']', fileName);
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DIRECTORY + '[' + lexical_cast<string>(itsStreamNr) + ']', directoryName);
logInitialStreamMetadataEvents("Correlated", fileName, directoryName);
}
catch (Exception &ex)
{
......@@ -341,11 +360,7 @@ namespace LOFAR
itsWriter = new MSWriterFile(path);
#endif
// Write data points for beamformed output file for monitoring (PVSS)
// once we know the file could at least be created.
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DATA_PRODUCT_TYPE + '[' + lexical_cast<string>(itsStreamNr) + ']', "Beamformed");
itsMdLogger.log(itsMdKeyPrefix + PN_COP_FILE_NAME + '[' + lexical_cast<string>(itsStreamNr) + ']', fileName);
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DIRECTORY + '[' + lexical_cast<string>(itsStreamNr) + ']', directoryName);
logInitialStreamMetadataEvents("Beamformed", fileName, directoryName);
}
catch (Exception &ex)
{
......
......@@ -81,6 +81,9 @@ namespace LOFAR
protected:
void checkForDroppedData(StreamableData *);
void doWork();
void logInitialStreamMetadataEvents(const std::string& dataProductType,
const std::string& fileName,
const std::string& directoryName);
const Parset &itsParset;
const unsigned itsStreamNr;
......@@ -113,7 +116,7 @@ namespace LOFAR
const std::string &logPrefix,
const std::string &targetDirectory = "");
void createMS();
virtual void createMS();
};
......@@ -131,7 +134,7 @@ namespace LOFAR
const std::string &logPrefix,
const std::string &targetDirectory = "");
void createMS();
virtual void createMS();
};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment