diff --git a/RTCP/CNProc/src/ArenaMapping.h b/RTCP/CNProc/src/ArenaMapping.h index d3ec1c884d8227e26e3c48c91cfa819378ef312f..b8b4dfc3726b4c1ba2d7470a911eaae99290e760 100644 --- a/RTCP/CNProc/src/ArenaMapping.h +++ b/RTCP/CNProc/src/ArenaMapping.h @@ -26,6 +26,7 @@ //# Never #include <config.h> or #include <lofar_config.h> in a header file! #include <Interface/Allocator.h> +#include <Interface/StreamableData.h> #include <boost/noncopyable.hpp> namespace LOFAR { @@ -35,18 +36,19 @@ class ArenaMapping: boost::noncopyable { public: virtual ~ArenaMapping(); - void addDataset( const unsigned dataset, const size_t size, const unsigned arena ); - void moveDataset( const unsigned dataset, const unsigned arena ); + void addDataset( StreamableData *dataset, const unsigned arena ); + void moveDataset( const StreamableData *dataset, const unsigned arena ); unsigned nrArenas() const; unsigned nrDatasets() const; - void createAllocators(); - Allocator *allocatorOf( const unsigned dataset ) const; + void allocate(); + + Allocator *allocatorOf( const StreamableData *dataset ) const; private: struct mapping { - unsigned dataset; + StreamableData *dataset; size_t size; unsigned arena; @@ -72,20 +74,25 @@ inline ArenaMapping::~ArenaMapping() itsArenas.clear(); } -inline void ArenaMapping::addDataset( const unsigned dataset, const size_t size, const unsigned arena ) +inline void ArenaMapping::addDataset( StreamableData *dataset, const unsigned arena ) { struct mapping m; + if( !dataset ) { + return; + }; + m.dataset = dataset; - m.size = size; + m.size = dataset->requiredSize(); m.arena = arena; m.allocator = 0; itsMapping.push_back( m ); } -inline void ArenaMapping::moveDataset( const unsigned dataset, const unsigned arena ) +inline void ArenaMapping::moveDataset( const StreamableData *dataset, const unsigned arena ) { + // if dataset==0, nothing happens for( unsigned i = 0; i < itsMapping.size(); i++ ) { if( dataset == itsMapping[i].dataset ) { itsMapping[i].arena = arena; @@ -138,7 +145,7 @@ inline size_t ArenaMapping::arenaSize( const unsigned arena ) const return neededSize; } -inline void ArenaMapping::createAllocators() +inline void ArenaMapping::allocate() { const size_t arenas = nrArenas(); const size_t datasets = nrDatasets(); @@ -153,10 +160,16 @@ inline void ArenaMapping::createAllocators() for( unsigned dataset = 0; dataset < datasets; dataset++ ) { itsMapping[dataset].allocator = new SparseSetAllocator( *itsArenas[itsMapping[dataset].arena] ); } + + // allocate data sets + for( unsigned dataset = 0; dataset < datasets; dataset++ ) { + itsMapping[dataset].dataset->allocate( *itsMapping[dataset].allocator ); + } } -inline Allocator *ArenaMapping::allocatorOf( const unsigned dataset ) const +inline Allocator *ArenaMapping::allocatorOf( const StreamableData *dataset ) const { + // if dataset==0, nothing happens for( unsigned i = 0; i < itsMapping.size(); i++ ) { if( itsMapping[i].dataset == dataset ) { return itsMapping[i].allocator; diff --git a/RTCP/CNProc/src/AsyncTranspose.cc b/RTCP/CNProc/src/AsyncTranspose.cc index 02b41e1ed3a0cea6355054ce36de51f572b35f56..333481354044b9bf129fe3c49da19df27311693c 100644 --- a/RTCP/CNProc/src/AsyncTranspose.cc +++ b/RTCP/CNProc/src/AsyncTranspose.cc @@ -17,10 +17,10 @@ namespace RTCP { #define MAX_TAG 100000 // The maximum tag we use to represent a data message. // Higher tags are metadata. -template <typename SAMPLE_TYPE> AsyncTranspose<SAMPLE_TYPE>::AsyncTranspose(bool isTransposeInput, bool isTransposeOutput, unsigned nrCoresPerPset, +template <typename SAMPLE_TYPE> AsyncTranspose<SAMPLE_TYPE>::AsyncTranspose(const bool isTransposeInput, const bool isTransposeOutput, unsigned nrCoresPerPset, const LocationInfo &locationInfo, const std::vector<unsigned> &inputPsets, - const std::vector<unsigned> &outputPsets, unsigned nrSamplesToCNProc) + const std::vector<unsigned> &outputPsets, const unsigned nrSamplesToCNProc) : itsIsTransposeInput(isTransposeInput), itsIsTransposeOutput(isTransposeOutput), @@ -28,9 +28,11 @@ template <typename SAMPLE_TYPE> AsyncTranspose<SAMPLE_TYPE>::AsyncTranspose(bool itsOutputPsets(outputPsets), itsLocationInfo(locationInfo) { + InputData<SAMPLE_TYPE> oneSample( 1, nrSamplesToCNProc ); + itsGroupNumber = -1; for(unsigned core = 0; core < nrCoresPerPset; core++) { - unsigned rank = locationInfo.remapOnTree(locationInfo.psetNumber(), core); + const unsigned rank = locationInfo.remapOnTree(locationInfo.psetNumber(), core); if(rank == locationInfo.rank()) { itsGroupNumber = core; break; @@ -38,11 +40,11 @@ template <typename SAMPLE_TYPE> AsyncTranspose<SAMPLE_TYPE>::AsyncTranspose(bool } for(unsigned i=0; i<inputPsets.size(); i++) { - unsigned rank = locationInfo.remapOnTree(inputPsets[i], itsGroupNumber); + const unsigned rank = locationInfo.remapOnTree(inputPsets[i], itsGroupNumber); itsRankToPsetIndex[rank] = i; } - itsMessageSize = InputData<SAMPLE_TYPE>::requiredSize(1, nrSamplesToCNProc); + itsMessageSize = oneSample.requiredSize(); dataHandles.resize(inputPsets.size()); metaDataHandles.resize(inputPsets.size()); itsAsyncComm = new AsyncCommunication(); @@ -57,29 +59,29 @@ template <typename SAMPLE_TYPE> AsyncTranspose<SAMPLE_TYPE>::~AsyncTranspose() template <typename SAMPLE_TYPE> void AsyncTranspose<SAMPLE_TYPE>::postAllReceives(TransposedData<SAMPLE_TYPE> *transposedData) { - for(unsigned i=0; i<itsInputPsets.size(); i++) { - void* buf = (void*) transposedData->samples[i].origin(); - unsigned pset = itsInputPsets[i]; - unsigned rank = itsLocationInfo.remapOnTree(pset, itsGroupNumber); // TODO cache this? maybe in locationInfo itself? - dataHandles[i] = itsAsyncComm->asyncRead(buf, itsMessageSize, rank, rank); - metaDataHandles[i] = itsAsyncComm->asyncRead(&transposedData->metaData[i], sizeof(SubbandMetaData), rank, rank + MAX_TAG); - } + for(unsigned i=0; i<itsInputPsets.size(); i++) { + void* buf = (void*) transposedData->samples[i].origin(); + const unsigned pset = itsInputPsets[i]; + const unsigned rank = itsLocationInfo.remapOnTree(pset, itsGroupNumber); // TODO cache this? maybe in locationInfo itself? + dataHandles[i] = itsAsyncComm->asyncRead(buf, itsMessageSize, rank, rank); + metaDataHandles[i] = itsAsyncComm->asyncRead(&transposedData->metaData[i], sizeof(SubbandMetaData), rank, rank + MAX_TAG); + } } // returns station number (= pset index) template <typename SAMPLE_TYPE> unsigned AsyncTranspose<SAMPLE_TYPE>::waitForAnyReceive() { - void* buf; - unsigned size, source; - int tag; - while(true) { + void* buf; + unsigned size, source; + int tag; + // This read could return either a data message, or a meta data message. itsAsyncComm->waitForAnyRead(buf, size, source, tag); // source is the real rank, calc pset index - unsigned psetIndex = itsRankToPsetIndex[source]; + const unsigned psetIndex = itsRankToPsetIndex[source]; if(tag < MAX_TAG) { // real data message dataHandles[psetIndex] = -1; // record that we have received the data @@ -96,11 +98,11 @@ template <typename SAMPLE_TYPE> unsigned AsyncTranspose<SAMPLE_TYPE>::waitForAny } -template <typename SAMPLE_TYPE> void AsyncTranspose<SAMPLE_TYPE>::asyncSend(unsigned outputPsetNr, const InputData<SAMPLE_TYPE> *inputData) +template <typename SAMPLE_TYPE> void AsyncTranspose<SAMPLE_TYPE>::asyncSend(const unsigned outputPsetNr, const InputData<SAMPLE_TYPE> *inputData) { - unsigned pset = itsOutputPsets[outputPsetNr]; - unsigned rank = itsLocationInfo.remapOnTree(pset, itsGroupNumber); - int tag = itsLocationInfo.rank(); + const unsigned pset = itsOutputPsets[outputPsetNr]; + const unsigned rank = itsLocationInfo.remapOnTree(pset, itsGroupNumber); + const int tag = itsLocationInfo.rank(); itsAsyncComm->asyncWrite(inputData->samples[outputPsetNr].origin(), itsMessageSize, rank, tag); itsAsyncComm->asyncWrite(&inputData->metaData[outputPsetNr], sizeof(SubbandMetaData), rank, tag + MAX_TAG); diff --git a/RTCP/CNProc/src/AsyncTranspose.h b/RTCP/CNProc/src/AsyncTranspose.h index f32dd0e9e96ff684dc5f2e402f86e355fb873b56..cb337ffdb25895352e745e6ea8c08b44e4800e0b 100644 --- a/RTCP/CNProc/src/AsyncTranspose.h +++ b/RTCP/CNProc/src/AsyncTranspose.h @@ -35,8 +35,8 @@ namespace RTCP { template <typename SAMPLE_TYPE> class AsyncTranspose { public: - AsyncTranspose(bool isTransposeInput, bool isTransposeOutput, unsigned nrCoresPerPset, const LocationInfo &, - const std::vector<unsigned> &inputPsets, const std::vector<unsigned> &outputPsets, unsigned nrSamplesToCNProc); + AsyncTranspose(const bool isTransposeInput, const bool isTransposeOutput, const unsigned nrCoresPerPset, const LocationInfo &, + const std::vector<unsigned> &inputPsets, const std::vector<unsigned> &outputPsets, const unsigned nrSamplesToCNProc); ~AsyncTranspose(); @@ -47,14 +47,14 @@ template <typename SAMPLE_TYPE> class AsyncTranspose unsigned waitForAnyReceive(); // Asynchronously send a subband. - void asyncSend(unsigned outputPsetNr, const InputData<SAMPLE_TYPE> *inputData); + void asyncSend(const unsigned outputPsetNr, const InputData<SAMPLE_TYPE> *inputData); // Make sure all async sends have finished. void waitForAllSends(); private: - bool itsIsTransposeInput, itsIsTransposeOutput; + const bool itsIsTransposeInput, itsIsTransposeOutput; // the size of a data message unsigned itsMessageSize; diff --git a/RTCP/CNProc/src/CN_Processing.cc b/RTCP/CNProc/src/CN_Processing.cc index ec361777f7fb0002251425d1ab028ca27471fa04..f6ff0b63fb681c99a5b20cb21cb46725d31ba2a6 100644 --- a/RTCP/CNProc/src/CN_Processing.cc +++ b/RTCP/CNProc/src/CN_Processing.cc @@ -88,6 +88,7 @@ template <typename SAMPLE_TYPE> CN_Processing<SAMPLE_TYPE>::CN_Processing(Stream itsPencilBeamData(0), itsStokesData(0), itsIncoherentStokesIData(0), + itsStokesDataIntegratedChannels(0), itsMode(), #if defined HAVE_BGL || defined HAVE_BGP itsAsyncTranspose(0), @@ -263,6 +264,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C itsNrSubbands = configuration.nrSubbands(); itsMode = configuration.mode(); itsOutputIncoherentStokesI = configuration.outputIncoherentStokesI(); + itsStokesIntegrateChannels = configuration.stokesIntegrateChannels(); itsOutputPsetSize = outputPsets.size(); const unsigned nrChannels = configuration.nrChannelsPerSubband(); const unsigned nrSamplesPerIntegration = configuration.nrSamplesPerIntegration(); @@ -288,34 +290,71 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C // Since some buffers (arenas) are used multiple times, we use multiple // Allocators for a single arena. - const size_t inputDataSize = itsIsTransposeInput ? InputData<SAMPLE_TYPE>::requiredSize(outputPsets.size(), nrSamplesToCNProc) : 0; - const size_t transposedDataSize = itsIsTransposeOutput ? TransposedData<SAMPLE_TYPE>::requiredSize(itsNrStations, nrSamplesToCNProc) : 0; - const size_t filteredDataSize = itsIsTransposeOutput ? FilteredData::requiredSize(itsNrStations, nrChannels, nrSamplesPerIntegration) : 0; - const size_t correlatedDataSize = itsIsTransposeOutput ? CorrelatedData::requiredSize(nrBaselines, nrChannels) : 0; - const size_t pencilBeamDataSize = itsIsTransposeOutput ? PencilBeamData::requiredSize(pencilCoordinates.size(), nrChannels, nrSamplesPerIntegration) : 0; - const size_t stokesDataSize = itsIsTransposeOutput ? StokesData::requiredSize(itsMode.isCoherent(), itsMode.nrStokes(), pencilCoordinates.size(), nrChannels, nrSamplesPerIntegration, nrSamplesPerStokesIntegration) : 0; - const size_t incoherentStokesIDataSize = itsIsTransposeOutput ? StokesData::requiredSize(false, 1, 1, nrChannels, nrSamplesPerIntegration, nrSamplesPerStokesIntegration) : 0; - - itsMapping.addDataset( 0, inputDataSize, 0 ); - itsMapping.addDataset( 1, transposedDataSize, 1 ); - itsMapping.addDataset( 2, filteredDataSize, 2 ); - itsMapping.addDataset( 3, correlatedDataSize, 1 ); - itsMapping.addDataset( 4, pencilBeamDataSize, 1 ); - itsMapping.addDataset( 5, stokesDataSize, 2 ); - itsMapping.addDataset( 6, incoherentStokesIDataSize, 1 ); + if (itsIsTransposeInput) { + itsInputData = new InputData<SAMPLE_TYPE>(outputPsets.size(), nrSamplesToCNProc); + } - if( !itsMode.isCoherent() ) { - // for incoherent modes, the filtered data is used for stokes, so they cannot overlap. - itsMapping.moveDataset( 5, 1 ); // stokesData + if (itsIsTransposeOutput) { + // create only the data structures that are used by the pipeline + + itsTransposedData = new TransposedData<SAMPLE_TYPE>(itsNrStations, nrSamplesToCNProc); + itsFilteredData = new FilteredData(itsNrStations, nrChannels, nrSamplesPerIntegration); + + switch( itsMode.mode() ) { + case CN_Mode::FILTER: + // we have everything already + break; + + case CN_Mode::CORRELATE: + itsCorrelatedData = new CorrelatedData(nrBaselines, nrChannels); + break; + + case CN_Mode::COHERENT_COMPLEX_VOLTAGES: + itsPencilBeamData = new PencilBeamData(pencilCoordinates.size(), nrChannels, nrSamplesPerIntegration); + break; + + case CN_Mode::COHERENT_STOKES_I: + case CN_Mode::COHERENT_ALLSTOKES: + itsPencilBeamData = new PencilBeamData(pencilCoordinates.size(), nrChannels, nrSamplesPerIntegration); + // fallthrough + + case CN_Mode::INCOHERENT_STOKES_I: + case CN_Mode::INCOHERENT_ALLSTOKES: + itsStokesData = new StokesData(itsMode.isCoherent(), itsMode.nrStokes(), pencilCoordinates.size(), nrChannels, nrSamplesPerIntegration, nrSamplesPerStokesIntegration); + break; + + default: + std::clog << "Invalid mode: " << itsMode << endl; + break; + } + + if( itsOutputIncoherentStokesI ) { + itsIncoherentStokesIData = new StokesData(false, 1, 1, nrChannels, nrSamplesPerIntegration, nrSamplesPerStokesIntegration); + } + + if( itsStokesIntegrateChannels ) { + itsStokesDataIntegratedChannels = new StokesDataIntegratedChannels(itsMode.isCoherent(), itsMode.nrStokes(), pencilCoordinates.size(), nrSamplesPerIntegration, nrSamplesPerStokesIntegration); + } } - // create the arenas - itsMapping.createAllocators(); + itsMapping.addDataset( itsInputData, 0 ); + itsMapping.addDataset( itsTransposedData, 1 ); + itsMapping.addDataset( itsFilteredData, 2 ); + itsMapping.addDataset( itsCorrelatedData, 1 ); + itsMapping.addDataset( itsPencilBeamData, 1 ); + itsMapping.addDataset( itsStokesData, 2 ); + itsMapping.addDataset( itsIncoherentStokesIData, 1 ); + itsMapping.addDataset( itsStokesDataIntegratedChannels, 1 ); - if (itsIsTransposeInput) { - itsInputData = new InputData<SAMPLE_TYPE>(outputPsets.size(), nrSamplesToCNProc, *(itsMapping.allocatorOf(0))); + if( !itsMode.isCoherent() ) { + // for incoherent modes, the filtered data is used for stokes, so they cannot overlap. + itsMapping.moveDataset( itsStokesData, 1 ); + itsMapping.moveDataset( itsStokesDataIntegratedChannels, 2 ); } + // create the arenas and allocate the data sets + itsMapping.allocate(); + if (itsIsTransposeOutput) { const unsigned nrSubbandsPerPset = configuration.nrSubbandsPerPset(); const unsigned logicalNode = usedCoresPerPset * (outputPsetIndex - outputPsets.begin()) + myCore; @@ -331,13 +370,6 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C printSubbandList(); #endif // HAVE_MPI - itsTransposedData = new TransposedData<SAMPLE_TYPE>(itsNrStations, nrSamplesToCNProc, *(itsMapping.allocatorOf(1))); - itsFilteredData = new FilteredData(itsNrStations, nrChannels, nrSamplesPerIntegration, *(itsMapping.allocatorOf(2))); - itsCorrelatedData = new CorrelatedData(nrBaselines, nrChannels, *(itsMapping.allocatorOf(3))); - itsPencilBeamData = new PencilBeamData(pencilCoordinates.size(), nrChannels, nrSamplesPerIntegration, *(itsMapping.allocatorOf(4))); - itsStokesData = new StokesData(itsMode.isCoherent(), itsMode.nrStokes(), pencilCoordinates.size(), nrChannels, nrSamplesPerIntegration, nrSamplesPerStokesIntegration, *(itsMapping.allocatorOf(5))); - itsIncoherentStokesIData = new StokesData(false, 1, pencilCoordinates.size(), nrChannels, nrSamplesPerIntegration, nrSamplesPerStokesIntegration, *(itsMapping.allocatorOf(6))); - itsPPF = new PPF<SAMPLE_TYPE>(itsNrStations, nrChannels, nrSamplesPerIntegration, configuration.sampleRate() / nrChannels, configuration.delayCompensation(), itsLocationInfo.rank() == 0); itsPencilBeamFormer = new PencilBeams(pencilCoordinates, itsNrStations, nrChannels, nrSamplesPerIntegration, itsCenterFrequencies[itsCurrentSubband], configuration.sampleRate() / nrChannels, configuration.refPhaseCentre(), configuration.phaseCentres(), configuration.correctBandPass() ); @@ -391,7 +423,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::transpose() } #else // NO MPI readTimer.start(); - itsInputData->read(itsStream); + itsInputData->readAll(itsStream); readTimer.stop(); #endif } // itsIsTransposeInput @@ -555,13 +587,23 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::process() case CN_Mode::COHERENT_ALLSTOKES: formPencilBeams(); calculateCoherentStokes(); - sendOutput( itsStokesData ); + if( itsStokesIntegrateChannels ) { + itsStokes->compressStokes( itsStokesData, itsStokesDataIntegratedChannels, itsPencilBeamFormer->nrCoordinates() ); + sendOutput( itsStokesDataIntegratedChannels ); + } else { + sendOutput( itsStokesData ); + } break; case CN_Mode::INCOHERENT_STOKES_I: case CN_Mode::INCOHERENT_ALLSTOKES: calculateIncoherentStokes(); - sendOutput( itsStokesData ); + if( itsStokesIntegrateChannels ) { + itsStokes->compressStokes( itsStokesData, itsStokesDataIntegratedChannels, 1 ); + sendOutput( itsStokesDataIntegratedChannels ); + } else { + sendOutput( itsStokesData ); + } break; default: diff --git a/RTCP/CNProc/src/CN_Processing.h b/RTCP/CNProc/src/CN_Processing.h index 4fdba60ea3502b550ed3d47df8713998275a1301..1bcb81402b3c35e6d51c97cb6aa55a82cf28fe9c 100644 --- a/RTCP/CNProc/src/CN_Processing.h +++ b/RTCP/CNProc/src/CN_Processing.h @@ -124,6 +124,7 @@ template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base, std::vector<double> itsCenterFrequencies; unsigned itsFirstSubband, itsCurrentSubband, itsLastSubband, itsSubbandIncrement; bool itsIsTransposeInput, itsIsTransposeOutput; + bool itsStokesIntegrateChannels; ArenaMapping itsMapping; @@ -134,6 +135,7 @@ template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base, PencilBeamData *itsPencilBeamData; StokesData *itsStokesData; StokesData *itsIncoherentStokesIData; + StokesDataIntegratedChannels *itsStokesDataIntegratedChannels; CN_Mode itsMode; bool itsOutputIncoherentStokesI; diff --git a/RTCP/CNProc/src/InputData.h b/RTCP/CNProc/src/InputData.h index 15489fbf5231729bfeb6a660add54ba9723acfd6..3b536d27a760b82d7c5f9734e16a7c0b05950f22 100644 --- a/RTCP/CNProc/src/InputData.h +++ b/RTCP/CNProc/src/InputData.h @@ -7,6 +7,7 @@ #include <Interface/MultiDimArray.h> #include <Interface/Config.h> #include <Interface/SubbandMetaData.h> +#include <Interface/StreamableData.h> #include <Stream/Stream.h> #include <Interface/Allocator.h> @@ -17,42 +18,43 @@ namespace LOFAR { namespace RTCP { -template <typename SAMPLE_TYPE> class InputData +template <typename SAMPLE_TYPE> class InputData: public SampleData<SAMPLE_TYPE,3> { public: - InputData(unsigned nrSubbands, unsigned nrSamplesToCNProc, Allocator &allocator = heapAllocator); + typedef SampleData<SAMPLE_TYPE,3> SuperType; - void read(Stream *); + InputData(const unsigned nrSubbands, const unsigned nrSamplesToCNProc); + + virtual void allocate( Allocator &allocator = heapAllocator ); // used for asynchronous transpose void readMetaData(Stream *str); void readOne(Stream *str); - static size_t requiredSize(unsigned nrSubbands, unsigned nrSamplesToCNProc); + // used for synchronous transfer + void readAll(Stream *str); private: - unsigned itsNrSubbands; + const unsigned itsNrSubbands; unsigned itsSubbandIndex; public: - Cube<SAMPLE_TYPE> samples; //[outputPsets.size()][itsPS->nrSamplesToCNProc()][NR_POLARIZATIONS] Vector<SubbandMetaData> metaData; //[outputPsets.size()] }; -template <typename SAMPLE_TYPE> inline size_t InputData<SAMPLE_TYPE>::requiredSize(unsigned nrSubbands, unsigned nrSamplesToCNProc) +template <typename SAMPLE_TYPE> inline InputData<SAMPLE_TYPE>::InputData(const unsigned nrSubbands, const unsigned nrSamplesToCNProc) +: + SuperType( false, boost::extents[nrSubbands][nrSamplesToCNProc][NR_POLARIZATIONS], 0 ), + itsNrSubbands(nrSubbands), + itsSubbandIndex(0) { - return align(sizeof(SAMPLE_TYPE) * nrSubbands * nrSamplesToCNProc * NR_POLARIZATIONS, 32); } - -template <typename SAMPLE_TYPE> inline InputData<SAMPLE_TYPE>::InputData(unsigned nrSubbands, unsigned nrSamplesToCNProc, Allocator &allocator) -: - itsNrSubbands(nrSubbands), - itsSubbandIndex(0), - samples(boost::extents[nrSubbands][nrSamplesToCNProc][NR_POLARIZATIONS], 32, allocator), - metaData(nrSubbands) +template <typename SAMPLE_TYPE> inline void InputData<SAMPLE_TYPE>::allocate( Allocator &allocator ) { + SuperType::allocate( allocator ); + metaData.resize( itsNrSubbands ); } @@ -66,24 +68,24 @@ template <typename SAMPLE_TYPE> inline void InputData<SAMPLE_TYPE>::readMetaData // used for asynchronous transpose template <typename SAMPLE_TYPE> inline void InputData<SAMPLE_TYPE>::readOne(Stream *str) { - str->read(samples[itsSubbandIndex].origin(), samples[itsSubbandIndex].num_elements() * sizeof(SAMPLE_TYPE)); + str->read(SuperType::samples[itsSubbandIndex].origin(), SuperType::samples[itsSubbandIndex].num_elements() * sizeof(SAMPLE_TYPE)); #if defined C_IMPLEMENTATION && defined WORDS_BIGENDIAN - dataConvert(LittleEndian, samples[itsSubbandIndex].origin(), samples[itsSubbandIndex].num_elements()); + dataConvert(LittleEndian, SuperType::samples[itsSubbandIndex].origin(), SuperType::samples[itsSubbandIndex].num_elements()); #endif if (++ itsSubbandIndex == itsNrSubbands) // we have read all data itsSubbandIndex = 0; } -template <typename SAMPLE_TYPE> inline void InputData<SAMPLE_TYPE>::read(Stream *str) +template <typename SAMPLE_TYPE> inline void InputData<SAMPLE_TYPE>::readAll(Stream *str) { // read all metadata str->read(&metaData[0], metaData.size() * sizeof(SubbandMetaData)); // now read all subbands using one recvBlocking call, even though the ION // sends all subbands one at a time - str->read(samples.origin(), samples.num_elements() * sizeof(SAMPLE_TYPE)); + str->read(SuperType::samples.origin(), SuperType::samples.num_elements() * sizeof(SAMPLE_TYPE)); #if defined C_IMPLEMENTATION && defined WORDS_BIGENDIAN dataConvert(LittleEndian, samples[itsSubbandIndex].origin(), samples[itsSubbandIndex].num_elements()); diff --git a/RTCP/CNProc/src/PencilBeams.cc b/RTCP/CNProc/src/PencilBeams.cc index 1ba83755fe5bd446dcdac35ae26ee266938994c1..cef7142fda5127c81d6f96589aa550ccdac9a9ad 100644 --- a/RTCP/CNProc/src/PencilBeams.cc +++ b/RTCP/CNProc/src/PencilBeams.cc @@ -183,6 +183,7 @@ PencilBeams::PencilBeams(PencilCoordinates &coordinates, const unsigned nrStatio const double z = phaseCentres[stat][2]; PencilCoord3D phaseCentre( x, y, z ); + //std::clog << "phase center #" << stat << ": " << phaseCentre; PencilCoord3D baseLine = phaseCentre - refPhaseCentre; PencilCoord3D baseLineSeconds = baseLine * (1.0/speedOfLight); @@ -192,6 +193,7 @@ PencilBeams::PencilBeams(PencilCoordinates &coordinates, const unsigned nrStatio for( unsigned beam = 0; beam < itsCoordinates.size(); beam++ ) { itsDelayOffsets[stat][beam] = baseLine * itsCoordinates[beam] * (1.0/speedOfLight); + //std::clog << "delay offset for beam " << beam << " station " << stat << itsDelayOffsets[stat][beam] << " = " << baseLine << " * " << itsCoordinates[beam] << " * " << (1.0/speedOfLight) << std::endl; } } } @@ -200,6 +202,8 @@ void PencilBeams::calculateDelays( unsigned stat, const PencilCoord3D &beamDir ) { const double compensatedDelay = itsDelayOffsets[stat][0] - itsBaselinesSeconds[stat] * beamDir; + //std::clog << "station " << stat << " beam 0 has an absolute delay of " << compensatedDelay << std::endl; + // centre beam does not need compensation itsDelays[stat][0] = 0.0; @@ -210,6 +214,10 @@ void PencilBeams::calculateDelays( unsigned stat, const PencilCoord3D &beamDir ) // // further reduced by the delay we already compensate for when doing regular beam forming (the centre of the beam). that compensation is done at the IONode (sample shift) and the PPF (phase shift) itsDelays[stat][i] = itsDelayOffsets[stat][i] - itsBaselinesSeconds[stat] * beamDir - compensatedDelay; + + //std::clog << "station " << stat << " beam " << i << "has an additional delay of " << itsDelays[stat][i] << std::endl; + //std::clog << itsDelayOffsets[stat][i] << " " << itsBaselinesSeconds[stat] << " " << beamDir << " " << compensatedDelay << std::endl; + //std::clog << "example shift: " << phaseShift( itsBaseFrequency + itsNrChannels/2*itsChannelBandwidth, itsDelays[stat][i] ) << std::endl; } } diff --git a/RTCP/CNProc/src/PencilBeams.h b/RTCP/CNProc/src/PencilBeams.h index d978c1036dca4330a373c60a9bf5789558e9e71c..552c6ee54111c6b9c884d79c93402cc9fde2b394 100644 --- a/RTCP/CNProc/src/PencilBeams.h +++ b/RTCP/CNProc/src/PencilBeams.h @@ -15,10 +15,32 @@ const double speedOfLight = 299792458; class PencilCoord3D { public: - PencilCoord3D(const double x, const double y) { + PencilCoord3D(const double ra, const double dec) { + /* itsXYZ[0] = x; itsXYZ[1] = y; itsXYZ[2] = sqrt( 1.0 - x*x - y*y ); + */ + // (ra,dec) is a spherical direction, but the station positions + // and phase centers are cartesian (x,y,z with origin close to the geocenter). + // Spherical coordinates are converted to cartesian as follows: + // + // phi = .5pi - DEC, theta = RA (in parset: angle1=RA, angle2=DEC) + // rho = 1 (distance), since we need to construct a unit vector + // + // then: x = rho*sin(phi)*cos(theta), y = rho*sin(phi)*sin(theta), z = rho*cos(theta) */ + // + // NOTE: The use of the letters phi and theta differ or are swapped between sources. + + // in this case, phi is relative to the original beam, so .5pi is already compensated for. The + // direction of DEC is still important, so we have to use phi = -dec to get the proper relative change + // in angle. + const double phi = -dec; + const double theta = ra; + + itsXYZ[0] = sin(phi)*cos(theta); + itsXYZ[1] = sin(phi)*sin(theta); + itsXYZ[2] = cos(theta); } PencilCoord3D(const double x, const double y, const double z) { @@ -212,8 +234,8 @@ inline ostream& operator<<(ostream& os, const PencilCoord3D &c) inline fcomplex PencilBeams::phaseShift( const float frequency, const float delay ) const { - float phaseShift = delay * frequency; - float phi = -2 * M_PI * phaseShift; + const float phaseShift = delay * frequency; + const float phi = -2 * M_PI * phaseShift; return makefcomplex( std::cos(phi), std::sin(phi) ); } diff --git a/RTCP/CNProc/src/Stokes.cc b/RTCP/CNProc/src/Stokes.cc index 22a5d57d45ff94aea29c46d1c082cffcd25ede75..a47cc754a80662f9481ca5f5aeb1246940e681af 100644 --- a/RTCP/CNProc/src/Stokes.cc +++ b/RTCP/CNProc/src/Stokes.cc @@ -56,7 +56,6 @@ void Stokes::calculateIncoherent( const FilteredData *filteredData, StokesData * } // Compress Stokes values by summing over all channels -/* void Stokes::compressStokes( const StokesData *in, StokesDataIntegratedChannels *out, const unsigned nrBeams ) { const unsigned timeSteps = itsNrSamplesPerIntegration / itsNrSamplesPerStokesIntegration; @@ -80,7 +79,6 @@ void Stokes::compressStokes( const StokesData *in, StokesDataIntegratedChannels } } } -*/ void Stokes::computeCoherentStokes( const MultiDimArray<fcomplex,4> &in, const SparseSet<unsigned> *inflags, StokesData *out, const unsigned nrBeams ) { diff --git a/RTCP/CNProc/src/Stokes.h b/RTCP/CNProc/src/Stokes.h index ad05e94565b12ff8d1ec8134d5a0856f6dac0f53..31506ebd78a15248876e6e43f17baf79c4257eff 100644 --- a/RTCP/CNProc/src/Stokes.h +++ b/RTCP/CNProc/src/Stokes.h @@ -19,7 +19,7 @@ class Stokes void calculateCoherent( const PencilBeamData *filteredData, StokesData *stokesData, const unsigned nrBeams ); void calculateIncoherent( const FilteredData *filteredData, StokesData *stokesData, const unsigned nrStations ); - //void compressStokes( const StokesData *in, StokesDataIntegratedChannels *out, const unsigned nrBeams ); + void compressStokes( const StokesData *in, StokesDataIntegratedChannels *out, const unsigned nrBeams ); private: const unsigned itsNrChannels; diff --git a/RTCP/CNProc/src/TransposedData.h b/RTCP/CNProc/src/TransposedData.h index 2b9551381494133d586d1dadca16922c0d4ab648..8aad180da8f27c62e713311f3728b62ebfc802e5 100644 --- a/RTCP/CNProc/src/TransposedData.h +++ b/RTCP/CNProc/src/TransposedData.h @@ -5,6 +5,7 @@ #include <Interface/Align.h> #include <Interface/Config.h> #include <Interface/MultiDimArray.h> +#include <Interface/StreamableData.h> #include <Interface/SubbandMetaData.h> #include <vector> @@ -13,30 +14,41 @@ namespace LOFAR { namespace RTCP { -template <typename SAMPLE_TYPE> class TransposedData +template <typename SAMPLE_TYPE> class TransposedData: public SampleData<SAMPLE_TYPE,3> { public: - TransposedData(unsigned nrStations, unsigned nrSamplesToCNProc, Allocator &allocator = heapAllocator); + typedef SampleData<SAMPLE_TYPE,3> SuperType; - static size_t requiredSize(unsigned nrStations, unsigned nrSamplesToCNProc); + TransposedData(const unsigned nrStations, const unsigned nrSamplesToCNProc); + + virtual size_t requiredSize() const; + virtual void allocate( Allocator &allocator = heapAllocator ); - Cube<SAMPLE_TYPE> samples; //[itsNrStations][itsPS->nrSamplesToCNProc()][NR_POLARIZATIONS] Vector<SubbandMetaData> metaData; //[itsNrStations] + private: + const unsigned itsNrStations; + const unsigned itsNrSamplesToCNProc; }; -template <typename SAMPLE_TYPE> inline TransposedData<SAMPLE_TYPE>::TransposedData(unsigned nrStations, unsigned nrSamplesToCNProc, Allocator &allocator) +template <typename SAMPLE_TYPE> inline TransposedData<SAMPLE_TYPE>::TransposedData(const unsigned nrStations, const unsigned nrSamplesToCNProc) : - samples(boost::extents[nrStations][nrSamplesToCNProc][NR_POLARIZATIONS], 32, allocator), - metaData(nrStations, 32, allocator) + SuperType(false,boost::extents[nrStations][nrSamplesToCNProc][NR_POLARIZATIONS],0), + itsNrStations(nrStations), + itsNrSamplesToCNProc(nrSamplesToCNProc) { } +template <typename SAMPLE_TYPE> inline void TransposedData<SAMPLE_TYPE>::allocate( Allocator &allocator ) +{ + SuperType::allocate( allocator ); + metaData.resize(itsNrStations, 32, allocator); +} -template <typename SAMPLE_TYPE> inline size_t TransposedData<SAMPLE_TYPE>::requiredSize(unsigned nrStations, unsigned nrSamplesToCNProc) +template <typename SAMPLE_TYPE> inline size_t TransposedData<SAMPLE_TYPE>::requiredSize() const { - return align(sizeof(SAMPLE_TYPE) * nrStations * nrSamplesToCNProc * NR_POLARIZATIONS, 32) + - align(sizeof(SubbandMetaData) * nrStations, 32); + return SuperType::requiredSize() + + align(sizeof(SubbandMetaData) * itsNrStations, 32); } } // namespace RTCP diff --git a/RTCP/IONProc/src/ION_main.cc b/RTCP/IONProc/src/ION_main.cc index 0b89bda9299fb5f383982793e208487bf87a07b2..f86b2149de162eb6af1011791355073a2b25842c 100644 --- a/RTCP/IONProc/src/ION_main.cc +++ b/RTCP/IONProc/src/ION_main.cc @@ -167,7 +167,7 @@ static void deleteClientStreams() static void configureCNs(const Parset &parset) { CN_Command command(CN_Command::PREPROCESS); - CN_Configuration configuration( parset, myPsetNumber ); + CN_Configuration configuration( parset ); std::stringstream logStr; diff --git a/RTCP/Interface/include/Interface/CN_Configuration.h b/RTCP/Interface/include/Interface/CN_Configuration.h index 44e8150115d3f0dce81ee755348f24a892668826..f445cba63c752dc4133f516aadab6c757b08cc4e 100644 --- a/RTCP/Interface/include/Interface/CN_Configuration.h +++ b/RTCP/Interface/include/Interface/CN_Configuration.h @@ -38,7 +38,7 @@ class CN_Configuration { public: CN_Configuration() {} - CN_Configuration( const Parset &parset, const unsigned myPsetNumber ); + CN_Configuration( const Parset &parset ); unsigned &nrStations(); unsigned &nrBitsPerSample(); @@ -62,6 +62,7 @@ class CN_Configuration Matrix<double> &phaseCentres(); CN_Mode &mode(); bool &outputIncoherentStokesI(); + bool &stokesIntegrateChannels(); unsigned nrPencilBeams() { return 3 * nrPencilRings() * (nrPencilRings() + 1) + 1 + nrManualPencilBeams(); } @@ -106,6 +107,7 @@ class CN_Configuration unsigned itsNrManualPencilBeams; double itsManualPencilBeams[MAX_PENCILBEAMS * 2]; bool itsOutputIncoherentStokesI; + bool itsStokesIntegrateChannels; } itsMarshalledData; }; @@ -230,6 +232,11 @@ inline bool &CN_Configuration::outputIncoherentStokesI() return itsMarshalledData.itsOutputIncoherentStokesI; } +inline bool &CN_Configuration::stokesIntegrateChannels() +{ + return itsMarshalledData.itsStokesIntegrateChannels; +} + } // namespace RTCP } // namespace LOFAR diff --git a/RTCP/Interface/include/Interface/CorrelatedData.h b/RTCP/Interface/include/Interface/CorrelatedData.h index aec0736ff03a5b4865a1f1f7db69b1253a1a29d6..e16efd9d02e1080ad0b0719c5a84e986f4e4422b 100644 --- a/RTCP/Interface/include/Interface/CorrelatedData.h +++ b/RTCP/Interface/include/Interface/CorrelatedData.h @@ -22,9 +22,10 @@ namespace RTCP { class CorrelatedData: public StreamableData { public: - CorrelatedData(unsigned nrBaselines, unsigned nrChannels, Allocator &allocator = heapAllocator); + CorrelatedData(const unsigned nrBaselines, const unsigned nrChannels); - static size_t requiredSize(unsigned nrBaselines, unsigned nrChannels); + virtual size_t requiredSize() const; + virtual void allocate( Allocator &allocator = heapAllocator ); virtual StreamableData &operator += (const StreamableData &); @@ -37,45 +38,54 @@ class CorrelatedData: public StreamableData virtual void writeData(Stream *); private: + const unsigned itsNrBaselines; + const unsigned itsNrChannels; + void checkEndianness(); - static size_t visibilitiesSize(unsigned nrBaselines, unsigned nrChannels); - static size_t nrValidSamplesSize(unsigned nrBaselines, unsigned nrChannels); - static size_t centroidSize(unsigned nrBaselines); + size_t visibilitiesSize() const; + size_t nrValidSamplesSize() const; + size_t centroidSize() const; }; -inline size_t CorrelatedData::visibilitiesSize(unsigned nrBaselines, unsigned nrChannels) +inline size_t CorrelatedData::visibilitiesSize() const { - return align(sizeof(fcomplex) * nrBaselines * nrChannels * NR_POLARIZATIONS * NR_POLARIZATIONS, 32); + return align(sizeof(fcomplex) * itsNrBaselines * itsNrChannels * NR_POLARIZATIONS * NR_POLARIZATIONS, 32); } -inline size_t CorrelatedData::nrValidSamplesSize(unsigned nrBaselines, unsigned nrChannels) +inline size_t CorrelatedData::nrValidSamplesSize() const { - return align(sizeof(unsigned short) * nrBaselines * nrChannels, 32); + return align(sizeof(unsigned short) * itsNrBaselines * itsNrChannels, 32); } -inline size_t CorrelatedData::centroidSize(unsigned nrBaselines) +inline size_t CorrelatedData::centroidSize() const { - return align(sizeof(float) * nrBaselines, 32); + return align(sizeof(float) * itsNrBaselines, 32); } -inline size_t CorrelatedData::requiredSize(unsigned nrBaselines, unsigned nrChannels) +inline size_t CorrelatedData::requiredSize() const { - return visibilitiesSize(nrBaselines, nrChannels) + nrValidSamplesSize(nrBaselines, nrChannels) + centroidSize(nrBaselines); + return visibilitiesSize() + nrValidSamplesSize() + centroidSize(); } -inline CorrelatedData::CorrelatedData(unsigned nrBaselines, unsigned nrChannels, Allocator &allocator) +inline CorrelatedData::CorrelatedData(const unsigned nrBaselines, const unsigned nrChannels) : StreamableData(true), - visibilities(boost::extents[nrBaselines][nrChannels][NR_POLARIZATIONS][NR_POLARIZATIONS], 32, allocator), - nrValidSamples(boost::extents[nrBaselines][nrChannels], 32, allocator), - centroids(nrBaselines, 32, allocator) + itsNrBaselines(nrBaselines), + itsNrChannels(nrChannels) +{ +} + +inline void CorrelatedData::allocate( Allocator &allocator ) { + visibilities.resize(boost::extents[itsNrBaselines][itsNrChannels][NR_POLARIZATIONS][NR_POLARIZATIONS], 32, allocator); + nrValidSamples.resize(boost::extents[itsNrBaselines][itsNrChannels], 32, allocator); + centroids.resize(itsNrBaselines, 32, allocator); } diff --git a/RTCP/Interface/include/Interface/FilteredData.h b/RTCP/Interface/include/Interface/FilteredData.h index 5fa088b55930c25185f7e5910779243bcded7d44..0975a50230a83fc49ad1e38094c50c0728ead2ba 100644 --- a/RTCP/Interface/include/Interface/FilteredData.h +++ b/RTCP/Interface/include/Interface/FilteredData.h @@ -18,35 +18,48 @@ class FilteredData: public SampleData<fcomplex,4> public: typedef SampleData<fcomplex,4> SuperType; - FilteredData(unsigned nrStations, unsigned nrChannels, unsigned nrSamplesPerIntegration, Allocator &allocator = heapAllocator); + FilteredData(const unsigned nrStations, const unsigned nrChannels, const unsigned nrSamplesPerIntegration); - static size_t requiredSize(unsigned nrStations, unsigned nrChannels, unsigned nrSamplesPerIntegration); + virtual size_t requiredSize() const; + virtual void allocate( Allocator &allocator = heapAllocator ); Vector<SubbandMetaData> metaData; //[itsNrStations] protected: + const unsigned itsNrStations; + const unsigned itsNrChannels; + const unsigned itsNrSamplesPerIntegration; virtual void readData( Stream* ); virtual void writeData( Stream* ); }; -inline size_t FilteredData::requiredSize(unsigned nrStations, unsigned nrChannels, unsigned nrSamplesPerIntegration) -{ - return align(sizeof(fcomplex) * nrChannels * nrStations * (nrSamplesPerIntegration | 2) * NR_POLARIZATIONS, 32) + - align(sizeof(SubbandMetaData) * nrStations, 32); -} - -inline FilteredData::FilteredData(unsigned nrStations, unsigned nrChannels, unsigned nrSamplesPerIntegration, Allocator &allocator) +inline FilteredData::FilteredData(const unsigned nrStations, const unsigned nrChannels, const unsigned nrSamplesPerIntegration) : // The "| 2" significantly improves transpose speeds for particular // numbers of stations due to cache conflict effects. The extra memory // is not used. - SuperType::SampleData(false,boost::extents[nrChannels][nrStations][nrSamplesPerIntegration | 2][NR_POLARIZATIONS], nrStations, allocator), - metaData(nrStations, 32, allocator) + SuperType::SampleData(false,boost::extents[nrChannels][nrStations][nrSamplesPerIntegration | 2][NR_POLARIZATIONS], nrStations), + itsNrStations(nrStations), + itsNrChannels(nrChannels), + itsNrSamplesPerIntegration(nrSamplesPerIntegration) +{ +} + +inline size_t FilteredData::requiredSize() const { + return SuperType::requiredSize() + + align(sizeof(SubbandMetaData) * itsNrStations, 32); } +inline void FilteredData::allocate( Allocator &allocator ) +{ + SuperType::allocate( allocator ); + metaData.resize( itsNrStations, 32, allocator ); +} + + inline void FilteredData::readData(Stream *str) { str->read(&metaData[0], metaData.size() * sizeof(SubbandMetaData)); diff --git a/RTCP/Interface/include/Interface/MultiDimArray.h b/RTCP/Interface/include/Interface/MultiDimArray.h index a93eecd31cb00661ce0d38cbf5cd5020b7cc1372..b8b367665e971b2bbb27c61759d13c8756b449ff 100644 --- a/RTCP/Interface/include/Interface/MultiDimArray.h +++ b/RTCP/Interface/include/Interface/MultiDimArray.h @@ -28,23 +28,23 @@ template <typename T, unsigned DIM> class MultiDimArray : public boost::multi_ar MultiDimArray(Allocator &allocator = heapAllocator) : SuperType(0, boost::detail::multi_array::extent_gen<DIM>()), - allocator(allocator) + allocator(&allocator) { } MultiDimArray(const ExtentList &extents, size_t alignment = defaultAlignment(), Allocator &allocator = heapAllocator) : SuperType(static_cast<T *>(allocator.allocate(nrElements(extents) * sizeof(T), alignment)), extents), - allocator(allocator) + allocator(&allocator) { } ~MultiDimArray() { - allocator.deallocate(this->origin()); + allocator->deallocate(this->origin()); } - void resize(const ExtentList &extents, size_t alignment = defaultAlignment()) + void resize(const ExtentList &extents, const size_t alignment, Allocator &allocator) { MultiDimArray newArray(extents, alignment, allocator); std::swap(this->base_, newArray.base_); @@ -55,6 +55,12 @@ template <typename T, unsigned DIM> class MultiDimArray : public boost::multi_ar std::swap(this->origin_offset_, newArray.origin_offset_); std::swap(this->directional_offset_, newArray.directional_offset_); std::swap(this->num_elements_, newArray.num_elements_); + std::swap(this->allocator, newArray.allocator); + } + + void resize(const ExtentList &extents, const size_t alignment = defaultAlignment()) + { + resize( extents, alignment, *allocator ); } static size_t defaultAlignment() @@ -62,8 +68,6 @@ template <typename T, unsigned DIM> class MultiDimArray : public boost::multi_ar return sizeof(T) < 16 ? 8 : sizeof(T) < 32 ? 16 : 32; } - private: - Allocator &allocator; static size_t nrElements(const ExtentList &extents) { @@ -74,6 +78,10 @@ template <typename T, unsigned DIM> class MultiDimArray : public boost::multi_ar return size; } + + private: + // needs to be a pointer to be swappable in resize() + Allocator *allocator; }; @@ -89,13 +97,13 @@ template <typename T> class Vector : public MultiDimArray<T, 1> { } - Vector(size_t x, size_t alignment = SuperType::defaultAlignment(), Allocator &allocator = heapAllocator) + Vector(const size_t x, const size_t alignment = SuperType::defaultAlignment(), Allocator &allocator = heapAllocator) : SuperType(boost::extents[x], alignment, allocator) { } - Vector(const ExtentList &extents, size_t alignment = SuperType::defaultAlignment(), Allocator &allocator = heapAllocator) + Vector(const ExtentList &extents, const size_t alignment = SuperType::defaultAlignment(), Allocator &allocator = heapAllocator) : SuperType(extents, alignment, allocator) { @@ -103,9 +111,9 @@ template <typename T> class Vector : public MultiDimArray<T, 1> using SuperType::resize; - void resize(size_t x, size_t alignment = SuperType::defaultAlignment()) + void resize(const size_t x, const size_t alignment = SuperType::defaultAlignment(), Allocator &allocator = heapAllocator ) { - SuperType::resize(boost::extents[x], alignment); + SuperType::resize(boost::extents[x], alignment, allocator); } }; @@ -122,13 +130,13 @@ template <typename T> class Matrix : public MultiDimArray<T, 2> { } - Matrix(size_t x, size_t y, size_t alignment = SuperType::defaultAlignment(), Allocator &allocator = heapAllocator) + Matrix(const size_t x, const size_t y, const size_t alignment = SuperType::defaultAlignment(), Allocator &allocator = heapAllocator) : SuperType(boost::extents[x][y], alignment, allocator) { } - Matrix(const ExtentList &extents, size_t alignment = SuperType::defaultAlignment(), Allocator &allocator = heapAllocator) + Matrix(const ExtentList &extents, const size_t alignment = SuperType::defaultAlignment(), Allocator &allocator = heapAllocator) : SuperType(extents, alignment, allocator) { @@ -136,9 +144,9 @@ template <typename T> class Matrix : public MultiDimArray<T, 2> using SuperType::resize; - void resize(size_t x, size_t y, size_t alignment = SuperType::defaultAlignment()) + void resize(const size_t x, const size_t y, const size_t alignment = SuperType::defaultAlignment(), Allocator &allocator = heapAllocator) { - SuperType::resize(boost::extents[x][y], alignment); + SuperType::resize(boost::extents[x][y], alignment, allocator); } }; @@ -155,13 +163,13 @@ template <typename T> class Cube : public MultiDimArray<T, 3> { } - Cube(size_t x, size_t y, size_t z, size_t alignment = SuperType::defaultAlignment(), Allocator &allocator = heapAllocator) + Cube(const size_t x, const size_t y, const size_t z, const size_t alignment = SuperType::defaultAlignment(), Allocator &allocator = heapAllocator) : SuperType(boost::extents[x][y][z], alignment, allocator) { } - Cube(const ExtentList &extents, size_t alignment = SuperType::defaultAlignment(), Allocator &allocator = heapAllocator) + Cube(const ExtentList &extents, const size_t alignment = SuperType::defaultAlignment(), Allocator &allocator = heapAllocator) : SuperType(extents, alignment, allocator) { @@ -169,9 +177,9 @@ template <typename T> class Cube : public MultiDimArray<T, 3> using SuperType::resize; - void resize(size_t x, size_t y, size_t z, size_t alignment = SuperType::defaultAlignment()) + void resize(const size_t x, const size_t y, const size_t z, const size_t alignment = SuperType::defaultAlignment(), Allocator &allocator = heapAllocator ) { - SuperType::resize(boost::extents[x][y][z], alignment); + SuperType::resize(boost::extents[x][y][z], alignment, allocator); } }; diff --git a/RTCP/Interface/include/Interface/Parset.h b/RTCP/Interface/include/Interface/Parset.h index c3d5f08567e47a230c774934d52df6cc9c77528f..d8502d3b06907a9972757d59489264dd038d761d 100644 --- a/RTCP/Interface/include/Interface/Parset.h +++ b/RTCP/Interface/include/Interface/Parset.h @@ -102,6 +102,7 @@ public: string getTransportType(const string& prefix) const; string getModeName() const; bool outputIncoherentStokesI() const; + bool stokesIntegrateChannels() const; uint32 nrManualPencilBeams() const; vector<double> getManualPencilBeam( const unsigned pencil ) const; @@ -420,6 +421,11 @@ inline bool Parset::outputIncoherentStokesI() const return getBool("Observation.outputIncoherentStokesI"); } +inline bool Parset::stokesIntegrateChannels() const +{ + return getBool("Observation.stokesIntegrateChannels"); +} + } // namespace RTCP } // namespace LOFAR diff --git a/RTCP/Interface/include/Interface/PencilBeamData.h b/RTCP/Interface/include/Interface/PencilBeamData.h index 4e027a558e91549f30b0460a49eacd822888da34..5076a0a2b0e6694e04b53d4ad3bcd687f4b4ca40 100644 --- a/RTCP/Interface/include/Interface/PencilBeamData.h +++ b/RTCP/Interface/include/Interface/PencilBeamData.h @@ -18,21 +18,15 @@ class PencilBeamData: public SampleData<fcomplex,4> public: typedef SampleData<fcomplex,4> SuperType; - PencilBeamData(unsigned nrStations, unsigned nrChannels, unsigned nrSamplesPerIntegration, Allocator &allocator = heapAllocator); - static size_t requiredSize(unsigned nrCoordinates, unsigned nrChannels, unsigned nrSamplesPerIntegration); + PencilBeamData(const unsigned nrStations, const unsigned nrChannels, const unsigned nrSamplesPerIntegration); }; -inline size_t PencilBeamData::requiredSize(unsigned nrCoordinates, unsigned nrChannels, unsigned nrSamplesPerIntegration) -{ - return align(sizeof(fcomplex) * nrChannels * nrCoordinates * (nrSamplesPerIntegration | 2) * NR_POLARIZATIONS, 32); -} - -inline PencilBeamData::PencilBeamData(unsigned nrCoordinates, unsigned nrChannels, unsigned nrSamplesPerIntegration, Allocator &allocator) +inline PencilBeamData::PencilBeamData(const unsigned nrCoordinates, const unsigned nrChannels, const unsigned nrSamplesPerIntegration) // The "| 2" significantly improves transpose speeds for particular // numbers of stations due to cache conflict effects. The extra memory // is not used. : - SuperType::SampleData(false, boost::extents[nrChannels][nrCoordinates][nrSamplesPerIntegration | 2][NR_POLARIZATIONS], nrCoordinates, allocator ) + SuperType::SampleData(false, boost::extents[nrChannels][nrCoordinates][nrSamplesPerIntegration | 2][NR_POLARIZATIONS], nrCoordinates ) { } diff --git a/RTCP/Interface/include/Interface/PipelineOutput.h b/RTCP/Interface/include/Interface/PipelineOutput.h index 3870dd7cae6c7b53974847728e207b89ee890cc0..d26a90ced1108e450fc70f8cadd8a961882c3dd8 100644 --- a/RTCP/Interface/include/Interface/PipelineOutput.h +++ b/RTCP/Interface/include/Interface/PipelineOutput.h @@ -30,6 +30,7 @@ class PipelineOutput: boost::noncopyable FILTEREDDATA, PENCILBEAMDATA, STOKESDATA, + STOKESDATAINTEGRATEDCHANNELS, RAWDATA = -1 }; @@ -112,13 +113,13 @@ inline PipelineOutputSet::PipelineOutputSet( const Parset &ps, Allocator &alloca // !!! This section should match the actual pipeline as used by CNProcs !!! unsigned id = 0; - CN_Mode mode = CN_Mode(ps); + const CN_Mode mode = CN_Mode(ps); PipelineOutput *o; // add optional incoherentStokesI output if( ps.outputIncoherentStokesI() ) { o = new PipelineOutput( id++, PipelineOutput::STOKESDATA ); - o->itsData = new StokesData( false, 1, ps.nrPencilBeams(), ps.nrChannelsPerSubband(), ps.CNintegrationSteps(), ps.stokesIntegrationSteps(), allocator ); + o->itsData = new StokesData( false, 1, ps.nrPencilBeams(), ps.nrChannelsPerSubband(), ps.CNintegrationSteps(), ps.stokesIntegrationSteps() ); o->itsFilenameSuffix = ".incoherentStokesI"; itsOutputs.push_back( o ); } @@ -127,26 +128,31 @@ inline PipelineOutputSet::PipelineOutputSet( const Parset &ps, Allocator &alloca switch( mode.mode() ) { case CN_Mode::FILTER: o = new PipelineOutput( id++, PipelineOutput::FILTEREDDATA ); - o->itsData = new FilteredData( ps.nrStations(), ps.nrChannelsPerSubband(), ps.CNintegrationSteps(), allocator ); + o->itsData = new FilteredData( ps.nrStations(), ps.nrChannelsPerSubband(), ps.CNintegrationSteps() ); break; case CN_Mode::CORRELATE: o = new PipelineOutput( id++, PipelineOutput::CORRELATEDDATA ); o->itsWriterType = PipelineOutput::CASAWRITER; - o->itsData = new CorrelatedData( ps.nrBaselines(), ps.nrChannelsPerSubband(), allocator ); + o->itsData = new CorrelatedData( ps.nrBaselines(), ps.nrChannelsPerSubband() ); break; case CN_Mode::COHERENT_COMPLEX_VOLTAGES: o = new PipelineOutput( id++, PipelineOutput::PENCILBEAMDATA ); - o->itsData = new PencilBeamData( ps.nrPencilBeams(), ps.nrChannelsPerSubband(), ps.CNintegrationSteps(), allocator ); + o->itsData = new PencilBeamData( ps.nrPencilBeams(), ps.nrChannelsPerSubband(), ps.CNintegrationSteps() ); break; case CN_Mode::COHERENT_STOKES_I: case CN_Mode::COHERENT_ALLSTOKES: case CN_Mode::INCOHERENT_STOKES_I: case CN_Mode::INCOHERENT_ALLSTOKES: - o = new PipelineOutput( id++, PipelineOutput::STOKESDATA ); - o->itsData = new StokesData( mode.isCoherent(), mode.nrStokes(), ps.nrPencilBeams(), ps.nrChannelsPerSubband(), ps.CNintegrationSteps(), ps.stokesIntegrationSteps(), allocator ); + if( ps.stokesIntegrateChannels() ) { + o = new PipelineOutput( id++, PipelineOutput::STOKESDATAINTEGRATEDCHANNELS ); + o->itsData = new StokesDataIntegratedChannels( mode.isCoherent(), mode.nrStokes(), ps.nrPencilBeams(), ps.CNintegrationSteps(), ps.stokesIntegrationSteps() ); + } else { + o = new PipelineOutput( id++, PipelineOutput::STOKESDATA ); + o->itsData = new StokesData( mode.isCoherent(), mode.nrStokes(), ps.nrPencilBeams(), ps.nrChannelsPerSubband(), ps.CNintegrationSteps(), ps.stokesIntegrationSteps() ); + } break; default: @@ -160,6 +166,10 @@ inline PipelineOutputSet::PipelineOutputSet( const Parset &ps, Allocator &alloca } itsOutputs.push_back( o ); + + for( unsigned i = 0; i < itsOutputs.size(); i++ ) { + itsOutputs[i]->data()->allocate( allocator ); + } } } // namespace RTCP diff --git a/RTCP/Interface/include/Interface/StokesData.h b/RTCP/Interface/include/Interface/StokesData.h index 026aa649c5ec82ff1f94d1338933a2395d52ff36..37e6f2da7e0d9d0f8c767d9492cbd7e5a5c56b4a 100644 --- a/RTCP/Interface/include/Interface/StokesData.h +++ b/RTCP/Interface/include/Interface/StokesData.h @@ -18,22 +18,33 @@ class StokesData: public SampleData<float,4> public: typedef SampleData<float,4> SuperType; - StokesData(bool coherent, unsigned nrStokes, unsigned nrPencilBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration, unsigned nrSamplesPerStokesIntegration, Allocator &allocator = heapAllocator); - - static size_t requiredSize(bool coherent, unsigned nrStokes, unsigned nrPencilBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration, unsigned nrSamplesPerStokesIntegration); + StokesData(const bool coherent, const unsigned nrStokes, const unsigned nrPencilBeams, const unsigned nrChannels, const unsigned nrSamplesPerIntegration, const unsigned nrSamplesPerStokesIntegration); }; -inline size_t StokesData::requiredSize(bool coherent, unsigned nrStokes, unsigned nrPencilBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration, unsigned nrSamplesPerStokesIntegration) +inline StokesData::StokesData(const bool coherent, const unsigned nrStokes, const unsigned nrPencilBeams, const unsigned nrChannels, const unsigned nrSamplesPerIntegration, const unsigned nrSamplesPerStokesIntegration) +: + // The "| 2" significantly improves transpose speeds for particular + // numbers of stations due to cache conflict effects. The extra memory + // is not used. + SuperType::SampleData(false, boost::extents[nrChannels][coherent ? nrPencilBeams : 1][(nrSamplesPerIntegration/nrSamplesPerStokesIntegration) | 2][nrStokes], coherent ? nrPencilBeams : 1) { - return align(sizeof(fcomplex) * (coherent ? nrPencilBeams : 1) * nrStokes * nrChannels * ((nrSamplesPerIntegration/nrSamplesPerStokesIntegration) | 2), 32); } -inline StokesData::StokesData(bool coherent, unsigned nrStokes, unsigned nrPencilBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration, unsigned nrSamplesPerStokesIntegration, Allocator &allocator) +// StokesData, but summed over all channels +class StokesDataIntegratedChannels: public SampleData<float,3> +{ + public: + typedef SampleData<float,3> SuperType; + + StokesDataIntegratedChannels(const bool coherent, const unsigned nrStokes, const unsigned nrPencilBeams, const unsigned nrSamplesPerIntegration, const unsigned nrSamplesPerStokesIntegration); +}; + +inline StokesDataIntegratedChannels::StokesDataIntegratedChannels(const bool coherent, const unsigned nrStokes, const unsigned nrPencilBeams, const unsigned nrSamplesPerIntegration, const unsigned nrSamplesPerStokesIntegration) : // The "| 2" significantly improves transpose speeds for particular // numbers of stations due to cache conflict effects. The extra memory // is not used. - SuperType::SampleData(false, boost::extents[nrChannels][coherent ? nrPencilBeams : 1][(nrSamplesPerIntegration/nrSamplesPerStokesIntegration) | 2][nrStokes], coherent ? nrPencilBeams : 1, allocator) + SuperType::SampleData(false, boost::extents[coherent ? nrPencilBeams : 1][(nrSamplesPerIntegration/nrSamplesPerStokesIntegration) | 2][nrStokes], coherent ? nrPencilBeams : 1) { } diff --git a/RTCP/Interface/include/Interface/StreamableData.h b/RTCP/Interface/include/Interface/StreamableData.h index 7a98cabe48d456cbad0b773a56a37477f4cfbd29..83383c04df55eb21e737f5f9dbad1ff3aabe4aaa 100644 --- a/RTCP/Interface/include/Interface/StreamableData.h +++ b/RTCP/Interface/include/Interface/StreamableData.h @@ -39,6 +39,9 @@ class StreamableData { // suppress warning by defining a virtual destructor virtual ~StreamableData() {} + virtual size_t requiredSize() const = 0; + virtual void allocate( Allocator &allocator = heapAllocator ) = 0; + virtual void read(Stream*, const bool withSequenceNumber); virtual void write(Stream*, const bool withSequenceNumber); @@ -63,9 +66,12 @@ template <typename T, unsigned DIM> class SampleData: public StreamableData, boo public: typedef typename MultiDimArray<T,DIM>::ExtentList ExtentList; - SampleData( const bool isIntegratable, const ExtentList &extents, const unsigned nrFlags, Allocator &allocator = heapAllocator ); + SampleData( const bool isIntegratable, const ExtentList &extents, const unsigned nrFlags ); virtual ~SampleData(); + virtual size_t requiredSize() const; + virtual void allocate( Allocator &allocator = heapAllocator ); + MultiDimArray<T,DIM> samples; SparseSet<unsigned> *flags; @@ -76,6 +82,10 @@ template <typename T, unsigned DIM> class SampleData: public StreamableData, boo virtual void writeData(Stream*); private: + // copy the ExtentList instead of using a reference, as boost by default uses a global one (boost::extents) + const ExtentList extents; + const unsigned nrFlags; + bool itsHaveWarnedLittleEndian; }; @@ -108,19 +118,33 @@ inline void StreamableData::write( Stream *str, const bool withSequenceNumber ) writeData( str ); } -template <typename T, unsigned DIM> inline SampleData<T,DIM>::SampleData( const bool isIntegratable, const ExtentList &extents, const unsigned nrFlags, Allocator &allocator ): +template <typename T, unsigned DIM> inline SampleData<T,DIM>::SampleData( const bool isIntegratable, const ExtentList &extents, const unsigned nrFlags ): StreamableData( isIntegratable ), - samples( extents, 32, allocator ), - flags( new SparseSet<unsigned>[nrFlags] ), + flags( 0 ), + extents( extents ), + nrFlags( nrFlags ), itsHaveWarnedLittleEndian( false ) { } -template <typename T, unsigned DIM> inline SampleData<T,DIM>::~SampleData() +template <typename T, unsigned DIM> inline size_t SampleData<T,DIM>::requiredSize() const { - delete [] flags; + return align(MultiDimArray<T,DIM>::nrElements( extents ) * sizeof(T),32); } +template <typename T, unsigned DIM> inline void SampleData<T,DIM>::allocate( Allocator &allocator ) +{ + samples.resize( extents, 32, allocator ); + flags = new SparseSet<unsigned>[nrFlags]; +} + +template <typename T, unsigned DIM> inline SampleData<T,DIM>::~SampleData() +{ + if( flags ) { + delete [] flags; + flags = 0; + } +} template <typename T, unsigned DIM> inline void SampleData<T,DIM>::checkEndianness() { diff --git a/RTCP/Interface/src/CN_Configuration.cc b/RTCP/Interface/src/CN_Configuration.cc index 4d0a9f7aab881742ae9a962c04c9391e89387bbc..c4473ec6ad52e8a1ce8e0bc5405ad3d3d376b31a 100644 --- a/RTCP/Interface/src/CN_Configuration.cc +++ b/RTCP/Interface/src/CN_Configuration.cc @@ -27,10 +27,8 @@ namespace LOFAR { namespace RTCP { -CN_Configuration::CN_Configuration( const Parset &parset, unsigned myPsetNumber ) +CN_Configuration::CN_Configuration( const Parset &parset ) { - std::vector<Parset::StationRSPpair> inputs = parset.getStationNamesAndRSPboardNumbers(myPsetNumber); - nrStations() = parset.nrStations(); nrBitsPerSample() = parset.nrBitsPerSample(); nrSubbands() = parset.nrSubbands(); @@ -43,7 +41,7 @@ CN_Configuration::CN_Configuration( const Parset &parset, unsigned myPsetNumber delayCompensation() = parset.delayCompensation(); correctBandPass() = parset.correctBandPass(); sampleRate() = parset.sampleRate(); - inputPsets() = parset.getUint32Vector("OLAP.CNProc.inputPsets"); + inputPsets() = parset.inputPsets(); outputPsets() = parset.getUint32Vector("OLAP.CNProc.outputPsets"); tabList() = parset.getUint32Vector("OLAP.CNProc.tabList"); refFreqs() = parset.subbandToFrequencyMapping(); @@ -53,13 +51,19 @@ CN_Configuration::CN_Configuration( const Parset &parset, unsigned myPsetNumber refPhaseCentre() = parset.getRefPhaseCentres(); mode() = CN_Mode(parset); outputIncoherentStokesI() = parset.outputIncoherentStokesI(); + stokesIntegrateChannels() = parset.stokesIntegrateChannels(); - itsPhaseCentres.resize( inputs.size(), 3 ); - for( unsigned stat = 0; stat < inputs.size(); stat++ ) { - std::vector<double> phaseCentre = parset.getPhaseCentresOf( inputs[stat].station ); + // Get the phase centres of all station, not just the one we receive input from. The compute nodes + // need the phase centres for beam forming, which is after the transpose so all stations are present. + // The order of the stations is the order in which they are defined in inputPsets and parset.getStationNamesAndRSPboardNumbers. + // The CNProc/src/AsyncTranspose module should honor the same order. + itsPhaseCentres.resize( parset.nrStations(), 3 ); + std::vector<double> positions = parset.positions(); + + for( unsigned stat = 0; stat < parset.nrStations(); stat++ ) { for( unsigned dim = 0; dim < 3; dim++ ) { - itsPhaseCentres[stat][dim] = phaseCentre[dim]; + itsPhaseCentres[stat][dim] = positions[stat*3+dim]; } } @@ -129,7 +133,6 @@ void CN_Configuration::write(Stream *str) memcpy(&itsMarshalledData.itsPhaseCentres[stat*3], &itsPhaseCentres[stat][0], 3 * sizeof(double)); } - itsManualPencilBeams.resize(nrManualPencilBeams(),2); for( unsigned beam = 0; beam < nrManualPencilBeams(); beam++ ) { memcpy(&itsMarshalledData.itsManualPencilBeams[beam*2], &itsManualPencilBeams[beam][0], 2 * sizeof(double)); } diff --git a/RTCP/Run/src/RTCP.parset b/RTCP/Run/src/RTCP.parset index 76d5780a6475ce201e6a12dee02359cb1e1d92a6..662bd31d0a7785ca2467516e9eaf0b5ad73ca807 100644 --- a/RTCP/Run/src/RTCP.parset +++ b/RTCP/Run/src/RTCP.parset @@ -38,7 +38,8 @@ OLAP.correctBandPass = T Observation.mode = Correlate Observation.nrSlotsInFrame = 36 -Observation.stokesIntegrationSteps = 16 +Observation.stokesIntegrationSteps = 1 +Observation.stokesIntegrateChannels = F Observation.outputIncoherentStokesI = F #Observation.bandFilter = LBL_10_80