Newer
Older
//# OutputThread.cc:
Alexander van Amesfoort
committed
//# Copyright (C) 2009-2013, 2017
//# ASTRON (Netherlands Institute for Radio Astronomy)
//# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
//#
//# This file is part of the LOFAR software suite.
//# The LOFAR software suite is free software: you can redistribute it and/or
//# modify it under the terms of the GNU General Public License as published
//# by the Free Software Foundation, either version 3 of the License, or
//# (at your option) any later version.
//#
//# The LOFAR software suite is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License along
//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
//#
Alexander van Amesfoort
committed
//# $Id$

Jan David Mol
committed
//# Always #include <lofar_config.h> first!
#include <lofar_config.h>
#include "OutputThread.h"

Jan David Mol
committed
#include <unistd.h>
Alexander van Amesfoort
committed
#include <set>
#include <sstream>
#include <iomanip>
Alexander van Amesfoort
committed
#include <boost/lexical_cast.hpp>

Jan David Mol
committed
#include <boost/format.hpp>
#include <Common/SystemCallException.h>
#include <Common/Thread/Mutex.h>
#include <Common/Thread/Cancellation.h>
Alexander van Amesfoort
committed
#include <Common/StreamUtil.h> // LOFAR::print()
Alexander van Amesfoort
committed
#include <ApplCommon/PVSSDatapointDefs.h>

Jan David Mol
committed
#include <CoInterface/OutputTypes.h>
#include <CoInterface/Exceptions.h>

Jan David Mol
committed
#include <CoInterface/LTAFeedback.h>

Jan David Mol
committed
#include <CoInterface/BudgetTimer.h>

Jan David Mol
committed
#if defined HAVE_AIPSPP
#include <casacore/casa/Exceptions/Error.h>

Jan David Mol
committed
#endif
#include "MSWriterFile.h"
#include "MSWriterCorrelated.h"
#include "MSWriterDAL.h"
#include "MSWriterNull.h"

Jan David Mol
committed
namespace Cobalt
{
static Mutex casacoreMutex;
using namespace std;
Alexander van Amesfoort
committed
using boost::lexical_cast;
Alexander van Amesfoort
committed
template<typename T> OutputThread<T>::OutputThread(const Parset &parset,
unsigned streamNr, Pool<T> &outputPool,
RTmetadata &mdLogger, const std::string &mdKeyPrefix,

Jan David Mol
committed
const std::string &logPrefix, const std::string &targetDirectory)
:
itsParset(parset),
itsStreamNr(streamNr),
Alexander van Amesfoort
committed
itsMdLogger(mdLogger),
itsMdKeyPrefix(mdKeyPrefix),
itsLogPrefix(logPrefix),
itsTargetDirectory(targetDirectory),
itsBlocksWritten(0),
itsBlocksDropped(0),
itsBlocksDroppedByMe(0),
itsFractionalBlocksWritten(0.0),
itsNextSequenceNumber(0),

Jan David Mol
committed
itsBlockDuration(parset.settings.blockDuration()),
itsOutputPool(outputPool)

Jan David Mol
committed

Marcel Loose
committed
template<typename T> OutputThread<T>::~OutputThread()
{
}
template<typename T> void OutputThread<T>::checkForDroppedData(StreamableData *data)
{
// TODO: check for dropped data at end of observation

Jan David Mol
committed
ASSERTSTR(data->sequenceNumber() >= itsNextSequenceNumber, "Received block nr " << data->sequenceNumber() << " out of order! I expected nothing before " << itsNextSequenceNumber);
Alexander van Amesfoort
committed
size_t droppedBlocks = data->sequenceNumber() - itsNextSequenceNumber;
Alexander van Amesfoort
committed
const string streamNrStr = '[' + lexical_cast<string>(itsStreamNr) + ']';
if (droppedBlocks > 0) {
itsBlocksDropped += droppedBlocks;

Jan David Mol
committed

Jan David Mol
committed
LOG_WARN_STR(itsLogPrefix << "Did not receive " << droppedBlocks << " blocks");
Alexander van Amesfoort
committed
Alexander van Amesfoort
committed
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPED + streamNrStr,

Jan David Mol
committed
itsBlocksDropped * static_cast<float>(itsBlockDuration));

Jan David Mol
committed
Alexander van Amesfoort
committed
if (data->doReadWithSequenceNumber()) {
itsNextSequenceNumber = data->sequenceNumber() + 1;
Alexander van Amesfoort
committed
} // else, droppedBlocks is useless, but itsBlocksWritten is valid
itsFractionalBlocksWritten += 1.0 - data->outputLossFraction(); // doubles have enough precision for this to go well
Alexander van Amesfoort
committed
Alexander van Amesfoort
committed
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPING + streamNrStr,
Alexander van Amesfoort
committed
droppedBlocks > 0); // logged too late if dropping: not anymore...
Alexander van Amesfoort
committed
itsMdLogger.log(itsMdKeyPrefix + PN_COP_WRITTEN + streamNrStr,

Jan David Mol
committed
itsBlocksWritten * static_cast<float>(itsBlockDuration));

Jan David Mol
committed
template<typename T> void OutputThread<T>::doWork()

Jan David Mol
committed
BudgetTimer writeTimer(
"writeOutput",
itsBlockDuration,
true, true);
for (SmartPtr<T> data; (data = itsOutputPool.filled.remove()) != 0; itsOutputPool.free.append(data)) {

Jan David Mol
committed
if (itsParset.settings.realTime) {
try {

Jan David Mol
committed
BudgetTimer::StartStop ss(writeTimer);
if (itsParset.settings.writeToDisk)
itsWriter->write(data);

Jan David Mol
committed
} catch (SystemCallException &ex) {
LOG_WARN_STR(itsLogPrefix << "OutputThread caught non-fatal exception: " << ex.what());
itsBlocksDroppedByMe++;

Jan David Mol
committed
continue;
}
} else { // no try/catch: any loss (e.g. disk full) is fatal in non-real-time mode
itsWriter->write(data);
}

Jan David Mol
committed

Jan David Mol
committed
checkForDroppedData(data);
LOG_DEBUG_STR(itsLogPrefix << "Written block with seqno = " << data->sequenceNumber() << "(which was " << (100.0 - 100.0 * data->outputLossFraction()) << "% complete), " << itsBlocksWritten << " blocks written, " << itsBlocksDropped << " blocks not received");

Jan David Mol
committed
}
Alexander van Amesfoort
committed
template<typename T>
void OutputThread<T>::logInitialStreamMetadataEvents(const string& dataProductType,
const string& fileName,
const string& directoryName)
{
// Write data points wrt @dataProductType output file for monitoring (PVSS).
const string streamNrStr = '[' + lexical_cast<string>(itsStreamNr) + ']';
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DATA_PRODUCT_TYPE + streamNrStr, dataProductType);
itsMdLogger.log(itsMdKeyPrefix + PN_COP_FILE_NAME + streamNrStr, fileName);
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DIRECTORY + streamNrStr, directoryName);
// After obs start these dynarray data points are written conditionally, so init.
// While we only have to write the last index (PVSSGateway will zero the rest),
// we'd have to find out who has the last subband. Don't bother, just init all.
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPING + streamNrStr, 0);
itsMdLogger.log(itsMdKeyPrefix + PN_COP_WRITTEN + streamNrStr, 0.0f);
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPED + streamNrStr, 0.0f);
}
template<typename T> void OutputThread<T>::cleanUp() const
// report statistics
const float lostPerc = 100.0 * itsBlocksDroppedByMe / itsNrExpectedBlocks;
const float didNotSendPerc = itsNrExpectedBlocks == 0 ? 0.0 : 100.0 - 100.0 * itsFractionalBlocksWritten / itsNrExpectedBlocks;
const float didNotReceivePerc = didNotSendPerc + lostPerc;
if (didNotReceivePerc > 0)
LOG_WARN_STR(itsLogPrefix << "Did not receive " << didNotReceivePerc << "% of the data");
if (lostPerc > 0)
LOG_ERROR_STR(itsLogPrefix << "I lost " << lostPerc << "% of the data");
if (didNotSendPerc > 0)
LOG_WARN_STR(itsLogPrefix << "Did not send " << didNotSendPerc << "% of the data");
LOG_INFO_STR(itsLogPrefix << "Finished writing " << itsBlocksWritten << " blocks, dropped " << itsBlocksDropped << " blocks.");
if (didNotSendPerc > 0)
LOG_ERROR_STR(itsLogPrefix << "Total output data loss is " << didNotSendPerc << "%.");
else
LOG_INFO_STR(itsLogPrefix << "Total output data loss is " << didNotSendPerc << "%.");

