Skip to content
Snippets Groups Projects
Commit 14a0a16c authored by Alexander van Amesfoort's avatar Alexander van Amesfoort
Browse files

Task #5830: write CobaltOutput data points using the MACIO RTmetadata...

Task #5830: write CobaltOutput data points using the MACIO RTmetadata interface. Perhaps, the 'dropping' data point is written too late. Also adapt up 2 test cases + createHeaders.cc with RTmetadata dummies that won't try to log/send anything.
parent 389bbe2e
No related branches found
No related tags found
No related merge requests found
......@@ -25,7 +25,7 @@ lofar_add_library(outputproc
install(PROGRAMS
gnuplotMS.sh
DESTINATION bin)
DESTINATION sbin)
install(FILES
outputProc.log_prop
......@@ -37,5 +37,4 @@ lofar_add_bin_program(versionoutputproc versionoutputproc.cc)
lofar_add_sbin_program(createHeaders createHeaders.cc)
lofar_add_sbin_program(plotMS plotMS.cc)
lofar_add_sbin_program(write_pvss_dp write_pvss_dp.cc)
......@@ -23,8 +23,10 @@
#include "GPUProcIO.h"
#include <cstring>
#include <vector>
#include <omp.h>
#include <boost/lexical_cast.hpp>
#include <boost/format.hpp>
#include <Common/LofarLogger.h>
......@@ -46,29 +48,51 @@ using namespace LOFAR;
using namespace LOFAR::Cobalt;
using namespace std;
using boost::format;
using boost::lexical_cast;
namespace LOFAR {
namespace Cobalt {
namespace Cobalt {
static string formatDataPointLocusName(const string& hostname)
{
// For node name, strip a "locus" prefix and '0's if any (avoid remote octal interp).
string nodeValue(hostname);
const char* nvstr = nodeValue.c_str();
if (strncmp(nvstr, "locus", sizeof("locus") - 1) == 0) {
unsigned stripLen = sizeof("locus") - 1;
while (nvstr[stripLen] == '0')
stripLen += 1;
nodeValue.erase(0, stripLen);
}
return nodeValue;
}
bool process(Stream &controlStream, unsigned myRank)
{
bool success(true);
Parset parset(&controlStream);
const vector<string> &hostnames = parset.settings.outputProcHosts;
ASSERT(myRank < hostnames.size());
string myHostName = hostnames[myRank];
// Send identification string to the MAC Log Processor
string fmtStr(createPropertySetName(PSN_COBALT_OUTPUT_PROC, "",
parset.getString("_DPname")));
format prFmt;
prFmt.exceptions(boost::io::no_error_bits); // avoid throw
prFmt.parse(fmtStr);
LOG_INFO_STR("MACProcessScope: " << str(prFmt % myRank));
string mdKeyPrefix = str(prFmt % myRank);
LOG_INFO_STR("MACProcessScope: " << mdKeyPrefix);
mdKeyPrefix.push_back('.');
const vector<string> &hostnames = parset.settings.outputProcHosts;
ASSERT(myRank < hostnames.size());
string myHostName = hostnames[myRank];
const string mdRegisterName = PST_COBALT_OUTPUT_PROC;
const string mdHostName = parset.getString("Cobalt.PVSSGateway.host", "");
MACIO::RTmetadata mdLogger(parset.observationID(), mdRegisterName, mdHostName);
mdLogger.start();
{
// make sure "parset" stays in scope for the lifetime of the SubbandWriters
// make sure "parset" and "mdLogger" stay in scope for the lifetime of the SubbandWriters
vector<SmartPtr<SubbandWriter> > subbandWriters;
vector<SmartPtr<TABOutputThread> > tabWriters;
......@@ -84,10 +108,13 @@ bool process(Stream &controlStream, unsigned myRank)
if (parset.settings.correlator.files[fileIdx].location.host != myHostName)
continue;
mdLogger.log(mdKeyPrefix + PN_COP_LOCUS_NODE + '[' + lexical_cast<string>(fileIdx) + ']',
formatDataPointLocusName(myHostName));
string logPrefix = str(format("[obs %u correlated stream %3u] ")
% parset.observationID() % fileIdx);
SubbandWriter *writer = new SubbandWriter(parset, fileIdx, logPrefix);
SubbandWriter *writer = new SubbandWriter(parset, fileIdx, mdLogger, mdKeyPrefix, logPrefix);
subbandWriters.push_back(writer);
}
}
......@@ -104,6 +131,10 @@ bool process(Stream &controlStream, unsigned myRank)
if (file.location.host != myHostName)
continue;
const unsigned allFileIdx = fileIdx + parset.settings.correlator.files.size();
mdLogger.log(mdKeyPrefix + PN_COP_LOCUS_NODE + '[' + lexical_cast<string>(allFileIdx) + ']',
formatDataPointLocusName(myHostName));
struct ObservationSettings::BeamFormer::StokesSettings &stokes =
file.coherent ? parset.settings.beamFormer.coherentSettings
: parset.settings.beamFormer.incoherentSettings;
......@@ -129,7 +160,7 @@ bool process(Stream &controlStream, unsigned myRank)
string logPrefix = str(format("[obs %u beamformed stream %3u] ")
% parset.observationID() % fileIdx);
TABOutputThread *writer = new TABOutputThread(parset, fileIdx, *outputPools[fileIdx], logPrefix);
TABOutputThread *writer = new TABOutputThread(parset, fileIdx, *outputPools[fileIdx], mdLogger, mdKeyPrefix, logPrefix);
tabWriters.push_back(writer);
}
}
......@@ -228,6 +259,6 @@ bool process(Stream &controlStream, unsigned myRank)
}
}
}
}
} // namespace Cobalt
} // namespace LOFAR
......@@ -29,6 +29,7 @@
#include <sys/stat.h>
#include <unistd.h>
#include <iomanip>
#include <boost/lexical_cast.hpp>
#include <boost/format.hpp>
#include <boost/algorithm/string.hpp>
......@@ -36,6 +37,7 @@
#include <Common/SystemCallException.h>
#include <Common/Thread/Mutex.h>
#include <Common/Thread/Cancellation.h>
#include <ApplCommon/PVSSDatapointDefs.h>
#include <CoInterface/OutputTypes.h>
#include <CoInterface/Exceptions.h>
......@@ -58,6 +60,7 @@ namespace LOFAR
static Mutex casacoreMutex;
using namespace std;
using boost::lexical_cast;
static void makeDir(const string &dirname, const string &logPrefix)
{
......@@ -100,10 +103,16 @@ namespace LOFAR
}
template<typename T> OutputThread<T>::OutputThread(const Parset &parset, unsigned streamNr, Pool<T> &outputPool, const std::string &logPrefix, const std::string &targetDirectory, const std::string &LTAfeedbackPrefix)
template<typename T> OutputThread<T>::OutputThread(const Parset &parset,
unsigned streamNr, Pool<T> &outputPool,
RTmetadata &mdLogger, const std::string &mdKeyPrefix,
const std::string &logPrefix, const std::string &targetDirectory,
const std::string &LTAfeedbackPrefix)
:
itsParset(parset),
itsStreamNr(streamNr),
itsMdLogger(mdLogger),
itsMdKeyPrefix(mdKeyPrefix),
itsLogPrefix(logPrefix),
itsTargetDirectory(targetDirectory),
itsLTAfeedbackPrefix(LTAfeedbackPrefix),
......@@ -137,6 +146,13 @@ namespace LOFAR
itsNextSequenceNumber = data->sequenceNumber() + 1;
itsBlocksWritten++;
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPING + '[' + lexical_cast<string>(itsStreamNr) + ']',
droppedBlocks > 0 ? "1" : "0"); // logged too late?
itsMdLogger.log(itsMdKeyPrefix + PN_COP_WRITTEN + '[' + lexical_cast<string>(itsStreamNr) + ']',
itsBlocksWritten * itsParset.settings.blockDuration());
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DROPPED + '[' + lexical_cast<string>(itsStreamNr) + ']',
itsBlocksDropped * itsParset.settings.blockDuration());
}
......@@ -205,14 +221,17 @@ namespace LOFAR
template class OutputThread<TABTranspose::BeamformedData>;
SubbandOutputThread::SubbandOutputThread(const Parset &parset, unsigned streamNr,
Pool<StreamableData> &outputPool, const std::string &logPrefix,
const std::string &targetDirectory)
SubbandOutputThread::SubbandOutputThread(const Parset &parset,
unsigned streamNr, Pool<StreamableData> &outputPool,
RTmetadata &mdLogger, const std::string &mdKeyPrefix,
const std::string &logPrefix, const std::string &targetDirectory)
:
OutputThread<StreamableData>(
parset,
streamNr,
outputPool,
mdLogger,
mdKeyPrefix,
logPrefix + "[SubbandOutputThread] ",
targetDirectory,
formatString("Observation.DataProducts.Output_Correlated_[%u].", streamNr))
......@@ -235,11 +254,16 @@ namespace LOFAR
try
{
recursiveMakeDir(directoryName, itsLogPrefix);
LOG_INFO_STR(itsLogPrefix << "Writing to " << path);
recursiveMakeDir(directoryName, itsLogPrefix);
LOG_INFO_STR(itsLogPrefix << "Writing to " << path);
itsWriter = new MSWriterCorrelated(itsLogPrefix, path, itsParset, itsStreamNr);
// Write data points wrt correlated output file for monitoring (PVSS)
// once we know the file could at least be created.
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DATA_PRODUCT_TYPE + '[' + lexical_cast<string>(itsStreamNr) + ']', "Correlated");
itsMdLogger.log(itsMdKeyPrefix + PN_COP_FILE_NAME + '[' + lexical_cast<string>(itsStreamNr) + ']', fileName);
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DIRECTORY + '[' + lexical_cast<string>(itsStreamNr) + ']', directoryName);
}
catch (Exception &ex)
{
......@@ -265,14 +289,18 @@ namespace LOFAR
}
TABOutputThread::TABOutputThread(const Parset &parset, unsigned streamNr,
Pool<TABTranspose::BeamformedData> &outputPool, const std::string &logPrefix,
const std::string &targetDirectory)
TABOutputThread::TABOutputThread(const Parset &parset,
unsigned streamNr, Pool<TABTranspose::BeamformedData> &outputPool,
RTmetadata &mdLogger, const std::string &mdKeyPrefix,
const std::string &logPrefix,
const std::string &targetDirectory)
:
OutputThread<TABTranspose::BeamformedData>(
parset,
streamNr,
outputPool,
mdLogger,
mdKeyPrefix,
logPrefix + "[TABOutputThread] ",
targetDirectory,
formatString("Observation.DataProducts.Output_Beamformed_[%u].", streamNr)
......@@ -305,6 +333,12 @@ namespace LOFAR
#else
itsWriter = new MSWriterFile(path);
#endif
// Write data points for beamformed output file for monitoring (PVSS)
// once we know the file could at least be created.
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DATA_PRODUCT_TYPE + '[' + lexical_cast<string>(itsStreamNr) + ']', "Beamformed");
itsMdLogger.log(itsMdKeyPrefix + PN_COP_FILE_NAME + '[' + lexical_cast<string>(itsStreamNr) + ']', fileName);
itsMdLogger.log(itsMdKeyPrefix + PN_COP_DIRECTORY + '[' + lexical_cast<string>(itsStreamNr) + ']', directoryName);
}
catch (Exception &ex)
{
......
......@@ -27,6 +27,7 @@
#include <vector>
#include <Stream/FileStream.h>
#include <MACIO/RTmetadata.h>
#include <CoInterface/SmartPtr.h>
#include <CoInterface/StreamableData.h>
#include <CoInterface/TABTranspose.h>
......@@ -39,6 +40,8 @@ namespace LOFAR
{
namespace Cobalt
{
using MACIO::RTmetadata;
/*
* OutputThread<T> manages the writing of data blocks to disk. It is
* responsible for:
......@@ -51,7 +54,10 @@ namespace LOFAR
template<typename T> class OutputThread
{
public:
OutputThread(const Parset &, unsigned streamNr, Pool<T> &outputPool, const std::string &logPrefix, const std::string &targetDirectory, const std::string &LTAfeedbackPrefix);
OutputThread(const Parset &parset, unsigned streamNr, Pool<T> &outputPool,
RTmetadata &mdLogger, const std::string &mdKeyPrefix,
const std::string &logPrefix, const std::string &targetDirectory,
const std::string &LTAfeedbackPrefix);
virtual ~OutputThread();
......@@ -76,6 +82,8 @@ namespace LOFAR
const Parset &itsParset;
const unsigned itsStreamNr;
RTmetadata &itsMdLogger; // non-const to be able to use its log()
const std::string itsMdKeyPrefix;
const std::string itsLogPrefix;
const std::string itsTargetDirectory;
const std::string itsLTAfeedbackPrefix;
......@@ -91,12 +99,17 @@ namespace LOFAR
/*
* SubbandOutputThread specialises in creating LOFAR MeasurementSets (MS).
* SubbandOutputThread specialises in creating LOFAR MeasurementSet (MS)
* files for correlated (uv) data.
*/
class SubbandOutputThread: public OutputThread<StreamableData>
{
public:
SubbandOutputThread(const Parset &, unsigned streamNr, Pool<StreamableData> &outputPool, const std::string &logPrefix, const std::string &targetDirectory = "");
SubbandOutputThread(const Parset &parset, unsigned streamNr,
Pool<StreamableData> &outputPool,
RTmetadata &mdLogger, const std::string &mdKeyPrefix,
const std::string &logPrefix,
const std::string &targetDirectory = "");
void createMS();
};
......@@ -104,13 +117,17 @@ namespace LOFAR
/*
* TABOutputThread specialises in creating LOFAR HDF5 files corresponding
* to ICD003.
* TABOutputThread specialises in creating LOFAR HDF5
* files for beamformed data (according to the ICD003 doc).
*/
class TABOutputThread: public OutputThread<TABTranspose::BeamformedData>
{
public:
TABOutputThread(const Parset &, unsigned streamNr, Pool<TABTranspose::BeamformedData> &outputPool, const std::string &logPrefix, const std::string &targetDirectory = "");
TABOutputThread(const Parset &parset, unsigned streamNr,
Pool<TABTranspose::BeamformedData> &outputPool,
RTmetadata &mdLogger, const std::string &mdKeyPrefix,
const std::string &logPrefix,
const std::string &targetDirectory = "");
void createMS();
};
......
......@@ -31,11 +31,13 @@ namespace LOFAR
{
namespace Cobalt
{
SubbandWriter::SubbandWriter(const Parset &parset, unsigned streamNr, const std::string &logPrefix)
SubbandWriter::SubbandWriter(const Parset &parset, unsigned streamNr,
RTmetadata &mdLogger, const std::string &mdKeyPrefix,
const std::string &logPrefix)
:
itsOutputPool(str(format("SubbandWriter::itsOutputPool [stream %u]") % streamNr)),
itsInputThread(parset, streamNr, itsOutputPool, logPrefix),
itsOutputThread(parset, streamNr, itsOutputPool, logPrefix)
itsOutputThread(parset, streamNr, itsOutputPool, mdLogger, mdKeyPrefix, logPrefix)
{
for (unsigned i = 0; i < maxReceiveQueueSize; i++)
itsOutputPool.free.append(new CorrelatedData(parset.nrMergedStations(), parset.nrChannelsPerSubband(), parset.integrationSteps(), heapAllocator, 512));
......
......@@ -46,8 +46,10 @@ namespace LOFAR
class SubbandWriter
{
public:
SubbandWriter(const Parset &,
SubbandWriter(const Parset &parset,
unsigned streamNr,
RTmetadata &mdLogger,
const std::string &mdKeyPrefix,
const std::string &logPrefix);
void process();
......
......@@ -51,6 +51,7 @@ int main(int argc, char *argv[])
}
Parset parset(argv[1]);
MACIO::RTmetadata rtmd(parset.observationID(), "", ""); // dummy
Parset feedbackLTA;
......@@ -62,7 +63,7 @@ int main(int argc, char *argv[])
Pool<StreamableData> outputPool(logPrefix);
SubbandOutputThread writer(parset, fileIdx, outputPool, logPrefix, ".");
SubbandOutputThread writer(parset, fileIdx, outputPool, rtmd, "rtmd key prefix", logPrefix, ".");
writer.createMS();
feedbackLTA.adoptCollection(writer.feedbackLTA());
}
......@@ -76,7 +77,7 @@ int main(int argc, char *argv[])
Pool<TABTranspose::BeamformedData> outputPool(logPrefix);
TABOutputThread writer(parset, fileIdx, outputPool, logPrefix, ".");
TABOutputThread writer(parset, fileIdx, outputPool, rtmd, "rtmd key prefix", logPrefix, ".");
writer.createMS();
feedbackLTA.adoptCollection(writer.feedbackLTA());
}
......
......@@ -81,11 +81,11 @@ int main(int argc, char *argv[])
switch (opt) {
case 'h':
usage(argv[0]);
exit(EXIT_SUCCESS);
return EXIT_SUCCESS;
default: /* '?' */
usage(argv[0]);
exit(EXIT_FAILURE);
return EXIT_FAILURE;
}
}
......
//# write_pvss_dp.cc: write PVSS datapoint(s) util
//# Copyright (C) 2014 ASTRON (Netherlands Institute for Radio Astronomy)
//# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
//#
//# This file is part of the LOFAR software suite.
//# The LOFAR software suite is free software: you can redistribute it and/or
//# modify it under the terms of the GNU General Public License as published
//# by the Free Software Foundation, either version 3 of the License, or
//# (at your option) any later version.
//#
//# The LOFAR software suite is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License along
//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
//#
//# $Id$
#include <lofar_config.h>
#include <string>
#include <vector>
#include <iostream>
#include <Common/LofarLogger.h>
#include <MACIO/RTmetadata.h>
using namespace std;
static void usage(const char *argv0) {
cerr << "Usage: " << argv0 << " [PVSS gateway host|IP] key=value ..." << endl;
}
int main(int argc, char *argv[]) {
INIT_LOGGER(argv[0]);
if (argc < 3)
{
usage(argv[0]);
exit(1);
}
#if 0 // TODO: port to new interface once purpose of this is clear
unsigned obsId = 12345;
string registerName("OnlineControl"); // see table in LOFAR/MAC/APL/APLCommon/src/ControllerDefines.cc
string defaultHostname("ccu099.control.lofar");
LOFAR::MACIO::RTmetadata rtmd(obsId, registerName, defaultHostname);
bool rv;
vector<string> keys;
vector<string> vals;
vector<double> times(4, 0.0); // irrel
/*
keys.push_back("LOFAR_ObsSW_TempObs0001_CobaltOutputProc.written[0]");
vals.push_back("42");
keys.push_back("LOFAR_ObsSW_TempObs0001_CobaltOutputProc.written[1]");
vals.push_back("43");
keys.push_back("LOFAR_ObsSW_TempObs0001_CobaltOutputProc.dropped[0]");
vals.push_back("21");
keys.push_back("LOFAR_ObsSW_TempObs0001_CobaltOutputProc.dropped[1]");
vals.push_back("22");
*/
keys.push_back("LOFAR_TEST.DynArrDouble[0]");
vals.push_back("0.12345");
keys.push_back("LOFAR_TEST.DynArrDouble[7]");
vals.push_back("4.97");
keys.push_back("LOFAR_TEST.Double");
vals.push_back("31.2565478");
keys.push_back("LOFAR_TEST.DynArrDouble[5]");
vals.push_back("2.55");
rv = rtmd.log(keys, vals, times);
if (!rv)
{
LOG_WARN("Failed to write data points");
}
/*
string key0("nonexisting_test-by-alexander");
string val0("3.3");
*/
/*
string key0("LOFAR_TEST.DynArrDouble[98765]"); // overflow?
string val0("1.01");
rv = rtmd.log(key0, val0);
if (!rv)
{
LOG_WARN("Failed to write data point");
}
*/
#endif
return 0;
}
......@@ -29,7 +29,8 @@
#include <UnitTest++.h>
using namespace std;
using namespace LOFAR::Cobalt;
using namespace LOFAR;
using namespace Cobalt;
TEST(testCorrelatorOutputThreadThrowsStorageException)
......@@ -59,27 +60,29 @@ TEST(testCorrelatorOutputThreadThrowsStorageException)
par.add("Observation.DataProducts.Output_Correlated.enabled", "true");
par.add("Observation.DataProducts.Output_Correlated.filenames", "[2*L173014_SAP000_SB000_uv.MS]");
par.add("Observation.DataProducts.Output_Correlated.locations", "[.:./, .:./]");
// MIssing key: This will lead to an exception triggering the new code
// Missing key: This will lead to an exception triggering the new code
//par.add("PIC.Core.CS002HBA0.phaseCenter", "0.0")
par.updateSettings();
Pool<StreamableData> outputPool("testCorrelatorOutputThreadThrowsStorageException::outputPool");
MACIO::RTmetadata rtmd(12345, "", "");
try
{
SubbandOutputThread SubbandOutputThread(par,
(unsigned) 0, outputPool, "Log prefix", "./");
(unsigned) 0, outputPool, rtmd, "rtmd key prefix", "Log prefix", "./");
SubbandOutputThread.createMS();
}
catch (StorageException &ex) // Catch the correct exception
{
cout << "Got the correct exeption. " << endl;
cout << "Got the correct exception. " << endl;
cout << ex.what() << endl;
}
//else Let the exception fall tru
cout << "Succes" << endl;
cout << "Success" << endl;
}
......@@ -90,15 +93,18 @@ TEST(testCorrelatorOutputThreadThrowsStorageException)
class OutputThreadWrapper: public SubbandOutputThread
{
public:
OutputThreadWrapper(const Parset &parset, unsigned streamNr, Pool<StreamableData> &outputPool, const std::string &logPrefix, const std::string &targetDirectory = "")
OutputThreadWrapper(const Parset &parset, unsigned streamNr,
Pool<StreamableData> &outputPool,
MACIO::RTmetadata& rtmd, const std::string &rtmdKeyPrefix,
const std::string &logPrefix, const std::string &targetDirectory = "")
:
SubbandOutputThread(parset, streamNr,
outputPool, logPrefix,
targetDirectory)
SubbandOutputThread(parset, streamNr, outputPool,
rtmd, rtmdKeyPrefix,
logPrefix, targetDirectory)
{
}
MSWriter* getMSWriter()
MSWriter* getMSWriter()
{
return itsWriter.get();
}
......@@ -135,20 +141,21 @@ TEST(testCorrelatorOutputThreadRealtimeThrowsNoException)
// MIssing key: This will lead to an exception triggering the new code
//par.add("PIC.Core.CS002HBA0.phaseCenter", "0.0")
par.add("Cobalt.realTime", "true");
par.updateSettings();
Pool<StreamableData> outputPool("testCorrelatorOutputThreadRealtimeThrowsNoException::outputPool");
par.updateSettings();
Pool<StreamableData> outputPool("testCorrelatorOutputThreadRealtimeThrowsNoException::outputPool");
MACIO::RTmetadata rtmd(12345, "", "");
// We have a realtime system. We should not throw execptions
// We have a realtime system. We should not throw exceptions
OutputThreadWrapper SubbandOutputThread(par,
(unsigned)0, outputPool, "Log prefix", "./");
(unsigned)0, outputPool, rtmd, "rtmd key prefix", "Log prefix", "./");
SubbandOutputThread.createMS();
// asure null writer
// ensure null writer
ASSERT( (dynamic_cast<MSWriterNull*> (SubbandOutputThread.getMSWriter()) != 0));
cout << "Succes" << endl;
cout << "Success" << endl;
}
/***************************************************
......@@ -184,25 +191,26 @@ TEST(testBeamformerOutputThreadThrowsStorageException)
par.add("Observation.DataProducts.Output_Correlated.locations", "[.:./, .:./]");
// MIssing key: This will lead to an exception triggering the new code
//par.add("PIC.Core.CS002HBA0.phaseCenter", "0.0")
par.updateSettings();
Pool<StreamableData> outputPool("testBeamformerOutputThreadThrowsStorageException::outputPool");
MACIO::RTmetadata rtmd(12345, "", "");
try
{
SubbandOutputThread SubbandOutputThread(par,
(unsigned)0, outputPool, "Log prefix", "./");
(unsigned)0, outputPool, rtmd, "rtmd key prefix", "Log prefix", "./");
SubbandOutputThread.createMS();
}
catch (StorageException &ex) // Catch the correct exception
{
cout << "Got the correct exeption. " << endl;
cout << "Got the correct exception. " << endl;
cout << ex.what() << endl;
}
//else Let the exception fall tru
cout << "Succes" << endl;
cout << "Success" << endl;
}
......@@ -214,16 +222,17 @@ class TABOutputThreadWrapper : public TABOutputThread
{
public:
TABOutputThreadWrapper(const Parset &parset, unsigned streamNr,
Pool<TABTranspose::BeamformedData> &outputPool, const std::string &logPrefix,
const std::string &targetDirectory)
:
TABOutputThread(parset, streamNr,
outputPool, logPrefix,
targetDirectory)
Pool<TABTranspose::BeamformedData> &outputPool,
MACIO::RTmetadata& rtmd, const std::string &rtmdKeyPrefix,
const std::string &logPrefix, const std::string &targetDirectory)
:
TABOutputThread(parset, streamNr, outputPool,
rtmd, rtmdKeyPrefix,
logPrefix, targetDirectory)
{
}
MSWriter* getMSWriter()
MSWriter* getMSWriter()
{
return itsWriter.get();
}
......@@ -261,30 +270,29 @@ TEST(testBeamformerOutputThreadRealtimeThrowsNoException)
par.add("Observation.DataProducts.Output_CoherentStokes.enabled", "true");
par.add("Observation.DataProducts.Output_CoherentStokes.filenames", "[tab1.raw]");
par.add("Observation.DataProducts.Output_CoherentStokes.locations", "[1 * :/NonExisting]");
// MIssing key: This will lead to an exception triggering the new code
// Missing key: This will lead to an exception triggering the new code
//par.add("PIC.Core.CS002HBA0.phaseCenter", "0.0")
par.add("Cobalt.realTime", "true");
par.updateSettings();
Pool<TABTranspose::BeamformedData> outputPool("testBeamformerOutputThreadRealtimeThrowsNoException");
Pool<TABTranspose::BeamformedData> outputPool("testBeamformerOutputThreadRealtimeThrowsNoException");
MACIO::RTmetadata rtmd(12345, "", "");
// We have a realtime system. We should not throw execptions
// We have a realtime system. We should not throw exceptions
TABOutputThreadWrapper BeamOutputThread(par,
(unsigned)0, outputPool, "Log prefix", "/NonExisting");
(unsigned)0, outputPool, rtmd, "rtmd key prefix", "Log prefix", "/NonExisting");
BeamOutputThread.createMS();
//// asure null writer
//// ensure null writer
ASSERT((dynamic_cast<MSWriterNull*> (BeamOutputThread.getMSWriter()) != 0));
cout << "Succes" << endl;
cout << "Success" << endl;
}
int main()
{
return UnitTest::RunAllTests() > 0;
return 0;
}
//# tSubbandWriter.cc
//# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy)
//# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
......@@ -47,7 +46,8 @@ using namespace Cobalt;
using namespace casa;
using boost::format;
unsigned obsId = 0;
unsigned obsId = 54321;
MACIO::RTmetadata rtmd(54321, "", "");
SUITE(SubbandWriter)
{
......@@ -83,12 +83,12 @@ SUITE(SubbandWriter)
TEST_FIXTURE(OneBeam, Construction)
{
SubbandWriter w(ps, 0, "");
SubbandWriter w(ps, 0, rtmd, "", "");
}
TEST_FIXTURE(OneBeam, IO)
{
SubbandWriter w(ps, 0, "");
SubbandWriter w(ps, 0, rtmd, "", "");
// process, and provide input
# pragma omp parallel sections
......@@ -130,7 +130,7 @@ SUITE(SubbandWriter)
TEST_FIXTURE(OneBeam, FinalMetaData)
{
SubbandWriter w(ps, 0, "");
SubbandWriter w(ps, 0, rtmd, "", "");
// process
# pragma omp parallel sections
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment