diff --git a/RTCP/CNProc/src/CN_Processing.cc b/RTCP/CNProc/src/CN_Processing.cc index 380c723d0192c1753ee00857c018215096cf8348..1896c7f1a9daa8ccc12e49743fd4fb076bbbc259 100644 --- a/RTCP/CNProc/src/CN_Processing.cc +++ b/RTCP/CNProc/src/CN_Processing.cc @@ -516,14 +516,17 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::formPencilBeams computeTimer.stop(); } -template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::calculateStokes() +template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::calculateIncoherentStokes() { computeTimer.start(); - if( itsMode.isCoherent() ) { - itsStokes->calculateCoherent(itsPencilBeamData,itsStokesData,itsPencilBeamFormer->nrCoordinates()); - } else { - itsStokes->calculateIncoherent(itsFilteredData,itsStokesData,itsNrStations); - } + itsStokes->calculateIncoherent(itsFilteredData,itsStokesData,itsNrStations); + computeTimer.stop(); +} + +template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::calculateCoherentStokes() +{ + computeTimer.start(); + itsStokes->calculateCoherent(itsPencilBeamData,itsStokesData,itsPencilBeamFormer->nrCoordinates()); computeTimer.stop(); } @@ -591,11 +594,13 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::process() case CN_Mode::COHERENT_STOKES_I: case CN_Mode::COHERENT_ALLSTOKES: formPencilBeams(); - // fallthrough to incoherent modes + calculateCoherentStokes(); + sendOutput( itsStokesData ); + break; case CN_Mode::INCOHERENT_STOKES_I: case CN_Mode::INCOHERENT_ALLSTOKES: - calculateStokes(); + calculateIncoherentStokes(); sendOutput( itsStokesData ); break; diff --git a/RTCP/CNProc/src/PencilBeams.cc b/RTCP/CNProc/src/PencilBeams.cc index 6c5b0f2f34061fdd03e3160cac377bd018931731..4410fea5f809662ae17b3faaa77f5f6ff38e1b41 100644 --- a/RTCP/CNProc/src/PencilBeams.cc +++ b/RTCP/CNProc/src/PencilBeams.cc @@ -220,25 +220,43 @@ fcomplex PencilBeams::phaseShift( double frequency, double delay ) return makefcomplex( std::cos(phi), std::sin(phi) ); } -void PencilBeams::computeComplexVoltages( MultiDimArray<fcomplex,4> &in, MultiDimArray<fcomplex,4> &out, std::vector<unsigned> stations ) +void PencilBeams::computeComplexVoltages( FilteredData *in, PencilBeamData *out, std::vector<unsigned> stations ) { float factor = 1.0 / stations.size(); + /* TODO: filter out stations with too much flagged data */ + + /* conservative flagging: flag output if any input was flagged */ + for( unsigned beam = 0; beam < itsCoordinates.size(); beam++ ) { + out->flags[beam].reset(); + + for (unsigned stat = 0; stat < stations.size(); stat++ ) { + out->flags[beam] |= in->flags[stations[stat]]; + } + } + for (unsigned ch = 0; ch < itsNrChannels; ch ++) { double frequency = itsBaseFrequency + ch * itsChannelBandwidth; for (unsigned time = 0; time < itsNrSamplesPerIntegration; time ++) { for (unsigned pol = 0; pol < NR_POLARIZATIONS; pol ++) { for( unsigned beam = 0; beam < itsCoordinates.size(); beam++ ) { - fcomplex &dest = out[ch][beam][time][pol]; - fcomplex sample = makefcomplex( 0, 0 ); - - for( unsigned stat = 0; stat < stations.size(); stat++ ) { - // note: for beam #0 (central beam), the shift is 1 - fcomplex shift = phaseShift( frequency, itsDelays[stations[stat]][beam] ); - sample += in[ch][stations[stat]][time][pol] * shift; + fcomplex &dest = out->samples[ch][beam][time][pol]; + + if( !out->flags[beam].test(time) ) { + /* combine the stations for this beam */ + fcomplex sample = makefcomplex( 0, 0 ); + + for( unsigned stat = 0; stat < stations.size(); stat++ ) { + // note: for beam #0 (central beam), the shift is 1 + fcomplex shift = phaseShift( frequency, itsDelays[stations[stat]][beam] ); + sample += in->samples[ch][stations[stat]][time][pol] * shift; + } + + dest = sample * factor; + } else { + /* data is flagged */ + dest = makefcomplex( 0, 0 ); } - - dest = sample * factor; } } } @@ -282,9 +300,7 @@ void PencilBeams::formPencilBeams( FilteredData *filteredData, PencilBeamData *p stations.push_back( stat ); } - std::clog << "Forming beams for stations " << stations << std::endl; - - computeComplexVoltages( filteredData->samples, pencilBeamData->samples, stations ); + computeComplexVoltages( filteredData, pencilBeamData, stations ); } } // namespace RTCP diff --git a/RTCP/CNProc/src/PencilBeams.h b/RTCP/CNProc/src/PencilBeams.h index 0f1a295a4c899eeef12f5144ec90e655ef68ce06..78b680091922657ca037d0dfbe286130cf0f9701 100644 --- a/RTCP/CNProc/src/PencilBeams.h +++ b/RTCP/CNProc/src/PencilBeams.h @@ -154,7 +154,7 @@ class PencilBeams void calculateDelays( unsigned stat, const PencilCoord3D &beamDir ); void calculateAllDelays( FilteredData *filteredData ); - void computeComplexVoltages( MultiDimArray<fcomplex,4> &in, MultiDimArray<fcomplex,4> &out, std::vector<unsigned> stations ); + void computeComplexVoltages( FilteredData *filteredData, PencilBeamData *pencilBeamData, std::vector<unsigned> stations ); std::vector<PencilCoord3D> itsCoordinates; unsigned itsNrStations; diff --git a/RTCP/CNProc/src/Stokes.cc b/RTCP/CNProc/src/Stokes.cc index f91153dd89c45b2cdfaf6a61027015a6cb6c5d76..2e0ee08a842946d16150a43a26b93f4678074965 100644 --- a/RTCP/CNProc/src/Stokes.cc +++ b/RTCP/CNProc/src/Stokes.cc @@ -18,34 +18,55 @@ Stokes::Stokes( CN_Mode &mode, unsigned nrChannels, unsigned nrSamplesPerIntegra void Stokes::calculateCoherent( PencilBeamData *pencilBeamData, StokesData *stokesData, unsigned nrBeams ) { - computeStokes( pencilBeamData->samples, stokesData->samples, nrBeams ); + computeStokes( pencilBeamData->samples, pencilBeamData->flags, stokesData, nrBeams ); } void Stokes::calculateIncoherent( FilteredData *filteredData, StokesData *stokesData, unsigned nrStations ) { - computeStokes( filteredData->samples, stokesData->samples, nrStations ); + computeStokes( filteredData->samples, filteredData->flags, stokesData, nrStations ); } static float sqr( float x ) { return x * x; } -void Stokes::computeStokes( MultiDimArray<fcomplex,4> &in, MultiDimArray<float,4> &out, unsigned nrBeams ) +void Stokes::computeStokes( MultiDimArray<fcomplex,4> &in, SparseSet<unsigned> *inflags, StokesData *out, unsigned nrBeams ) { unsigned &integrationSteps = itsNrSamplesPerStokesIntegration; bool allStokes = itsNrStokes == 4; bool coherent = itsIsCoherent; + unsigned nrOutputs = coherent ? nrBeams : 1; - std::clog << "Calculating " << itsNrStokes << " Stokes for " << nrBeams << " beam(s)." << std::endl; + for(unsigned beam = 0; beam < nrOutputs; beam++) { + out->flags[beam].reset(); + } + + /* conservative flagging; flag output if any of the inputs are flagged */ + /* TODO: fix for integrationsteps > 1 */ + for(unsigned beam = 0; beam < nrBeams; beam++) { + out->flags[coherent ? beam : 0] |= inflags[beam]; + } + + for(unsigned beam = 0; beam < nrOutputs; beam++) { + out->flags[beam] /= integrationSteps; + } for (unsigned ch = 0; ch < itsNrChannels; ch ++) { for (unsigned time = 0; time < itsNrSamplesPerIntegration; time += integrationSteps ) { float stokesI = 0, stokesQ = 0, stokesU = 0, stokesV = 0; + float nrValidSamples = 0; + bool flagged = false; for( unsigned beam = 0; beam < nrBeams; beam++ ) { if( coherent ) { stokesI = stokesQ = stokesU = stokesV = 0; + nrValidSamples = 0; + flagged = false; } for( unsigned fractime = 0; fractime < integrationSteps; fractime++ ) { + if( inflags[beam].test( time+fractime ) ) { + continue; + } + // assert: two polarizations fcomplex &polX = in[ch][beam][time+fractime][0]; @@ -59,34 +80,42 @@ void Stokes::computeStokes( MultiDimArray<fcomplex,4> &in, MultiDimArray<float,4 stokesU += real(polX * conj(polY)); stokesV += imag(polX * conj(polY)); } + nrValidSamples++; } - #define dest out[ch][beam][time / integrationSteps] if( coherent ) { - dest[0] = stokesI; + if( flagged ) { + /* hack: if no valid samples, insert zeroes */ + if( nrValidSamples == 0 ) { nrValidSamples = 1; } + } + #define dest out->samples[ch][beam][time / integrationSteps] + dest[0] = stokesI / nrValidSamples; if( allStokes ) { - dest[1] = stokesQ; - dest[2] = stokesU; - dest[3] = stokesV; + dest[1] = stokesQ / nrValidSamples; + dest[2] = stokesU / nrValidSamples; + dest[3] = stokesV / nrValidSamples; } + #undef dest } - #undef dest } if( !coherent ) { - #define dest out[ch][0][time / integrationSteps] - dest[0] = stokesI; + if( flagged ) { + /* hack: if no valid samples, insert zeroes */ + if( nrValidSamples == 0 ) { nrValidSamples = 1; } + } + + #define dest out->samples[ch][0][time / integrationSteps] + dest[0] = stokesI / nrValidSamples; if( allStokes ) { - dest[1] = stokesQ; - dest[2] = stokesU; - dest[3] = stokesV; + dest[1] = stokesQ / nrValidSamples; + dest[2] = stokesU / nrValidSamples; + dest[3] = stokesV / nrValidSamples; } #undef dest } } } - - std::clog << "Done calculating Stokes" << std::endl; } } // namespace RTCP diff --git a/RTCP/CNProc/src/Stokes.h b/RTCP/CNProc/src/Stokes.h index 893982a46a41d838ef8f250eb846bce968fc751e..ff8b6000d1f1eb2673fdbedaa510fe2e1a230289 100644 --- a/RTCP/CNProc/src/Stokes.h +++ b/RTCP/CNProc/src/Stokes.h @@ -26,7 +26,7 @@ class Stokes unsigned itsNrStokes; bool itsIsCoherent; - void computeStokes( MultiDimArray<fcomplex,4> &in, MultiDimArray<float,4> &out, unsigned nrBeams ); + void computeStokes( MultiDimArray<fcomplex,4> &in, SparseSet<unsigned> *inflags, StokesData *out, unsigned nrBeams ); }; } // namespace RTCP diff --git a/RTCP/Interface/include/Interface/FilteredData.h b/RTCP/Interface/include/Interface/FilteredData.h index cbd4db9d470422e5c99d6a4552902d62d57eafce..5fa088b55930c25185f7e5910779243bcded7d44 100644 --- a/RTCP/Interface/include/Interface/FilteredData.h +++ b/RTCP/Interface/include/Interface/FilteredData.h @@ -13,27 +13,20 @@ namespace LOFAR { namespace RTCP { -class FilteredData: public StreamableData +class FilteredData: public SampleData<fcomplex,4> { public: + typedef SampleData<fcomplex,4> SuperType; + FilteredData(unsigned nrStations, unsigned nrChannels, unsigned nrSamplesPerIntegration, Allocator &allocator = heapAllocator); - ~FilteredData(); static size_t requiredSize(unsigned nrStations, unsigned nrChannels, unsigned nrSamplesPerIntegration); - // The "| 2" significantly improves transpose speeds for particular - // numbers of stations due to cache conflict effects. The extra memory - // is not used. - MultiDimArray<fcomplex, 4> samples; //[itsNrChannels][itsNrStations][itsNrSamplesPerIntegration | 2][NR_POLARIZATIONS] CACHE_ALIGNED - SparseSet<unsigned> *flags; //[itsNrStations] Vector<SubbandMetaData> metaData; //[itsNrStations] protected: virtual void readData( Stream* ); virtual void writeData( Stream* ); - - private: - void checkEndianness(); }; @@ -46,45 +39,25 @@ inline size_t FilteredData::requiredSize(unsigned nrStations, unsigned nrChannel inline FilteredData::FilteredData(unsigned nrStations, unsigned nrChannels, unsigned nrSamplesPerIntegration, Allocator &allocator) : - StreamableData(false), - samples(boost::extents[nrChannels][nrStations][nrSamplesPerIntegration | 2][NR_POLARIZATIONS], 32, allocator), - flags(new SparseSet<unsigned>[nrStations]), + // The "| 2" significantly improves transpose speeds for particular + // numbers of stations due to cache conflict effects. The extra memory + // is not used. + SuperType::SampleData(false,boost::extents[nrChannels][nrStations][nrSamplesPerIntegration | 2][NR_POLARIZATIONS], nrStations, allocator), metaData(nrStations, 32, allocator) { } - -inline FilteredData::~FilteredData() -{ - delete [] flags; -} - - -inline void FilteredData::checkEndianness() -{ -#if !defined WORDS_BIGENDIAN - dataConvert(LittleEndian, samples.origin(), samples.num_elements()); -#endif -} - - inline void FilteredData::readData(Stream *str) { - str->read(samples.origin(), samples.num_elements() * sizeof(fcomplex)); str->read(&metaData[0], metaData.size() * sizeof(SubbandMetaData)); - - checkEndianness(); + SuperType::readData(str); } inline void FilteredData::writeData(Stream *str) { -#if !defined WORDS_BIGENDIAN - THROW(AssertError, "not implemented: think about endianness"); -#endif - - str->write(samples.origin(), samples.num_elements() * sizeof(fcomplex)); str->write(&metaData[0], metaData.size() * sizeof(SubbandMetaData)); + SuperType::writeData(str); } diff --git a/RTCP/Interface/include/Interface/PencilBeamData.h b/RTCP/Interface/include/Interface/PencilBeamData.h index 9a2f7eff3eb96ca2385119cdeef6e5bcf271f338..abf9e55bfbc57509e799ce6c0070704947c7c3fb 100644 --- a/RTCP/Interface/include/Interface/PencilBeamData.h +++ b/RTCP/Interface/include/Interface/PencilBeamData.h @@ -13,64 +13,29 @@ namespace LOFAR { namespace RTCP { -class PencilBeamData: public StreamableData +class PencilBeamData: public SampleData<fcomplex,4> { public: - PencilBeamData(unsigned nrStations, unsigned nrChannels, unsigned nrSamplesPerIntegration, Allocator &allocator = heapAllocator); + typedef SampleData<fcomplex,4> SuperType; + PencilBeamData(unsigned nrStations, unsigned nrChannels, unsigned nrSamplesPerIntegration, Allocator &allocator = heapAllocator); static size_t requiredSize(unsigned nrCoordinates, unsigned nrChannels, unsigned nrSamplesPerIntegration); - - // The "| 2" significantly improves transpose speeds for particular - // numbers of stations due to cache conflict effects. The extra memory - // is not used. - MultiDimArray<fcomplex, 4> samples; //[itsNrChannels][itsNrCoordinates][itsNrSamplesPerIntegration | 2][NR_POLARIZATIONS] CACHE_ALIGNED - - protected: - virtual void readData( Stream* ); - virtual void writeData( Stream* ); - - private: - void checkEndianness(); }; - inline size_t PencilBeamData::requiredSize(unsigned nrCoordinates, unsigned nrChannels, unsigned nrSamplesPerIntegration) { return align(sizeof(fcomplex) * nrChannels * nrCoordinates * (nrSamplesPerIntegration | 2) * NR_POLARIZATIONS, 32); } inline PencilBeamData::PencilBeamData(unsigned nrCoordinates, unsigned nrChannels, unsigned nrSamplesPerIntegration, Allocator &allocator) + // The "| 2" significantly improves transpose speeds for particular + // numbers of stations due to cache conflict effects. The extra memory + // is not used. : - StreamableData(false), - samples(boost::extents[nrChannels][nrCoordinates][nrSamplesPerIntegration | 2][NR_POLARIZATIONS], 32, allocator) + SuperType::SampleData(false, boost::extents[nrChannels][nrCoordinates][nrSamplesPerIntegration | 2][NR_POLARIZATIONS], nrCoordinates, allocator ) { } -inline void PencilBeamData::checkEndianness() -{ -#if !defined WORDS_BIGENDIAN - dataConvert(LittleEndian, samples.origin(), samples.num_elements()); -#endif -} - - -inline void PencilBeamData::readData(Stream *str) -{ - str->read(samples.origin(), samples.num_elements() * sizeof(fcomplex)); - - checkEndianness(); -} - - -inline void PencilBeamData::writeData(Stream *str) -{ -#if !defined WORDS_BIGENDIAN - THROW(AssertError, "not implemented: think about endianness"); -#endif - - str->write(samples.origin(), samples.num_elements() * sizeof(fcomplex)); -} - } // namespace RTCP } // namespace LOFAR diff --git a/RTCP/Interface/include/Interface/StokesData.h b/RTCP/Interface/include/Interface/StokesData.h index dcd73a9c211a40cde73dfbbaebac46650435dce7..e0d8d411db8ba4dbf3d93f024f2144f92108cf97 100644 --- a/RTCP/Interface/include/Interface/StokesData.h +++ b/RTCP/Interface/include/Interface/StokesData.h @@ -14,26 +14,14 @@ namespace LOFAR { namespace RTCP { -class StokesData: public StreamableData +class StokesData: public SampleData<float,4> { public: + typedef SampleData<float,4> SuperType; + StokesData(CN_Mode &mode, unsigned nrPencilBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration, unsigned nrSamplesPerStokesIntegration, Allocator &allocator = heapAllocator); static size_t requiredSize(CN_Mode &mode, unsigned nrPencilBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration, unsigned nrSamplesPerStokesIntegration); - - // The "| 2" significantly improves transpose speeds for particular - // numbers of stations due to cache conflict effects. The extra memory - // is not used. - MultiDimArray<float, 4> samples; //[itsNrChannels][nrBeams()][itsNrSamplesPerIntegration | 2][nrStokes()] CACHE_ALIGNED - - protected: - virtual void readData( Stream* ); - virtual void writeData( Stream* ); - - private: - bool itsHaveWarnedLittleEndian; - - void checkEndianness(); }; inline size_t StokesData::requiredSize(CN_Mode &mode, unsigned nrPencilBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration, unsigned nrSamplesPerStokesIntegration) @@ -43,43 +31,13 @@ inline size_t StokesData::requiredSize(CN_Mode &mode, unsigned nrPencilBeams, un inline StokesData::StokesData(CN_Mode &mode, unsigned nrPencilBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration, unsigned nrSamplesPerStokesIntegration, Allocator &allocator) : - StreamableData(false), - samples(boost::extents[nrChannels][mode.isCoherent() ? nrPencilBeams : 1][(nrSamplesPerIntegration/nrSamplesPerStokesIntegration) | 2][mode.nrStokes()], 32, allocator), - itsHaveWarnedLittleEndian(false) -{ -} - -inline void StokesData::checkEndianness() -{ -#if !defined WORDS_BIGENDIAN - dataConvert(LittleEndian, samples.origin(), samples.num_elements()); -#endif -} - - -inline void StokesData::readData(Stream *str) + // The "| 2" significantly improves transpose speeds for particular + // numbers of stations due to cache conflict effects. The extra memory + // is not used. + SuperType::SampleData(false, boost::extents[nrChannels][mode.isCoherent() ? nrPencilBeams : 1][(nrSamplesPerIntegration/nrSamplesPerStokesIntegration) | 2][mode.nrStokes()], mode.isCoherent() ? nrPencilBeams : 1, allocator) { - str->read(samples.origin(), samples.num_elements() * sizeof(float)); - - checkEndianness(); } - -inline void StokesData::writeData(Stream *str) -{ -#if !defined WORDS_BIGENDIAN - if( !itsHaveWarnedLittleEndian ) { - itsHaveWarnedLittleEndian = true; - - std::clog << "Warning: writing data in little endian." << std::endl; - } - //THROW(AssertError, "not implemented: think about endianness"); -#endif - - str->write(samples.origin(), samples.num_elements() * sizeof(float)); -} - - } // namespace RTCP } // namespace LOFAR diff --git a/RTCP/Interface/include/Interface/StreamableData.h b/RTCP/Interface/include/Interface/StreamableData.h index 56c29fae5926a026772c85a25652a463d28def93..e910fcfd8d78fe6e4bbc6e84841cc0ca15a369d6 100644 --- a/RTCP/Interface/include/Interface/StreamableData.h +++ b/RTCP/Interface/include/Interface/StreamableData.h @@ -4,6 +4,8 @@ #include <Stream/Stream.h> #include <Common/LofarTypes.h> #include <Interface/Parset.h> +#include <Interface/MultiDimArray.h> +#include <Interface/SparseSet.h> #include <Interface/Allocator.h> #include <Common/DataConvert.h> @@ -54,6 +56,27 @@ class StreamableData { virtual void writeData(Stream*) = 0; }; +// A typical data set contains a MultiDimArray of tuples and a set of flags. +template <typename T, unsigned DIM> class SampleData: public StreamableData { + public: + typedef typename MultiDimArray<T,DIM>::ExtentList ExtentList; + + SampleData( bool isIntegratable, const ExtentList &extents, unsigned nrFlags, Allocator &allocator = heapAllocator ); + virtual ~SampleData(); + + MultiDimArray<T,DIM> samples; + SparseSet<unsigned> *flags; + + protected: + virtual void checkEndianness(); + + virtual void readData(Stream*); + virtual void writeData(Stream*); + + private: + bool itsHaveWarnedLittleEndian; +}; + inline void StreamableData::read( Stream *str, bool withSequenceNumber ) { if( withSequenceNumber ) { @@ -83,6 +106,49 @@ inline void StreamableData::write( Stream *str, bool withSequenceNumber ) writeData( str ); } +template <typename T, unsigned DIM> inline SampleData<T,DIM>::SampleData( bool isIntegratable, const ExtentList &extents, unsigned nrFlags, Allocator &allocator ): + StreamableData( isIntegratable ), + samples( extents, 32, allocator ), + flags( new SparseSet<unsigned>[nrFlags] ), + itsHaveWarnedLittleEndian( false ) +{ +} + +template <typename T, unsigned DIM> inline SampleData<T,DIM>::~SampleData() +{ + delete [] flags; +} + + +template <typename T, unsigned DIM> inline void SampleData<T,DIM>::checkEndianness() +{ +#if !defined WORDS_BIGENDIAN + dataConvert(LittleEndian, samples.origin(), samples.num_elements()); +#endif +} + +template <typename T, unsigned DIM> inline void SampleData<T,DIM>::readData( Stream *str ) +{ + str->read(samples.origin(), samples.num_elements() * sizeof (T) ); + + checkEndianness(); +} + +template <typename T, unsigned DIM> inline void SampleData<T,DIM>::writeData( Stream *str ) +{ +#if !defined WORDS_BIGENDIAN + if( !itsHaveWarnedLittleEndian ) { + itsHaveWarnedLittleEndian = true; + + std::clog << "Warning: writing data in little endian." << std::endl; + } + //THROW(AssertError, "not implemented: think about endianness"); +#endif + + str->write(samples.origin(), samples.num_elements() * sizeof (T) ); +} + + } // namespace RTCP } // namespace LOFAR