Jan David Mol
committed
}

Jan David Mol
committed
template<typename T> void OutputThread<T>::init()

Jan David Mol
committed
{
try {
ASSERT(itsWriter.get());
itsWriter->init();

Jan David Mol
committed
} catch (Exception &ex) {
LOG_ERROR_STR(itsLogPrefix << "Could not create meta data: " << ex);
if (!itsParset.settings.realTime)
THROW(StorageException, ex);
#if defined HAVE_AIPSPP
}
catch (casacore::AipsError &ex)
{
LOG_ERROR_STR(itsLogPrefix << "Could not create meta data (AipsError): " << ex.what());
if (!itsParset.settings.realTime)
THROW(StorageException, ex.what());
#endif

Jan David Mol
committed
}
}
template<typename T> void OutputThread<T>::fini( const FinalMetaData &finalMetaData )

Jan David Mol
committed
{

Jan David Mol
committed
try {
// fini the data product

Jan David Mol
committed
ASSERT(itsWriter.get());

Jan David Mol
committed
itsWriter->fini(finalMetaData);

Jan David Mol
committed
} catch (Exception &ex) {
LOG_ERROR_STR(itsLogPrefix << "Could not add final meta data: " << ex);
if (!itsParset.settings.realTime)
THROW(StorageException, ex);
#if defined HAVE_AIPSPP
}
catch (casacore::AipsError &ex)
{
LOG_ERROR_STR(itsLogPrefix << "Could not add final meta data (AipsError): " << ex.what());
if (!itsParset.settings.realTime)
THROW(StorageException, ex.what());
#endif

Jan David Mol
committed
}

Jan David Mol
committed
}
template<typename T> ParameterSet OutputThread<T>::feedbackLTA() const

Jan David Mol
committed
{
ParameterSet result;

Jan David Mol
committed
try {

Jan David Mol
committed
result.adoptCollection(itsWriter->configuration());

Jan David Mol
committed
} catch (Exception &ex) {
LOG_ERROR_STR(itsLogPrefix << "Could not obtain feedback for LTA: " << ex);
}

Jan David Mol
committed

Jan David Mol
committed
}
template<typename T> void OutputThread<T>::process()

Jan David Mol
committed
{
LOG_DEBUG_STR(itsLogPrefix << "process() entered");

Jan David Mol
committed
createMS();

Jan David Mol
committed
# pragma omp parallel sections num_threads(2)
{
# pragma omp section
{
doWork();
cleanUp();
}
# pragma omp section
Alexander van Amesfoort
committed
{
init();
}

Jan David Mol
committed
}

Jan David Mol
committed
LOG_INFO_STR(itsLogPrefix << "Finalised data product.");

Jan David Mol
committed
}
// Make required instantiations
template class OutputThread<StreamableData>;
template class OutputThread<TABTranspose::BeamformedData>;

Jan David Mol
committed
Alexander van Amesfoort
committed
SubbandOutputThread::SubbandOutputThread(const Parset &parset,
unsigned streamNr, Pool<StreamableData> &outputPool,
RTmetadata &mdLogger, const std::string &mdKeyPrefix,
const std::string &logPrefix, const std::string &targetDirectory)

Jan David Mol
committed
:
OutputThread<StreamableData>(
parset,
streamNr,
outputPool,
Alexander van Amesfoort
committed
mdLogger,
mdKeyPrefix,
logPrefix + "[SubbandOutputThread] ",

Jan David Mol
committed
targetDirectory)

Jan David Mol
committed
{

Jan David Mol
committed
itsBlockDuration = parset.settings.correlator.integrationTime();

Jan David Mol
committed
}
void SubbandOutputThread::createMS()

Jan David Mol
committed
{
ScopedLock sl(casacoreMutex);
ScopedDelayCancellation dc; // don't cancel casacore calls
const std::string directoryName =
itsTargetDirectory == ""
? itsParset.getDirectoryName(CORRELATED_DATA, itsStreamNr)
: itsTargetDirectory;
const std::string fileName = itsParset.getFileName(CORRELATED_DATA, itsStreamNr);
const std::string path = directoryName + "/" + fileName;
Alexander van Amesfoort
committed
LOG_INFO_STR(itsLogPrefix << "Writing correlated data to " << path);

Jan David Mol
committed
Alexander van Amesfoort
committed
if (itsParset.settings.realTime) {
try {
itsWriter = new MSWriterCorrelated(itsLogPrefix, path, itsParset, itsStreamNr);
Alexander van Amesfoort
committed
Alexander van Amesfoort
committed
logInitialStreamMetadataEvents("Correlated", fileName, directoryName);
} catch (Exception &ex) {
LOG_ERROR_STR(itsLogPrefix << "Cannot open " << path << ": " << ex.what());
Alexander van Amesfoort
committed
itsWriter = new MSWriterNull(itsParset);

Jan David Mol
committed
#if defined HAVE_AIPSPP
} catch (casacore::AipsError &ex) {
Alexander van Amesfoort
committed
LOG_ERROR_STR(itsLogPrefix << "Caught AipsError: " << ex.what());
itsWriter = new MSWriterNull(itsParset);

Jan David Mol
committed
#endif
Alexander van Amesfoort
committed
}
} else { // don't handle exception in non-RT: it is fatal: avoid rethrow for a clean stracktrace
itsWriter = new MSWriterCorrelated(itsLogPrefix, path, itsParset, itsStreamNr);
logInitialStreamMetadataEvents("Correlated", fileName, directoryName);

Jan David Mol
committed
}
itsNrExpectedBlocks = itsParset.settings.correlator.nrIntegrations;

Jan David Mol
committed
}
Alexander van Amesfoort
committed
TABOutputThread::TABOutputThread(const Parset &parset,
unsigned streamNr, Pool<TABTranspose::BeamformedData> &outputPool,
RTmetadata &mdLogger, const std::string &mdKeyPrefix,
const std::string &logPrefix,
const std::string &targetDirectory)
OutputThread<TABTranspose::BeamformedData>(
parset,
streamNr,
outputPool,
Alexander van Amesfoort
committed
mdLogger,
mdKeyPrefix,
logPrefix + "[TABOutputThread] ",

Jan David Mol
committed
targetDirectory)

Jan David Mol
committed
{
}
void TABOutputThread::createMS()

Jan David Mol
committed
{
// even the HDF5 writer accesses casacore, to perform conversions
ScopedLock sl(casacoreMutex);
ScopedDelayCancellation dc; // don't cancel casacore calls

Jan David Mol
committed
const std::string directoryName =
itsTargetDirectory == ""
? itsParset.getDirectoryName(BEAM_FORMED_DATA, itsStreamNr)
: itsTargetDirectory;
const std::string fileName = itsParset.getFileName(BEAM_FORMED_DATA, itsStreamNr);

Jan David Mol
committed
const std::string path = directoryName + "/" + fileName;
Alexander van Amesfoort
committed
LOG_INFO_STR(itsLogPrefix << "Writing beamformed data to " << path);

Jan David Mol
committed
Alexander van Amesfoort
committed
if (itsParset.settings.realTime) {
try {
#ifdef HAVE_DAL
itsWriter = new MSWriterDAL<float,3>(path, itsParset, itsStreamNr);
#else
itsWriter = new MSWriterFile(path);
#endif
logInitialStreamMetadataEvents("Beamformed", fileName, directoryName);

Jan David Mol
committed
Alexander van Amesfoort
committed
} catch (Exception &ex) {
LOG_ERROR_STR(itsLogPrefix << "Cannot open " << path << ": " << ex.what());
Alexander van Amesfoort
committed
itsWriter = new MSWriterNull(itsParset);
#if defined HAVE_AIPSPP
} catch (casacore::AipsError &ex) {
Alexander van Amesfoort
committed
LOG_ERROR_STR(itsLogPrefix << "Caught AipsError: " << ex.what());
itsWriter = new MSWriterNull(itsParset);
#endif
}
} else { // don't handle exception in non-RT: it is fatal: avoid rethrow for a clean stracktrace
itsWriter = new MSWriterDAL<float,3,1>(path, itsParset, itsStreamNr);
#else
itsWriter = new MSWriterFile(path);
#endif
Alexander van Amesfoort
committed
logInitialStreamMetadataEvents("Beamformed", fileName, directoryName);
Alexander van Amesfoort
committed
itsNrExpectedBlocks = itsParset.settings.nrBlocks();
}
Alexander van Amesfoort
committed
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
RSPRawOutputThread::RSPRawOutputThread(const Parset &parset,
unsigned streamNr, Pool<StreamableData> &outputPool,
RTmetadata &mdLogger, const std::string &mdKeyPrefix,
const std::string &logPrefix, const std::string &targetDirectory)
:
OutputThread<StreamableData>(
parset,
streamNr,
outputPool,
mdLogger,
mdKeyPrefix,
logPrefix + "[RSPRawOutputThread] ",
targetDirectory)
{
}
void RSPRawOutputThread::createMS()
{
// Unlike the other output types, there is no need to grab casacoreMutex
// or delay cancellation, because the RSP raw writer does not use casacore or libhdf5.
const std::string directoryName =
itsTargetDirectory == ""
? itsParset.getDirectoryName(RSP_RAW_DATA, itsStreamNr)
: itsTargetDirectory;
Alexander van Amesfoort
committed
const std::string fileName = itsParset.getFileName(RSP_RAW_DATA, itsStreamNr);
Alexander van Amesfoort
committed
const std::string path = directoryName + "/" + fileName;
LOG_INFO_STR(itsLogPrefix << "Writing RSP raw data to " << path);
Alexander van Amesfoort
committed
// Write parset as observation metadata. We end up with many duplicate files,
// but at least we get the parset, even if storage node(s) fail.
Parset rspRawParset = makeRspRawParset();
Alexander van Amesfoort
committed
if (itsParset.settings.realTime) {
try {
itsWriter = new MSWriterFile(path);
Alexander van Amesfoort
committed
rspRawParset.writeFile(path + ".parset"); // relies on (recursive) mkdir by MSWriterFile()
Alexander van Amesfoort
committed
// The rest of the system doesn't know about RSP raw data output, but if monitoring did, enable this:
//logInitialStreamMetadataEvents("RSPRaw", fileName, directoryName);
} catch (Exception& ex) {
LOG_ERROR_STR(itsLogPrefix << "Cannot open " << path << ": " << ex.what());
Alexander van Amesfoort
committed
itsWriter = new MSWriterNull(itsParset);
}
} else { // don't handle exception in non-RT: it is fatal: avoid rethrow for a clean stracktrace
itsWriter = new MSWriterFile(path);
Alexander van Amesfoort
committed
rspRawParset.writeFile(path + ".parset"); // relies on (recursive) mkdir by MSWriterFile()
Alexander van Amesfoort
committed
// The rest of the system doesn't know about RSP raw data output, but if monitoring did, enable this:
//logInitialStreamMetadataEvents("RSPRaw", fileName, directoryName);

Jan David Mol
committed
Alexander van Amesfoort
committed
// NOTE: for RSP raw we need to count bytes instead of blocks, but N/A here.
itsNrExpectedBlocks = itsParset.settings.nrRspRawBlocks();
Alexander van Amesfoort
committed
}
Parset RSPRawOutputThread::makeRspRawParset()
{
LOG_INFO("makeRspRawParset() begin");
Parset rspRawParset(itsParset);
// Patch several parset key values for easy setup of (single node) offline reprocessing.
rspRawParset.replace("Observation.startTime",
LOFAR::timeString(rspRawParset.settings.rspRaw.startTime, true, "%F %T"));
rspRawParset.replace("Observation.stopTime",
LOFAR::timeString(rspRawParset.settings.rspRaw.stopTime, true, "%F %T"));
rspRawParset.replace("Cobalt.realTime", "false");
rspRawParset.replace("Observation.DataProducts.Output_RSPRaw.enabled", "false");
const unsigned nrBoards = rspRawParset.settings.rspRaw.nrBeamletsPerBoardList.size();
set<string> stationNameSet;
for (unsigned af = 0; af < rspRawParset.settings.rspRaw.antennaFieldNames.size(); ++af)
Alexander van Amesfoort
committed
const string antFieldName = rspRawParset.settings.rspRaw.antennaFieldNames[af].fullName();
Alexander van Amesfoort
committed
string rspPortsValue(1, '[');
string dataslotListValue(1, '[');
string rspBoardListValue(1, '[');
for (unsigned b = 0; b < nrBoards; ++b)
{
Alexander van Amesfoort
committed
unsigned nrBeamlets = rspRawParset.settings.rspRaw.nrBeamletsPerBoardList[b];
if (nrBeamlets == 0) {
// It is valid to completely filter streams, but they must all be at the end in the list.
// If nrBeamlets half-way is 0 we cannot generate a valid dataslot list, which computes nrBeamlets-1.
for (++b; b < nrBoards; ++b) {
if (nrBeamlets != 0) {
LOG_WARN_STR("makeRspRawParset(): empty nr beamlets per board found for antenna field " <<
antFieldName << " followed by non-empty nr beamlets. Observation data output unaffected, " <<
"but cannot write valid DataslotList in RSP *output* parset to support offline post-processing.");
break;
}
}
break;
}
Alexander van Amesfoort
committed
if (b > 0) {
rspPortsValue += ", ";
dataslotListValue += ", ";
rspBoardListValue += ", ";
}
rspPortsValue += "file:" + rspRawParset.getFileName(RSP_RAW_DATA, af * nrBoards + b);
dataslotListValue += str(boost::format("0..%u") % (nrBeamlets - 1));
rspBoardListValue += str(boost::format("%u*%u") % nrBeamlets % b);
stationNameSet.insert(rspRawParset.settings.rspRaw.antennaFieldNames[af].station);
}
rspPortsValue.push_back(']');

Jorrit Schaap
committed
rspRawParset.replace("PIC.Core." + antFieldName + ".RSP.sources", rspPortsValue);
Alexander van Amesfoort
committed
rspRawParset.replace("PIC.Core." + antFieldName + ".RSP.receiver", "localhost");
dataslotListValue.push_back(']');
rspRawParset.replace("Observation.Dataslots." + antFieldName + ".DataslotList", dataslotListValue);
rspBoardListValue.push_back(']');
rspRawParset.replace("Observation.Dataslots." + antFieldName + ".RSPBoardList", rspBoardListValue);

Jan David Mol
committed
Alexander van Amesfoort
committed
ostringstream stationListStr;
LOFAR::print(stationListStr, stationNameSet.begin(), stationNameSet.end(), ",", "[", "]");
rspRawParset.replace("Observation.VirtualInstrument.stationList", stationListStr.str());
rspRawParset.updateSettings(); // not needed and may WARN, but does some checks and to return valid obj
LOG_INFO("makeRspRawParset() end");
return rspRawParset;
Alexander van Amesfoort
committed
} // namespace Cobalt

Jan David Mol
committed
} // namespace LOFAR