diff --git a/RTCP/Storage/include/Storage/InputThread.h b/RTCP/Storage/include/Storage/InputThread.h index b1d4e4c8ea5d4eb2e2e520247d21388715099481..164021078014c685aa8062c19ced4eca6222f55f 100644 --- a/RTCP/Storage/include/Storage/InputThread.h +++ b/RTCP/Storage/include/Storage/InputThread.h @@ -26,6 +26,8 @@ //# Never #include <config.h> or #include <lofar_config.h> in a header file! #include <Interface/StreamableData.h> +#include <Interface/MultiDimArray.h> +#include <Interface/CN_ProcessingPlan.h> #include <Interface/Queue.h> #include <Stream/Stream.h> @@ -45,7 +47,7 @@ class InputThread struct SingleInput { Queue<StreamableData *> freeQueue, receiveQueue; }; - struct SingleInput *itsInputs; // [itsNrInputs] + Vector<struct SingleInput> itsInputs; // [itsNrInputs] unsigned itsNrInputs; Queue<unsigned> itsReceiveQueueActivity; @@ -58,6 +60,8 @@ class InputThread const Parset *itsPS; Stream *itsStreamFromION; pthread_t thread; + + Vector<CN_ProcessingPlan<> *> itsPlans; // [maxReceiveQueueSize] unsigned itsSB; }; diff --git a/RTCP/Storage/include/Storage/MeasurementSetFormat.h b/RTCP/Storage/include/Storage/MeasurementSetFormat.h index 6ba25cdd4877b7b73c7a1d1cc533494a6c926e90..eb4a77208b34382b0bf6dc9143ddf3e7ad697dd8 100644 --- a/RTCP/Storage/include/Storage/MeasurementSetFormat.h +++ b/RTCP/Storage/include/Storage/MeasurementSetFormat.h @@ -16,7 +16,6 @@ // (#samples correlated) #include <Interface/Parset.h> -#include <Interface/PipelineOutput.h> #include <casa/aips.h> #include <tables/Tables/Table.h> @@ -49,9 +48,7 @@ class MeasurementSetFormat : public Format private: const Parset *itsPS; - const PipelineOutputSet itsPipelineOutputSet; - unsigned itsNrOutputs; unsigned itsNrAnt; uint32 itsNrTimes; diff --git a/RTCP/Storage/include/Storage/RTStorage.h b/RTCP/Storage/include/Storage/RTStorage.h index 66f1a127aa54250e19c8f5b908fcaaaaddb998d7..ee83aee3f7902c0e1c168a3f96ecce16b4c802cc 100644 --- a/RTCP/Storage/include/Storage/RTStorage.h +++ b/RTCP/Storage/include/Storage/RTStorage.h @@ -16,7 +16,8 @@ #include <Interface/Parset.h> #include <Interface/Exceptions.h> -#include <Interface/PipelineOutput.h> +#include <Interface/CN_Configuration.h> +#include <Interface/CN_ProcessingPlan.h> #include <Interface/MultiDimArray.h> #include <Interface/RSPTimeStamp.h> @@ -41,7 +42,8 @@ namespace LOFAR { unsigned itsRank; unsigned itsSize; - PipelineOutputSet itsPipelineOutputSet; + CN_Configuration itsConfiguration; + CN_ProcessingPlan<> itsPlan; unsigned itsNrOutputs; unsigned itsNrSubbands; diff --git a/RTCP/Storage/include/Storage/SubbandWriter.h b/RTCP/Storage/include/Storage/SubbandWriter.h index 4fc9b4ddba091e023242e58446bdfb1df3126ac1..7e57cf549ea87b50ee27e67ac19ce9c603a74541 100644 --- a/RTCP/Storage/include/Storage/SubbandWriter.h +++ b/RTCP/Storage/include/Storage/SubbandWriter.h @@ -35,7 +35,8 @@ #include <Common/Timer.h> #include <Common/lofar_vector.h> #include <Interface/Parset.h> -#include <Interface/PipelineOutput.h> +#include <Interface/CN_Configuration.h> +#include <Interface/CN_ProcessingPlan.h> #include <Interface/StreamableData.h> #include <Interface/Queue.h> #include <Interface/RSPTimeStamp.h> @@ -68,7 +69,8 @@ class SubbandWriter const Parset *itsPS; unsigned itsRank; unsigned itsSize; - PipelineOutputSet itsPipelineOutputSet; + CN_Configuration itsConfiguration; + CN_ProcessingPlan<> itsPlan; unsigned itsNrOutputs; std::vector<Stream *> itsInputStreams; diff --git a/RTCP/Storage/src/InputThread.cc b/RTCP/Storage/src/InputThread.cc index 36e166df9c5b6fdebb6e751fe94e976a788e12aa..8947047bbeb3f1b921d00fc01d054ba0ce3f82d2 100644 --- a/RTCP/Storage/src/InputThread.cc +++ b/RTCP/Storage/src/InputThread.cc @@ -24,7 +24,6 @@ #include <lofar_config.h> #include <Storage/InputThread.h> -#include <Interface/PipelineOutput.h> #include <Interface/StreamableData.h> #include <Stream/NullStream.h> #include <Common/DataConvert.h> @@ -40,20 +39,25 @@ namespace RTCP { itsNrInputs(0), itsPS(ps), itsStreamFromION(streamFromION), + itsPlans(maxReceiveQueueSize), itsSB(sb) { // transpose output stream holders - for (unsigned i = 0; i < maxReceiveQueueSize; i ++) { - PipelineOutputSet pipeline( *ps ); + CN_Configuration configuration(*ps); - // only the first call will resize the array - if( !itsInputs ) { - itsNrInputs = pipeline.size(); - itsInputs = new struct InputThread::SingleInput[itsNrInputs]; + for (unsigned i = 0; i < maxReceiveQueueSize; i ++) { + itsPlans[i] = new CN_ProcessingPlan<>( configuration, false, true ); + itsPlans[i]->removeNonOutputs(); + itsPlans[i]->allocateOutputs( heapAllocator ); + + if( itsNrInputs == 0 ) { + // do this only the first time + itsNrInputs = itsPlans[i]->nrOutputs(); + itsInputs.resize( itsNrInputs ); } for (unsigned o = 0; o < itsNrInputs; o ++ ) { - itsInputs[o].freeQueue.append( pipeline[o].extractData() ); + itsInputs[o].freeQueue.append( itsPlans[i]->plan[o].source ); } } @@ -70,15 +74,6 @@ InputThread::~InputThread() LOG_ERROR("could not join input thread"); exit(1); } - - for (unsigned o = 0; o < itsNrInputs; o++ ) { - struct InputThread::SingleInput &input = itsInputs[o]; - - for (unsigned i = 0; i < maxReceiveQueueSize; i ++) - delete input.freeQueue.remove(); - } - - delete [] itsInputs; } diff --git a/RTCP/Storage/src/Makefile.am b/RTCP/Storage/src/Makefile.am index 48db6ffc9ab96d436ad4ca946ccf539a9cbcb8a0..95b5b9c0914bffd01acce9becc9923d5d60f9d3a 100644 --- a/RTCP/Storage/src/Makefile.am +++ b/RTCP/Storage/src/Makefile.am @@ -23,7 +23,6 @@ RTStorage_SOURCES = RTStorage_main.cc RTStorage_LDADD = librtstorage.la RTStorage_DEPENDENCIES = librtstorage.la $(LOFAR_DEPEND) - configfilesdir = $(bindir) configfiles_DATA = Storage.machinefile \ Storage.log_prop \ diff --git a/RTCP/Storage/src/MeasurementSetFormat.cc b/RTCP/Storage/src/MeasurementSetFormat.cc index 83ad544fd230d42b4b062bb02d6a2b17e75b7db1..6de83703c7d6d81e2cecc02f3b764b23ae9dc157 100644 --- a/RTCP/Storage/src/MeasurementSetFormat.cc +++ b/RTCP/Storage/src/MeasurementSetFormat.cc @@ -56,8 +56,6 @@ namespace RTCP { MeasurementSetFormat::MeasurementSetFormat(const Parset *ps, const unsigned alignment) : itsPS(ps), - itsPipelineOutputSet(*ps), - itsNrOutputs(itsPipelineOutputSet.size()), itsMS(0), itsAlignment(alignment) { diff --git a/RTCP/Storage/src/RTStorage.cc b/RTCP/Storage/src/RTStorage.cc index 55c68afa58dee4e0169eef8b4bb5e8fe64367935..f0ebbcd43c5f793c74ce993ec029453525000558 100644 --- a/RTCP/Storage/src/RTStorage.cc +++ b/RTCP/Storage/src/RTStorage.cc @@ -37,8 +37,9 @@ RTStorage::RTStorage(const Parset *ps, unsigned rank, unsigned size) itsPS(ps), itsRank(rank), itsSize(size), - itsPipelineOutputSet(*ps), - itsNrOutputs(itsPipelineOutputSet.size()), + itsConfiguration(*ps), + itsPlan(itsConfiguration,false,true), + itsNrOutputs(itsPlan.nrOutputs()), itsAlignment(512), itsWriteTimer("WriteTimer", false), bytesWritten(0) diff --git a/RTCP/Storage/src/Storage.log_prop b/RTCP/Storage/src/Storage.log_prop index f3544f42d999d02dd205459fce0f0786d29b8a66..5cdbc0381748cee48c178f0b0b43ddf89bf8339d 100644 --- a/RTCP/Storage/src/Storage.log_prop +++ b/RTCP/Storage/src/Storage.log_prop @@ -1,9 +1,9 @@ # Configure the loggers -log4cplus.rootLogger=INFO, STDOUT, FILE +log4cplus.rootLogger=INFO, STDOUT #log4cplus.logger.TRC=INFO log4cplus.logger.TRC=INFO -log4cplus.logger.LCS.Common=FATAL, STDOUT, FILE +log4cplus.logger.LCS.Common=FATAL, STDOUT # Define the appenders log4cplus.appender.STDOUT=log4cplus::ConsoleAppender @@ -14,12 +14,3 @@ log4cplus.appender.STDERR=log4cplus::ConsoleAppender log4cplus.appender.STDERR.layout=log4cplus::PatternLayout log4cplus.appender.STDERR.layout.ConversionPattern=%D{%d-%m %H:%M:%S.%q} %-5p %c{3} - %m [%.25l]%n log4cplus.appender.STDERR.logToStdErr=true - -log4cplus.appender.FILE=log4cplus::RollingFileAppender -log4cplus.appender.FILE.File=../log/%filename.log -log4cplus.appender.FILE.MaxFileSize=10MB -log4cplus.appender.FILE.MaxBackupIndex=2 -log4cplus.appender.FILE.layout=log4cplus::PatternLayout -log4cplus.appender.FILE.layout.ConversionPattern=%x %D{%d-%m %H:%M:%S.%q} %-5p %c{3} - %m [%.25l]%n - -log4cplus.appender.DUMP=log4cplus::NullAppender diff --git a/RTCP/Storage/src/SubbandWriter.cc b/RTCP/Storage/src/SubbandWriter.cc index eb3408d1dc7d4045eeb77b085194547efdd0fde6..5c61fd26e142521fba3d796c05a03ef2ffcc5a1a 100644 --- a/RTCP/Storage/src/SubbandWriter.cc +++ b/RTCP/Storage/src/SubbandWriter.cc @@ -34,6 +34,7 @@ #include <Stream/NullStream.h> #include <Stream/SocketStream.h> #include <Interface/Exceptions.h> +#include <Interface/CorrelatedData.h> #ifdef USE_MAC_PI #include <GCF/GCF_PVDouble.h> @@ -52,8 +53,12 @@ SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank, unsigned size) itsPS(ps), itsRank(rank), itsSize(size), - itsPipelineOutputSet(*ps), - itsNrOutputs(itsPipelineOutputSet.size()), + itsConfiguration(*ps), + itsPlan(itsConfiguration,false,true), + itsNrOutputs(itsPlan.nrOutputs()), + itsNBaselines(ps->nrBaselines()), + itsNChannels(ps->nrChannelsPerSubband()), + itsNBeams(ps->nrBeams()), itsTimeCounter(0), itsVisibilities(0), itsWriteTimer ("writing-MS", false, true) @@ -61,6 +66,8 @@ SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank, unsigned size) ,itsPropertySet(0) #endif { + itsPlan.removeNonOutputs(); + #ifdef USE_MAC_PI itsWriteToMAC = itsPS.getBool("Storage.WriteToMAC"); #endif @@ -70,9 +77,6 @@ SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank, unsigned size) itsNStations = itsPS->nrStations(); } - itsNBaselines = itsPS->nrBaselines(); - itsNChannels = itsPS->nrChannelsPerSubband(); - itsNBeams = itsPS->nrBeams(); unsigned pols = itsPS->getUint32("Observation.nrPolarisations"); itsNPolSquared = pols*pols; @@ -164,7 +168,6 @@ void SubbandWriter::preprocess() itsNrSubbandsPerStorage = (itsNrSubbands / itsSize) + 1; } - const double sampleFreq = itsPS->sampleRate(); const unsigned seconds = static_cast<unsigned>(floor(startTime)); const unsigned samples = static_cast<unsigned>((startTime - floor(startTime)) * sampleFreq); @@ -200,39 +203,34 @@ void SubbandWriter::preprocess() for (unsigned output = 0; output < itsNrOutputs; output++ ) { - string filename = itsPS->getMSname(currentSubband) + itsPipelineOutputSet[output].filenameSuffix(); - - switch( itsPipelineOutputSet[output].writerType() ) { - case PipelineOutput::CASAWRITER: - + string filename = itsPS->getMSname(currentSubband) + itsPlan.plan[output].filenameSuffix; + + if( dynamic_cast<CorrelatedData*>( itsPlan.plan[output].source ) ) { + // correlated data -> mswriter itsWriters[i][output] = new MSWriterCasa( filename.c_str(), startTime, itsPS->IONintegrationTime(), itsNChannels, itsNPolSquared, itsNStations, antPos, stationNames, itsWeightFactor); - - break; - - case PipelineOutput::RAWWRITER: + } else { + // raw writer itsWriters[i][output] = new MSWriterFile( filename.c_str(), startTime, itsPS->IONintegrationTime(), itsNChannels, itsNPolSquared, itsNStations, antPos, stationNames, itsWeightFactor); - break; - - case PipelineOutput::NULLWRITER: + } + +#if 0 + { + // null writer itsWriters[i][output] = new MSWriterNull( filename.c_str(), startTime, itsPS->IONintegrationTime(), itsNChannels, itsNPolSquared, itsNStations, antPos, stationNames, itsWeightFactor); - break; - - default: - LOG_WARN_STR("unknown writer type!"); - break; } +#endif unsigned beam = subbandToBeamMapping[currentSubband]; vector<double> beamDir = itsPS->getBeamDirection(beam); @@ -273,7 +271,7 @@ void SubbandWriter::preprocess() createInputStreams(); for (unsigned sb = 0; sb < itsMyNrSubbands; sb ++) { - itsInputThreads.push_back(new InputThread(itsInputStreams[sb], itsPS)); + itsInputThreads.push_back(new InputThread(itsInputStreams[sb], itsPS, sb)); } } @@ -322,25 +320,26 @@ void SubbandWriter::checkForDroppedData(StreamableData *data, unsigned sb, unsig bool SubbandWriter::processSubband(unsigned sb) { - do { - unsigned o = itsInputThreads[sb]->itsReceiveQueueActivity.remove(); - struct InputThread::SingleInput &input = itsInputThreads[sb]->itsInputs[o]; + // process a single output, since we get a trigger for each one + // from the InputThread + + unsigned o = itsInputThreads[sb]->itsReceiveQueueActivity.remove(); + struct InputThread::SingleInput &input = itsInputThreads[sb]->itsInputs[o]; - StreamableData *data = input.receiveQueue.remove(); + StreamableData *data = input.receiveQueue.remove(); - if (data == 0) - return false; + if (data == 0) + return false; - checkForDroppedData(data, sb, o); + checkForDroppedData(data, sb, o); #if defined HAVE_AIPSPP - itsWriteTimer.start(); - itsWriters[sb][o]->write(itsBandIDs[sb][o], 0, itsNChannels, data); - itsWriteTimer.stop(); + itsWriteTimer.start(); + itsWriters[sb][o]->write(itsBandIDs[sb][o], 0, itsNChannels, data); + itsWriteTimer.stop(); #endif - input.freeQueue.append(data); - } while( !itsInputThreads[sb]->itsReceiveQueueActivity.empty() ); + input.freeQueue.append(data); return true; } @@ -354,18 +353,20 @@ void SubbandWriter::process() while (finishedSubbandsCount < itsMyNrSubbands) { writeLogMessage(); - for (unsigned sb = 0; sb < itsMyNrSubbands; ++ sb) - if (!finishedSubbands[sb]) - if (!processSubband(sb)) { - finishedSubbands[sb] = true; - ++ finishedSubbandsCount; - } + unsigned sb = itsInputThreads[0]->itsRcvdQueue.remove(); + if (sb == 0) writeLogMessage(); + + if (!finishedSubbands[sb]) { + if (!processSubband(sb)) { + finishedSubbands[sb] = true; + ++ finishedSubbandsCount; + } + } ++ itsTimeCounter; } } - void SubbandWriter::postprocess() { for (unsigned sb = 0; sb < itsMyNrSubbands; sb ++) {