diff --git a/RTCP/CNProc/src/AsyncTransposeBeams.cc b/RTCP/CNProc/src/AsyncTransposeBeams.cc index 5ab32929afedc9f6a9a9243cc2fdef0748b67f66..cfdfbe9b810150e4801ffba158c6b52a11bf22b1 100644 --- a/RTCP/CNProc/src/AsyncTransposeBeams.cc +++ b/RTCP/CNProc/src/AsyncTransposeBeams.cc @@ -46,11 +46,12 @@ AsyncTransposeBeams::AsyncTransposeBeams( itsOutputCores(outputCores), itsLocationInfo(locationInfo), itsCommHandles(itsNrCommunications,nrSubbands), + itsLocalSubbands(nrSubbands), itsNrSubbeams(nrSubbeams) { } -template <typename T,unsigned DIM> void AsyncTransposeBeams::postReceive(SampleData<T,DIM> *transposedData, unsigned subband, unsigned beam, unsigned psetIndex, unsigned coreIndex) +template <typename T,unsigned DIM> void AsyncTransposeBeams::postReceive(SampleData<T,DIM> *transposedData, unsigned localSubband, unsigned globalSubband, unsigned beam, unsigned psetIndex, unsigned coreIndex) { unsigned pset = itsInputPsets[psetIndex]; unsigned core = itsInputCores[coreIndex]; @@ -62,9 +63,11 @@ template <typename T,unsigned DIM> void AsyncTransposeBeams::postReceive(SampleD void *ptr; size_t size; } toRead[itsNrCommunications] = { - { transposedData->samples[subband].origin(), transposedData->samples[subband].num_elements() * sizeof(T) } + { transposedData->samples[localSubband].origin(), transposedData->samples[localSubband].num_elements() * sizeof(T) } }; + itsLocalSubbands[globalSubband] = localSubband; + // read it for (unsigned h = 0; h < itsNrCommunications; h ++) { Tag t; @@ -72,12 +75,12 @@ template <typename T,unsigned DIM> void AsyncTransposeBeams::postReceive(SampleD t.info.sourceRank = rank; t.info.comm = h; t.info.beam = beam; - t.info.subband = subband; + t.info.subband = globalSubband; #ifdef DEBUG - LOG_DEBUG_STR( "Posting to receive beam " << beam << " subband " << subband << " from pset " << pset << " core " << core << " = rank " << rank << ", tag " << t.nr ); + LOG_DEBUG_STR( "Posting to receive beam " << beam << " subband " << globalSubband << " (local: subband " << localSubband << ") from pset " << pset << " core " << core << " = rank " << rank << ", tag " << t.nr ); #endif - itsCommHandles[h][subband] = itsAsyncComm.asyncRead(toRead[h].ptr, toRead[h].size, rank, t.nr); + itsCommHandles[h][globalSubband] = itsAsyncComm.asyncRead(toRead[h].ptr, toRead[h].size, rank, t.nr); } } @@ -116,12 +119,12 @@ unsigned AsyncTransposeBeams::waitForAnyReceive() } if (haveAll) - return subband; + return itsLocalSubbands[subband]; } } -template <typename T, unsigned DIM> void AsyncTransposeBeams::asyncSend(unsigned outputPsetIndex, unsigned coreIndex, unsigned subband, unsigned beam, unsigned subbeam, const SampleData<T,DIM> *inputData) +template <typename T, unsigned DIM> void AsyncTransposeBeams::asyncSend(unsigned outputPsetIndex, unsigned coreIndex, unsigned subband, unsigned localBeam, unsigned subbeam, unsigned globalBeam, const SampleData<T,DIM> *inputData) { unsigned pset = itsOutputPsets[outputPsetIndex]; unsigned core = itsOutputCores[coreIndex]; @@ -132,7 +135,7 @@ template <typename T, unsigned DIM> void AsyncTransposeBeams::asyncSend(unsigned const void *ptr; const size_t size; } toWrite[itsNrCommunications] = { - { inputData->samples[beam][subbeam].origin(), inputData->samples[beam][subbeam].num_elements() * sizeof(T) } + { inputData->samples[localBeam][subbeam].origin(), inputData->samples[localBeam][subbeam].num_elements() * sizeof(T) } }; // write it @@ -142,22 +145,22 @@ template <typename T, unsigned DIM> void AsyncTransposeBeams::asyncSend(unsigned t.info.sourceRank = itsLocationInfo.rank(); t.info.comm = h; t.info.subband = subband; - t.info.beam = beam * itsNrSubbeams + subbeam; + t.info.beam = globalBeam; #ifdef DEBUG - LOG_DEBUG_STR( "Sending beam " << beam << " subband " << subband << " to pset " << pset << " core " << core << " = rank " << rank << ", tag " << t.nr ); + LOG_DEBUG_STR( "Sending beam " << globalBeam << " (local: beam " << localBeam << " subbeam " << subbeam << ") subband " << subband << " to pset " << pset << " core " << core << " = rank " << rank << ", tag " << t.nr ); #endif itsAsyncComm.asyncWrite(toWrite[h].ptr, toWrite[h].size, rank, t.nr); } } // specialisation for StokesData -template void AsyncTransposeBeams::postReceive(SampleData<float,4> *, unsigned, unsigned, unsigned, unsigned); -template void AsyncTransposeBeams::asyncSend(unsigned, unsigned, unsigned, unsigned, unsigned, const SampleData<float,4> *); +template void AsyncTransposeBeams::postReceive(SampleData<float,4> *, unsigned, unsigned, unsigned, unsigned, unsigned); +template void AsyncTransposeBeams::asyncSend(unsigned, unsigned, unsigned, unsigned, unsigned, unsigned, const SampleData<float,4> *); // specialisation for BeamFormedData -template void AsyncTransposeBeams::postReceive(SampleData<fcomplex,3> *, unsigned, unsigned, unsigned, unsigned); -template void AsyncTransposeBeams::asyncSend(unsigned, unsigned, unsigned, unsigned, unsigned, const SampleData<fcomplex,4> *); +template void AsyncTransposeBeams::postReceive(SampleData<fcomplex,3> *, unsigned, unsigned, unsigned, unsigned, unsigned); +template void AsyncTransposeBeams::asyncSend(unsigned, unsigned, unsigned, unsigned, unsigned, unsigned, const SampleData<fcomplex,4> *); void AsyncTransposeBeams::waitForAllSends() { diff --git a/RTCP/CNProc/src/AsyncTransposeBeams.h b/RTCP/CNProc/src/AsyncTransposeBeams.h index 958de71f1439f99d35af30efc11dba671034f2c5..549c83d7fe8dcc48d933dd337f44c911551fea00 100644 --- a/RTCP/CNProc/src/AsyncTransposeBeams.h +++ b/RTCP/CNProc/src/AsyncTransposeBeams.h @@ -35,13 +35,17 @@ class AsyncTransposeBeams const std::vector<unsigned> &inputPsets, const std::vector<unsigned> &inputCores, const std::vector<unsigned> &outputPsets, const std::vector<unsigned> &outputCores); // Post all async receives for the transpose. - template <typename T, unsigned DIM> void postReceive( SampleData<T,DIM> *transposedData, unsigned subband, unsigned beam, unsigned psetIndex, unsigned coreIndex); + // localSubband is the subband index for local data structures, + // globalSubband is the subband index used globally in the system (0..247) + template <typename T, unsigned DIM> void postReceive( SampleData<T,DIM> *transposedData, unsigned localSubband, unsigned globalSubband, unsigned beam, unsigned psetIndex, unsigned coreIndex); // Wait for a data message. Returns the station number where the message originates. unsigned waitForAnyReceive(); // Asynchronously send a subband. - template <typename T, unsigned DIM> void asyncSend(unsigned outputPsetIndex, unsigned coreIndex, unsigned subband, unsigned beam, unsigned subbeam, const SampleData<T,DIM> *inputData); + // localBeam is the beam index used locally (=0..nrBeams), in conjunction with subbeam (=polarisation or stokes) + // globalBeam is the beam index for the output backend, which does not differentiate between beams, subbeams, filesperstokes, etc. + template <typename T, unsigned DIM> void asyncSend(unsigned outputPsetIndex, unsigned coreIndex, unsigned subband, unsigned localBeam, unsigned subbeam, unsigned globalBeam, const SampleData<T,DIM> *inputData); // Make sure all async sends have finished. void waitForAllSends(); @@ -62,6 +66,8 @@ class AsyncTransposeBeams // The value is -1 if the read finished. Matrix<int> itsCommHandles; // [itsNrCommunications][itsNrInputPsets] + Vector<int> itsLocalSubbands; + const unsigned itsNrSubbeams; }; diff --git a/RTCP/CNProc/src/CN_Processing.cc b/RTCP/CNProc/src/CN_Processing.cc index 66890012f54efe8aec55cca51b3bd191d5a8ef68..778e1aa69b6ca02e893c3ab97580aad4a3e499ef 100644 --- a/RTCP/CNProc/src/CN_Processing.cc +++ b/RTCP/CNProc/src/CN_Processing.cc @@ -1,4 +1,4 @@ -//# + //# //# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl //# //# This program is free software; you can redistribute it and/or modify @@ -151,23 +151,22 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C itsNrPencilBeams = configuration.nrPencilBeams(); itsNrSubbands = configuration.nrSubbands(); itsNrSubbandsPerPset = configuration.nrSubbandsPerPset(); + itsNrSubbandsPerBeam = configuration.nrSubbandsPerBeam(); + itsNrFilesPerStokes = configuration.nrFilesPerStokes(); itsNrBeamsPerPset = configuration.nrBeamsPerPset(); - itsNrStokes = configuration.nrStokes(); itsCenterFrequencies = configuration.refFreqs(); itsFlysEye = configuration.flysEye(); itsNrBeams = itsFlysEye ? itsNrBeamFormedStations : itsNrPencilBeams; - unsigned nrsubbeams = 0; - if (configuration.outputBeamFormedData()) { - nrsubbeams = NR_POLARIZATIONS; + itsNrStokes = NR_POLARIZATIONS; } else if (configuration.outputCoherentStokes()) { - nrsubbeams = configuration.nrStokes(); + itsNrStokes = configuration.nrStokes(); + } else { + itsNrStokes = 0; } - itsNrSubbeams = nrsubbeams; - unsigned nrChannels = configuration.nrChannelsPerSubband(); unsigned nrSamplesPerIntegration = configuration.nrSamplesPerIntegration(); unsigned nrSamplesPerStokesIntegration = configuration.nrSamplesPerStokesIntegration(); @@ -260,7 +259,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C } if (itsPhaseThreeExists && (itsHasPhaseTwo || itsHasPhaseThree)) { - itsAsyncTransposeBeams = new AsyncTransposeBeams(itsHasPhaseTwo, itsHasPhaseThree, itsNrSubbands, nrsubbeams, itsLocationInfo, phaseTwoPsets, phaseOneTwoCores, phaseThreePsets, phaseThreeCores ); + itsAsyncTransposeBeams = new AsyncTransposeBeams(itsHasPhaseTwo, itsHasPhaseThree, itsNrSubbands, itsNrStokes, itsLocationInfo, phaseTwoPsets, phaseOneTwoCores, phaseThreePsets, phaseThreeCores ); } #endif // HAVE_MPI @@ -326,9 +325,10 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::transposeInput( #endif // HAVE_MPI } -template <typename SAMPLE_TYPE> bool CN_Processing<SAMPLE_TYPE>::transposeBeams(unsigned block) +template <typename SAMPLE_TYPE> int CN_Processing<SAMPLE_TYPE>::transposeBeams(unsigned block) { bool beamToProcess = false; + unsigned myBeam = 0; #if defined HAVE_MPI if (itsHasPhaseThree) { @@ -338,13 +338,11 @@ template <typename SAMPLE_TYPE> bool CN_Processing<SAMPLE_TYPE>::transposeBeams( NSTimer postAsyncReceives("post async beam receives", LOG_CONDITION, true); postAsyncReceives.start(); - unsigned myBeam; - if (itsPhaseThreeDisjunct) { // the phase 3 psets are dedicated to phase 3 myBeam = *itsCurrentBeam; - beamToProcess = myBeam < itsNrBeams * itsNrSubbeams; + beamToProcess = myBeam < itsNrBeams * itsNrStokes * itsNrFilesPerStokes; //LOG_DEBUG_STR(itsLogPrefix << "transpose: my beam = " << myBeam << " process? " << beamToProcess << " my coreindex = " << itsCurrentBeam->core ); @@ -364,20 +362,24 @@ template <typename SAMPLE_TYPE> bool CN_Processing<SAMPLE_TYPE>::transposeBeams( myBeam = firstBeamOfPset + relativeCoreIndex; - beamToProcess = myBeam < itsNrBeams * itsNrSubbeams && relativeCoreIndex < itsNrBeamsPerPset; + beamToProcess = myBeam < itsNrBeams * itsNrStokes * itsNrFilesPerStokes && relativeCoreIndex < itsNrBeamsPerPset; } if (beamToProcess) { - for (unsigned sb = 0; sb < itsNrSubbands; sb ++) { + unsigned stokesFile = myBeam % itsNrFilesPerStokes; + unsigned firstSubband = stokesFile * itsNrSubbandsPerBeam; + unsigned lastSubband = std::min( (stokesFile+1) * itsNrSubbandsPerBeam, itsNrSubbands ); + + for (unsigned sb = firstSubband; sb < lastSubband; sb ++) { // calculate which (pset,core) produced subband sb unsigned pset = sb / itsNrSubbandsPerPset; unsigned core = (block * itsNrSubbandsPerPset + sb % itsNrSubbandsPerPset) % itsNrPhaseOneTwoCores; - //LOG_DEBUG_STR(itsLogPrefix << "transpose: receive subband " << sb << " of beam " << myBeam << " from pset " << pset << " core " << core); + LOG_DEBUG_STR(itsLogPrefix << "transpose: receive subband " << sb << " of beam " << myBeam << " part " << stokesFile << " from pset " << pset << " core " << core); if (itsPlan->calculate( itsPlan->itsTransposedCoherentStokesData )) { - itsAsyncTransposeBeams->postReceive(itsPlan->itsTransposedCoherentStokesData, sb, myBeam, pset, core); + itsAsyncTransposeBeams->postReceive(itsPlan->itsTransposedCoherentStokesData, sb - firstSubband, sb, myBeam, pset, core); } else { - itsAsyncTransposeBeams->postReceive(itsPlan->itsTransposedBeamFormedData, sb, myBeam, pset, core); + itsAsyncTransposeBeams->postReceive(itsPlan->itsTransposedBeamFormedData, sb - firstSubband, sb, myBeam, pset, core); } } } @@ -412,9 +414,9 @@ template <typename SAMPLE_TYPE> bool CN_Processing<SAMPLE_TYPE>::transposeBeams( } asyncSendTimer.start(); - for (unsigned j = 0; j < itsNrSubbeams; j++) { + for (unsigned j = 0; j < itsNrStokes; j++) { // calculate which (pset,core) needs beam i - unsigned beam = i * itsNrSubbeams + j; + unsigned beam = i * itsNrStokes + j; unsigned pset = beam / itsNrBeamsPerPset; unsigned core = (firstCore + beam % itsNrBeamsPerPset) % itsNrPhaseThreeCores; @@ -428,7 +430,7 @@ template <typename SAMPLE_TYPE> bool CN_Processing<SAMPLE_TYPE>::transposeBeams( asyncSendTimer.stop(); } #else - /* overlap computation and transpose */ + /* don't overlap computation and transpose */ for (unsigned i = 0; i < itsNrBeams; i ++) { if(calculateCoherentStokesData) { calculateCoherentStokes( i ); @@ -437,19 +439,21 @@ template <typename SAMPLE_TYPE> bool CN_Processing<SAMPLE_TYPE>::transposeBeams( } } + unsigned stokesFile = *itsCurrentSubband / itsNrSubbandsPerBeam; + for (unsigned i = 0; i < itsNrBeams; i ++) { asyncSendTimer.start(); - for (unsigned j = 0; j < itsNrSubbeams; j++) { + for (unsigned j = 0; j < itsNrStokes; j++) { // calculate which (pset,core) needs beam i - unsigned beam = i * itsNrSubbeams + j; + unsigned beam = (i * itsNrStokes + j) * itsNrFilesPerStokes + stokesFile; unsigned pset = beam / itsNrBeamsPerPset; unsigned core = (firstCore + beam % itsNrBeamsPerPset) % itsNrPhaseThreeCores; - //LOG_DEBUG_STR(itsLogPrefix << "transpose: send subband " << *itsCurrentSubband << " of beam " << i << " to pset " << pset << " core " << core); + LOG_DEBUG_STR(itsLogPrefix << "transpose: send subband " << *itsCurrentSubband << " of beam " << i << " pol/sgtokes " << j << " part " << stokesFile << " to pset " << pset << " core " << core); if (itsPlan->calculate( itsPlan->itsCoherentStokesData )) { - itsAsyncTransposeBeams->asyncSend(pset, core, *itsCurrentSubband, i, j, itsPlan->itsCoherentStokesData); // Asynchronously send one beam to another pset. + itsAsyncTransposeBeams->asyncSend(pset, core, *itsCurrentSubband, i, j, beam, itsPlan->itsCoherentStokesData); // Asynchronously send one beam to another pset. } else { - itsAsyncTransposeBeams->asyncSend(pset, core, *itsCurrentSubband, i, j, itsPlan->itsPreTransposeBeamFormedData); // Asynchronously send one beam to another pset. + itsAsyncTransposeBeams->asyncSend(pset, core, *itsCurrentSubband, i, j, beam, itsPlan->itsPreTransposeBeamFormedData); // Asynchronously send one beam to another pset. } } asyncSendTimer.stop(); @@ -459,7 +463,7 @@ template <typename SAMPLE_TYPE> bool CN_Processing<SAMPLE_TYPE>::transposeBeams( #endif // HAVE_MPI - return beamToProcess; + return beamToProcess ? myBeam : -1; } template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::filter() @@ -644,8 +648,12 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::finishSendingBe #endif } -template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::receiveBeam() +template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::receiveBeam( unsigned beamToProcess ) { + unsigned stokesFile = beamToProcess % itsNrFilesPerStokes; + unsigned firstSubband = stokesFile * itsNrSubbandsPerBeam; + unsigned lastSubband = std::min( (stokesFile+1) * itsNrSubbandsPerBeam, itsNrSubbands ); + #if defined HAVE_MPI NSTimer asyncReceiveTimer("wait for any async beam receive", LOG_CONDITION, true); @@ -657,13 +665,13 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::receiveBeam() #if 1 /* Overlap transpose and computations */ - for (unsigned i = 0; i < itsNrSubbands; i++) { + for (unsigned i = firstSubband; i < lastSubband; i++) { asyncReceiveTimer.start(); const unsigned subband = itsAsyncTransposeBeams->waitForAnyReceive(); asyncReceiveTimer.stop(); if (LOG_CONDITION) - LOG_DEBUG_STR( itsLogPrefix << "Received subband " << subband ); + LOG_DEBUG_STR( itsLogPrefix << "Received subband " << (firstSubband+subband) ); if (calculateBeamFormedData) { postTransposeBeams( subband ); @@ -673,7 +681,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::receiveBeam() } #else /* Don't overlap transpose and computations */ - for (unsigned i = 0; i < itsNrSubbands; i++) { + for (unsigned i = firstSubband; i < lastSubband; i++) { asyncReceiveTimer.start(); const unsigned subband = itsAsyncTransposeBeams->waitForAnyReceive(); asyncReceiveTimer.stop(); @@ -682,7 +690,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::receiveBeam() LOG_DEBUG_STR( itsLogPrefix << "Received subband " << subband ); } - for (unsigned i = 0; i < itsNrSubbands; i++) { + for (unsigned i = firstSubband; i < lastSubband; i++) { if (calculateBeamFormedData) { postTransposeBeams( i ); } else if (calculateCoherentStokesData) { @@ -754,10 +762,11 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::process(unsigne */ if ( (itsHasPhaseThree && itsPhaseThreeDisjunct) || (itsHasPhaseTwo && *itsCurrentSubband < itsNrSubbands && itsPhaseThreeExists)) { - bool doPhaseThree = transposeBeams(block); + int beamToProcess = transposeBeams(block); + bool doPhaseThree = beamToProcess >= 0; if (doPhaseThree) { - receiveBeam(); + receiveBeam( beamToProcess ); sendOutput( itsPlan->itsFinalBeamFormedData ); sendOutput( itsPlan->itsFinalCoherentStokesData ); diff --git a/RTCP/CNProc/src/CN_Processing.h b/RTCP/CNProc/src/CN_Processing.h index 4bf4ba98a2757d1af2be84b396e25d48ba6ad2d1..a55d496f80b8fa81606d099f1b8967e48f706e82 100644 --- a/RTCP/CNProc/src/CN_Processing.h +++ b/RTCP/CNProc/src/CN_Processing.h @@ -76,11 +76,11 @@ template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base, private: void transposeInput(); - bool transposeBeams(unsigned block); + int transposeBeams(unsigned block); void filter(); void mergeStations(); void formBeams(); - void receiveBeam(); + void receiveBeam(unsigned beam); void preTransposeBeams(unsigned beam); void postTransposeBeams(unsigned subband); void postTransposeStokes(unsigned subband); @@ -104,9 +104,10 @@ template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base, unsigned itsNrPencilBeams; unsigned itsNrSubbands; unsigned itsNrSubbandsPerPset; + unsigned itsNrSubbandsPerBeam; + unsigned itsNrFilesPerStokes; unsigned itsNrBeams; - unsigned itsNrSubbeams; // the number of polarizations/stokes that will be split off per beam during the transpose - unsigned itsMyNrBeams; + unsigned itsNrStokes; // the number of polarizations/stokes that will be split off per beam during the transpose unsigned itsNrBeamsPerPset; unsigned itsComputeGroupRank; unsigned itsPhaseTwoPsetSize, itsPhaseThreePsetSize; @@ -121,7 +122,6 @@ template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base, Ring *itsCurrentSubband, *itsCurrentBeam; bool itsHasPhaseOne, itsHasPhaseTwo, itsHasPhaseThree; bool itsStokesIntegrateChannels; - bool itsNrStokes; CN_ProcessingPlan<SAMPLE_TYPE> *itsPlan; ArenaMapping itsMapping; // needs to be a member to ensure that its lifetime extends beyond that of its data sets diff --git a/RTCP/IONProc/src/Job.cc b/RTCP/IONProc/src/Job.cc index c32ed3f803e985e973d2335b3f5407abc76235d1..b13637ebb986b8347926b0d458ffeb8f4162ed52 100644 --- a/RTCP/IONProc/src/Job.cc +++ b/RTCP/IONProc/src/Job.cc @@ -549,14 +549,14 @@ template <typename SAMPLE_TYPE> void Job::doObservation() std::vector<unsigned> cores; std::string type; - unsigned nrsubbeams = 0; + unsigned nrstokes = 0; if (itsParset.outputBeamFormedData()) - nrsubbeams = NR_POLARIZATIONS; + nrstokes = NR_POLARIZATIONS; else if (itsParset.outputCoherentStokes()) - nrsubbeams = itsParset.nrStokes(); + nrstokes = itsParset.nrStokes(); - unsigned nrbeams = (itsParset.flysEye() ? itsParset.nrMergedStations() : itsParset.nrPencilBeams()) * nrsubbeams; + unsigned nrbeams = (itsParset.flysEye() ? itsParset.nrMergedStations() : itsParset.nrPencilBeams()) * nrstokes * itsParset.nrFilesPerStokes(); switch (p.distribution) { case ProcessingPlan::DIST_SUBBAND: diff --git a/RTCP/Interface/include/Interface/CN_Configuration.h b/RTCP/Interface/include/Interface/CN_Configuration.h index 7040d9babb533d8d08e56a3f0d7c76772a3eef2d..745bf49e4ee002e3d727954fefdf0ac7088334fe 100644 --- a/RTCP/Interface/include/Interface/CN_Configuration.h +++ b/RTCP/Interface/include/Interface/CN_Configuration.h @@ -51,6 +51,8 @@ class CN_Configuration unsigned &nrSamplesPerStokesIntegration(); unsigned &nrSamplesToCNProc(); unsigned &nrSubbandsPerPset(); + unsigned &nrSubbandsPerBeam(); + unsigned &nrFilesPerStokes(); unsigned &nrBeamsPerPset(); bool &delayCompensation(); bool &correctBandPass(); @@ -108,6 +110,8 @@ class CN_Configuration unsigned itsNrPhaseOneTwoCores; unsigned itsNrPhaseThreeCores; unsigned itsNrSubbandsPerPset; + unsigned itsNrSubbandsPerBeam; + unsigned itsNrFilesPerStokes; unsigned itsNrBeamsPerPset; bool itsDelayCompensation; bool itsCorrectBandPass; @@ -177,6 +181,16 @@ inline unsigned &CN_Configuration::nrSubbandsPerPset() return itsMarshalledData.itsNrSubbandsPerPset; } +inline unsigned &CN_Configuration::nrSubbandsPerBeam() +{ + return itsMarshalledData.itsNrSubbandsPerBeam; +} + +inline unsigned &CN_Configuration::nrFilesPerStokes() +{ + return itsMarshalledData.itsNrFilesPerStokes; +} + inline unsigned &CN_Configuration::nrBeamsPerPset() { return itsMarshalledData.itsNrBeamsPerPset; diff --git a/RTCP/Interface/include/Interface/Parset.h b/RTCP/Interface/include/Interface/Parset.h index a3ac7fe6ed55a275619c6d3ee45b4f93f324b033..c4bdec8cd00fed2f3de634fa44dd739ca944504d 100644 --- a/RTCP/Interface/include/Interface/Parset.h +++ b/RTCP/Interface/include/Interface/Parset.h @@ -88,6 +88,8 @@ public: double IONintegrationTime() const; uint32 nrSubbandSamples() const; uint32 nrSubbandsPerPset() const; + uint32 nrSubbandsPerBeam() const; + uint32 nrFilesPerStokes() const; uint32 nrBeamsPerPset() const; uint32 nrHistorySamples() const; uint32 nrSamplesToCNProc() const; @@ -406,6 +408,16 @@ inline uint32 Parset::nrSubbandsPerPset() const return getUint32("OLAP.subbandsPerPset"); } +inline uint32 Parset::nrSubbandsPerBeam() const +{ + return getUint32("OLAP.Storage.nrSubbandsPerBeam"); +} + +inline uint32 Parset::nrFilesPerStokes() const +{ + return getUint32("OLAP.Storage.nrFilesPerStokes"); +} + inline uint32 Parset::nrBeamsPerPset() const { return getUint32("OLAP.PencilInfo.beamsPerPset"); diff --git a/RTCP/Interface/src/CN_Configuration.cc b/RTCP/Interface/src/CN_Configuration.cc index 1f24cc7522e1c42b177ac0df8d58fe4cc21ae635..b66ad6588ea73d3dcfe108d6009f8b2916789db0 100644 --- a/RTCP/Interface/src/CN_Configuration.cc +++ b/RTCP/Interface/src/CN_Configuration.cc @@ -54,6 +54,8 @@ CN_Configuration::CN_Configuration(const Parset &parset) nrSamplesPerStokesIntegration() = parset.stokesIntegrationSteps(); nrSamplesToCNProc() = parset.nrSamplesToCNProc(); nrSubbandsPerPset() = parset.nrSubbandsPerPset(); + nrSubbandsPerBeam() = parset.nrSubbandsPerBeam(); + nrFilesPerStokes() = parset.nrFilesPerStokes(); nrBeamsPerPset() = parset.nrBeamsPerPset(); delayCompensation() = parset.delayCompensation() || parset.nrPencilBeams() > 1 || parset.correctClocks(); correctBandPass() = parset.correctBandPass(); diff --git a/RTCP/Interface/src/CN_ProcessingPlan.cc b/RTCP/Interface/src/CN_ProcessingPlan.cc index 63060022c21a726ecdc674be0601e795a9ba560e..4e39f5aae6b2a86dc1335d855b1efa133379b12e 100644 --- a/RTCP/Interface/src/CN_ProcessingPlan.cc +++ b/RTCP/Interface/src/CN_ProcessingPlan.cc @@ -51,6 +51,8 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla const unsigned nrBaselines = configuration.nrMergedStations() * (configuration.nrMergedStations() + 1)/2; + const bool multipleBeamFiles = configuration.nrFilesPerStokes() > 1; + if (hasPhaseOne) { std::vector<unsigned> &phaseTwoPsets = configuration.phaseTwoPsets(); @@ -160,14 +162,15 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla } if (hasPhaseThree) { + const unsigned nrSubbands = std::min( configuration.nrSubbands(), configuration.nrSubbandsPerBeam() ); itsTransposedBeamFormedData = new TransposedBeamFormedData( - configuration.nrSubbands(), + nrSubbands, configuration.nrChannelsPerSubband(), configuration.nrSamplesPerIntegration() ); itsFinalBeamFormedData = new FinalBeamFormedData( - configuration.nrSubbands(), + nrSubbands, configuration.nrChannelsPerSubband(), configuration.nrSamplesPerIntegration() ); @@ -175,7 +178,7 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla itsTransposedCoherentStokesData = new StokesData( true, 1, - configuration.nrSubbands(), + nrSubbands, configuration.nrChannelsPerSubband(), configuration.nrSamplesPerIntegration(), configuration.nrSamplesPerStokesIntegration() @@ -183,7 +186,7 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla itsFinalCoherentStokesData = new FinalStokesData( true, - configuration.nrSubbands(), + nrSubbands, configuration.nrChannelsPerSubband(), configuration.nrSamplesPerIntegration(), configuration.nrSamplesPerStokesIntegration() @@ -196,10 +199,18 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla TRANSFORM( itsTransposedCoherentStokesData, itsFinalCoherentStokesData ); if( configuration.outputBeamFormedData() ) { - send( 4, itsFinalBeamFormedData, "L${MSNUMBER}_B${PBEAM}_S${SUBBEAM}_bf.raw", ProcessingPlan::DIST_BEAM, NR_POLARIZATIONS ); + if (multipleBeamFiles) { + send( 4, itsFinalBeamFormedData, "L${MSNUMBER}_B${PBEAM}_S${SUBBEAM}_P${BEAMFILE}_bf.raw", ProcessingPlan::DIST_BEAM, NR_POLARIZATIONS ); + } else { + send( 4, itsFinalBeamFormedData, "L${MSNUMBER}_B${PBEAM}_S${SUBBEAM}_bf.raw", ProcessingPlan::DIST_BEAM, NR_POLARIZATIONS ); + } } if( configuration.outputCoherentStokes() ) { - send( 5, itsFinalCoherentStokesData, "L${MSNUMBER}_B${PBEAM}_S${SUBBEAM}_bf.raw", ProcessingPlan::DIST_BEAM, configuration.nrStokes() ); + if (multipleBeamFiles) { + send( 5, itsFinalCoherentStokesData, "L${MSNUMBER}_B${PBEAM}_S${SUBBEAM}_P${BEAMFILE}_bf.raw", ProcessingPlan::DIST_BEAM, configuration.nrStokes() ); + } else { + send( 5, itsFinalCoherentStokesData, "L${MSNUMBER}_B${PBEAM}_S${SUBBEAM}_bf.raw", ProcessingPlan::DIST_BEAM, configuration.nrStokes() ); + } } } } diff --git a/RTCP/Run/src/LOFAR/Parset.py b/RTCP/Run/src/LOFAR/Parset.py index f1a7c894fb6770e5deb68934081079f4eee37ac0..e8220f0b1d2d37f6792a2c48ddaf9bb57fb9b1d8 100644 --- a/RTCP/Run/src/LOFAR/Parset.py +++ b/RTCP/Run/src/LOFAR/Parset.py @@ -260,6 +260,8 @@ class Parset(util.Parset.Parset): # subband configuration if "Observation.subbandList" in self: nrSubbands = len(self.getInt32Vector("Observation.subbandList")) + else: + nrSubbands = 248 for nrBeams in count(): if "Observation.Beam[%s].angle1" % (nrBeams,) not in self: @@ -271,6 +273,9 @@ class Parset(util.Parset.Parset): self['OLAP.CNProc.partition'] = self.partition self['OLAP.IONProc.psetList'] = self.psets + self.setdefault('OLAP.Storage.nrSubbandsPerBeam', nrSubbands); + self['OLAP.Storage.nrFilesPerStokes'] = int( math.ceil( 1.0 * nrSubbands / int(self["OLAP.Storage.nrSubbandsPerBeam"]) ) ) + nrPsets = len(self.psets) nrStorageNodes = self.getNrUsedStorageNodes() nrBeamFiles = self.getNrBeamFiles() @@ -467,17 +472,22 @@ class Parset(util.Parset.Parset): return max(tabList) + 1 def getNrBeamFiles( self ): + nrSubbands = len(self.getInt32Vector("Observation.subbandList")) + nrFilesPerStokes = int(self["OLAP.Storage.nrFilesPerStokes"]) + if self.getBool("OLAP.outputBeamFormedData"): - files = 2 + nrStokes = 2 elif self.getBool("OLAP.outputCoherentStokes"): - files = len(self["OLAP.Stokes.which"]) + nrStokes = len(self["OLAP.Stokes.which"]) else: - files = 0 + nrStokes = 0 if self.getBool("OLAP.PencilInfo.flysEye"): - return self.getNrMergedStations() * files - - return self.getNrBeams() * files + nrBeams = self.getNrMergedStations() + else: + nrBeams = self.getNrBeams() + + return nrBeams * nrStokes * nrFilesPerStokes def phaseThreeExists( self ): # NO support for mixing with Observation.mode and Observation.outputIncoherentStokesI diff --git a/RTCP/Storage/src/OutputThread.cc b/RTCP/Storage/src/OutputThread.cc index bc8deb3d60f1c25e0af39b779e181be822830d7e..2be6ae70c55d058d8c870595c6944641a6dda5bb 100644 --- a/RTCP/Storage/src/OutputThread.cc +++ b/RTCP/Storage/src/OutputThread.cc @@ -183,8 +183,10 @@ string OutputThread::getMSname() const const char pols[] = "XY"; const char stokes[] = "IQUV"; - const int beam = itsSubbandNumber / itsOutputConfig.nrFilesPerBeam; - const int subbeam = itsSubbandNumber % itsOutputConfig.nrFilesPerBeam; + const unsigned nrBeamFiles = itsParset.nrFilesPerStokes(); + const int beam = itsSubbandNumber / itsOutputConfig.nrFilesPerBeam / nrBeamFiles; + const int subbeam = (itsSubbandNumber / nrBeamFiles) % itsOutputConfig.nrFilesPerBeam; + const int beamfile = itsSubbandNumber % nrBeamFiles; string name = dirName( itsParset.getString("Observation.MSNameMask") ) + itsOutputConfig.filename; string startTime = itsParset.getString("Observation.startTime"); @@ -201,6 +203,7 @@ string OutputThread::getMSname() const replace_all(name, "${MSNUMBER}", str(format("%05u") % itsParset.observationID())); replace_all(name, "${BEAM}", str(format("%02u") % itsParset.subbandToSAPmapping()[itsSubbandNumber])); replace_all(name, "${SUBBAND}", str(format("%03u") % itsSubbandNumber)); + replace_all(name, "${BEAMFILE}", str(format("%03u") % beamfile)); replace_all(name, "${PBEAM}", str(format("%03u") % beam)); replace_all(name, "${POL}", str(format("%c") % pols[subbeam])); replace_all(name, "${STOKES}", str(format("%c") % stokes[subbeam]));