Skip to content
Snippets Groups Projects
Commit bcc8a12f authored by Jan David Mol's avatar Jan David Mol
Browse files

bug 1362: * new storage that uses cn_processingplan

parent 21a8c99b
Branches
Tags
No related merge requests found
...@@ -26,6 +26,8 @@ ...@@ -26,6 +26,8 @@
//# Never #include <config.h> or #include <lofar_config.h> in a header file! //# Never #include <config.h> or #include <lofar_config.h> in a header file!
#include <Interface/StreamableData.h> #include <Interface/StreamableData.h>
#include <Interface/MultiDimArray.h>
#include <Interface/CN_ProcessingPlan.h>
#include <Interface/Queue.h> #include <Interface/Queue.h>
#include <Stream/Stream.h> #include <Stream/Stream.h>
...@@ -45,7 +47,7 @@ class InputThread ...@@ -45,7 +47,7 @@ class InputThread
struct SingleInput { struct SingleInput {
Queue<StreamableData *> freeQueue, receiveQueue; Queue<StreamableData *> freeQueue, receiveQueue;
}; };
struct SingleInput *itsInputs; // [itsNrInputs] Vector<struct SingleInput> itsInputs; // [itsNrInputs]
unsigned itsNrInputs; unsigned itsNrInputs;
Queue<unsigned> itsReceiveQueueActivity; Queue<unsigned> itsReceiveQueueActivity;
...@@ -58,6 +60,8 @@ class InputThread ...@@ -58,6 +60,8 @@ class InputThread
const Parset *itsPS; const Parset *itsPS;
Stream *itsStreamFromION; Stream *itsStreamFromION;
pthread_t thread; pthread_t thread;
Vector<CN_ProcessingPlan<> *> itsPlans; // [maxReceiveQueueSize]
unsigned itsSB; unsigned itsSB;
}; };
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
// (#samples correlated) // (#samples correlated)
#include <Interface/Parset.h> #include <Interface/Parset.h>
#include <Interface/PipelineOutput.h>
#include <casa/aips.h> #include <casa/aips.h>
#include <tables/Tables/Table.h> #include <tables/Tables/Table.h>
...@@ -49,9 +48,7 @@ class MeasurementSetFormat : public Format ...@@ -49,9 +48,7 @@ class MeasurementSetFormat : public Format
private: private:
const Parset *itsPS; const Parset *itsPS;
const PipelineOutputSet itsPipelineOutputSet;
unsigned itsNrOutputs;
unsigned itsNrAnt; unsigned itsNrAnt;
uint32 itsNrTimes; uint32 itsNrTimes;
......
...@@ -16,7 +16,8 @@ ...@@ -16,7 +16,8 @@
#include <Interface/Parset.h> #include <Interface/Parset.h>
#include <Interface/Exceptions.h> #include <Interface/Exceptions.h>
#include <Interface/PipelineOutput.h> #include <Interface/CN_Configuration.h>
#include <Interface/CN_ProcessingPlan.h>
#include <Interface/MultiDimArray.h> #include <Interface/MultiDimArray.h>
#include <Interface/RSPTimeStamp.h> #include <Interface/RSPTimeStamp.h>
...@@ -41,7 +42,8 @@ namespace LOFAR { ...@@ -41,7 +42,8 @@ namespace LOFAR {
unsigned itsRank; unsigned itsRank;
unsigned itsSize; unsigned itsSize;
PipelineOutputSet itsPipelineOutputSet; CN_Configuration itsConfiguration;
CN_ProcessingPlan<> itsPlan;
unsigned itsNrOutputs; unsigned itsNrOutputs;
unsigned itsNrSubbands; unsigned itsNrSubbands;
......
...@@ -35,7 +35,8 @@ ...@@ -35,7 +35,8 @@
#include <Common/Timer.h> #include <Common/Timer.h>
#include <Common/lofar_vector.h> #include <Common/lofar_vector.h>
#include <Interface/Parset.h> #include <Interface/Parset.h>
#include <Interface/PipelineOutput.h> #include <Interface/CN_Configuration.h>
#include <Interface/CN_ProcessingPlan.h>
#include <Interface/StreamableData.h> #include <Interface/StreamableData.h>
#include <Interface/Queue.h> #include <Interface/Queue.h>
#include <Interface/RSPTimeStamp.h> #include <Interface/RSPTimeStamp.h>
...@@ -68,7 +69,8 @@ class SubbandWriter ...@@ -68,7 +69,8 @@ class SubbandWriter
const Parset *itsPS; const Parset *itsPS;
unsigned itsRank; unsigned itsRank;
unsigned itsSize; unsigned itsSize;
PipelineOutputSet itsPipelineOutputSet; CN_Configuration itsConfiguration;
CN_ProcessingPlan<> itsPlan;
unsigned itsNrOutputs; unsigned itsNrOutputs;
std::vector<Stream *> itsInputStreams; std::vector<Stream *> itsInputStreams;
......
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
#include <lofar_config.h> #include <lofar_config.h>
#include <Storage/InputThread.h> #include <Storage/InputThread.h>
#include <Interface/PipelineOutput.h>
#include <Interface/StreamableData.h> #include <Interface/StreamableData.h>
#include <Stream/NullStream.h> #include <Stream/NullStream.h>
#include <Common/DataConvert.h> #include <Common/DataConvert.h>
...@@ -40,20 +39,25 @@ namespace RTCP { ...@@ -40,20 +39,25 @@ namespace RTCP {
itsNrInputs(0), itsNrInputs(0),
itsPS(ps), itsPS(ps),
itsStreamFromION(streamFromION), itsStreamFromION(streamFromION),
itsPlans(maxReceiveQueueSize),
itsSB(sb) itsSB(sb)
{ {
// transpose output stream holders // transpose output stream holders
for (unsigned i = 0; i < maxReceiveQueueSize; i ++) { CN_Configuration configuration(*ps);
PipelineOutputSet pipeline( *ps );
// only the first call will resize the array for (unsigned i = 0; i < maxReceiveQueueSize; i ++) {
if( !itsInputs ) { itsPlans[i] = new CN_ProcessingPlan<>( configuration, false, true );
itsNrInputs = pipeline.size(); itsPlans[i]->removeNonOutputs();
itsInputs = new struct InputThread::SingleInput[itsNrInputs]; itsPlans[i]->allocateOutputs( heapAllocator );
if( itsNrInputs == 0 ) {
// do this only the first time
itsNrInputs = itsPlans[i]->nrOutputs();
itsInputs.resize( itsNrInputs );
} }
for (unsigned o = 0; o < itsNrInputs; o ++ ) { for (unsigned o = 0; o < itsNrInputs; o ++ ) {
itsInputs[o].freeQueue.append( pipeline[o].extractData() ); itsInputs[o].freeQueue.append( itsPlans[i]->plan[o].source );
} }
} }
...@@ -70,15 +74,6 @@ InputThread::~InputThread() ...@@ -70,15 +74,6 @@ InputThread::~InputThread()
LOG_ERROR("could not join input thread"); LOG_ERROR("could not join input thread");
exit(1); exit(1);
} }
for (unsigned o = 0; o < itsNrInputs; o++ ) {
struct InputThread::SingleInput &input = itsInputs[o];
for (unsigned i = 0; i < maxReceiveQueueSize; i ++)
delete input.freeQueue.remove();
}
delete [] itsInputs;
} }
......
...@@ -23,7 +23,6 @@ RTStorage_SOURCES = RTStorage_main.cc ...@@ -23,7 +23,6 @@ RTStorage_SOURCES = RTStorage_main.cc
RTStorage_LDADD = librtstorage.la RTStorage_LDADD = librtstorage.la
RTStorage_DEPENDENCIES = librtstorage.la $(LOFAR_DEPEND) RTStorage_DEPENDENCIES = librtstorage.la $(LOFAR_DEPEND)
configfilesdir = $(bindir) configfilesdir = $(bindir)
configfiles_DATA = Storage.machinefile \ configfiles_DATA = Storage.machinefile \
Storage.log_prop \ Storage.log_prop \
......
...@@ -56,8 +56,6 @@ namespace RTCP { ...@@ -56,8 +56,6 @@ namespace RTCP {
MeasurementSetFormat::MeasurementSetFormat(const Parset *ps, const unsigned alignment) MeasurementSetFormat::MeasurementSetFormat(const Parset *ps, const unsigned alignment)
: itsPS(ps), : itsPS(ps),
itsPipelineOutputSet(*ps),
itsNrOutputs(itsPipelineOutputSet.size()),
itsMS(0), itsMS(0),
itsAlignment(alignment) itsAlignment(alignment)
{ {
......
...@@ -37,8 +37,9 @@ RTStorage::RTStorage(const Parset *ps, unsigned rank, unsigned size) ...@@ -37,8 +37,9 @@ RTStorage::RTStorage(const Parset *ps, unsigned rank, unsigned size)
itsPS(ps), itsPS(ps),
itsRank(rank), itsRank(rank),
itsSize(size), itsSize(size),
itsPipelineOutputSet(*ps), itsConfiguration(*ps),
itsNrOutputs(itsPipelineOutputSet.size()), itsPlan(itsConfiguration,false,true),
itsNrOutputs(itsPlan.nrOutputs()),
itsAlignment(512), itsAlignment(512),
itsWriteTimer("WriteTimer", false), itsWriteTimer("WriteTimer", false),
bytesWritten(0) bytesWritten(0)
......
# Configure the loggers # Configure the loggers
log4cplus.rootLogger=INFO, STDOUT, FILE log4cplus.rootLogger=INFO, STDOUT
#log4cplus.logger.TRC=INFO #log4cplus.logger.TRC=INFO
log4cplus.logger.TRC=INFO log4cplus.logger.TRC=INFO
log4cplus.logger.LCS.Common=FATAL, STDOUT, FILE log4cplus.logger.LCS.Common=FATAL, STDOUT
# Define the appenders # Define the appenders
log4cplus.appender.STDOUT=log4cplus::ConsoleAppender log4cplus.appender.STDOUT=log4cplus::ConsoleAppender
...@@ -14,12 +14,3 @@ log4cplus.appender.STDERR=log4cplus::ConsoleAppender ...@@ -14,12 +14,3 @@ log4cplus.appender.STDERR=log4cplus::ConsoleAppender
log4cplus.appender.STDERR.layout=log4cplus::PatternLayout log4cplus.appender.STDERR.layout=log4cplus::PatternLayout
log4cplus.appender.STDERR.layout.ConversionPattern=%D{%d-%m %H:%M:%S.%q} %-5p %c{3} - %m [%.25l]%n log4cplus.appender.STDERR.layout.ConversionPattern=%D{%d-%m %H:%M:%S.%q} %-5p %c{3} - %m [%.25l]%n
log4cplus.appender.STDERR.logToStdErr=true log4cplus.appender.STDERR.logToStdErr=true
log4cplus.appender.FILE=log4cplus::RollingFileAppender
log4cplus.appender.FILE.File=../log/%filename.log
log4cplus.appender.FILE.MaxFileSize=10MB
log4cplus.appender.FILE.MaxBackupIndex=2
log4cplus.appender.FILE.layout=log4cplus::PatternLayout
log4cplus.appender.FILE.layout.ConversionPattern=%x %D{%d-%m %H:%M:%S.%q} %-5p %c{3} - %m [%.25l]%n
log4cplus.appender.DUMP=log4cplus::NullAppender
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include <Stream/NullStream.h> #include <Stream/NullStream.h>
#include <Stream/SocketStream.h> #include <Stream/SocketStream.h>
#include <Interface/Exceptions.h> #include <Interface/Exceptions.h>
#include <Interface/CorrelatedData.h>
#ifdef USE_MAC_PI #ifdef USE_MAC_PI
#include <GCF/GCF_PVDouble.h> #include <GCF/GCF_PVDouble.h>
...@@ -52,8 +53,12 @@ SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank, unsigned size) ...@@ -52,8 +53,12 @@ SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank, unsigned size)
itsPS(ps), itsPS(ps),
itsRank(rank), itsRank(rank),
itsSize(size), itsSize(size),
itsPipelineOutputSet(*ps), itsConfiguration(*ps),
itsNrOutputs(itsPipelineOutputSet.size()), itsPlan(itsConfiguration,false,true),
itsNrOutputs(itsPlan.nrOutputs()),
itsNBaselines(ps->nrBaselines()),
itsNChannels(ps->nrChannelsPerSubband()),
itsNBeams(ps->nrBeams()),
itsTimeCounter(0), itsTimeCounter(0),
itsVisibilities(0), itsVisibilities(0),
itsWriteTimer ("writing-MS", false, true) itsWriteTimer ("writing-MS", false, true)
...@@ -61,6 +66,8 @@ SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank, unsigned size) ...@@ -61,6 +66,8 @@ SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank, unsigned size)
,itsPropertySet(0) ,itsPropertySet(0)
#endif #endif
{ {
itsPlan.removeNonOutputs();
#ifdef USE_MAC_PI #ifdef USE_MAC_PI
itsWriteToMAC = itsPS.getBool("Storage.WriteToMAC"); itsWriteToMAC = itsPS.getBool("Storage.WriteToMAC");
#endif #endif
...@@ -70,9 +77,6 @@ SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank, unsigned size) ...@@ -70,9 +77,6 @@ SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank, unsigned size)
itsNStations = itsPS->nrStations(); itsNStations = itsPS->nrStations();
} }
itsNBaselines = itsPS->nrBaselines();
itsNChannels = itsPS->nrChannelsPerSubband();
itsNBeams = itsPS->nrBeams();
unsigned pols = itsPS->getUint32("Observation.nrPolarisations"); unsigned pols = itsPS->getUint32("Observation.nrPolarisations");
itsNPolSquared = pols*pols; itsNPolSquared = pols*pols;
...@@ -164,7 +168,6 @@ void SubbandWriter::preprocess() ...@@ -164,7 +168,6 @@ void SubbandWriter::preprocess()
itsNrSubbandsPerStorage = (itsNrSubbands / itsSize) + 1; itsNrSubbandsPerStorage = (itsNrSubbands / itsSize) + 1;
} }
const double sampleFreq = itsPS->sampleRate(); const double sampleFreq = itsPS->sampleRate();
const unsigned seconds = static_cast<unsigned>(floor(startTime)); const unsigned seconds = static_cast<unsigned>(floor(startTime));
const unsigned samples = static_cast<unsigned>((startTime - floor(startTime)) * sampleFreq); const unsigned samples = static_cast<unsigned>((startTime - floor(startTime)) * sampleFreq);
...@@ -200,39 +203,34 @@ void SubbandWriter::preprocess() ...@@ -200,39 +203,34 @@ void SubbandWriter::preprocess()
for (unsigned output = 0; output < itsNrOutputs; output++ ) { for (unsigned output = 0; output < itsNrOutputs; output++ ) {
string filename = itsPS->getMSname(currentSubband) + itsPipelineOutputSet[output].filenameSuffix(); string filename = itsPS->getMSname(currentSubband) + itsPlan.plan[output].filenameSuffix;
switch( itsPipelineOutputSet[output].writerType() ) {
case PipelineOutput::CASAWRITER:
if( dynamic_cast<CorrelatedData*>( itsPlan.plan[output].source ) ) {
// correlated data -> mswriter
itsWriters[i][output] = new MSWriterCasa( itsWriters[i][output] = new MSWriterCasa(
filename.c_str(), filename.c_str(),
startTime, itsPS->IONintegrationTime(), itsNChannels, startTime, itsPS->IONintegrationTime(), itsNChannels,
itsNPolSquared, itsNStations, antPos, itsNPolSquared, itsNStations, antPos,
stationNames, itsWeightFactor); stationNames, itsWeightFactor);
} else {
break; // raw writer
case PipelineOutput::RAWWRITER:
itsWriters[i][output] = new MSWriterFile( itsWriters[i][output] = new MSWriterFile(
filename.c_str(), filename.c_str(),
startTime, itsPS->IONintegrationTime(), itsNChannels, startTime, itsPS->IONintegrationTime(), itsNChannels,
itsNPolSquared, itsNStations, antPos, itsNPolSquared, itsNStations, antPos,
stationNames, itsWeightFactor); stationNames, itsWeightFactor);
break; }
case PipelineOutput::NULLWRITER: #if 0
{
// null writer
itsWriters[i][output] = new MSWriterNull( itsWriters[i][output] = new MSWriterNull(
filename.c_str(), filename.c_str(),
startTime, itsPS->IONintegrationTime(), itsNChannels, startTime, itsPS->IONintegrationTime(), itsNChannels,
itsNPolSquared, itsNStations, antPos, itsNPolSquared, itsNStations, antPos,
stationNames, itsWeightFactor); stationNames, itsWeightFactor);
break;
default:
LOG_WARN_STR("unknown writer type!");
break;
} }
#endif
unsigned beam = subbandToBeamMapping[currentSubband]; unsigned beam = subbandToBeamMapping[currentSubband];
vector<double> beamDir = itsPS->getBeamDirection(beam); vector<double> beamDir = itsPS->getBeamDirection(beam);
...@@ -273,7 +271,7 @@ void SubbandWriter::preprocess() ...@@ -273,7 +271,7 @@ void SubbandWriter::preprocess()
createInputStreams(); createInputStreams();
for (unsigned sb = 0; sb < itsMyNrSubbands; sb ++) { for (unsigned sb = 0; sb < itsMyNrSubbands; sb ++) {
itsInputThreads.push_back(new InputThread(itsInputStreams[sb], itsPS)); itsInputThreads.push_back(new InputThread(itsInputStreams[sb], itsPS, sb));
} }
} }
...@@ -322,7 +320,9 @@ void SubbandWriter::checkForDroppedData(StreamableData *data, unsigned sb, unsig ...@@ -322,7 +320,9 @@ void SubbandWriter::checkForDroppedData(StreamableData *data, unsigned sb, unsig
bool SubbandWriter::processSubband(unsigned sb) bool SubbandWriter::processSubband(unsigned sb)
{ {
do { // process a single output, since we get a trigger for each one
// from the InputThread
unsigned o = itsInputThreads[sb]->itsReceiveQueueActivity.remove(); unsigned o = itsInputThreads[sb]->itsReceiveQueueActivity.remove();
struct InputThread::SingleInput &input = itsInputThreads[sb]->itsInputs[o]; struct InputThread::SingleInput &input = itsInputThreads[sb]->itsInputs[o];
...@@ -340,7 +340,6 @@ bool SubbandWriter::processSubband(unsigned sb) ...@@ -340,7 +340,6 @@ bool SubbandWriter::processSubband(unsigned sb)
#endif #endif
input.freeQueue.append(data); input.freeQueue.append(data);
} while( !itsInputThreads[sb]->itsReceiveQueueActivity.empty() );
return true; return true;
} }
...@@ -354,18 +353,20 @@ void SubbandWriter::process() ...@@ -354,18 +353,20 @@ void SubbandWriter::process()
while (finishedSubbandsCount < itsMyNrSubbands) { while (finishedSubbandsCount < itsMyNrSubbands) {
writeLogMessage(); writeLogMessage();
for (unsigned sb = 0; sb < itsMyNrSubbands; ++ sb) unsigned sb = itsInputThreads[0]->itsRcvdQueue.remove();
if (!finishedSubbands[sb]) if (sb == 0) writeLogMessage();
if (!finishedSubbands[sb]) {
if (!processSubband(sb)) { if (!processSubband(sb)) {
finishedSubbands[sb] = true; finishedSubbands[sb] = true;
++ finishedSubbandsCount; ++ finishedSubbandsCount;
} }
}
++ itsTimeCounter; ++ itsTimeCounter;
} }
} }
void SubbandWriter::postprocess() void SubbandWriter::postprocess()
{ {
for (unsigned sb = 0; sb < itsMyNrSubbands; sb ++) { for (unsigned sb = 0; sb < itsMyNrSubbands; sb ++) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment