diff --git a/.gitattributes b/.gitattributes index 9e79157ec173b1616500e313e2b92278439dabcd..0c99d9fad36bd18bb24554bc4339664f8ee85cf4 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1781,8 +1781,6 @@ RTCP/Interface/include/Interface/InverseFilteredData.h -text RTCP/Interface/include/Interface/ProcessingPlan.h -text RTCP/Interface/include/Interface/StokesData.h -text RTCP/Interface/include/Interface/StreamableData.h -text -RTCP/Interface/include/Interface/TransposedBeamFormedData.h -text -RTCP/Interface/include/Interface/TransposedStokesData.h -text RTCP/Interface/src/BeamCoordinates.cc -text RTCP/Interface/src/CN_ProcessingPlan.cc -text RTCP/LofarStMan/CMakeLists.txt -text diff --git a/RTCP/CNProc/src/AsyncTransposeBeams.cc b/RTCP/CNProc/src/AsyncTransposeBeams.cc index 0d75f22a23155fc99f1444659e79a112f3578e27..94eac7298be5629b3d7d36421b142025e1729c30 100644 --- a/RTCP/CNProc/src/AsyncTransposeBeams.cc +++ b/RTCP/CNProc/src/AsyncTransposeBeams.cc @@ -4,8 +4,6 @@ #include <AsyncTransposeBeams.h> #include <Interface/CN_Mapping.h> -#include <Interface/TransposedBeamFormedData.h> -#include <Interface/StokesData.h> #include <Interface/PrintVector.h> #include <Common/LofarLogger.h> @@ -116,7 +114,7 @@ unsigned AsyncTransposeBeams::waitForAnyReceive() } -template <typename T, unsigned DIM> void AsyncTransposeBeams::asyncSend(unsigned outputPsetIndex, unsigned coreIndex, unsigned subband, unsigned beam, const SampleData<T,DIM> *inputData) +template <typename T, unsigned DIM> void AsyncTransposeBeams::asyncSend(unsigned outputPsetIndex, unsigned coreIndex, unsigned subband, unsigned beam, unsigned subbeam, const SampleData<T,DIM> *inputData) { unsigned pset = itsOutputPsets[outputPsetIndex]; unsigned core = itsUsedCoresInPset[coreIndex]; @@ -127,7 +125,7 @@ template <typename T, unsigned DIM> void AsyncTransposeBeams::asyncSend(unsigned const void *ptr; const size_t size; } toWrite[itsNrCommunications] = { - { inputData->samples[beam].origin(), inputData->samples[beam].num_elements() * sizeof(T) } + { inputData->samples[beam][subbeam].origin(), inputData->samples[beam][subbeam].num_elements() * sizeof(T) } }; // write it @@ -145,11 +143,11 @@ template <typename T, unsigned DIM> void AsyncTransposeBeams::asyncSend(unsigned // specialisation for StokesData template void AsyncTransposeBeams::postReceive(SampleData<float,4> *, unsigned, unsigned, unsigned, unsigned); -template void AsyncTransposeBeams::asyncSend(unsigned, unsigned, unsigned, unsigned, const SampleData<float,4> *); +template void AsyncTransposeBeams::asyncSend(unsigned, unsigned, unsigned, unsigned, unsigned, const SampleData<float,4> *); // specialisation for BeamFormedData -template void AsyncTransposeBeams::postReceive(SampleData<fcomplex,4> *, unsigned, unsigned, unsigned, unsigned); -template void AsyncTransposeBeams::asyncSend(unsigned, unsigned, unsigned, unsigned, const SampleData<fcomplex,4> *); +template void AsyncTransposeBeams::postReceive(SampleData<fcomplex,3> *, unsigned, unsigned, unsigned, unsigned); +template void AsyncTransposeBeams::asyncSend(unsigned, unsigned, unsigned, unsigned, unsigned, const SampleData<fcomplex,4> *); void AsyncTransposeBeams::waitForAllSends() { diff --git a/RTCP/CNProc/src/AsyncTransposeBeams.h b/RTCP/CNProc/src/AsyncTransposeBeams.h index 8109f9a6c1626381910992ae5095d37484046585..04bbcedce2b5945d148b3161fdbbf541fdeb3216 100644 --- a/RTCP/CNProc/src/AsyncTransposeBeams.h +++ b/RTCP/CNProc/src/AsyncTransposeBeams.h @@ -41,7 +41,7 @@ class AsyncTransposeBeams unsigned waitForAnyReceive(); // Asynchronously send a subband. - template <typename T, unsigned DIM> void asyncSend(unsigned outputPsetIndex, unsigned coreIndex, unsigned subband, unsigned beam, const SampleData<T,DIM> *inputData); + template <typename T, unsigned DIM> void asyncSend(unsigned outputPsetIndex, unsigned coreIndex, unsigned subband, unsigned beam, unsigned subbeam, const SampleData<T,DIM> *inputData); // Make sure all async sends have finished. void waitForAllSends(); diff --git a/RTCP/CNProc/src/BeamFormer.cc b/RTCP/CNProc/src/BeamFormer.cc index 3258e43492f6946ccfe614cfc8ecaa100e8c43b3..6be74f0f9df54366fb1a382193257283c9f1711b 100644 --- a/RTCP/CNProc/src/BeamFormer.cc +++ b/RTCP/CNProc/src/BeamFormer.cc @@ -535,6 +535,56 @@ void BeamFormer::formBeams( const SubbandMetaData *metaData, SampleData<> *sampl beamFormTimer.stop(); } +void BeamFormer::preTransposeBeams( const BeamFormedData *in, PreTransposeBeamFormedData *out ) +{ + ASSERT( in->samples.shape()[0] == itsNrPencilBeams ); + ASSERT( in->samples.shape()[1] == itsNrChannels ); + ASSERT( in->samples.shape()[2] >= itsNrSamplesPerIntegration ); + ASSERT( in->samples.shape()[3] == NR_POLARIZATIONS ); + + ASSERT( out->samples.shape()[0] == itsNrPencilBeams ); + ASSERT( out->samples.shape()[1] == NR_POLARIZATIONS ); + ASSERT( out->samples.shape()[2] >= itsNrSamplesPerIntegration ); + ASSERT( out->samples.shape()[3] == itsNrChannels ); + + for (unsigned b = 0; b < itsNrPencilBeams; b++) { + out->flags[b] = in->flags[b]; + } + + for (unsigned b = 0; b < itsNrPencilBeams; b++) { + for (unsigned c = 0; c < itsNrChannels; c++) { + for (unsigned s = 0; s < itsNrSamplesPerIntegration; s++) { + for (unsigned p = 0; p < NR_POLARIZATIONS; p++) { + out->samples[b][p][s][c] = in->samples[b][c][s][p]; + } + } + } + } +} + +void BeamFormer::postTransposeBeams( const TransposedBeamFormedData *in, FinalBeamFormedData *out, unsigned nrSubbands ) +{ + ASSERT( in->samples.shape()[0] == nrSubbands ); + ASSERT( in->samples.shape()[1] >= itsNrSamplesPerIntegration ); + ASSERT( in->samples.shape()[2] == itsNrChannels ); + + ASSERT( out->samples.shape()[0] >= itsNrSamplesPerIntegration ); + ASSERT( out->samples.shape()[1] == nrSubbands ); + ASSERT( out->samples.shape()[2] == itsNrChannels ); + + for (unsigned s = 0; s < nrSubbands; s++) { + out->flags[s] = in->flags[s]; + } + + for (unsigned s = 0; s < nrSubbands; s++) { + for (unsigned t = 0; t < itsNrSamplesPerIntegration; t++) { + for (unsigned c = 0; c < itsNrChannels; c++) { + out->samples[t][s][c] = in->samples[s][t][c]; + } + } + } +} + } // namespace RTCP } // namespace LOFAR diff --git a/RTCP/CNProc/src/BeamFormer.h b/RTCP/CNProc/src/BeamFormer.h index 6b8327f1522b7d7ec8770700060b50563301bbe0..1f2101aa73f86dcc80772c0507c1aa442493be64 100644 --- a/RTCP/CNProc/src/BeamFormer.h +++ b/RTCP/CNProc/src/BeamFormer.h @@ -55,6 +55,12 @@ class BeamFormer // fills beamFormedData with pencil beams void formBeams( const SubbandMetaData *metaData, SampleData<> *sampleData, BeamFormedData *beamFormedData, double centerFrequency ); + // rearrange dimensions in preparation for transpose + void preTransposeBeams( const BeamFormedData *in, PreTransposeBeamFormedData *out ); + + // rearrange dimensions into final order after transpose + void postTransposeBeams( const TransposedBeamFormedData *in, FinalBeamFormedData *out, unsigned nrSubbands ); + // return the station mapping std::vector<unsigned> &getStationMapping(); diff --git a/RTCP/CNProc/src/CN_Processing.cc b/RTCP/CNProc/src/CN_Processing.cc index 21d219f7b48f98e674e5e2f9c263fbb38f8777e9..f0de64076d9bf68efc74f7311e28c9309a023d4a 100644 --- a/RTCP/CNProc/src/CN_Processing.cc +++ b/RTCP/CNProc/src/CN_Processing.cc @@ -172,6 +172,16 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C itsNrBeams = itsFlysEye ? itsNrBeamFormedStations : itsNrPencilBeams; + unsigned multiplier = 0; + + if (configuration.outputBeamFormedData()) { + multiplier = NR_POLARIZATIONS; + } else if (configuration.outputCoherentStokes()) { + multiplier = configuration.nrStokes(); + } + + itsNrSubbeams = multiplier; + unsigned nrChannels = configuration.nrChannelsPerSubband(); unsigned nrSamplesPerIntegration = configuration.nrSamplesPerIntegration(); unsigned nrSamplesPerStokesIntegration = configuration.nrSamplesPerStokesIntegration(); @@ -240,12 +250,15 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C itsPPF = new PPF<SAMPLE_TYPE>(itsNrStations, nrChannels, nrSamplesPerIntegration, configuration.sampleRate() / nrChannels, configuration.delayCompensation(), configuration.correctBandPass(), itsLocationInfo.rank() == 0); - itsCoherentStokes = new Stokes(itsNrStokes, nrChannels, nrSamplesPerIntegration, nrSamplesPerStokesIntegration); itsIncoherentStokes = new Stokes(itsNrStokes, nrChannels, nrSamplesPerIntegration, nrSamplesPerStokesIntegration); itsCorrelator = new Correlator(itsBeamFormer->getStationMapping(), nrChannels, nrSamplesPerIntegration); } + if (itsHasPhaseTwo || itsHasPhaseThree) { + itsCoherentStokes = new Stokes(itsNrStokes, nrChannels, nrSamplesPerIntegration, nrSamplesPerStokesIntegration); + } + // we don't support >1 beam/core (which would require bigger memory structures) assert( itsNrBeamsPerPset <= itsUsedCoresPerPset ); @@ -344,7 +357,7 @@ template <typename SAMPLE_TYPE> bool CN_Processing<SAMPLE_TYPE>::transposeBeams( unsigned firstBeamOfPset = itsNrBeamsPerPset * myPset; unsigned myBeam = firstBeamOfPset + relativeCoreIndex; - if (myBeam < itsNrBeams && relativeCoreIndex < itsNrBeamsPerPset) { + if (myBeam < itsNrBeams * itsNrSubbeams && relativeCoreIndex < itsNrBeamsPerPset) { beamToProcess = true; NSTimer postAsyncReceives("post async beam receives", LOG_CONDITION, true); @@ -373,15 +386,18 @@ template <typename SAMPLE_TYPE> bool CN_Processing<SAMPLE_TYPE>::transposeBeams( asyncSendTimer.start(); for (unsigned i = 0; i < itsNrBeams; i ++) { - // calculate which (pset,core) produces beam i - unsigned pset = i / itsNrBeamsPerPset; - unsigned core = (firstCore + i % itsNrBeamsPerPset) % itsUsedCoresPerPset; - - //LOG_DEBUG_STR(itsLogPrefix << "transpose: send subband " << itsCurrentSubband << " of beam " << i << " to pset " << pset << " core " << core); - if (itsPlan->calculate( itsPlan->itsCoherentStokesData )) { - itsAsyncTransposeBeams->asyncSend(pset, core, itsCurrentSubband, i, itsPlan->itsCoherentStokesData); // Asynchronously send one beam to another pset. - } else { - itsAsyncTransposeBeams->asyncSend(pset, core, itsCurrentSubband, i, itsPlan->itsBeamFormedData); // Asynchronously send one beam to another pset. + for (unsigned j = 0; j < itsNrSubbeams; j++) { + // calculate which (pset,core) produces beam i + unsigned beam = i * itsNrSubbeams + j; + unsigned pset = beam / itsNrBeamsPerPset; + unsigned core = (firstCore + beam % itsNrBeamsPerPset) % itsUsedCoresPerPset; + + //LOG_DEBUG_STR(itsLogPrefix << "transpose: send subband " << itsCurrentSubband << " of beam " << i << " to pset " << pset << " core " << core); + if (itsPlan->calculate( itsPlan->itsCoherentStokesData )) { + itsAsyncTransposeBeams->asyncSend(pset, core, itsCurrentSubband, i, j, itsPlan->itsCoherentStokesData); // Asynchronously send one beam to another pset. + } else { + itsAsyncTransposeBeams->asyncSend(pset, core, itsCurrentSubband, i, j, itsPlan->itsPreTransposeBeamFormedData); // Asynchronously send one beam to another pset. + } } } @@ -444,6 +460,42 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::formBeams() computeTimer.stop(); } +template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preTransposeBeams() +{ +#if defined HAVE_MPI + if (LOG_CONDITION) + LOG_DEBUG_STR(itsLogPrefix << "Start reordering beams before transpose at " << MPI_Wtime()); +#endif // HAVE_MPI + computeTimer.start(); + itsBeamFormer->preTransposeBeams(itsPlan->itsBeamFormedData, itsPlan->itsPreTransposeBeamFormedData); + computeTimer.stop(); +} + + +template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::postTransposeBeams() +{ +#if defined HAVE_MPI + if (LOG_CONDITION) + LOG_DEBUG_STR(itsLogPrefix << "Start reordering beams after transpose at " << MPI_Wtime()); +#endif // HAVE_MPI + computeTimer.start(); + itsBeamFormer->postTransposeBeams(itsPlan->itsTransposedBeamFormedData, itsPlan->itsFinalBeamFormedData, itsNrSubbands); + computeTimer.stop(); +} + + +template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::postTransposeStokes() +{ +#if defined HAVE_MPI + if (LOG_CONDITION) + LOG_DEBUG_STR(itsLogPrefix << "Start reordering stokes after transpose at " << MPI_Wtime()); +#endif // HAVE_MPI + computeTimer.start(); + itsCoherentStokes->postTransposeStokes(itsPlan->itsTransposedCoherentStokesData, itsPlan->itsFinalCoherentStokesData, itsNrSubbands); + computeTimer.stop(); +} + + template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::calculateIncoherentStokes() { #if defined HAVE_MPI @@ -567,6 +619,10 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::process() calculateCoherentStokes(); } + if( itsPlan->calculate( itsPlan->itsPreTransposeBeamFormedData ) ) { + preTransposeBeams(); + } + #define SEND( data ) do { \ if (itsPlan->output( data )) { \ sendOutput( outputNr++, data ); \ @@ -606,16 +662,24 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::process() for (unsigned i = 0; i < itsNrSubbands; i++) { asyncReceiveTimer.start(); const unsigned subband = itsAsyncTransposeBeams->waitForAnyReceive(); - if (LOG_CONDITION) - LOG_DEBUG_STR( itsLogPrefix << "transpose: received subband " << subband ); + //if (LOG_CONDITION) + // LOG_DEBUG_STR( itsLogPrefix << "transpose: received subband " << subband ); asyncReceiveTimer.stop(); (void)subband; } #endif - SEND( itsPlan->itsTransposedBeamFormedData ); - SEND( itsPlan->itsTransposedCoherentStokesData ); + if( itsPlan->calculate( itsPlan->itsFinalBeamFormedData ) ) { + postTransposeBeams(); + } + + if( itsPlan->calculate( itsPlan->itsFinalCoherentStokesData ) ) { + postTransposeStokes(); + } + + SEND( itsPlan->itsFinalBeamFormedData ); + SEND( itsPlan->itsFinalCoherentStokesData ); } if (itsHasPhaseTwo) { diff --git a/RTCP/CNProc/src/CN_Processing.h b/RTCP/CNProc/src/CN_Processing.h index ef9d161269f8af7c99185575a14e2b0ae33cfa70..c68f3e6dc7cb6b3cf41d621636160283a9d124ef 100644 --- a/RTCP/CNProc/src/CN_Processing.h +++ b/RTCP/CNProc/src/CN_Processing.h @@ -79,6 +79,9 @@ template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base, void filter(); void mergeStations(); void formBeams(); + void preTransposeBeams(); + void postTransposeBeams(); + void postTransposeStokes(); void calculateCoherentStokes(); void calculateIncoherentStokes(); void correlate(); @@ -104,6 +107,7 @@ template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base, unsigned itsNrSubbands; unsigned itsNrSubbandsPerPset; unsigned itsNrBeams; + unsigned itsNrSubbeams; // the number of polarizations/stokes that will be split off per beam during the transpose unsigned itsMyNrBeams; unsigned itsNrBeamsPerPset; unsigned itsComputeGroupRank; diff --git a/RTCP/CNProc/src/Stokes.cc b/RTCP/CNProc/src/Stokes.cc index ffece7bb93399b1a27c30322eacc8d0e53a35da5..232e0d197d28ef4b8cc83506d38d131fcd9698e9 100644 --- a/RTCP/CNProc/src/Stokes.cc +++ b/RTCP/CNProc/src/Stokes.cc @@ -79,7 +79,7 @@ void Stokes::calculateCoherent( const SampleData<> *sampleData, StokesData *stok addStokes( stokes, in[sb][ch][inTime+fractime][0], in[sb][ch][inTime+fractime][1], allStokes ); } - #define dest(stokes) out->samples[sb][ch][outTime][stokes] + #define dest(stokes) out->samples[sb][stokes][outTime][ch] dest(0) = stokes.I; if( allStokes ) { dest(1) = stokes.Q; @@ -152,11 +152,11 @@ void Stokes::calculateIncoherent( const SampleData<> *sampleData, StokesData *st } for( unsigned fractime = 0; fractime < integrationSteps; fractime++ ) { - addStokes( stokes, in[ch][srcStat][inTime+fractime][0], in[ch][srcStat][inTime+fractime][1], allStokes ); + addStokes( stokes, in[ch][srcStat][inTime+fractime][0], in[ch][srcStat][inTime+fractime][1], allStokes ); } } - #define dest(stokes) out->samples[0][ch][outTime][stokes] + #define dest(stokes) out->samples[0][stokes][outTime][ch] dest(0) = stokes.I / nrValidStations; if( allStokes ) { dest(1) = stokes.Q / nrValidStations; @@ -170,5 +170,29 @@ void Stokes::calculateIncoherent( const SampleData<> *sampleData, StokesData *st } +void Stokes::postTransposeStokes( const StokesData *in, FinalStokesData *out, unsigned nrSubbands ) +{ + ASSERT( in->samples.shape()[0] == nrSubbands ); + ASSERT( in->samples.shape()[1] == 1 ); + ASSERT( in->samples.shape()[2] >= itsNrSamplesPerIntegration/itsNrSamplesPerStokesIntegration ); + ASSERT( in->samples.shape()[3] == itsNrChannels ); + + ASSERT( out->samples.shape()[0] >= itsNrSamplesPerIntegration/itsNrSamplesPerStokesIntegration ); + ASSERT( out->samples.shape()[1] == nrSubbands ); + ASSERT( out->samples.shape()[2] == itsNrChannels ); + + for (unsigned s = 0; s < nrSubbands; s++) { + out->flags[s] = in->flags[s]; + } + + for (unsigned s = 0; s < nrSubbands; s++) { + for (unsigned t = 0; t < itsNrSamplesPerIntegration/itsNrSamplesPerStokesIntegration; t++) { + for (unsigned c = 0; c < itsNrChannels; c++) { + out->samples[t][s][c] = in->samples[s][0][t][c]; + } + } + } +} + } // namespace RTCP } // namespace LOFAR diff --git a/RTCP/CNProc/src/Stokes.h b/RTCP/CNProc/src/Stokes.h index a2f6c04c17e30a4f13d5faa73a93a17c4dbfe5eb..4c2458e8a9b7cd6541e50e90402bbde48fc727de 100644 --- a/RTCP/CNProc/src/Stokes.h +++ b/RTCP/CNProc/src/Stokes.h @@ -20,6 +20,8 @@ class Stokes void calculateCoherent( const SampleData<> *sampleData, StokesData *stokesData, const unsigned nrSubbands ); void calculateIncoherent( const SampleData<> *sampleData, StokesData *stokesData, const std::vector<unsigned> &stationMapping ); + void postTransposeStokes( const StokesData *in, FinalStokesData *out, unsigned nrSubbands ); + private: const unsigned itsNrChannels; const unsigned itsNrSamplesPerIntegration; diff --git a/RTCP/IONProc/src/ION_main.cc b/RTCP/IONProc/src/ION_main.cc index 400491d280cb06af468dcc729770547e8519fb93..82d277a265c882f6ff0caf6206d2549923df96c7 100644 --- a/RTCP/IONProc/src/ION_main.cc +++ b/RTCP/IONProc/src/ION_main.cc @@ -452,7 +452,7 @@ int main(int argc, char **argv) #if defined HAVE_BGP INIT_LOGGER_WITH_SYSINFO(str(format("IONProc@%02d") % myPsetNumber)); #elif defined HAVE_LOG4CPLUS - lofarLoggerInitNode(); + // do nothing #elif defined HAVE_LOG4CXX Context::initialize(); setLevel("Global", 8); diff --git a/RTCP/IONProc/src/Job.cc b/RTCP/IONProc/src/Job.cc index 3f2635449e93e0898d0610ba7db6d6b30e946e03..64b200eccac0d53e5c8b49d0c82a98b0446e60d3 100644 --- a/RTCP/IONProc/src/Job.cc +++ b/RTCP/IONProc/src/Job.cc @@ -508,7 +508,14 @@ template <typename SAMPLE_TYPE> void Job::doObservation() unsigned phase, psetIndex, maxlistsize; std::vector<unsigned> list; // list of subbands or beams - unsigned nrbeams = itsParset.flysEye() ? itsParset.nrMergedStations() : itsParset.nrPencilBeams(); + unsigned nrsubbeams = 0; + + if (itsParset.outputBeamFormedData()) + nrsubbeams = NR_POLARIZATIONS; + else if (itsParset.outputCoherentStokes()) + nrsubbeams = itsParset.nrStokes(); + + unsigned nrbeams = (itsParset.flysEye() ? itsParset.nrMergedStations() : itsParset.nrPencilBeams()) * nrsubbeams; switch (plan.plan[output].distribution) { case ProcessingPlan::DIST_SUBBAND: diff --git a/RTCP/Interface/include/Interface/BeamFormedData.h b/RTCP/Interface/include/Interface/BeamFormedData.h index 6bc32e9bbd28e3c3b21baf07e40dc2a8fb638e00..339084ae1d92bfce1b8729c8496ebac62ca32f91 100644 --- a/RTCP/Interface/include/Interface/BeamFormedData.h +++ b/RTCP/Interface/include/Interface/BeamFormedData.h @@ -12,6 +12,18 @@ namespace LOFAR { namespace RTCP { +/* + * Data flow: + * + * BeamFormedData -> PreTransposeBeamFormedData -> TransposedBeamFormedData -> FinalBeamFormedData + * + * The separate steps are necessary since the data is required or produced in different orders + * by different processes. The transpose wants to split beams and polarizations and puts subbands + & in the highest dimension in exchange. The final data product however wants time to be the + * highest dimension. + * + */ + class BeamFormedData: public SampleData<fcomplex,4> { public: @@ -22,6 +34,40 @@ class BeamFormedData: public SampleData<fcomplex,4> virtual BeamFormedData *clone() const { return new BeamFormedData(*this); } }; + +class PreTransposeBeamFormedData: public SampleData<fcomplex,4> +{ + public: + typedef SampleData<fcomplex,4> SuperType; + + PreTransposeBeamFormedData(unsigned nrBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration); + + virtual PreTransposeBeamFormedData *clone() const { return new PreTransposeBeamFormedData(*this); } +}; + + +class TransposedBeamFormedData: public SampleData<fcomplex,3> +{ + public: + typedef SampleData<fcomplex,3> SuperType; + + TransposedBeamFormedData(unsigned nrSubbands, unsigned nrChannels, unsigned nrSamplesPerIntegration); + + virtual TransposedBeamFormedData *clone() const { return new TransposedBeamFormedData(*this); } +}; + + +class FinalBeamFormedData: public SampleData<fcomplex,3> +{ + public: + typedef SampleData<fcomplex,3> SuperType; + + FinalBeamFormedData(unsigned nrSubbands, unsigned nrChannels, unsigned nrSamplesPerIntegration); + + virtual FinalBeamFormedData *clone() const { return new FinalBeamFormedData(*this); } +}; + + inline BeamFormedData::BeamFormedData(unsigned nrBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration) // The "| 2" significantly improves transpose speeds for particular // numbers of stations due to cache conflict effects. The extra memory @@ -32,6 +78,26 @@ inline BeamFormedData::BeamFormedData(unsigned nrBeams, unsigned nrChannels, uns } +inline PreTransposeBeamFormedData::PreTransposeBeamFormedData(unsigned nrBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration) +: + SuperType::SampleData(false, boost::extents[nrBeams][NR_POLARIZATIONS][nrSamplesPerIntegration | 2][nrChannels], nrBeams) +{ +} + + +inline TransposedBeamFormedData::TransposedBeamFormedData(unsigned nrSubbands, unsigned nrChannels, unsigned nrSamplesPerIntegration) +: + SuperType(false,boost::extents[nrSubbands][nrChannels][nrSamplesPerIntegration | 2], nrSubbands) +{ +} + + +inline FinalBeamFormedData::FinalBeamFormedData(unsigned nrSubbands, unsigned nrChannels, unsigned nrSamplesPerIntegration) +: + SuperType(false,boost::extents[nrSamplesPerIntegration | 2][nrSubbands][nrChannels], nrSubbands) +{ +} + } // namespace RTCP } // namespace LOFAR diff --git a/RTCP/Interface/include/Interface/CMakeLists.txt b/RTCP/Interface/include/Interface/CMakeLists.txt index c7b4051fbfe843526267fc5418d67dc369928bd9..20a48c9e9032a8c424e2eeab94bbf2240e9e61d7 100644 --- a/RTCP/Interface/include/Interface/CMakeLists.txt +++ b/RTCP/Interface/include/Interface/CMakeLists.txt @@ -24,7 +24,6 @@ set(inst_HEADERS CN_ProcessingPlan.h BFRawFormat.h TransposedData.h - TransposedBeamFormedData.h InverseFilteredData.h InputData.h) diff --git a/RTCP/Interface/include/Interface/CN_ProcessingPlan.h b/RTCP/Interface/include/Interface/CN_ProcessingPlan.h index 9d9d43757f094492f8a80545bbd4e7414d5476fc..f826766329116a2192aefa4b5c892b7e1fc44b5d 100644 --- a/RTCP/Interface/include/Interface/CN_ProcessingPlan.h +++ b/RTCP/Interface/include/Interface/CN_ProcessingPlan.h @@ -28,7 +28,6 @@ #include <Interface/InputData.h> #include <Interface/TransposedData.h> -#include <Interface/TransposedBeamFormedData.h> #include <Interface/FilteredData.h> #include <Interface/BeamFormedData.h> #include <Interface/CorrelatedData.h> @@ -59,10 +58,13 @@ template <typename SAMPLE_TYPE = i8complex> class CN_ProcessingPlan: public Proc FilteredData *itsFilteredData; CorrelatedData *itsCorrelatedData; BeamFormedData *itsBeamFormedData; + PreTransposeBeamFormedData *itsPreTransposeBeamFormedData; TransposedBeamFormedData *itsTransposedBeamFormedData; + FinalBeamFormedData *itsFinalBeamFormedData; StokesData *itsCoherentStokesData; StokesData *itsIncoherentStokesData; StokesData *itsTransposedCoherentStokesData; + FinalStokesData *itsFinalCoherentStokesData; }; } diff --git a/RTCP/Interface/include/Interface/Parset.h b/RTCP/Interface/include/Interface/Parset.h index dd03bde037d5f99be464e1310170e6571f3bfb2d..d43548a230fd165c5043884f4d4a2d1d32c423e6 100644 --- a/RTCP/Interface/include/Interface/Parset.h +++ b/RTCP/Interface/include/Interface/Parset.h @@ -118,8 +118,6 @@ public: int phaseOnePsetIndex(uint32 pset) const; int phaseTwoPsetIndex(uint32 pset) const; int phaseThreePsetIndex(uint32 pset) const; - string getMSname(unsigned sb) const; - string getMSBaseDir() const; string getTransportType(const string& prefix) const; bool outputFilteredData() const; diff --git a/RTCP/Interface/include/Interface/ProcessingPlan.h b/RTCP/Interface/include/Interface/ProcessingPlan.h index a6c720c62ce3bec89f21c974b454a3b39c5b7e1d..aaa66160af46f25fe9adad5cb6a3c86280906ec7 100644 --- a/RTCP/Interface/include/Interface/ProcessingPlan.h +++ b/RTCP/Interface/include/Interface/ProcessingPlan.h @@ -68,10 +68,12 @@ class ProcessingPlan bool calculate; int arena; // -1: not allocated, >= 0: allocated - const char *name; // name of planlet or data set, for logging purposes - const char *filenameSuffix; // for outputs: extension to use for this output + const char *name; // name of planlet or data set, for logging purposes + const char *filename; // for outputs: filename to use for this output distribution_t distribution; + unsigned nrSubbeams; // number of subbeams per beam + bool isOutput() const { return output; } // for filtering }; @@ -83,8 +85,8 @@ class ProcessingPlan // require source for something else void require( StreamableData *source ); - // send set (i.e. as output) to be stored in a file or directory with a certain extension - void send( StreamableData *set, const char *extension, distribution_t distribution ); + // send set (i.e. as output) to be stored in a file or directory with a certain filename + void send( StreamableData *set, const char *filename, distribution_t distribution, unsigned nrSubbeams = 1 ); // ----- Construct the plan: assign an arena to all // products that have to be calculated. @@ -165,14 +167,15 @@ inline void ProcessingPlan::require( StreamableData *source ) { } } -inline void ProcessingPlan::send( StreamableData *set, const char *extension, ProcessingPlan::distribution_t distribution ) { +inline void ProcessingPlan::send( StreamableData *set, const char *filename, ProcessingPlan::distribution_t distribution, unsigned nrSubbeams ) { require( set ); // fake planlet to indicate we need this set // the entry we just created is an output -- configure it as such plan.back().output = true; plan.back().name = find( set )->name; - plan.back().filenameSuffix = extension; + plan.back().filename = filename; plan.back().distribution = distribution; + plan.back().nrSubbeams = nrSubbeams; } inline void ProcessingPlan::assignArenas( bool assignAll ) { diff --git a/RTCP/Interface/include/Interface/StokesData.h b/RTCP/Interface/include/Interface/StokesData.h index 76b13dc88dbce8012680fc1b6990187ea3346af1..95ad90b2f1ff7136d4498607a3fea88e5e14a61b 100644 --- a/RTCP/Interface/include/Interface/StokesData.h +++ b/RTCP/Interface/include/Interface/StokesData.h @@ -23,15 +23,35 @@ class StokesData: public SampleData<float,4> virtual StokesData *clone() const { return new StokesData(*this); } }; + +class FinalStokesData: public SampleData<float,3> +{ + public: + typedef SampleData<float,3> SuperType; + + FinalStokesData(bool coherent, unsigned nrSubbands, unsigned nrChannels, unsigned nrSamplesPerIntegration, unsigned nrSamplesPerStokesIntegration); + + virtual FinalStokesData *clone() const { return new FinalStokesData(*this); } +}; + + inline StokesData::StokesData(bool coherent, unsigned nrStokes, unsigned nrBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration, unsigned nrSamplesPerStokesIntegration) : // The "| 2" significantly improves transpose speeds for particular // numbers of stations due to cache conflict effects. The extra memory // is not used. - SuperType::SampleData(false, boost::extents[coherent ? nrBeams : 1][nrChannels][(nrSamplesPerIntegration/nrSamplesPerStokesIntegration) | 2][nrStokes], coherent ? nrBeams : 1) + SuperType::SampleData(false, boost::extents[coherent ? nrBeams : 1][nrStokes][(nrSamplesPerIntegration/nrSamplesPerStokesIntegration) | 2][nrChannels], coherent ? nrBeams : 1) { } + +inline FinalStokesData::FinalStokesData(bool coherent, unsigned nrSubbands, unsigned nrChannels, unsigned nrSamplesPerIntegration, unsigned nrSamplesPerStokesIntegration) +: + SuperType::SampleData(false, boost::extents[(nrSamplesPerIntegration/nrSamplesPerStokesIntegration) | 2][coherent ? nrSubbands : 1][nrChannels], coherent ? nrSubbands : 1) +{ +} + + } // namespace RTCP } // namespace LOFAR diff --git a/RTCP/Interface/include/Interface/TransposedBeamFormedData.h b/RTCP/Interface/include/Interface/TransposedBeamFormedData.h deleted file mode 100644 index 14c62b64c4a1d5bb7906d72ddf8d3661b0e5e92c..0000000000000000000000000000000000000000 --- a/RTCP/Interface/include/Interface/TransposedBeamFormedData.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef LOFAR_CNPROC_TRANSPOSED_BEAMFORMED_DATA_H -#define LOFAR_CNPROC_TRANSPOSED_BEAMFORMED_DATA_H - -#include <Common/lofar_complex.h> -#include <Interface/StreamableData.h> - -#include <vector> - - -namespace LOFAR { -namespace RTCP { - -class TransposedBeamFormedData: public SampleData<fcomplex,4> -{ - public: - typedef SampleData<fcomplex,4> SuperType; - - TransposedBeamFormedData(unsigned nrSubbands, unsigned nrChannels, unsigned nrSamplesPerIntegration); - - virtual TransposedBeamFormedData *clone() const { return new TransposedBeamFormedData(*this); } -}; - - -inline TransposedBeamFormedData::TransposedBeamFormedData(unsigned nrSubbands, unsigned nrChannels, unsigned nrSamplesPerIntegration) -: - SuperType(false,boost::extents[nrSubbands][nrChannels][nrSamplesPerIntegration | 2][NR_POLARIZATIONS], nrSubbands) -{ -} - -} // namespace RTCP -} // namespace LOFAR - -#endif diff --git a/RTCP/Interface/include/Interface/TransposedStokesData.h b/RTCP/Interface/include/Interface/TransposedStokesData.h deleted file mode 100644 index ceefc7149f2213e859016bfbde375f3fd9756a66..0000000000000000000000000000000000000000 --- a/RTCP/Interface/include/Interface/TransposedStokesData.h +++ /dev/null @@ -1,38 +0,0 @@ -#ifndef LOFAR_INTERFACE_TRANSPOSED_STOKES_DATA_H -#define LOFAR_INTERFACE_TRANSPOSED_STOKES_DATA_H - -#include <Common/lofar_complex.h> -#include <Stream/Stream.h> -#include <Interface/Align.h> -#include <Interface/Config.h> -#include <Interface/MultiDimArray.h> -#include <Interface/SparseSet.h> -#include <Interface/StreamableData.h> -#include <Interface/SubbandMetaData.h> - -namespace LOFAR { -namespace RTCP { - -class TransposedStokesData: public SampleData<float,4> -{ - public: - typedef SampleData<float,4> SuperType; - - TransposedStokesData(bool coherent, unsigned nrStokes, unsigned nrSubbands, unsigned nrChannels, unsigned nrSamplesPerIntegration, unsigned nrSamplesPerStokesIntegration); - - virtual TransposedStokesData *clone() const { return new TransposedStokesData(*this); } -}; - -inline TransposedStokesData::TransposedStokesData(bool coherent, unsigned nrStokes, unsigned nrSubbands, unsigned nrChannels, unsigned nrSamplesPerIntegration, unsigned nrSamplesPerStokesIntegration) -: - // The "| 2" significantly improves transpose speeds for particular - // numbers of stations due to cache conflict effects. The extra memory - // is not used. - SuperType::SampleData(false, boost::extents[coherent ? nrSubbands : 1][nrChannels][(nrSamplesPerIntegration/nrSamplesPerStokesIntegration) | 2][nrStokes], coherent ? nrSubbands : 1) -{ -} - -} // namespace RTCP -} // namespace LOFAR - -#endif diff --git a/RTCP/Interface/src/CN_ProcessingPlan.cc b/RTCP/Interface/src/CN_ProcessingPlan.cc index 90cea8a169b07b6abbbb5374efef0b78e396d033..0ee91c3a2a5670eabda8b53e136241e9fd3f03d2 100644 --- a/RTCP/Interface/src/CN_ProcessingPlan.cc +++ b/RTCP/Interface/src/CN_ProcessingPlan.cc @@ -38,10 +38,13 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla itsFilteredData(0), itsCorrelatedData(0), itsBeamFormedData(0), + itsPreTransposeBeamFormedData(0), itsTransposedBeamFormedData(0), + itsFinalBeamFormedData(0), itsCoherentStokesData(0), itsIncoherentStokesData(0), - itsTransposedCoherentStokesData(0) + itsTransposedCoherentStokesData(0), + itsFinalCoherentStokesData(0) { // in fly's eye mode, every station is a beam const unsigned nrBeams = configuration.flysEye() ? configuration.nrMergedStations() : configuration.nrPencilBeams(); @@ -118,16 +121,17 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla TRANSFORM( itsFilteredData, itsIncoherentStokesData ); TRANSFORM( itsBeamFormedData, itsCoherentStokesData ); + TRANSFORM( itsBeamFormedData, itsPreTransposeBeamFormedData ); // send all requested outputs if( configuration.outputFilteredData() ) { - send( itsFilteredData, ".filtered", ProcessingPlan::DIST_SUBBAND ); + send( itsFilteredData, "SB${SUBBAND}.filtered", ProcessingPlan::DIST_SUBBAND ); } if( configuration.outputCorrelatedData() ) { - send( itsCorrelatedData, "", ProcessingPlan::DIST_SUBBAND ); + send( itsCorrelatedData, "SB${SUBBAND}.MS", ProcessingPlan::DIST_SUBBAND ); } if( configuration.outputIncoherentStokes() ) { - send( itsIncoherentStokesData, ".incoherentstokes", ProcessingPlan::DIST_SUBBAND ); + send( itsIncoherentStokesData, "SB${SUBBAND}.incoherentstokes", ProcessingPlan::DIST_SUBBAND, 1 ); } // whether there will be a second transpose @@ -135,7 +139,7 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla if (phaseThreeExists) { if ( configuration.outputBeamFormedData() ) { - require( itsBeamFormedData ); + require( itsPreTransposeBeamFormedData ); } else { require( itsCoherentStokesData ); } @@ -154,9 +158,23 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla configuration.nrSamplesPerIntegration() ); + itsFinalBeamFormedData = new FinalBeamFormedData( + configuration.nrSubbands(), + configuration.nrChannelsPerSubband(), + configuration.nrSamplesPerIntegration() + ); + itsTransposedCoherentStokesData = new StokesData( true, - configuration.nrStokes(), + 1, + configuration.nrSubbands(), + configuration.nrChannelsPerSubband(), + configuration.nrSamplesPerIntegration(), + configuration.nrSamplesPerStokesIntegration() + ); + + itsFinalCoherentStokesData = new FinalStokesData( + true, configuration.nrSubbands(), configuration.nrChannelsPerSubband(), configuration.nrSamplesPerIntegration(), @@ -164,13 +182,16 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla ); TRANSFORM( 0, itsTransposedBeamFormedData ); + TRANSFORM( itsTransposedBeamFormedData, itsFinalBeamFormedData ); + TRANSFORM( 0, itsTransposedCoherentStokesData ); + TRANSFORM( itsTransposedCoherentStokesData, itsFinalCoherentStokesData ); if( configuration.outputBeamFormedData() ) { - send( itsTransposedBeamFormedData, ".beams", ProcessingPlan::DIST_BEAM ); + send( itsFinalBeamFormedData, "BEAM${PBEAM}-${POL}.voltages", ProcessingPlan::DIST_BEAM, NR_POLARIZATIONS ); } if( configuration.outputCoherentStokes() ) { - send( itsTransposedCoherentStokesData, ".stokes", ProcessingPlan::DIST_BEAM ); + send( itsFinalCoherentStokesData, "BEAM${PBEAM}-${STOKES}.stokes", ProcessingPlan::DIST_BEAM, configuration.nrStokes() ); } } } @@ -183,9 +204,12 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::~CN_ProcessingPl delete itsTransposedInputData; delete itsFilteredData; delete itsTransposedBeamFormedData; + delete itsFinalBeamFormedData; delete itsTransposedCoherentStokesData; + delete itsFinalCoherentStokesData; delete itsCorrelatedData; delete itsBeamFormedData; + delete itsPreTransposeBeamFormedData; delete itsCoherentStokesData; delete itsIncoherentStokesData; } diff --git a/RTCP/Interface/src/Parset.cc b/RTCP/Interface/src/Parset.cc index f29149a574418dc2487b4554e06fa6710c636e5f..56a57325cfc3cc5b660e3be6136bb98cd649a1b6 100644 --- a/RTCP/Interface/src/Parset.cc +++ b/RTCP/Interface/src/Parset.cc @@ -36,9 +36,7 @@ #include <Stream/NullStream.h> #include <Stream/SocketStream.h> -#include <boost/algorithm/string.hpp> #include <boost/algorithm/string/classification.hpp> -#include <boost/algorithm/string/split.hpp> #include <boost/format.hpp> #include <boost/lexical_cast.hpp> @@ -348,48 +346,6 @@ vector<double> Parset::getPhaseCentresOf(const string& name) const return list; } -string Parset::getMSname(unsigned sb) const -{ - using namespace boost; - - string name = getString("Observation.MSNameMask"); - string startTime = getString("Observation.startTime"); - vector<string> splitStartTime; - split(splitStartTime, startTime, is_any_of("- :")); - - replace_all(name, "${YEAR}", splitStartTime[0]); - replace_all(name, "${MONTH}", splitStartTime[1]); - replace_all(name, "${DAY}", splitStartTime[2]); - replace_all(name, "${HOURS}", splitStartTime[3]); - replace_all(name, "${MINUTES}", splitStartTime[4]); - replace_all(name, "${SECONDS}", splitStartTime[5]); - - replace_all(name, "${MSNUMBER}", str(format("%05u") % getUint32("Observation.ObsID"))); - replace_all(name, "${SUBBAND}", str(format("%03u") % sb)); - replace_all(name, "${BEAM}", str(format("%u") % subbandToBeamMapping()[sb])); - - if (isDefined("OLAP.storageRaidList")) - replace_all(name, "${RAID}", str(format("%s") % getStringVector("OLAP.storageRaidList", true)[sb])); - - return name; -} - -string Parset::getMSBaseDir() const -{ - using namespace boost; - - string name = this->getMSname(0); - string basedir; - vector<string> splitName; - - split(splitName, name, is_any_of("/")); - - for (unsigned i = 0; i < splitName.size()-1 ; i++) { - basedir += splitName[i] + '/'; - } - return basedir; -} - vector<double> Parset::getManualPencilBeam(const unsigned pencil) const { diff --git a/RTCP/Run/src/LOFAR/Parset.py b/RTCP/Run/src/LOFAR/Parset.py index 414067393301c314936e532bc4d8d4b1908aad39..c68442d8ea59b470e30207b2a87185f7b2d1023f 100644 --- a/RTCP/Run/src/LOFAR/Parset.py +++ b/RTCP/Run/src/LOFAR/Parset.py @@ -457,10 +457,17 @@ class Parset(util.Parset.Parset): return max(tabList) + 1 def getNrBeamOutputFiles( self ): - if self["OLAP.PencilInfo.flysEye"]: - return self.getNrMergedStations() + if self.getBool("OLAP.outputBeamFormedData"): + subbeams = 2 + elif self.getBool("OLAP.outputCoherentStokes"): + subbeams = len(self["OLAP.Stokes.which"]) + else: + subbeams = 0 - return self.getNrPencilBeams() + if self.getBool("OLAP.PencilInfo.flysEye"): + return self.getNrMergedStations() * subbeams + + return self.getNrPencilBeams() * subbeams def phaseThreeExists( self ): # NO support for mixing with Observation.mode and Observation.outputIncoherentStokesI diff --git a/RTCP/Storage/include/Storage/Format.h b/RTCP/Storage/include/Storage/Format.h index 26a6a8f67069ad0be2d3dea791b2220589b99b91..30db7257c76b5bd1cf26baf8d7368b37e9d3b059 100644 --- a/RTCP/Storage/include/Storage/Format.h +++ b/RTCP/Storage/include/Storage/Format.h @@ -9,6 +9,8 @@ #ifndef LOFAR_STORAGE_FORMAT_H #define LOFAR_STORAGE_FORMAT_H +#include <string> + namespace LOFAR { namespace RTCP { @@ -17,7 +19,7 @@ class Format public: virtual ~Format(); - virtual void addSubband(unsigned subband, bool isBigEndian) = 0; + virtual void addSubband(const std::string MSname, unsigned subband, bool isBigEndian) = 0; }; diff --git a/RTCP/Storage/include/Storage/MeasurementSetFormat.h b/RTCP/Storage/include/Storage/MeasurementSetFormat.h index 35e70ff5fe0404dd31dc704898c991d464ae8af6..46c114a46e8b6e370c67881809204e94337a5a44 100644 --- a/RTCP/Storage/include/Storage/MeasurementSetFormat.h +++ b/RTCP/Storage/include/Storage/MeasurementSetFormat.h @@ -18,6 +18,8 @@ #include <Storage/Format.h> +#include <string> + //# Forward Declarations namespace casa { @@ -36,7 +38,7 @@ class MeasurementSetFormat : public Format MeasurementSetFormat(const Parset *, uint32 alignment = 1); virtual ~MeasurementSetFormat(); - virtual void addSubband(unsigned subband, bool isBigEndian); + virtual void addSubband(const string MSname, unsigned subband, bool isBigEndian); private: const Parset *itsPS; @@ -56,8 +58,8 @@ class MeasurementSetFormat : public Format static Mutex sharedMutex; - void createMSTables(unsigned subband); - void createMSMetaFile(unsigned subband, bool isBigEndian); + void createMSTables(const string &MSname, unsigned subband); + void createMSMetaFile(const string &MSname, unsigned subband, bool isBigEndian); void fillFeed(); void fillAntenna(const casa::Block<casa::MPosition>& antMPos); diff --git a/RTCP/Storage/include/Storage/OutputThread.h b/RTCP/Storage/include/Storage/OutputThread.h index f3d3b7f7e868df6f846cb6a21bf59df6188a86c3..4bd6e5afc5ee026b70f02762c4a08cef53bf9254 100644 --- a/RTCP/Storage/include/Storage/OutputThread.h +++ b/RTCP/Storage/include/Storage/OutputThread.h @@ -47,13 +47,15 @@ namespace RTCP { class OutputThread { public: - OutputThread(const Parset &, unsigned subbandNumber, unsigned outputNumber, const ProcessingPlan::planlet &outputConfig, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue); + OutputThread(const Parset &, unsigned subbandNumber, unsigned outputNumber, const ProcessingPlan::planlet &outputConfig, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue, bool isBigEndian); ~OutputThread(); // report any writes that take longer than this (seconds) static const float reportWriteDelay = 0.05; private: + string getMSname() const; + string getBeamName() const; void writeLogMessage(unsigned sequenceNumber); void flushSequenceNumbers(); void checkForDroppedData(StreamableData *data); @@ -61,6 +63,9 @@ class OutputThread const std::string itsLogPrefix; + const Parset &itsParset; + const ProcessingPlan::planlet &itsOutputConfig; + Thread *itsThread; const unsigned itsSubbandNumber; diff --git a/RTCP/Storage/src/MeasurementSetFormat.cc b/RTCP/Storage/src/MeasurementSetFormat.cc index 3c25f6ffa3ecb96e73a3216626a137cbc89bc203..c3def6428282a827f28f458f5d206b50dfd03cc8 100644 --- a/RTCP/Storage/src/MeasurementSetFormat.cc +++ b/RTCP/Storage/src/MeasurementSetFormat.cc @@ -101,28 +101,25 @@ MeasurementSetFormat::MeasurementSetFormat(const Parset *ps, unsigned alignment) MeasurementSetFormat::~MeasurementSetFormat() {} -void MeasurementSetFormat::addSubband(unsigned subband, bool isBigEndian) +void MeasurementSetFormat::addSubband(const string MSname, unsigned subband, bool isBigEndian) { ScopedLock scopedLock(sharedMutex); /// First create a valid MeasurementSet with all required /// tables. Note that the MS is destroyed immidiately. - createMSTables(subband); + createMSTables(MSname, subband); /// Next make a metafile which describes the raw datafile we're /// going to write - createMSMetaFile(subband, isBigEndian); + createMSMetaFile(MSname, subband, isBigEndian); } -void MeasurementSetFormat::createMSTables(unsigned subband) +void MeasurementSetFormat::createMSTables(const string &MSname, unsigned subband) { try { - TableDesc td = MS::requiredTableDesc(); MS::addColumnToDesc(td, MS::DATA, 2); MS::addColumnToDesc(td, MS::WEIGHT_SPECTRUM, 2); - string MSname = itsPS->getMSname(subband); - SetupNewTable newtab(MSname, td, Table::New); LofarStMan lofarstman; newtab.bindAll(lofarstman); @@ -390,7 +387,7 @@ void MeasurementSetFormat::fillHistory() { cli.put (rownr, clivec); } -void MeasurementSetFormat::createMSMetaFile(unsigned subband, bool isBigEndian) +void MeasurementSetFormat::createMSMetaFile(const string &MSname, unsigned subband, bool isBigEndian) { Block<Int> ant1(itsPS->nrBaselines()); Block<Int> ant2(itsPS->nrBaselines()); @@ -408,7 +405,7 @@ void MeasurementSetFormat::createMSMetaFile(unsigned subband, bool isBigEndian) } } - string filename = itsPS->getMSname(subband) + "/table.f0meta"; + string filename = MSname + "/table.f0meta"; AipsIO aio(filename, ByteIO::New); aio.putstart ("LofarStMan", itsPS->getLofarStManVersion()); diff --git a/RTCP/Storage/src/OutputThread.cc b/RTCP/Storage/src/OutputThread.cc index a02d439d43c329114b785a819c5d2393b07b90ee..6f28faa06b3d6118d3501dce9b8ff2cdb2876568 100644 --- a/RTCP/Storage/src/OutputThread.cc +++ b/RTCP/Storage/src/OutputThread.cc @@ -27,13 +27,19 @@ #include <Storage/OutputThread.h> #include <Storage/MSWriterFile.h> #include <Storage/MSWriterNull.h> +#include <Storage/MeasurementSetFormat.h> #include <Interface/StreamableData.h> #include <Thread/Semaphore.h> #include <Common/DataConvert.h> #include <stdio.h> + #include <boost/format.hpp> +#include <boost/algorithm/string/split.hpp> +#include <boost/algorithm/string.hpp> #include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> #include <errno.h> #include <fcntl.h> @@ -42,9 +48,51 @@ using boost::format; namespace LOFAR { namespace RTCP { -OutputThread::OutputThread(const Parset &parset, unsigned subbandNumber, unsigned outputNumber, const ProcessingPlan::planlet &outputConfig, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue) +static string dirName( const string filename ) +{ + using namespace boost; + + string basedir; + vector<string> splitName; + + split(splitName, filename, is_any_of("/")); + + for (unsigned i = 0; i < splitName.size()-1 ; i++) { + basedir += splitName[i] + '/'; + } + return basedir; +} + +static void makeDir( const char *dirname, const string &logPrefix ) +{ + struct stat s; + + if (stat( dirname, &s ) == 0) { + // path already exists + if (s.st_mode & S_IFMT != S_IFDIR) { + LOG_WARN_STR(logPrefix << "Not a directory: " << dirname ); + } + } else if (errno == ENOENT) { + // create directory + LOG_INFO_STR(logPrefix << "Creating directory " << dirname ); + + if (mkdir(dirname, 0777) != 0 && errno != EEXIST) { + unsigned savedErrno = errno; // first argument below clears errno + throw SystemCallException(string("mkdir ") + dirname, savedErrno, THROW_ARGS); + } + } else { + // something else went wrong + unsigned savedErrno = errno; // first argument below clears errno + throw SystemCallException(string("stat ") + dirname, savedErrno, THROW_ARGS); + } +} + + +OutputThread::OutputThread(const Parset &parset, unsigned subbandNumber, unsigned outputNumber, const ProcessingPlan::planlet &outputConfig, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue, bool isBigEndian) : itsLogPrefix(str(format("[obs %u output %u subband %3u] ") % parset.observationID() % outputNumber % subbandNumber)), + itsParset(parset), + itsOutputConfig(outputConfig), itsSubbandNumber(subbandNumber), itsOutputNumber(outputNumber), itsObservationID(parset.observationID()), @@ -54,17 +102,15 @@ OutputThread::OutputThread(const Parset &parset, unsigned subbandNumber, unsigne itsSequenceNumbersFile(0), itsHaveCaughtException(false) { - std::string filename, seqfilename; + std::string filename; if (dynamic_cast<CorrelatedData *>(outputConfig.source)) { - std::stringstream out; - out << parset.getMSname(subbandNumber) << "/table.f" << outputNumber << "data"; - filename = out.str(); + makeDir( dirName(getMSname()).c_str(), itsLogPrefix ); + + filename = str(format("%s/table.f0data") % getMSname()); if (parset.getLofarStManVersion() == 2) { - std::stringstream seq; - seq << parset.getMSname(subbandNumber) <<"/table.f" << outputNumber << "seqnr"; - seqfilename = seq.str(); + string seqfilename = str(format("%s%s/table.f0seqnr") % getMSname() % outputConfig.filename); try { itsSequenceNumbersFile = new FileStream(seqfilename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); @@ -74,17 +120,27 @@ OutputThread::OutputThread(const Parset &parset, unsigned subbandNumber, unsigne } } +#if defined HAVE_AIPSPP + MeasurementSetFormat myFormat(&parset, 512); + + /// Make MeasurementSet filestructures and required tables + myFormat.addSubband(getMSname(), subbandNumber, isBigEndian); + + LOG_INFO_STR(itsLogPrefix << "MeasurementSet created"); +#endif // defined HAVE_AIPSPP + } else { - // raw writer - std::stringstream out; - out << parset.getMSname(subbandNumber) << outputConfig.filenameSuffix; - filename = out.str(); + // raw writer -- per beam + filename = getBeamName(); + + makeDir( dirName(filename).c_str(), itsLogPrefix ); } LOG_DEBUG_STR(itsLogPrefix << "Writing to " << filename); try { itsWriter = new MSWriterFile(filename.c_str()); + } catch (SystemCallException &ex) { LOG_ERROR_STR(itsLogPrefix << "Cannot open " << filename << ": " << ex); itsWriter = new MSWriterNull(); @@ -108,6 +164,71 @@ OutputThread::~OutputThread() } +string OutputThread::getMSname() const +{ + using namespace boost; + + string name = itsParset.getString("Observation.MSNameMask"); + string startTime = itsParset.getString("Observation.startTime"); + + vector<string> splitStartTime; + split(splitStartTime, startTime, is_any_of("- :")); + + replace_all(name, "${YEAR}", splitStartTime[0]); + replace_all(name, "${MONTH}", splitStartTime[1]); + replace_all(name, "${DAY}", splitStartTime[2]); + replace_all(name, "${HOURS}", splitStartTime[3]); + replace_all(name, "${MINUTES}", splitStartTime[4]); + replace_all(name, "${SECONDS}", splitStartTime[5]); + + replace_all(name, "${MSNUMBER}", str(format("%05u") % itsParset.observationID())); + replace_all(name, "${SUBBAND}", str(format("%03u") % itsSubbandNumber)); + replace_all(name, "${BEAM}", str(format("%u") % itsParset.subbandToBeamMapping()[itsSubbandNumber])); + + if (itsParset.isDefined("OLAP.storageRaidList")) + replace_all(name, "${RAID}", str(format("%s") % itsParset.getStringVector("OLAP.storageRaidList", true)[itsSubbandNumber])); + + return name; +} + + +string OutputThread::getBeamName() const +{ + using namespace boost; + + const char pols[] = "XY"; + const char stokes[] = "IQUV"; + + const int beam = itsSubbandNumber / itsOutputConfig.nrSubbeams; + const int subbeam = itsSubbandNumber % itsOutputConfig.nrSubbeams; + + string name = itsParset.isDefined("Observation.BeamDirMask") + ? itsParset.getString("Observation.BeamDirMask") + "/" + itsOutputConfig.filename + : dirName( itsParset.getString("Observation.MSNameMask") ) + itsOutputConfig.filename; + string startTime = itsParset.getString("Observation.startTime"); + vector<string> splitStartTime; + split(splitStartTime, startTime, is_any_of("- :")); + + replace_all(name, "${YEAR}", splitStartTime[0]); + replace_all(name, "${MONTH}", splitStartTime[1]); + replace_all(name, "${DAY}", splitStartTime[2]); + replace_all(name, "${HOURS}", splitStartTime[3]); + replace_all(name, "${MINUTES}", splitStartTime[4]); + replace_all(name, "${SECONDS}", splitStartTime[5]); + + replace_all(name, "${MSNUMBER}", str(format("%05u") % itsParset.observationID())); + replace_all(name, "${BEAM}", str(format("%u") % itsParset.subbandToBeamMapping()[itsSubbandNumber])); + replace_all(name, "${PBEAM}", str(format("%u") % beam)); + replace_all(name, "${POL}", str(format("%c") % pols[subbeam])); + replace_all(name, "${STOKES}", str(format("%c") % stokes[subbeam])); + + if (itsParset.isDefined("OLAP.PencilInfo.storageRaidList")) + replace_all(name, "${RAID}", str(format("%s") % itsParset.getStringVector("OLAP.PencilInfo.storageRaidList", true)[itsSubbandNumber])); + + return name; +} + + void OutputThread::writeLogMessage(unsigned sequenceNumber) { LOG_INFO_STR(itsLogPrefix << "Written block with seqno = " << sequenceNumber); diff --git a/RTCP/Storage/src/Storage_main.cc b/RTCP/Storage/src/Storage_main.cc index 401006fe0bfb0857035d3b2b7007383a1fb7ed95..3d88e16f48eb7646a106dbbd63f0f09aab15b2f6 100644 --- a/RTCP/Storage/src/Storage_main.cc +++ b/RTCP/Storage/src/Storage_main.cc @@ -150,7 +150,6 @@ int main(int argc, char *argv[]) using namespace log4cplus; using namespace log4cplus::helpers; - lofarLoggerInitNode(); helpers::Properties traceProp; traceProp.setProperty("log4cplus.rootLogger", "DEBUG, STDOUT"); traceProp.setProperty("log4cplus.logger.TRC", "DEBUG"); @@ -242,14 +241,6 @@ int main(int argc, char *argv[]) logPrefix = str(format("[obs %u] ") % parset.observationID()); - // create root directory of the observation tree - LOG_INFO_STR(logPrefix << "Creating directory " << parset.getMSBaseDir()); - - if (mkdir(parset.getMSBaseDir().c_str(), 0777) != 0 && errno != EEXIST) { - unsigned savedErrno = errno; // first argument below clears errno - throw SystemCallException(("mkdir " + parset.getMSBaseDir()).c_str(), savedErrno, THROW_ARGS); - } - // start all writers for (unsigned output = 0; output < plan.nrOutputTypes(); output ++) { ProcessingPlan::distribution_t distribution = plan.plan[output].distribution; diff --git a/RTCP/Storage/src/SubbandWriter.cc b/RTCP/Storage/src/SubbandWriter.cc index 7433704192d88220d079e5a7df9b9969db9eb960..c391a029aa726d3165fefb36898a7b78d7e45609 100644 --- a/RTCP/Storage/src/SubbandWriter.cc +++ b/RTCP/Storage/src/SubbandWriter.cc @@ -30,7 +30,6 @@ #include <Interface/CN_Configuration.h> #include <Interface/CN_ProcessingPlan.h> #include <Storage/SubbandWriter.h> -#include <Storage/MeasurementSetFormat.h> #include <boost/lexical_cast.hpp> @@ -47,18 +46,7 @@ namespace RTCP { SubbandWriter::SubbandWriter(const Parset &parset, unsigned subband, unsigned outputType, ProcessingPlan::planlet &outputConfig, bool isBigEndian) { const std::string logPrefix = str(format("[obs %u output %u subband %3u] ") % parset.observationID() % outputType % subband); - -#if defined HAVE_AIPSPP - if (outputType == 0 && parset.outputCorrelatedData()) { - MeasurementSetFormat myFormat(&parset, 512); - - /// Make MeasurementSet filestructures and required tables - myFormat.addSubband(subband, isBigEndian); - - LOG_INFO_STR(logPrefix << "MeasurementSet created"); - } -#endif // defined HAVE_AIPSPP - + StreamableData *dataTemplate = outputConfig.source; for (unsigned i = 0; i < maxReceiveQueueSize; i ++) { @@ -69,7 +57,7 @@ SubbandWriter::SubbandWriter(const Parset &parset, unsigned subband, unsigned ou } itsInputThread = new InputThread(parset, subband, outputType, outputConfig, itsFreeQueue, itsReceiveQueue); - itsOutputThread = new OutputThread(parset, subband, outputType, outputConfig, itsFreeQueue, itsReceiveQueue); + itsOutputThread = new OutputThread(parset, subband, outputType, outputConfig, itsFreeQueue, itsReceiveQueue, isBigEndian); }