From fb8fddee1e3b4a3a5a58912c1328f21c9ee77b5a Mon Sep 17 00:00:00 2001 From: Jan David Mol <mol@astron.nl> Date: Thu, 17 Sep 2009 08:06:46 +0000 Subject: [PATCH] bug 1303: merged beamformer and pencilbeamformer, detached metadata from samples to avoid copying them, changed some arrays into vectors --- .gitattributes | 2 - RTCP/CNProc/src/AsyncTranspose.cc | 7 +- RTCP/CNProc/src/AsyncTranspose.h | 4 +- RTCP/CNProc/src/BeamFormer.cc | 220 ----------- RTCP/CNProc/src/BeamFormer.h | 86 ---- RTCP/CNProc/src/CMakeLists.txt | 1 - RTCP/CNProc/src/CN_Processing.cc | 78 ++-- RTCP/CNProc/src/CN_Processing.h | 12 +- RTCP/CNProc/src/Correlator.cc | 74 ++-- RTCP/CNProc/src/Correlator.h | 18 +- RTCP/CNProc/src/InputData.h | 33 +- RTCP/CNProc/src/Makefile.am | 2 - RTCP/CNProc/src/PPF.cc | 15 +- RTCP/CNProc/src/PPF.h | 5 +- RTCP/CNProc/src/PencilBeams.cc | 367 ++++++++++++++---- RTCP/CNProc/src/PencilBeams.h | 75 +++- RTCP/CNProc/src/Stokes.cc | 90 +++-- RTCP/CNProc/src/Stokes.h | 9 +- RTCP/CNProc/src/TransposedData.h | 9 +- .../include/Interface/FilteredData.h | 29 +- .../include/Interface/MultiDimArray.h | 5 + .../include/Interface/PencilBeamData.h | 7 +- .../include/Interface/PipelineOutput.h | 4 +- .../include/Interface/StreamableData.h | 14 +- 24 files changed, 522 insertions(+), 644 deletions(-) delete mode 100644 RTCP/CNProc/src/BeamFormer.cc delete mode 100644 RTCP/CNProc/src/BeamFormer.h diff --git a/.gitattributes b/.gitattributes index f9821ccc015..cc6d94946b4 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1130,8 +1130,6 @@ MAC/_System/lofar30.sysconf -text svneol=native#application/octet-stream MAC/bootstrap -text svneol=native#application/octet-stream RTCP/CNProc/bootstrap -text RTCP/CNProc/src/ArenaMapping.h -text -RTCP/CNProc/src/BeamFormer.cc -text -RTCP/CNProc/src/BeamFormer.h -text RTCP/CNProc/src/BeamFormerAsm3St6Bm.inc -text RTCP/CNProc/src/BeamFormerAsm6St3Bm.inc -text RTCP/CNProc/src/CN_Math.h -text diff --git a/RTCP/CNProc/src/AsyncTranspose.cc b/RTCP/CNProc/src/AsyncTranspose.cc index d85a244e08c..dac00cd8211 100644 --- a/RTCP/CNProc/src/AsyncTranspose.cc +++ b/RTCP/CNProc/src/AsyncTranspose.cc @@ -38,7 +38,7 @@ template <typename SAMPLE_TYPE> AsyncTranspose<SAMPLE_TYPE>::AsyncTranspose( } } -template <typename SAMPLE_TYPE> void AsyncTranspose<SAMPLE_TYPE>::postAllReceives(TransposedData<SAMPLE_TYPE> *transposedData) +template <typename SAMPLE_TYPE> void AsyncTranspose<SAMPLE_TYPE>::postAllReceives(SubbandMetaData *metaData, TransposedData<SAMPLE_TYPE> *transposedData) { // there must be something to receive ASSERT(itsInputPsets.size() > 0); @@ -53,7 +53,7 @@ template <typename SAMPLE_TYPE> void AsyncTranspose<SAMPLE_TYPE>::postAllReceive size_t size; } toRead[itsNrCommunications] = { { transposedData->samples[i].origin(), transposedData->samples[i].num_elements() * sizeof(SAMPLE_TYPE) }, - { &transposedData->metaData.subbandInfo(i), transposedData->metaData.itsSubbandInfoSize } + { &metaData->subbandInfo(i), metaData->itsSubbandInfoSize } }; // read it @@ -103,6 +103,7 @@ template <typename SAMPLE_TYPE> unsigned AsyncTranspose<SAMPLE_TYPE>::waitForAny template <typename SAMPLE_TYPE> void AsyncTranspose<SAMPLE_TYPE>::asyncSend(unsigned outputPsetIndex, + const SubbandMetaData *metaData, const InputData<SAMPLE_TYPE> *inputData) { unsigned pset = itsOutputPsets[outputPsetIndex]; @@ -115,7 +116,7 @@ template <typename SAMPLE_TYPE> void AsyncTranspose<SAMPLE_TYPE>::asyncSend(unsi const size_t size; } toWrite[itsNrCommunications] = { { inputData->samples[outputPsetIndex].origin(), inputData->samples[outputPsetIndex].num_elements() * sizeof(SAMPLE_TYPE) }, - { &inputData->metaData.subbandInfo(outputPsetIndex), inputData->metaData.itsSubbandInfoSize }, + { &metaData->subbandInfo(outputPsetIndex), metaData->itsSubbandInfoSize }, }; // write it diff --git a/RTCP/CNProc/src/AsyncTranspose.h b/RTCP/CNProc/src/AsyncTranspose.h index 6dffce21290..679510bff5e 100644 --- a/RTCP/CNProc/src/AsyncTranspose.h +++ b/RTCP/CNProc/src/AsyncTranspose.h @@ -37,13 +37,13 @@ template <typename SAMPLE_TYPE> class AsyncTranspose const std::vector<unsigned> &inputPsets, const std::vector<unsigned> &outputPsets ); // Post all async receives for the transpose. - void postAllReceives(TransposedData<SAMPLE_TYPE> *transposedData); + void postAllReceives(SubbandMetaData *metaData, TransposedData<SAMPLE_TYPE> *transposedData); // Wait for a data message. Returns the station number where the message originates. unsigned waitForAnyReceive(); // Asynchronously send a subband. - void asyncSend(const unsigned outputPsetNr, const InputData<SAMPLE_TYPE> *inputData); + void asyncSend(const unsigned outputPsetNr, const SubbandMetaData *metaData, const InputData<SAMPLE_TYPE> *inputData); // Make sure all async sends have finished. void waitForAllSends(); diff --git a/RTCP/CNProc/src/BeamFormer.cc b/RTCP/CNProc/src/BeamFormer.cc deleted file mode 100644 index 71c8b8b076c..00000000000 --- a/RTCP/CNProc/src/BeamFormer.cc +++ /dev/null @@ -1,220 +0,0 @@ -//# Always #include <lofar_config.h> first! -#include <lofar_config.h> - -#include <Common/Timer.h> - -#include <BeamFormer.h> - -#include <map> - -#define VERBOSE 0 - -namespace LOFAR { -namespace RTCP { - -static NSTimer beamFormTimer("BeamFormer::formBeams()", true, true); - -BeamFormer::BeamFormer(const unsigned nrStations, const unsigned nrSamplesPerIntegration, - const std::vector<unsigned> &station2BeamFormedStation, const unsigned nrChannels) -: - itsNrStations(nrStations), - itsNrChannels(nrChannels), - itsNrSamplesPerIntegration(nrSamplesPerIntegration), - itsStation2BeamFormedStation(station2BeamFormedStation) -{ - itsNrBeamFormedStations = calcNrBeamFormedStations(); - if(itsNrBeamFormedStations == 0) { -#if VERBOSE - LOG_ERROR("BeamForming disabled"); -#endif - itsStationMapping = new unsigned[nrStations]; - } else { -#if VERBOSE - LOG_ERROR_STR("BeamForming enabled, " << itsNrBeamFormedStations << " station(s)"); -#endif - itsStationMapping = new unsigned[itsNrBeamFormedStations]; - } - - calcMapping(); -} - -unsigned BeamFormer::calcNrBeamFormedStations() -{ - if(itsStation2BeamFormedStation.size() == 0) return 0; - - unsigned max = 0; - for(unsigned i=0; i<itsStation2BeamFormedStation.size(); i++) { - if(itsStation2BeamFormedStation[i] > max) { - max = itsStation2BeamFormedStation[i]; - } - } - - return max + 1; -} - -BeamFormer::~BeamFormer() -{ - delete[] itsStationMapping; -} - -void BeamFormer::calcMapping() -{ - if(itsNrBeamFormedStations == 0) { - // beamforming disabled - for(unsigned i=0; i<itsNrStations; i++) { - itsStationMapping[i] = i; - } - } else { - unsigned nrStationsInBeam[itsNrBeamFormedStations]; - memset(nrStationsInBeam, 0, itsNrBeamFormedStations * sizeof(unsigned)); - - itsBeamFormedStations.resize(itsNrBeamFormedStations); - for(unsigned i=0; i<itsNrStations; i++) { - unsigned id = itsStation2BeamFormedStation[i]; - - itsBeamFormedStations[id].resize(nrStationsInBeam[id] + 1); - itsBeamFormedStations[id][nrStationsInBeam[id]] = i; - nrStationsInBeam[id]++; - } - - for(unsigned i=0; i<itsNrBeamFormedStations; i++) { - itsStationMapping[i] = itsBeamFormedStations[i][0]; - } - -#if VERBOSE - // dump the mapping - LOG_ERROR("*** BeamForming mapping START"); - std::stringstream logStr; - for(unsigned i=0; i<itsNrBeamFormedStations; i++) { - logStr << "BeamFormed Station " << i << ": "; - for (unsigned j=0; j<itsBeamFormedStations[i].size(); j++) { - logStr << itsBeamFormedStations[i][j] << " "; - } - LOG_ERROR(logStr.str()); - } - LOG_ERROR("*** BeamForming mapping END"); -#endif - } -} - -void BeamFormer::beamFormStation(FilteredData *filteredData, const unsigned beamFormedStation) -{ - const std::vector<unsigned> &stationList = itsBeamFormedStations[beamFormedStation]; - const unsigned destStation = stationList[0]; - const unsigned nrStationsInBeam = stationList.size(); - -#if VERBOSE - std::stringstream logStr; - logStr << "Beam forming station " << beamFormedStation << ", size is " << nrStationsInBeam << " ("; - for(unsigned statIndex=0; statIndex<nrStationsInBeam; statIndex++) { - logStr << stationList[statIndex] << " "; - } - logStr << ")"; - LOG_ERROR(logStr.str()); -#endif - - // Flagging strategy: if an entire station (or more than x % of a station) - // is flagged away, just drop that entire station and correct for it. - // If only a fraction is flagged, do a union of all flags, and flag that data away. - // Also, we have to set the resulting values to zero, the correlator requires this. - - bool validStation[nrStationsInBeam]; - const unsigned upperBound = static_cast<unsigned>(itsNrSamplesPerIntegration * MAX_FLAGGED_PERCENTAGE); - unsigned nrValidStations = 0; - - for(unsigned i=0; i<nrStationsInBeam; i++) { - if(filteredData->flags[stationList[i]].count() > upperBound) { - // many samples have been flagged away, drop entire station -#if VERBOSE - LOG_ERROR_STR( "dropping station " << stationList[i] << ", " << filteredData->flags[destStation].count() << - " samples were flagged away, upper bound = " << upperBound); -#endif - validStation[i] = false; - } else { - // ok, use station - validStation[i] = true; - nrValidStations++; - } - } - - if(nrValidStations == 0) { - filteredData->flags[destStation].include(0, itsNrSamplesPerIntegration); - return; - } - - // LOG_ERROR_STR("total Stations in beam = " << nrStationsInBeam << ", valid = " << nrValidStations); - const float factor = 1.0 / nrValidStations; - - // Now, we just flag everything that is flagged in one of the stations away. - // We only do this for valid stations. Station 0 (the destination) is a special case. - // If it was not valid, we have to clear its flags. - if(!validStation[0]) { - filteredData->flags[destStation].reset(); - } - for(unsigned statIndex = 1; statIndex < nrStationsInBeam; statIndex++) { - if(validStation[statIndex]) { - unsigned other = stationList[statIndex]; - filteredData->flags[destStation] |= filteredData->flags[other]; - } - } - - // Add the data of all stations to the destination station. - for (unsigned ch = 0; ch < itsNrChannels; ch ++) { - if (ch > 0 /* && !itsRFIflags[stat1][ch] && !itsRFIflags[stat2][ch] */) { - for (unsigned time = 0; time < itsNrSamplesPerIntegration; time ++) { - if(!filteredData->flags[destStation].test(time)) { - for (unsigned pol = 0; pol < NR_POLARIZATIONS; pol ++) { - fcomplex sample = filteredData->samples[ch][destStation][time][pol]; - - for (unsigned statIndex = 1; statIndex < nrStationsInBeam; statIndex++) { - if(validStation[statIndex]) { - sample += filteredData->samples[ch][stationList[statIndex]][time][pol]; - } - } - - // We need to correct the total power here: divide by the number of - // stations in this beam formed station. - filteredData->samples[ch][destStation][time][pol] = sample * factor; - } - } else { // data was flagged away - for (unsigned pol = 0; pol < NR_POLARIZATIONS; pol ++) { - filteredData->samples[ch][destStation][time][pol] = makefcomplex(0, 0); - } - } - } - } - } - - // There are two ways of adding the data. - // 1) load sample, iterate over the stations, add values, store sample. - // 2) do the additions pair-wise. Just add two stations together. - // The number of operations is the same, but the latter requires many - // more memory loads. However, option 2 walks linerarly through memory, and - // may have a better cache behaviour, or may be easier to prefetch. - // On a PC, option 1 was much faster. -} - -void BeamFormer::formBeams(FilteredData *filteredData) -{ -#if VERBOSE - LOG_ERROR("beam forming START"); -#endif - - beamFormTimer.start(); - - for (unsigned beamFormedStation=0; beamFormedStation<itsNrBeamFormedStations; beamFormedStation++) { - if(itsBeamFormedStations[beamFormedStation].size() > 1) { - beamFormStation(filteredData, beamFormedStation); - } - } - - beamFormTimer.stop(); - -#if VERBOSE - LOG_ERROR("beam forming DONE"); -#endif -} - - -} // namespace RTCP -} // namespace LOFAR diff --git a/RTCP/CNProc/src/BeamFormer.h b/RTCP/CNProc/src/BeamFormer.h deleted file mode 100644 index 8b6b1704bf7..00000000000 --- a/RTCP/CNProc/src/BeamFormer.h +++ /dev/null @@ -1,86 +0,0 @@ -#ifndef LOFAR_CNPROC_BEAM_FORMER_H -#define LOFAR_CNPROC_BEAM_FORMER_H - -#include <Interface/FilteredData.h> - -#include <cassert> -#include <boost/noncopyable.hpp> - -namespace LOFAR { -namespace RTCP { - -/* - -Example with 8 stations and 5 beam formed stations: - -station2BeamFormedStation list: - -0 1 2 3 4 5 6 7 (array index) -0 0 1 0 2 3 3 4 (beam formed station id) - --> 5 beam formed stations - -Beam formed station id -> itsBeamFormedStations list -0 -> 0, 1, 3 -1 -> 2 -2 -> 4 -3 -> 5, 6 -4 -> 7 - -The data is always beamformed to the first station in the beamFormedStations list. -So, in the example, there is data for five stations, at positions - -0, 2, 4, 5, 7 - -in the FilteredData. - -*/ - -class BeamFormer: boost::noncopyable -{ - public: - static const float MAX_FLAGGED_PERCENTAGE = 0.9f; - - BeamFormer(const unsigned nrStations, const unsigned nrSamplesPerIntegration, - const std::vector<unsigned> &station2BeamFormedStation, const unsigned nrChannels); - - ~BeamFormer(); - - // reads from and writes to FilteredData - void formBeams(FilteredData *); - - // Return the number of beam formed stations. - // If beamforming is not used, this is the total number of stations. - unsigned getNrBeamFormedStations(); - - // return the station mapping - unsigned* getStationMapping(); - - private: - const unsigned itsNrStations; - const unsigned itsNrChannels; - const unsigned itsNrSamplesPerIntegration; - unsigned itsNrBeamFormedStations; - const std::vector<unsigned> &itsStation2BeamFormedStation; - std::vector<std::vector<unsigned> > itsBeamFormedStations; - unsigned* itsStationMapping; // same as itsBeamFormedStations, but only contains the first (=destination) station - - unsigned calcNrBeamFormedStations(); - void calcMapping(); - void beamFormStation(FilteredData *filteredData, const unsigned beamFormedStation); -}; - - -inline unsigned BeamFormer::getNrBeamFormedStations() { - if(itsNrBeamFormedStations == 0) return itsNrStations; - return itsNrBeamFormedStations; -} - -inline unsigned* BeamFormer::getStationMapping() { - return itsStationMapping; -} - -} // namespace SRCP -} // namespace LOFAR - -#endif // LOFAR_CNPROC_BEAM_FORMER_H diff --git a/RTCP/CNProc/src/CMakeLists.txt b/RTCP/CNProc/src/CMakeLists.txt index 5b8b06f1960..8597ee71ab9 100644 --- a/RTCP/CNProc/src/CMakeLists.txt +++ b/RTCP/CNProc/src/CMakeLists.txt @@ -34,7 +34,6 @@ set(cnproc_LIB_SRCS AsyncCommunication.cc AsyncTranspose.cc BandPass.cc - BeamFormer.cc CN_Processing.cc Correlator.cc FCNP_ClientStream.cc diff --git a/RTCP/CNProc/src/CN_Processing.cc b/RTCP/CNProc/src/CN_Processing.cc index c06839df7ce..03e764b873e 100644 --- a/RTCP/CNProc/src/CN_Processing.cc +++ b/RTCP/CNProc/src/CN_Processing.cc @@ -29,7 +29,6 @@ #include <Interface/CN_Configuration.h> #include <Interface/PencilCoordinates.h> #include <Interface/CN_Mapping.h> -#include <cassert> #include <complex> #include <cmath> #include <iomanip> @@ -68,10 +67,12 @@ template <typename SAMPLE_TYPE> CN_Processing<SAMPLE_TYPE>::CN_Processing(Stream itsStream(str), itsLocationInfo(locationInfo), itsInputData(0), + itsInputSubbandMetaData(0), + itsSubbandMetaData(0), itsTransposedData(0), itsFilteredData(0), itsCorrelatedData(0), - itsPencilBeamData(0), + itsBeamFormedData(0), itsStokesData(0), itsIncoherentStokesIData(0), itsStokesDataIntegratedChannels(0), @@ -81,7 +82,6 @@ template <typename SAMPLE_TYPE> CN_Processing<SAMPLE_TYPE>::CN_Processing(Stream #endif itsPPF(0), itsBeamFormer(0), - itsPencilBeamFormer(0), itsStokes(0), itsIncoherentStokesI(0), itsCorrelator(0) @@ -148,6 +148,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C itsOutputIncoherentStokesI = configuration.outputIncoherentStokesI(); itsStokesIntegrateChannels = configuration.stokesIntegrateChannels(); itsOutputPsetSize = outputPsets.size(); + itsCenterFrequencies = configuration.refFreqs(); unsigned nrChannels = configuration.nrChannelsPerSubband(); unsigned nrSamplesPerIntegration = configuration.nrSamplesPerIntegration(); @@ -158,8 +159,8 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C // We have to create the Beam Former first, it knows the number of beam-formed stations. // The number of baselines depends on this. // If beam forming is disabled, nrBeamFormedStations will be equal to nrStations. - itsBeamFormer = new BeamFormer(itsNrStations, nrSamplesPerIntegration, station2BeamFormedStation, nrChannels); - const unsigned nrBeamFormedStations = itsBeamFormer->getNrBeamFormedStations(); + itsBeamFormer = new BeamFormer(itsNrPencilBeams, itsNrStations, nrChannels, nrSamplesPerIntegration, configuration.sampleRate() / nrChannels, station2BeamFormedStation); + const unsigned nrBeamFormedStations = itsBeamFormer->getStationMapping().size(); const unsigned nrBaselines = nrBeamFormedStations * (nrBeamFormedStations + 1) / 2; // Each phase (e.g., transpose, PPF, correlator) reads from an input data @@ -168,16 +169,18 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C // buffer and the other as output buffer. // Since some buffers (arenas) are used multiple times, we use multiple // Allocators for a single arena. - + // if (itsIsTransposeInput) { - itsInputData = new InputData<SAMPLE_TYPE>(outputPsets.size(), nrSamplesToCNProc, itsNrPencilBeams); + itsInputData = new InputData<SAMPLE_TYPE>(outputPsets.size(), nrSamplesToCNProc); + itsInputSubbandMetaData = new SubbandMetaData( outputPsets.size(), itsNrPencilBeams, 32 ); } if (itsIsTransposeOutput) { // create only the data structures that are used by the pipeline - itsTransposedData = new TransposedData<SAMPLE_TYPE>(itsNrStations, nrSamplesToCNProc, itsNrPencilBeams); - itsFilteredData = new FilteredData(itsNrStations, nrChannels, nrSamplesPerIntegration, itsNrPencilBeams); + itsSubbandMetaData = new SubbandMetaData( itsNrStations, itsNrPencilBeams, 32 ); + itsTransposedData = new TransposedData<SAMPLE_TYPE>(itsNrStations, nrSamplesToCNProc); + itsFilteredData = new FilteredData(itsNrStations, nrChannels, nrSamplesPerIntegration); switch( itsMode.mode() ) { case CN_Mode::FILTER: @@ -185,16 +188,17 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C break; case CN_Mode::CORRELATE: + itsBeamFormedData = new BeamFormedData(itsNrPencilBeams, nrChannels, nrSamplesPerIntegration); itsCorrelatedData = new CorrelatedData(nrBaselines, nrChannels); break; case CN_Mode::COHERENT_COMPLEX_VOLTAGES: - itsPencilBeamData = new PencilBeamData(itsNrPencilBeams, nrChannels, nrSamplesPerIntegration); + itsBeamFormedData = new BeamFormedData(itsNrPencilBeams, nrChannels, nrSamplesPerIntegration); break; case CN_Mode::COHERENT_STOKES_I: case CN_Mode::COHERENT_ALLSTOKES: - itsPencilBeamData = new PencilBeamData(itsNrPencilBeams, nrChannels, nrSamplesPerIntegration); + itsBeamFormedData = new BeamFormedData(itsNrPencilBeams, nrChannels, nrSamplesPerIntegration); // fallthrough case CN_Mode::INCOHERENT_STOKES_I: @@ -217,8 +221,8 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C itsMapping.addDataset(itsInputData, 0); itsMapping.addDataset(itsTransposedData, 1); itsMapping.addDataset(itsFilteredData, 2); + itsMapping.addDataset(itsBeamFormedData, 1); itsMapping.addDataset(itsCorrelatedData, 1); - itsMapping.addDataset(itsPencilBeamData, 1); itsMapping.addDataset(itsStokesData, 2); itsMapping.addDataset(itsIncoherentStokesIData, 1); itsMapping.addDataset(itsStokesDataIntegratedChannels, 1); @@ -238,7 +242,6 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C unsigned myCoreIndex = std::find(usedCoresInPset.begin(), usedCoresInPset.end(), myCoreInPset) - usedCoresInPset.begin(); unsigned logicalNode = usedCoresPerPset * (outputPsetIndex - outputPsets.begin()) + myCoreIndex; - itsCenterFrequencies = configuration.refFreqs(); itsFirstSubband = (logicalNode / usedCoresPerPset) * itsNrSubbandsPerPset; itsLastSubband = itsFirstSubband + itsNrSubbandsPerPset; itsCurrentSubband = itsFirstSubband + logicalNode % usedCoresPerPset % itsNrSubbandsPerPset; @@ -250,11 +253,10 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C itsPPF = new PPF<SAMPLE_TYPE>(itsNrStations, nrChannels, nrSamplesPerIntegration, configuration.sampleRate() / nrChannels, configuration.delayCompensation(), itsLocationInfo.rank() == 0); - itsPencilBeamFormer = new PencilBeams(itsNrPencilBeams, itsNrStations, nrChannels, nrSamplesPerIntegration, itsCenterFrequencies[itsCurrentSubband], configuration.sampleRate() / nrChannels); itsStokes = new Stokes(itsMode.isCoherent(), itsMode.nrStokes(), nrChannels, nrSamplesPerIntegration, nrSamplesPerStokesIntegration); itsIncoherentStokesI = new Stokes(false, 1, nrChannels, nrSamplesPerIntegration, nrSamplesPerStokesIntegration); - itsCorrelator = new Correlator(nrBeamFormedStations, itsBeamFormer->getStationMapping(), nrChannels, nrSamplesPerIntegration, configuration.correctBandPass()); + itsCorrelator = new Correlator(itsBeamFormer->getStationMapping(), nrChannels, nrSamplesPerIntegration, configuration.correctBandPass()); } #if defined HAVE_MPI @@ -270,13 +272,13 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::transpose() #if defined HAVE_MPI if (itsIsTransposeInput) { - itsInputData->readMetaData(itsStream); // sync read the meta data + itsInputSubbandMetaData->read(itsStream); // sync read the meta data } if(itsIsTransposeOutput && itsCurrentSubband < itsNrSubbands) { NSTimer postAsyncReceives("post async receives", LOG_CONDITION, true); postAsyncReceives.start(); - itsAsyncTranspose->postAllReceives(itsTransposedData); + itsAsyncTranspose->postAllReceives(itsSubbandMetaData,itsTransposedData); postAsyncReceives.stop(); } @@ -302,7 +304,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::transpose() asyncSendTimer.start(); // LOG_DEBUG("transpose: send subband " << subband << " to pset id " << i); - itsAsyncTranspose->asyncSend(i, itsInputData); // Asynchronously send one subband to another pset. + itsAsyncTranspose->asyncSend(i, itsInputSubbandMetaData, itsInputData); // Asynchronously send one subband to another pset. asyncSendTimer.stop(); } } @@ -313,7 +315,8 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::transpose() if (itsIsTransposeInput) { static NSTimer readTimer("receive timer", true, true); readTimer.start(); - itsInputData->readAll(itsStream); + itsInputSubbandMetaData->read(itsStream); + itsInputData->read(itsStream); readTimer.stop(); } @@ -336,15 +339,15 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::filter() // LOG_DEBUG("transpose: received subband " << itsCurrentSubband << " from " << stat); computeTimer.start(); - itsPPF->computeFlags(stat, itsTransposedData, itsFilteredData); - itsPPF->filter(stat, itsCenterFrequencies[itsCurrentSubband], itsTransposedData, itsFilteredData); + itsPPF->computeFlags(stat, itsSubbandMetaData, itsFilteredData); + itsPPF->filter(stat, itsCenterFrequencies[itsCurrentSubband], itsSubbandMetaData, itsTransposedData, itsFilteredData); computeTimer.stop(); } #else // NO MPI for (unsigned stat = 0; stat < itsNrStations; stat ++) { computeTimer.start(); - itsPPF->computeFlags(stat, itsTransposedData, itsFilteredData); - itsPPF->filter(stat, itsCenterFrequencies[itsCurrentSubband], itsTransposedData, itsFilteredData); + itsPPF->computeFlags(stat, itsSubbandMetaData, itsFilteredData); + itsPPF->filter(stat, itsCenterFrequencies[itsCurrentSubband], itsSubbandMetaData, itsTransposedData, itsFilteredData); computeTimer.stop(); } #endif // HAVE_MPI @@ -357,18 +360,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::formBeams() LOG_DEBUG(std::setprecision(12) << "core " << itsLocationInfo.rank() << ": start beam forming at " << MPI_Wtime()); #endif // HAVE_MPI computeTimer.start(); - itsBeamFormer->formBeams(itsFilteredData); - computeTimer.stop(); -} - -template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::formPencilBeams() -{ -#if defined HAVE_MPI - if (LOG_CONDITION) - LOG_DEBUG(std::setprecision(12) << "core " << itsLocationInfo.rank() << ": start pencil-beam forming at " << MPI_Wtime()); -#endif // HAVE_MPI - computeTimer.start(); - itsPencilBeamFormer->formPencilBeams(itsFilteredData,itsPencilBeamData); + itsBeamFormer->formBeams(itsSubbandMetaData,itsFilteredData,itsBeamFormedData, itsCenterFrequencies[itsCurrentSubband]); computeTimer.stop(); } @@ -401,7 +393,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::calculateCohere LOG_DEBUG(std::setprecision(12) << "core " << itsLocationInfo.rank() << ": start calculating coherent Stokes at " << MPI_Wtime()); #endif // HAVE_MPI computeTimer.start(); - itsStokes->calculateCoherent(itsPencilBeamData,itsStokesData,itsNrPencilBeams); + itsStokes->calculateCoherent(itsBeamFormedData,itsStokesData,itsNrPencilBeams); computeTimer.stop(); } @@ -475,13 +467,13 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::process() break; case CN_Mode::COHERENT_COMPLEX_VOLTAGES: - formPencilBeams(); - sendOutput(itsPencilBeamData); + formBeams(); + sendOutput(itsBeamFormedData); break; case CN_Mode::COHERENT_STOKES_I: case CN_Mode::COHERENT_ALLSTOKES: - formPencilBeams(); + formBeams(); calculateCoherentStokes(); if( itsStokesIntegrateChannels ) { itsStokes->compressStokes(itsStokesData, itsStokesDataIntegratedChannels, itsNrPencilBeams); @@ -538,8 +530,11 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::process() template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::postprocess() { + delete itsBeamFormer; + if (itsIsTransposeInput) { delete itsInputData; + delete itsInputSubbandMetaData; } if (itsIsTransposeInput || itsIsTransposeOutput) { @@ -549,12 +544,11 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::postprocess() } if (itsIsTransposeOutput) { + delete itsSubbandMetaData; delete itsTransposedData; delete itsPPF; delete itsFilteredData; - delete itsBeamFormer; - delete itsPencilBeamFormer; - delete itsPencilBeamData; + delete itsBeamFormedData; delete itsCorrelator; delete itsCorrelatedData; delete itsStokes; diff --git a/RTCP/CNProc/src/CN_Processing.h b/RTCP/CNProc/src/CN_Processing.h index dddd633a002..bda7a76480d 100644 --- a/RTCP/CNProc/src/CN_Processing.h +++ b/RTCP/CNProc/src/CN_Processing.h @@ -46,10 +46,9 @@ #include <Interface/StreamableData.h> #include <AsyncTranspose.h> +#include <PencilBeams.h> #include <PPF.h> #include <Correlator.h> -#include <BeamFormer.h> -#include <PencilBeams.h> #include <Stokes.h> #include <LocationInfo.h> @@ -86,7 +85,7 @@ template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base, void transpose(); void filter(); void formBeams(); - void formPencilBeams(); + void formBeamFormer(); void calculateIncoherentStokesI(); void calculateCoherentStokes(); void calculateIncoherentStokes(); @@ -119,10 +118,12 @@ template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base, ArenaMapping itsMapping; InputData<SAMPLE_TYPE> *itsInputData; + SubbandMetaData *itsInputSubbandMetaData; + SubbandMetaData *itsSubbandMetaData; TransposedData<SAMPLE_TYPE> *itsTransposedData; FilteredData *itsFilteredData; CorrelatedData *itsCorrelatedData; - PencilBeamData *itsPencilBeamData; + BeamFormedData *itsBeamFormedData; StokesData *itsStokesData; StokesData *itsIncoherentStokesIData; StokesDataIntegratedChannels *itsStokesDataIntegratedChannels; @@ -134,8 +135,7 @@ template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base, #endif PPF<SAMPLE_TYPE> *itsPPF; - BeamFormer *itsBeamFormer; - PencilBeams *itsPencilBeamFormer; + BeamFormer *itsBeamFormer; Stokes *itsStokes, *itsIncoherentStokesI; Correlator *itsCorrelator; }; diff --git a/RTCP/CNProc/src/Correlator.cc b/RTCP/CNProc/src/Correlator.cc index e1392270b51..3e75b1a3f6d 100644 --- a/RTCP/CNProc/src/Correlator.cc +++ b/RTCP/CNProc/src/Correlator.cc @@ -19,13 +19,13 @@ static NSTimer weightTimer("Correlator::weight()", true, true); // nrStations is the number of superstations in case we use TAB. // Stations that are not beam formed count as a station. -Correlator::Correlator(const unsigned nrStations, const unsigned* stationMapping, const unsigned nrChannels, const unsigned nrSamplesPerIntegration, const bool correctBandPass) +Correlator::Correlator(const std::vector<unsigned> &stationMapping, const unsigned nrChannels, const unsigned nrSamplesPerIntegration, const bool correctBandPass) : - itsNrStations(nrStations), - itsNrBaselines(nrStations * (nrStations + 1) / 2), + itsNrStations(stationMapping.size()), + itsNrBaselines(itsNrStations * (itsNrStations + 1) / 2), itsNrChannels(nrChannels), itsNrSamplesPerIntegration(nrSamplesPerIntegration), - itsCorrelationWeights(new float[nrSamplesPerIntegration + 1]), + itsCorrelationWeights(nrSamplesPerIntegration + 1), itsBandPass(correctBandPass, nrChannels), itsStationMapping(stationMapping) { @@ -35,13 +35,6 @@ Correlator::Correlator(const unsigned nrStations, const unsigned* stationMapping itsCorrelationWeights[i] = 1.0e-6 / i; } - -Correlator::~Correlator() -{ - delete [] itsCorrelationWeights; -} - - #if 1 double Correlator::computeCentroidAndValidSamples(const SparseSet<unsigned> &flags, unsigned &nrValidSamples) const @@ -59,7 +52,7 @@ double Correlator::computeCentroidAndValidSamples(const SparseSet<unsigned> &fla } -void Correlator::computeFlagsAndCentroids(const FilteredData *filteredData, CorrelatedData *correlatedData) +void Correlator::computeFlagsAndCentroids(const SampleData<> *sampleData, CorrelatedData *correlatedData) { computeFlagsTimer.start(); @@ -68,8 +61,8 @@ void Correlator::computeFlagsAndCentroids(const FilteredData *filteredData, Corr unsigned nrValidSamples; const unsigned bl = baseline(stat1, stat2); - correlatedData->centroids[bl] = computeCentroidAndValidSamples(filteredData->flags[itsStationMapping[stat1]] - | filteredData->flags[itsStationMapping[stat2]], nrValidSamples); + correlatedData->centroids[bl] = computeCentroidAndValidSamples(sampleData->flags[itsStationMapping[stat1]] + | sampleData->flags[itsStationMapping[stat2]], nrValidSamples); correlatedData->nrValidSamples[bl][0] = 0; // channel 0 does not contain valid data for (unsigned ch = 1; ch < itsNrChannels; ch ++) @@ -82,15 +75,15 @@ void Correlator::computeFlagsAndCentroids(const FilteredData *filteredData, Corr #else -void Correlator::computeFlags(const FilteredData *filteredData, CorrelatedData *correlatedData) +void Correlator::computeFlags(const SampleData<> *sampleData, CorrelatedData *correlatedData) { computeFlagsTimer.start(); for (unsigned stat2 = 0; stat2 < itsNrStations; stat2 ++) { for (unsigned stat1 = 0; stat1 <= stat2; stat1 ++) { unsigned bl = baseline(stat1, stat2); - unsigned nrValidSamples = itsNrSamplesPerIntegration - (filteredData->flags[itsStationMapping[stat1]] - | filteredData->flags[itsStationMapping[stat2]]).count(); + unsigned nrValidSamples = itsNrSamplesPerIntegration - (sampleData->flags[itsStationMapping[stat1]] + | sampleData->flags[itsStationMapping[stat2]]).count(); correlatedData->nrValidSamples[bl][0] = 0; // channel 0 does not contain valid data @@ -105,8 +98,13 @@ void Correlator::computeFlags(const FilteredData *filteredData, CorrelatedData * #endif -void Correlator::correlate(const FilteredData *filteredData, CorrelatedData *correlatedData) +void Correlator::correlate(const SampleData<> *sampleData, CorrelatedData *correlatedData) { + ASSERT( sampleData->samples.shape()[0] == itsNrChannels ); + /* sampleData->samples.shape()[1] needs to be valid for any itsStationMapping */ + ASSERT( sampleData->samples.shape()[2] >= itsNrSamplesPerIntegration ); + ASSERT( sampleData->samples.shape()[3] == NR_POLARIZATIONS ); + #if 0 LOG_DEBUG_STR("correlating " << itsNrStations << " stations"); for (unsigned stat = 0; stat < itsNrStations; stat ++) { @@ -128,8 +126,8 @@ void Correlator::correlate(const FilteredData *filteredData, CorrelatedData *cor for (unsigned pol2 = 0; pol2 < NR_POLARIZATIONS; pol2 ++) { dcomplex sum = makedcomplex(0, 0); for (unsigned time = 0; time < itsNrSamplesPerIntegration; time ++) { - sum += filteredData->samples[ch][itsStationMapping[stat1]][time][pol1] - * conj(filteredData->samples[ch][itsStationMapping[stat2]][time][pol2]); + sum += sampleData->samples[ch][itsStationMapping[stat1]][time][pol1] + * conj(sampleData->samples[ch][itsStationMapping[stat2]][time][pol2]); } sum *= itsCorrelationWeights[nrValid] * itsBandPass.squaredCorrectionFactors()[ch]; correlatedData->visibilities[bl][ch][pol1][pol2] = sum; @@ -180,8 +178,8 @@ void Correlator::correlate(const FilteredData *filteredData, CorrelatedData *cor if (nrValidStations % 2 == 0) { unsigned stat10 = map[0], stat11 = map[1]; - _auto_correlate_2(filteredData->samples[ch][itsStationMapping[stat10]].origin(), - filteredData->samples[ch][itsStationMapping[stat11]].origin(), + _auto_correlate_2(sampleData->samples[ch][itsStationMapping[stat10]].origin(), + sampleData->samples[ch][itsStationMapping[stat11]].origin(), correlatedData->visibilities[baseline(stat10, stat10)][ch].origin(), correlatedData->visibilities[baseline(stat10, stat11)][ch].origin(), correlatedData->visibilities[baseline(stat11, stat11)][ch].origin(), @@ -189,7 +187,7 @@ void Correlator::correlate(const FilteredData *filteredData, CorrelatedData *cor } else { unsigned stat10 = map[0]; - _auto_correlate_1(filteredData->samples[ch][itsStationMapping[stat10]].origin(), + _auto_correlate_1(sampleData->samples[ch][itsStationMapping[stat10]].origin(), correlatedData->visibilities[baseline(stat10, stat10)][ch].origin(), itsNrSamplesPerIntegration); } @@ -203,11 +201,11 @@ void Correlator::correlate(const FilteredData *filteredData, CorrelatedData *cor const unsigned stat10 = map[stat1], stat11 = map[stat1+1], stat12 = map[stat1+2]; const unsigned stat20 = map[stat2], stat21 = map[stat2+1]; - _correlate_3x2(filteredData->samples[ch][itsStationMapping[stat10]].origin(), - filteredData->samples[ch][itsStationMapping[stat11]].origin(), - filteredData->samples[ch][itsStationMapping[stat12]].origin(), - filteredData->samples[ch][itsStationMapping[stat20]].origin(), - filteredData->samples[ch][itsStationMapping[stat21]].origin(), + _correlate_3x2(sampleData->samples[ch][itsStationMapping[stat10]].origin(), + sampleData->samples[ch][itsStationMapping[stat11]].origin(), + sampleData->samples[ch][itsStationMapping[stat12]].origin(), + sampleData->samples[ch][itsStationMapping[stat20]].origin(), + sampleData->samples[ch][itsStationMapping[stat21]].origin(), correlatedData->visibilities[baseline(stat10, stat20)][ch].origin(), correlatedData->visibilities[baseline(stat10, stat21)][ch].origin(), correlatedData->visibilities[baseline(stat11, stat20)][ch].origin(), @@ -223,10 +221,10 @@ void Correlator::correlate(const FilteredData *filteredData, CorrelatedData *cor const unsigned stat10 = map[stat1], stat11 = map[stat1+1]; const unsigned stat20 = map[stat2], stat21 = map[stat2+1]; - _correlate_2x2(filteredData->samples[ch][itsStationMapping[stat10]].origin(), - filteredData->samples[ch][itsStationMapping[stat11]].origin(), - filteredData->samples[ch][itsStationMapping[stat20]].origin(), - filteredData->samples[ch][itsStationMapping[stat21]].origin(), + _correlate_2x2(sampleData->samples[ch][itsStationMapping[stat10]].origin(), + sampleData->samples[ch][itsStationMapping[stat11]].origin(), + sampleData->samples[ch][itsStationMapping[stat20]].origin(), + sampleData->samples[ch][itsStationMapping[stat21]].origin(), correlatedData->visibilities[baseline(stat10, stat20)][ch].origin(), correlatedData->visibilities[baseline(stat10, stat21)][ch].origin(), correlatedData->visibilities[baseline(stat11, stat20)][ch].origin(), @@ -238,8 +236,8 @@ void Correlator::correlate(const FilteredData *filteredData, CorrelatedData *cor if (stat1 == stat2) { const unsigned stat10 = map[stat1], stat11 = map[stat1+1]; - _auto_correlate_2(filteredData->samples[ch][itsStationMapping[stat10]].origin(), - filteredData->samples[ch][itsStationMapping[stat11]].origin(), + _auto_correlate_2(sampleData->samples[ch][itsStationMapping[stat10]].origin(), + sampleData->samples[ch][itsStationMapping[stat11]].origin(), correlatedData->visibilities[baseline(stat10,stat10)][ch].origin(), correlatedData->visibilities[baseline(stat10,stat11)][ch].origin(), correlatedData->visibilities[baseline(stat11,stat11)][ch].origin(), @@ -247,9 +245,9 @@ void Correlator::correlate(const FilteredData *filteredData, CorrelatedData *cor } else { const unsigned stat10 = map[stat1], stat11 = map[stat1+1], stat12 = map[stat1+2]; - _auto_correlate_3(filteredData->samples[ch][itsStationMapping[stat10]].origin(), - filteredData->samples[ch][itsStationMapping[stat11]].origin(), - filteredData->samples[ch][itsStationMapping[stat12]].origin(), + _auto_correlate_3(sampleData->samples[ch][itsStationMapping[stat10]].origin(), + sampleData->samples[ch][itsStationMapping[stat11]].origin(), + sampleData->samples[ch][itsStationMapping[stat12]].origin(), correlatedData->visibilities[baseline(stat10,stat11)][ch].origin(), correlatedData->visibilities[baseline(stat10,stat12)][ch].origin(), correlatedData->visibilities[baseline(stat11,stat11)][ch].origin(), @@ -272,7 +270,7 @@ void Correlator::correlate(const FilteredData *filteredData, CorrelatedData *cor } } #else - _weigh_visibilities(correlatedData->visibilities.origin(), correlatedData->nrValidSamples.origin(), itsCorrelationWeights, itsBandPass.squaredCorrectionFactors(), itsNrBaselines, itsNrChannels); // FIXME + _weigh_visibilities(correlatedData->visibilities.origin(), correlatedData->nrValidSamples.origin(), &itsCorrelationWeights[0], itsBandPass.squaredCorrectionFactors(), itsNrBaselines, itsNrChannels); // FIXME #endif weightTimer.stop(); #endif diff --git a/RTCP/CNProc/src/Correlator.h b/RTCP/CNProc/src/Correlator.h index c4733a5c6e1..3944be2e856 100644 --- a/RTCP/CNProc/src/Correlator.h +++ b/RTCP/CNProc/src/Correlator.h @@ -7,8 +7,8 @@ #include <BandPass.h> -#include <Interface/FilteredData.h> #include <Interface/CorrelatedData.h> +#include <Interface/StreamableData.h> #include <cassert> @@ -22,23 +22,23 @@ namespace RTCP { class Correlator { public: - // TODO make stationMapping a vector? --Rob - Correlator(const unsigned nrStations, const unsigned* stationMapping, const unsigned nrChannels, const unsigned nrSamplesPerIntegration, const bool correctBandPass); - ~Correlator(); + Correlator(const std::vector<unsigned> &stationMapping, const unsigned nrChannels, const unsigned nrSamplesPerIntegration, const bool correctBandPass); - void correlate(const FilteredData *, CorrelatedData *); - void computeFlagsAndCentroids(const FilteredData *, CorrelatedData *); + // We can correlate arrays of size + // samples[nrChannels][nrStations][nrSamplesPerIntegration][nrPolarizations] + void correlate(const SampleData<> *, CorrelatedData *); + void computeFlagsAndCentroids(const SampleData<> *, CorrelatedData *); static unsigned baseline(const unsigned station1, const unsigned station2); private: const unsigned itsNrStations, itsNrBaselines, itsNrChannels, itsNrSamplesPerIntegration; - float *itsCorrelationWeights; //[itsNrSamplesPerIntegration + 1] + std::vector<float> itsCorrelationWeights; //[itsNrSamplesPerIntegration + 1] const BandPass itsBandPass; - // A list indexed by station number, result is the station position in the Filtered data. + // A list indexed by station number, result is the station position in the input data. // This is needed in case of tied array beam forming. - const unsigned* itsStationMapping; //[itsNrStations] + const std::vector<unsigned> &itsStationMapping; //[itsNrStations] double computeCentroidAndValidSamples(const SparseSet<unsigned> &flags, unsigned &nrValidSamples) const; }; diff --git a/RTCP/CNProc/src/InputData.h b/RTCP/CNProc/src/InputData.h index 3b661f30186..954f9c79413 100644 --- a/RTCP/CNProc/src/InputData.h +++ b/RTCP/CNProc/src/InputData.h @@ -6,7 +6,6 @@ #include <Interface/Align.h> #include <Interface/MultiDimArray.h> #include <Interface/Config.h> -#include <Interface/SubbandMetaData.h> #include <Interface/StreamableData.h> #include <Stream/Stream.h> @@ -23,37 +22,24 @@ template <typename SAMPLE_TYPE> class InputData: public SampleData<SAMPLE_TYPE,3 public: typedef SampleData<SAMPLE_TYPE,3> SuperType; - InputData(const unsigned nrSubbands, const unsigned nrSamplesToCNProc, const unsigned nrPencilBeams); + InputData(const unsigned nrSubbands, const unsigned nrSamplesToCNProc); // used for asynchronous transpose - void readMetaData(Stream *str); void readOne(Stream *str, unsigned subbandPosition); - // used for synchronous transfer - void readAll(Stream *str); + protected: + virtual void checkEndianness(); private: const unsigned itsNrSubbands; - const unsigned itsNrPencilBeams; - - public: - SubbandMetaData metaData; // with one subband for every outputPset }; -template <typename SAMPLE_TYPE> inline InputData<SAMPLE_TYPE>::InputData(const unsigned nrSubbands, const unsigned nrSamplesToCNProc, const unsigned nrPencilBeams) +template <typename SAMPLE_TYPE> inline InputData<SAMPLE_TYPE>::InputData(const unsigned nrSubbands, const unsigned nrSamplesToCNProc) : SuperType( false, boost::extents[nrSubbands][nrSamplesToCNProc][NR_POLARIZATIONS], 0 ), - itsNrSubbands(nrSubbands), - itsNrPencilBeams(nrPencilBeams), - metaData(nrSubbands,nrPencilBeams,32) -{ -} - -// used for asynchronous transpose -template <typename SAMPLE_TYPE> inline void InputData<SAMPLE_TYPE>::readMetaData(Stream *str) + itsNrSubbands(nrSubbands) { - metaData.read( str ); } // used for asynchronous transpose @@ -66,15 +52,8 @@ template <typename SAMPLE_TYPE> inline void InputData<SAMPLE_TYPE>::readOne(Stre #endif } -template <typename SAMPLE_TYPE> inline void InputData<SAMPLE_TYPE>::readAll(Stream *str) +template <typename SAMPLE_TYPE> inline void InputData<SAMPLE_TYPE>::checkEndianness() { - // read all metadata - readMetaData( str ); - - // now read all subbands using one recvBlocking call, even though the ION - // sends all subbands one at a time - str->read(SuperType::samples.origin(), SuperType::samples.num_elements() * sizeof(SAMPLE_TYPE)); - #if defined C_IMPLEMENTATION && defined WORDS_BIGENDIAN dataConvert(LittleEndian, SuperType::samples.origin(), SuperType::samples.num_elements()); #endif diff --git a/RTCP/CNProc/src/Makefile.am b/RTCP/CNProc/src/Makefile.am index 6010b63ceff..1da9da2706b 100644 --- a/RTCP/CNProc/src/Makefile.am +++ b/RTCP/CNProc/src/Makefile.am @@ -10,7 +10,6 @@ TransposedData.h \ FIR.h \ PPF.h \ AsyncTranspose.h \ -BeamFormer.h \ BeamFormerAsm.h \ Correlator.h \ CN_Processing.h \ @@ -48,7 +47,6 @@ FIR_Asm.S \ FFT_Asm.S \ FIR.cc \ AsyncTranspose.cc \ -BeamFormer.cc \ PPF.cc \ Correlator.cc \ CN_Processing_main.cc \ diff --git a/RTCP/CNProc/src/PPF.cc b/RTCP/CNProc/src/PPF.cc index 8d0d3ec771e..2c1ad967531 100644 --- a/RTCP/CNProc/src/PPF.cc +++ b/RTCP/CNProc/src/PPF.cc @@ -145,12 +145,12 @@ template <typename SAMPLE_TYPE> void PPF<SAMPLE_TYPE>::destroy_fft() } -template <typename SAMPLE_TYPE> void PPF<SAMPLE_TYPE>::computeFlags(unsigned stat, const TransposedData<SAMPLE_TYPE> *transposedData, FilteredData *filteredData) +template <typename SAMPLE_TYPE> void PPF<SAMPLE_TYPE>::computeFlags(unsigned stat, const SubbandMetaData *metaData, FilteredData *filteredData) { computeFlagsTimer.start(); filteredData->flags[stat].reset(); - SparseSet<unsigned> flags = transposedData->metaData.getFlags( stat ); + SparseSet<unsigned> flags = metaData->getFlags( stat ); const SparseSet<unsigned>::Ranges &ranges = flags.getRanges(); for (SparseSet<unsigned>::const_iterator it = ranges.begin(); it != ranges.end(); it ++) { @@ -204,13 +204,13 @@ template <typename SAMPLE_TYPE> void PPF<SAMPLE_TYPE>::computePhaseShifts(struct #endif // PPF_C_IMPLEMENTATION -template <typename SAMPLE_TYPE> void PPF<SAMPLE_TYPE>::filter(unsigned stat, double centerFrequency, const TransposedData<SAMPLE_TYPE> *transposedData, FilteredData *filteredData) +template <typename SAMPLE_TYPE> void PPF<SAMPLE_TYPE>::filter(unsigned stat, double centerFrequency, const SubbandMetaData *metaData, const TransposedData<SAMPLE_TYPE> *transposedData, FilteredData *filteredData) { PPFtimer.start(); double baseFrequency = centerFrequency - (itsNrChannels / 2) * itsChannelBandwidth; - const unsigned alignmentShift = transposedData->metaData.alignmentShift( stat ); + const unsigned alignmentShift = metaData->alignmentShift( stat ); #if 0 LOG_DEBUG_STR(setprecision(15) << "stat " << stat << ", basefreq " << baseFrequency << ": delay from " << delays[stat].delayAtBegin << " to " << delays[stat].delayAfterEnd << " sec"); @@ -257,7 +257,7 @@ template <typename SAMPLE_TYPE> void PPF<SAMPLE_TYPE>::filter(unsigned stat, dou for (unsigned chan = 0; chan < itsNrChannels; chan ++) { if (itsDelayCompensation) - fftOutData[chan] *= phaseShift(time, chan, baseFrequency, transposedData->metaData.beams(stat)[0].delayAtBegin, transposedData->metaData.beams(stat)[0].delayAfterEnd); + fftOutData[chan] *= phaseShift(time, chan, baseFrequency, metaData->beams(stat)[0].delayAtBegin, metaData->beams(stat)[0].delayAfterEnd); filteredData->samples[chan][stat][time][pol] = fftOutData[chan]; } @@ -296,10 +296,7 @@ template <typename SAMPLE_TYPE> void PPF<SAMPLE_TYPE>::filter(unsigned stat, dou struct phase_shift phaseShifts[itsNrSamplesPerIntegration]; - computePhaseShifts(phaseShifts, transposedData->metaData.beams(stat)[0].delayAtBegin, transposedData->metaData.beams(stat)[0].delayAfterEnd, baseFrequency); - - // forward (pencil) beam forming information - memcpy( &filteredData->metaData.subbandInfo(stat), &transposedData->metaData.subbandInfo(stat), transposedData->metaData.itsSubbandInfoSize ); + computePhaseShifts(phaseShifts, metaData->beams(stat)[0].delayAtBegin, metaData->beams(stat)[0].delayAfterEnd, baseFrequency); const SparseSet<unsigned>::Ranges &ranges = filteredData->flags[stat].getRanges(); SparseSet<unsigned>::const_iterator it = ranges.begin(); diff --git a/RTCP/CNProc/src/PPF.h b/RTCP/CNProc/src/PPF.h index e448f7fa157..9cad46823a4 100644 --- a/RTCP/CNProc/src/PPF.h +++ b/RTCP/CNProc/src/PPF.h @@ -10,6 +10,7 @@ #include <FIR.h> #include <TransposedData.h> #include <Interface/FilteredData.h> +#include <Interface/SubbandMetaData.h> #include <Interface/AlignedStdAllocator.h> #include <boost/multi_array.hpp> @@ -33,8 +34,8 @@ template <typename SAMPLE_TYPE> class PPF: boost::noncopyable PPF(unsigned nrStations, unsigned nrChannels, unsigned nrSamplesPerIntegration, double channelBandwidth, bool delayCompensation, bool verbose); ~PPF(); - void computeFlags(unsigned stat, const TransposedData<SAMPLE_TYPE> *, FilteredData *); - void filter(unsigned stat, double centerFrequency, const TransposedData<SAMPLE_TYPE> *, FilteredData *); + void computeFlags(unsigned stat, const SubbandMetaData *metaData, FilteredData *); + void filter(unsigned stat, double centerFrequency, const SubbandMetaData *metaData, const TransposedData<SAMPLE_TYPE> *, FilteredData *); private: void init_fft(), destroy_fft(); diff --git a/RTCP/CNProc/src/PencilBeams.cc b/RTCP/CNProc/src/PencilBeams.cc index a932a118258..d6697a09381 100644 --- a/RTCP/CNProc/src/PencilBeams.cc +++ b/RTCP/CNProc/src/PencilBeams.cc @@ -9,6 +9,7 @@ #include <Common/Timer.h> #include <Common/LofarLogger.h> #include <cassert> +#include <algorithm> #ifndef PENCILBEAMS_C_IMPLEMENTATION #include <BeamFormerAsm.h> @@ -17,36 +18,133 @@ namespace LOFAR { namespace RTCP { -static NSTimer pencilBeamFormTimer("PencilBeamFormer::formBeams()", true, true); +static NSTimer beamFormTimer("BeamFormer::formBeams()", true, true); -PencilBeams::PencilBeams(const unsigned nrPencilBeams, const unsigned nrStations, const unsigned nrChannels, const unsigned nrSamplesPerIntegration, const double centerFrequency, const double channelBandwidth ) +BeamFormer::BeamFormer(const unsigned nrPencilBeams, const unsigned nrStations, const unsigned nrChannels, const unsigned nrSamplesPerIntegration, const double channelBandwidth, const std::vector<unsigned> &station2BeamFormedStation ) : itsNrStations(nrStations), itsNrPencilBeams(nrPencilBeams), itsNrChannels(nrChannels), itsNrSamplesPerIntegration(nrSamplesPerIntegration), - itsCenterFrequency(centerFrequency), itsChannelBandwidth(channelBandwidth), - itsBaseFrequency(centerFrequency - (nrChannels/2) * channelBandwidth), itsDelays( nrStations, nrPencilBeams ), + itsStation2BeamFormedStation(station2BeamFormedStation), itsNrValidStations( 0 ), itsValidStations( itsNrStations ) { + initStationMergeMap(); } -void PencilBeams::computeFlags( const FilteredData *in, PencilBeamData *out ) +void BeamFormer::initStationMergeMap() { - const unsigned upperBound = static_cast<unsigned>(itsNrSamplesPerIntegration * PencilBeams::MAX_FLAGGED_PERCENTAGE); + if(itsStation2BeamFormedStation.empty()) { + // beamforming disabled -- assignment is 1:1 + itsMergeSourceStations.resize(itsNrStations); + itsMergeDestStations.resize(itsNrStations); + + for(unsigned i=0; i<itsNrStations; i++) { + itsMergeSourceStations[i].push_back( i ); + itsMergeDestStations[i] = i; + } + } else { + // beamforming enabled + ASSERT( itsStation2BeamFormedStation.size() == itsNrStations ); + + const unsigned nrMergedStations = *std::max_element( itsStation2BeamFormedStation.begin(), itsStation2BeamFormedStation.end() ) + 1; + + itsMergeSourceStations.resize( nrMergedStations ); + itsMergeDestStations.resize( nrMergedStations ); + + for(unsigned i=0; i<itsNrStations; i++) { + unsigned id = itsStation2BeamFormedStation[i]; + + itsMergeSourceStations[id].push_back(i); + } + + for(unsigned i=0; i<nrMergedStations; i++) { + itsMergeDestStations[i] = itsMergeSourceStations[i][0]; + } + } + + // reserve the same sizes for the vectors of valid stations + itsValidMergeSourceStations.resize( itsMergeSourceStations.size() ); + for (unsigned i=0; i<itsValidMergeSourceStations.size(); i++) { + itsValidMergeSourceStations[i].reserve( itsMergeSourceStations[i].size() ); + } +} + +// functor for determining whether a station should be included based on +// its amount of flags +class stationValidator +{ + public: + stationValidator( const SampleData<> *samples, const unsigned upperBound ): + samples(samples), upperBound(upperBound) {} + + bool operator ()(unsigned stationNr) const + { + return samples->flags[stationNr].count() > upperBound; + } + private: + const SampleData<> *samples; + const unsigned upperBound; +}; + +void BeamFormer::mergeStationFlags( const SampleData<> *in, SampleData<> *out ) +{ + const unsigned upperBound = static_cast<unsigned>(itsNrSamplesPerIntegration * BeamFormer::MAX_FLAGGED_PERCENTAGE); + const stationValidator isValid( in, upperBound ); + + for( unsigned i = 0; i < itsMergeDestStations.size(); i++ ) { + const unsigned destStation = itsMergeDestStations[i]; + const std::vector<unsigned> &sourceStations = itsMergeSourceStations[i]; + std::vector<unsigned> &validSourceStations = itsValidMergeSourceStations[i]; + + validSourceStations.clear(); + + // copy valid stations from sourceStations -> validSourceStations + for( unsigned i = 0; i < sourceStations.size(); i++ ) { + if( isValid( sourceStations[i] ) ) { + validSourceStations.push_back( i ); + } + } + + // conservative flagging: flag output if any input was flagged + if( validSourceStations.empty() ) { + // no valid stations: flag everything + out->flags[destStation].include(0, itsNrSamplesPerIntegration); + } else { + // some valid stations: merge flags + + if( validSourceStations[0] != destStation || in != out ) { + // dest station, which should be first in the list, was not valid + out->flags[destStation] = in->flags[validSourceStations[0]]; + } + + for (unsigned stat = 1; stat < validSourceStations.size(); stat++ ) { + out->flags[destStation] |= in->flags[validSourceStations[stat]]; + } + } + } +} + + +void BeamFormer::computeFlags( const SampleData<> *in, SampleData<> *out ) +{ + const unsigned upperBound = static_cast<unsigned>(itsNrSamplesPerIntegration * BeamFormer::MAX_FLAGGED_PERCENTAGE); + const stationValidator isValid( in, upperBound ); // determine which stations have too much flagged data + // also, source stations from a merge are set as invalid, since the ASM implementation + // can only combine consecutive stations, we have to consider them. itsNrValidStations = 0; - for(unsigned stat = 0; stat < itsNrStations; stat++) { - if( in->flags[stat].count() > upperBound ) { - // too much flagged data -- drop station - itsValidStations[stat] = false; - } else { - // keep station - itsValidStations[stat] = true; + for(unsigned i = 0; i < itsNrStations; i++ ) { + itsValidStations[i] = false; + } + + for(std::vector<unsigned>::const_iterator stat = itsMergeDestStations.begin(); stat != itsMergeDestStations.end(); stat++ ) { + if( isValid( *stat ) ) { + itsValidStations[*stat] = true; itsNrValidStations++; } } @@ -64,15 +162,51 @@ void PencilBeams::computeFlags( const FilteredData *in, PencilBeamData *out ) } #ifdef PENCILBEAMS_C_IMPLEMENTATION -void PencilBeams::computeComplexVoltages( const FilteredData *in, PencilBeamData *out ) +void BeamFormer::mergeStations( const SampleData<> *in, SampleData<> *out ); { - const double averagingFactor = 1.0 / itsNrValidStations; + for (unsigned i = 0; i < itsValidMergeSourceStations.size(); i++ ) { + const unsigned destStation = itsMergeDestStations[i]; + const std::vector<unsigned> &validSourceStations = itsValidMergeSourceStations[i]; + + if( validSourceStations.empty() ) { + continue; + } + + if( validSourceStations.size() == 1 && validSourceStations[0] == destStation ) { + continue; + } - for( unsigned beam = 0; beam < itsNrPencilBeams; beam++ ) { for (unsigned ch = 0; ch < itsNrChannels; ch ++) { - const double frequency = itsBaseFrequency + ch * itsChannelBandwidth; + const double frequency = baseFrequency + ch * itsChannelBandwidth; const float factor = averagingFactor; + for (unsigned time = 0; time < itsNrSamplesPerIntegration; time ++) { + if( !out->flags[destStation].test(time) ) { + for (unsigned pol = 0; pol < NR_POLARIZATIONS; pol ++) { + fcomplex &dest = inout->samples[ch][destStation][time][pol]; + + if( validSourceStations[0] != destStation ) { + // first station is somewhere else; copy it + dest = inout->samples[ch][0][time][pol]; + } + + // combine the stations + for( unsigned stat = 1; stat < validSourceStations[i].size(); stat++ ) { + dest += inout->samples[ch][validSourceStations[stat]][time][pol]; + } + } + } + } + } + } +} + +void BeamFormer::computeComplexVoltages( const SampleData<> *in, SampleData *out, double baseFrequency ) +{ + for( unsigned beam = 0; beam < itsNrPencilBeams; beam++ ) { + for (unsigned ch = 0; ch < itsNrChannels; ch ++) { + const double frequency = baseFrequency + ch * itsChannelBandwidth; + for (unsigned time = 0; time < itsNrSamplesPerIntegration; time ++) { if( !out->flags[beam].test(time) ) { for (unsigned pol = 0; pol < NR_POLARIZATIONS; pol ++) { @@ -90,7 +224,6 @@ void PencilBeams::computeComplexVoltages( const FilteredData *in, PencilBeamData sample += in->samples[ch][stat][time][pol] * shift; } } - dest = sample * factor; } } else { // data is flagged @@ -117,19 +250,112 @@ static const unsigned NRBEAMS = 3; // TIMESTEPSIZE and itsNrSamplesPerIntegration need to be a multiple of 16, as the assembly code requires it static const unsigned TIMESTEPSIZE = 128; -void PencilBeams::computeComplexVoltages( const FilteredData *in, PencilBeamData *out ) +inline void BeamFormer::addUnweighedStations( const SampleData<> *in, SampleData<> *out, const unsigned stationIndices[], unsigned nrStations, unsigned channel, unsigned beamIndex, unsigned timeOffset, unsigned timeLength, bool replace ) { - ASSERT( TIMESTEPSIZE % 16 == 0 ); + // central beam (#0) has no weights, we can simply add the stations + switch( nrStations ) { + case 0: + default: + THROW(CNProcException,"Requested to add " << nrStations << " stations, but can only add 1-6."); + break; - if( itsNrSamplesPerIntegration % 16 > 0 ) { - THROW(CNProcException, "nrSamplesPerIntegration (" << itsNrSamplesPerIntegration << ") needs to be a multiple of 16" ); +// possible inputs +#define OUTPUT (reinterpret_cast<float*>(out->samples[channel][beamIndex][timeOffset].origin())) +#define STATION(nr) (reinterpret_cast<const float*>(in->samples[channel][stationIndices[nr]][timeOffset].origin())) + +// shorthand for the add functions +#define ADDGENERIC(nr,...) ADDFUNC(nr)( OUTPUT, __VA_ARGS__, timeLength * NR_POLARIZATIONS * 2 ) /* 2 is for real/imag */ + +// adds stations, and the subtotal if needed (if stat!=0) +#define ADD(nr,nrplusone,...) do { \ + if( replace ) { \ + ADDGENERIC(nr,__VA_ARGS__); \ + } else { \ + ADDGENERIC(nrplusone,OUTPUT,__VA_ARGS__); \ + } \ + } while(0); + + case 1: + ADD( 1, 2, STATION(0) ); + break; + + case 2: + ADD( 2, 3, STATION(0), STATION(1) ); + break; + + case 3: + ADD( 3, 4, STATION(0), STATION(1), STATION(2) ); + break; + + case 4: + ADD( 4, 5, STATION(0), STATION(1), STATION(2), STATION(3) ); + break; + + case 5: + ADD( 5, 6, STATION(0), STATION(1), STATION(2), STATION(3), STATION(4) ); + break; + + case 6: + ADD( 6, 7, STATION(0), STATION(1), STATION(2), STATION(3), STATION(4), STATION(5) ); + break; + } +} + +void BeamFormer::mergeStations( const SampleData<> *in, SampleData<> *out ) +{ + for (unsigned i = 0; i < itsValidMergeSourceStations.size(); i++ ) { + const unsigned destStation = itsMergeDestStations[i]; + const std::vector<unsigned> &validSourceStations = itsValidMergeSourceStations[i]; + + if( validSourceStations.empty() ) { + continue; + } + + if( validSourceStations.size() == 1 && validSourceStations[0] == destStation ) { + continue; + } + + const unsigned nrStations = validSourceStations.size(); + const bool destValid = validSourceStations[0] == destStation; + + // do the actual beamforming + for (unsigned ch = 0; ch < itsNrChannels; ch ++) { + unsigned processStations = NRSTATIONS; + unsigned processTime = TIMESTEPSIZE; + bool replaceDest = !destValid && in == out; // if true, ignore values at destStation + + // add everything to the first station in the list + for( unsigned stat = replaceDest ? 0 : 1; stat < nrStations; stat += processStations ) { + processStations = std::min( NRSTATIONS, nrStations - stat ); + + for( unsigned time = 0; time < itsNrSamplesPerIntegration; time += processTime ) { + processTime = std::min( TIMESTEPSIZE, itsNrSamplesPerIntegration - time ); + + addUnweighedStations( in, out, &validSourceStations[stat], processStations, ch, destStation, time, processTime, replaceDest ); + } + + replaceDest = false; + } + } } +} +void BeamFormer::computeComplexVoltages( const SampleData<> *in, SampleData<> *out, double baseFrequency ) +{ const double averagingFactor = 1.0 / itsNrValidStations; + const unsigned nrStations = itsNrStations; + //const unsigned nrStations = itsMergeDestStations.size(); + // ^^^ is faster, but can only work once BEAMFORMFUNC can skip over invalid stations + std::vector<unsigned> allStations( itsNrStations ); + + for( unsigned i = 0; i < itsNrStations; i++ ) { + allStations[i] = i; + } + // do the actual beamforming for (unsigned ch = 0; ch < itsNrChannels; ch ++) { - const double frequency = itsBaseFrequency + ch * itsChannelBandwidth; + const double frequency = baseFrequency + ch * itsChannelBandwidth; const double factor = averagingFactor; // add multiplication factors as needed // construct the weights, with zeroes for unused data @@ -155,60 +381,14 @@ void PencilBeams::computeComplexVoltages( const FilteredData *in, PencilBeamData // Iterate over the same portions of the input data as many times as possible to // fully exploit the caches. - for( unsigned stat = 0; stat < itsNrStations; stat += processStations ) { - processStations = std::min( NRSTATIONS, itsNrStations - stat ); + for( unsigned stat = 0; stat < nrStations; stat += processStations ) { + processStations = std::min( NRSTATIONS, nrStations - stat ); for( unsigned time = 0; time < itsNrSamplesPerIntegration; time += processTime ) { processTime = std::min( TIMESTEPSIZE, itsNrSamplesPerIntegration - time ); // central beam (#0) has no weights, we can simply add the stations - - switch( processStations ) { - case 0: - default: - THROW(CNProcException,"Requested to add " << processStations << " stations, but can only add 1-6."); - break; - -// possible inputs -#define OUTPUT (reinterpret_cast<float*>(out->samples[ch][0][time].origin())) -#define STATION(nr) (reinterpret_cast<const float*>(in->samples[ch][stat+nr][time].origin())) - -// shorthand for the add functions -#define ADDGENERIC(nr,...) ADDFUNC(nr)( OUTPUT, __VA_ARGS__, processTime * NR_POLARIZATIONS * 2 ) - -// adds stations, and the subtotal if needed (if stat!=0) -#define ADD(nr,nrplusone,...) do { \ - if( stat ) { \ - ADDGENERIC(nrplusone,OUTPUT,__VA_ARGS__); \ - } else { \ - ADDGENERIC(nr,__VA_ARGS__); \ - } \ - } while(0); - - case 1: - ADD( 1, 2, STATION(0) ); - break; - - case 2: - ADD( 2, 3, STATION(0), STATION(1) ); - break; - - case 3: - ADD( 3, 4, STATION(0), STATION(1), STATION(2) ); - break; - - case 4: - ADD( 4, 5, STATION(0), STATION(1), STATION(2), STATION(3) ); - break; - - case 5: - ADD( 5, 6, STATION(0), STATION(1), STATION(2), STATION(3), STATION(4) ); - break; - - case 6: - ADD( 6, 7, STATION(0), STATION(1), STATION(2), STATION(3), STATION(4), STATION(5) ); - break; - } + addUnweighedStations( in, out, &allStations[stat], processStations, ch, 0, time, processTime, stat == 0 ); // non-central beams for( unsigned beam = 1; beam < itsNrPencilBeams; beam += processBeams ) { @@ -237,11 +417,9 @@ void PencilBeams::computeComplexVoltages( const FilteredData *in, PencilBeamData } } -#undef FUNC - #endif -void PencilBeams::computeDelays( const FilteredData *filteredData ) +void BeamFormer::computeDelays( const SubbandMetaData *metaData ) { // Calculate the delays for each station for this integration period. @@ -251,14 +429,14 @@ void PencilBeams::computeDelays( const FilteredData *filteredData ) for( unsigned stat = 0; stat < itsNrStations; stat++ ) { // we already compensated for the delay for the central beam - const SubbandMetaData::beamInfo ¢ralBeamInfo = filteredData->metaData.beams(stat)[0]; + const SubbandMetaData::beamInfo ¢ralBeamInfo = metaData->beams(stat)[0]; const double compensatedDelay = (centralBeamInfo.delayAfterEnd + centralBeamInfo.delayAtBegin) * 0.5; itsDelays[stat][0] = 0.0; // non-central beams for( unsigned pencil = 1; pencil < itsNrPencilBeams; pencil++ ) { - const SubbandMetaData::beamInfo &beamInfo = filteredData->metaData.beams(stat)[pencil]; + const SubbandMetaData::beamInfo &beamInfo = metaData->beams(stat)[pencil]; // subtract the delay that was already compensated for itsDelays[stat][pencil] = (beamInfo.delayAfterEnd + beamInfo.delayAtBegin) * 0.5 - compensatedDelay; @@ -266,18 +444,41 @@ void PencilBeams::computeDelays( const FilteredData *filteredData ) } } -void PencilBeams::formPencilBeams( const FilteredData *filteredData, PencilBeamData *pencilBeamData ) +void BeamFormer::formBeams( const SubbandMetaData *metaData, SampleData<> *sampleData, BeamFormedData *beamFormedData, double centerFrequency ) { + ASSERT( sampleData->samples.shape()[0] == itsNrChannels ); + ASSERT( sampleData->samples.shape()[1] == itsNrStations ); + ASSERT( sampleData->samples.shape()[2] >= itsNrSamplesPerIntegration ); + ASSERT( sampleData->samples.shape()[3] == NR_POLARIZATIONS ); + +#if !defined PENCILBEAMS_C_IMPLEMENTATION + ASSERT( TIMESTEPSIZE % 16 == 0 ); + + if( itsNrSamplesPerIntegration % 16 > 0 ) { + THROW(CNProcException, "nrSamplesPerIntegration (" << itsNrSamplesPerIntegration << ") needs to be a multiple of 16" ); + } +#endif + // TODO: fetch a list of stations to beam form. for now, we assume // we use all stations + // - pencilBeamFormTimer.start(); + const double baseFrequency = centerFrequency - (itsNrChannels/2) * itsChannelBandwidth; - computeDelays( filteredData ); - computeFlags( filteredData, pencilBeamData ); - computeComplexVoltages( filteredData, pencilBeamData ); + beamFormTimer.start(); + + // merging has to happen first! + mergeStationFlags( sampleData, sampleData ); + mergeStations( sampleData, sampleData ); + + // perform beam forming + if( itsNrPencilBeams > 0 && beamFormedData ) { // TODO: implement itsNrPencilBeams == 0 if nothing needs to be done + computeDelays( metaData ); + computeFlags( sampleData, beamFormedData ); + computeComplexVoltages( sampleData, beamFormedData, baseFrequency ); + } - pencilBeamFormTimer.stop(); + beamFormTimer.stop(); } } // namespace RTCP diff --git a/RTCP/CNProc/src/PencilBeams.h b/RTCP/CNProc/src/PencilBeams.h index b9bbf67f7d3..e0e165bb4ae 100644 --- a/RTCP/CNProc/src/PencilBeams.h +++ b/RTCP/CNProc/src/PencilBeams.h @@ -4,8 +4,9 @@ #include <vector> #include <cmath> -#include <Interface/FilteredData.h> +#include <Interface/StreamableData.h> #include <Interface/PencilBeamData.h> +#include <Interface/SubbandMetaData.h> #include <Interface/PencilCoordinates.h> #include <BandPass.h> #include <CN_Math.h> @@ -17,42 +18,83 @@ namespace LOFAR { namespace RTCP { -class PencilBeams +/* + + This beam former supports two modes: + + 1) merging stations, as indicated by the station2BeamFormedStation array. + 2) creating pencil beams, as indicated by the nrPencilBeams and metaData parameters. + + Merging stations + ------------------------- + + Stations are merged in-place according to the station2BeamFormedStation array, which is a mapping + source -> dest of length nrStations. Multiple sources with the same dest are added and stored at dest. + If the station2BeamFormedStation array is empty, source and dest are mapped 1:1 and no stations are merged. + + Creating pencil beams + ------------------------- + + Pencil beams are created by specifying their number as nrPencilBeams upon construction, and by the + delays as provided by the metaData given to formBeams. If nrPencilBeams = 0, the target data structure + remains untouched. + +*/ + +class BeamFormer { public: static const float MAX_FLAGGED_PERCENTAGE = 0.9f; - PencilBeams(const unsigned nrPencilBeams, const unsigned nrStations, const unsigned nrChannels, const unsigned nrSamplesPerIntegration, const double centerFrequency, const double channelBandwidth ); + BeamFormer(const unsigned nrPencilBeams, const unsigned nrStations, const unsigned nrChannels, const unsigned nrSamplesPerIntegration, const double channelBandwidth, const std::vector<unsigned> &station2BeamFormedStation ); - void formPencilBeams( const FilteredData *filteredData, PencilBeamData *pencilBeamData ); + // merges stations into superstations in sampleData, and fills beamFormedData with pencil beams + void formBeams( const SubbandMetaData *metaData, SampleData<> *sampleData, BeamFormedData *beamFormedData, double centerFrequency ); + // return the station mapping + std::vector<unsigned> &getStationMapping(); private: + unsigned calcNrBeamFormedStations(); + void initStationMergeMap(); + // extracts the delays from the metaData, and transforms them if necessary - void computeDelays( const FilteredData *filteredData ); + void computeDelays( const SubbandMetaData *metaData ); dcomplex phaseShift( const double frequency, const double delay ) const; - // sets the flags in pencilBeamData, and decides which stations should be added - void computeFlags( const FilteredData *filteredData, PencilBeamData *pencilBeamData ); + void addUnweighedStations( const SampleData<> *in, SampleData<> *out, const unsigned stationIndices[], unsigned nrStations, unsigned channel, unsigned beamIndex, unsigned timeOffset, unsigned timeLength, bool first ); + + // sets the flags in beamFormedData, and decides which stations should be added + void computeFlags( const SampleData<> *sampleData, SampleData<> *beamFormedData ); + void mergeStationFlags( const SampleData<> *in, SampleData<> *out ); // the actual beam former - void computeComplexVoltages( const FilteredData *filteredData, PencilBeamData *pencilBeamData ); + void mergeStations( const SampleData<> *in, SampleData<> *out ); + void computeComplexVoltages( const SampleData<> *in, SampleData<> *out, double baseFrequency ); const unsigned itsNrStations; const unsigned itsNrPencilBeams; const unsigned itsNrChannels; const unsigned itsNrSamplesPerIntegration; - const double itsCenterFrequency; const double itsChannelBandwidth; - const double itsBaseFrequency; Matrix<double> itsDelays; // [itsNrStations][itsNrPencilBeams] - // the following are filled by computeFlags() - unsigned itsNrValidStations; // the number - std::vector<bool> itsValidStations; + // a station is 'valid' if the samples do not contain too much flagged data. invalid stations + // are ignored by the beamformer. + + // variables for station merging + const std::vector<unsigned> &itsStation2BeamFormedStation; // [s] = i => station s belongs to beam i + std::vector<std::vector<unsigned> > itsMergeSourceStations; // [i] = [a,b,c] => beam i is a+b+c + std::vector<unsigned> itsMergeDestStations; // [i] = a => beam i is stored at a + std::vector<std::vector<unsigned> > itsValidMergeSourceStations; // subset of itsMergeSourceStations, + // containing only valid stations + + // variables for pencil beam forming + unsigned itsNrValidStations; // number of 'true' values in itsValidStations + std::vector<bool> itsValidStations; // [itsNrStations] whether each station is valid }; -inline dcomplex PencilBeams::phaseShift( const double frequency, const double delay ) const +inline dcomplex BeamFormer::phaseShift( const double frequency, const double delay ) const { const double phaseShift = delay * frequency; const double phi = -2 * M_PI * phaseShift; @@ -60,6 +102,11 @@ inline dcomplex PencilBeams::phaseShift( const double frequency, const double de return cosisin(phi); } +inline std::vector<unsigned> &BeamFormer::getStationMapping() { + return itsMergeDestStations; +} + + } // namespace RTCP } // namespace LOFAR diff --git a/RTCP/CNProc/src/Stokes.cc b/RTCP/CNProc/src/Stokes.cc index ad383e674d8..8619903f46f 100644 --- a/RTCP/CNProc/src/Stokes.cc +++ b/RTCP/CNProc/src/Stokes.cc @@ -48,50 +48,20 @@ static inline void addStokes( struct stokes &stokes, const fcomplex &polX, const // Calculate coherent stokes values from pencil beams. -void Stokes::calculateCoherent( const PencilBeamData *pencilBeamData, StokesData *stokesData, const unsigned nrBeams ) +void Stokes::calculateCoherent( const SampleData<> *sampleData, StokesData *stokesData, const unsigned nrBeams ) { - stokesTimer.start(); - computeCoherentStokes( pencilBeamData->samples, pencilBeamData->flags, stokesData, nrBeams ); - stokesTimer.stop(); -} - -// Calculate incoherent stokes values from (filtered) station data. -void Stokes::calculateIncoherent( const FilteredData *filteredData, StokesData *stokesData, const unsigned nrStations ) -{ - stokesTimer.start(); - computeIncoherentStokes( filteredData->samples, filteredData->flags, stokesData, nrStations ); - stokesTimer.stop(); -} - -// Compress Stokes values by summing over all channels -void Stokes::compressStokes( const StokesData *in, StokesDataIntegratedChannels *out, const unsigned nrBeams ) -{ - const unsigned timeSteps = itsNrSamplesPerIntegration / itsNrSamplesPerStokesIntegration; - - // copy flags - for(unsigned beam = 0; beam < nrBeams; beam++) { - out->flags[beam] = in->flags[beam]; - } - - for (unsigned beam = 0; beam < nrBeams; beam++ ) { - for (unsigned time = 0; time < timeSteps; time++ ) { - for (unsigned stokes = 0; stokes < itsNrStokes; stokes++ ) { - float channelSum = 0.0f; - - for (unsigned ch = 0; ch < itsNrChannels; ch ++) { - channelSum += in->samples[ch][beam][time][stokes]; - } - - out->samples[beam][time][stokes] = channelSum; - } - } - } -} + ASSERT( sampleData->samples.shape()[0] == itsNrChannels ); + ASSERT( sampleData->samples.shape()[1] == nrBeams ); + ASSERT( sampleData->samples.shape()[2] >= itsNrSamplesPerIntegration ); + ASSERT( sampleData->samples.shape()[3] == NR_POLARIZATIONS ); -void Stokes::computeCoherentStokes( const MultiDimArray<fcomplex,4> &in, const SparseSet<unsigned> *inflags, StokesData *out, const unsigned nrBeams ) -{ const unsigned &integrationSteps = itsNrSamplesPerStokesIntegration; const bool allStokes = itsNrStokes == 4; + const MultiDimArray<fcomplex,4> &in = sampleData->samples; + const std::vector<SparseSet<unsigned> > &inflags = sampleData->flags; + StokesData *out = stokesData; + + stokesTimer.start(); // copy flags from beams for(unsigned beam = 0; beam < nrBeams; beam++) { @@ -123,15 +93,27 @@ void Stokes::computeCoherentStokes( const MultiDimArray<fcomplex,4> &in, const S } } } + stokesTimer.stop(); } -void Stokes::computeIncoherentStokes( const MultiDimArray<fcomplex,4> &in, const SparseSet<unsigned> *inflags, StokesData *out, const unsigned nrStations ) +// Calculate incoherent stokes values from (filtered) station data. +void Stokes::calculateIncoherent( const SampleData<> *sampleData, StokesData *stokesData, const unsigned nrStations ) { + ASSERT( sampleData->samples.shape()[0] == itsNrChannels ); + ASSERT( sampleData->samples.shape()[1] == nrStations ); + ASSERT( sampleData->samples.shape()[2] >= itsNrSamplesPerIntegration ); + ASSERT( sampleData->samples.shape()[3] == NR_POLARIZATIONS ); + const unsigned &integrationSteps = itsNrSamplesPerStokesIntegration; const bool allStokes = itsNrStokes == 4; const unsigned upperBound = static_cast<unsigned>(itsNrSamplesPerIntegration * Stokes::MAX_FLAGGED_PERCENTAGE); bool validStation[nrStations]; unsigned nrValidStations = 0; + const MultiDimArray<fcomplex,4> &in = sampleData->samples; + const std::vector< SparseSet<unsigned> > &inflags = sampleData->flags; + StokesData *out = stokesData; + + stokesTimer.start(); out->flags[0].reset(); @@ -180,6 +162,32 @@ void Stokes::computeIncoherentStokes( const MultiDimArray<fcomplex,4> &in, const #undef dest } } + stokesTimer.stop(); +} + +// Compress Stokes values by summing over all channels +void Stokes::compressStokes( const StokesData *in, StokesDataIntegratedChannels *out, const unsigned nrBeams ) +{ + const unsigned timeSteps = itsNrSamplesPerIntegration / itsNrSamplesPerStokesIntegration; + + // copy flags + for(unsigned beam = 0; beam < nrBeams; beam++) { + out->flags[beam] = in->flags[beam]; + } + + for (unsigned beam = 0; beam < nrBeams; beam++ ) { + for (unsigned time = 0; time < timeSteps; time++ ) { + for (unsigned stokes = 0; stokes < itsNrStokes; stokes++ ) { + float channelSum = 0.0f; + + for (unsigned ch = 0; ch < itsNrChannels; ch ++) { + channelSum += in->samples[ch][beam][time][stokes]; + } + + out->samples[beam][time][stokes] = channelSum; + } + } + } } } // namespace RTCP diff --git a/RTCP/CNProc/src/Stokes.h b/RTCP/CNProc/src/Stokes.h index 31506ebd78a..bdcfb095336 100644 --- a/RTCP/CNProc/src/Stokes.h +++ b/RTCP/CNProc/src/Stokes.h @@ -2,7 +2,7 @@ #define LOFAR_CNPROC_STOKES_H #include <Interface/FilteredData.h> -#include <Interface/PencilBeamData.h> +#include <Interface/StreamableData.h> #include <Interface/StokesData.h> #include <Interface/MultiDimArray.h> @@ -17,8 +17,8 @@ class Stokes Stokes(const bool coherent, const int nrStokes, const unsigned nrChannels, const unsigned nrSamplesPerIntegration, const unsigned nrSamplesPerOutputIntegration); - void calculateCoherent( const PencilBeamData *filteredData, StokesData *stokesData, const unsigned nrBeams ); - void calculateIncoherent( const FilteredData *filteredData, StokesData *stokesData, const unsigned nrStations ); + void calculateCoherent( const SampleData<> *sampleData, StokesData *stokesData, const unsigned nrBeams ); + void calculateIncoherent( const SampleData<> *sampleData, StokesData *stokesData, const unsigned nrStations ); void compressStokes( const StokesData *in, StokesDataIntegratedChannels *out, const unsigned nrBeams ); private: @@ -27,9 +27,6 @@ class Stokes const unsigned itsNrSamplesPerStokesIntegration; const unsigned itsNrStokes; const bool itsIsCoherent; - - void computeCoherentStokes( const MultiDimArray<fcomplex,4> &in, const SparseSet<unsigned> *inflags, StokesData *out, const unsigned nrBeams ); - void computeIncoherentStokes( const MultiDimArray<fcomplex,4> &in, const SparseSet<unsigned> *inflags, StokesData *out, const unsigned nrBeams ); }; } // namespace RTCP diff --git a/RTCP/CNProc/src/TransposedData.h b/RTCP/CNProc/src/TransposedData.h index b323fceb67b..d4ab84a7413 100644 --- a/RTCP/CNProc/src/TransposedData.h +++ b/RTCP/CNProc/src/TransposedData.h @@ -20,22 +20,17 @@ template <typename SAMPLE_TYPE> class TransposedData: public SampleData<SAMPLE_T public: typedef SampleData<SAMPLE_TYPE,3> SuperType; - TransposedData(const unsigned nrStations, const unsigned nrSamplesToCNProc, const unsigned nrPencilBeams); - - SubbandMetaData metaData; // with one subband for every station + TransposedData(const unsigned nrStations, const unsigned nrSamplesToCNProc); private: const unsigned itsNrStations; - const unsigned itsNrPencilBeams; const unsigned itsNrSamplesToCNProc; }; -template <typename SAMPLE_TYPE> inline TransposedData<SAMPLE_TYPE>::TransposedData(const unsigned nrStations, const unsigned nrSamplesToCNProc, const unsigned nrPencilBeams) +template <typename SAMPLE_TYPE> inline TransposedData<SAMPLE_TYPE>::TransposedData(const unsigned nrStations, const unsigned nrSamplesToCNProc) : SuperType(false,boost::extents[nrStations][nrSamplesToCNProc][NR_POLARIZATIONS],0), - metaData(nrStations,nrPencilBeams,32), itsNrStations(nrStations), - itsNrPencilBeams(nrPencilBeams), itsNrSamplesToCNProc(nrSamplesToCNProc) { } diff --git a/RTCP/Interface/include/Interface/FilteredData.h b/RTCP/Interface/include/Interface/FilteredData.h index 561a0dbac94..6db6b232b34 100644 --- a/RTCP/Interface/include/Interface/FilteredData.h +++ b/RTCP/Interface/include/Interface/FilteredData.h @@ -8,7 +8,6 @@ #include <Interface/MultiDimArray.h> #include <Interface/SparseSet.h> #include <Interface/StreamableData.h> -#include <Interface/SubbandMetaData.h> namespace LOFAR { namespace RTCP { @@ -18,50 +17,26 @@ class FilteredData: public SampleData<fcomplex,4> public: typedef SampleData<fcomplex,4> SuperType; - FilteredData(unsigned nrStations, unsigned nrChannels, unsigned nrSamplesPerIntegration, unsigned nrPencilBeams); - - SubbandMetaData metaData; // with one subband for every station + FilteredData(unsigned nrStations, unsigned nrChannels, unsigned nrSamplesPerIntegration); protected: const unsigned itsNrStations; - const unsigned itsNrPencilBeams; const unsigned itsNrChannels; const unsigned itsNrSamplesPerIntegration; - virtual void readData(Stream *); - virtual void writeData(Stream *); }; - - -inline FilteredData::FilteredData(unsigned nrStations, unsigned nrChannels, unsigned nrSamplesPerIntegration, unsigned nrPencilBeams) +inline FilteredData::FilteredData(unsigned nrStations, unsigned nrChannels, unsigned nrSamplesPerIntegration) : // The "| 2" significantly improves transpose speeds for particular // numbers of stations due to cache conflict effects. The extra memory // is not used. SuperType::SampleData(false,boost::extents[nrChannels][nrStations][nrSamplesPerIntegration | 2][NR_POLARIZATIONS], nrStations), - metaData(nrStations,nrPencilBeams,32), itsNrStations(nrStations), - itsNrPencilBeams(nrPencilBeams), itsNrChannels(nrChannels), itsNrSamplesPerIntegration(nrSamplesPerIntegration) { } - -inline void FilteredData::readData(Stream *str) -{ - metaData.read(str); - SuperType::readData(str); -} - - -inline void FilteredData::writeData(Stream *str) -{ - metaData.write(str); - SuperType::writeData(str); -} - - } // namespace RTCP } // namespace LOFAR diff --git a/RTCP/Interface/include/Interface/MultiDimArray.h b/RTCP/Interface/include/Interface/MultiDimArray.h index bb2bc4153de..495e32b2e3c 100644 --- a/RTCP/Interface/include/Interface/MultiDimArray.h +++ b/RTCP/Interface/include/Interface/MultiDimArray.h @@ -94,6 +94,11 @@ template <typename T, unsigned DIM> class MultiDimArray : public boost::multi_ar resize(extents, alignment, *allocator); } + bool hasShape(const ExtentList &extents) + { + return std::equal( this->extent_list_.begin(), this->extent_list_.end(), extents.begin() ); + } + static size_t defaultAlignment() { return sizeof(T) < 16 ? 8 : sizeof(T) < 32 ? 16 : 32; diff --git a/RTCP/Interface/include/Interface/PencilBeamData.h b/RTCP/Interface/include/Interface/PencilBeamData.h index 59803f9160d..2474a9f8ccd 100644 --- a/RTCP/Interface/include/Interface/PencilBeamData.h +++ b/RTCP/Interface/include/Interface/PencilBeamData.h @@ -8,20 +8,19 @@ #include <Interface/MultiDimArray.h> #include <Interface/SparseSet.h> #include <Interface/StreamableData.h> -#include <Interface/SubbandMetaData.h> namespace LOFAR { namespace RTCP { -class PencilBeamData: public SampleData<fcomplex,4> +class BeamFormedData: public SampleData<fcomplex,4> { public: typedef SampleData<fcomplex,4> SuperType; - PencilBeamData(unsigned nrBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration); + BeamFormedData(unsigned nrBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration); }; -inline PencilBeamData::PencilBeamData(unsigned nrBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration) +inline BeamFormedData::BeamFormedData(unsigned nrBeams, unsigned nrChannels, unsigned nrSamplesPerIntegration) // The "| 2" significantly improves transpose speeds for particular // numbers of stations due to cache conflict effects. The extra memory // is not used. diff --git a/RTCP/Interface/include/Interface/PipelineOutput.h b/RTCP/Interface/include/Interface/PipelineOutput.h index f1d8be6a6c1..64eb6c1eae3 100644 --- a/RTCP/Interface/include/Interface/PipelineOutput.h +++ b/RTCP/Interface/include/Interface/PipelineOutput.h @@ -128,7 +128,7 @@ inline PipelineOutputSet::PipelineOutputSet(const Parset &ps, Allocator &allocat switch (mode.mode()) { case CN_Mode::FILTER : o = new PipelineOutput(id ++, PipelineOutput::FILTEREDDATA); - o->itsData = new FilteredData(ps.nrStations(), ps.nrChannelsPerSubband(), ps.CNintegrationSteps(), ps.nrPencilBeams()); + o->itsData = new FilteredData(ps.nrStations(), ps.nrChannelsPerSubband(), ps.CNintegrationSteps()); o->itsFilenameSuffix = ".filtered"; break; @@ -140,7 +140,7 @@ inline PipelineOutputSet::PipelineOutputSet(const Parset &ps, Allocator &allocat case CN_Mode::COHERENT_COMPLEX_VOLTAGES : o = new PipelineOutput(id ++, PipelineOutput::PENCILBEAMDATA); - o->itsData = new PencilBeamData(ps.nrPencilBeams(), ps.nrChannelsPerSubband(), ps.CNintegrationSteps()); + o->itsData = new BeamFormedData(ps.nrPencilBeams(), ps.nrChannelsPerSubband(), ps.CNintegrationSteps()); o->itsFilenameSuffix = ".complexvoltages"; break; diff --git a/RTCP/Interface/include/Interface/StreamableData.h b/RTCP/Interface/include/Interface/StreamableData.h index e5e310d322d..dc5dc77a09f 100644 --- a/RTCP/Interface/include/Interface/StreamableData.h +++ b/RTCP/Interface/include/Interface/StreamableData.h @@ -63,19 +63,18 @@ class StreamableData { }; // A typical data set contains a MultiDimArray of tuples and a set of flags. -template <typename T, unsigned DIM> class SampleData : public StreamableData +template <typename T = fcomplex, unsigned DIM = 4> class SampleData : public StreamableData { public: typedef typename MultiDimArray<T,DIM>::ExtentList ExtentList; SampleData(bool isIntegratable, const ExtentList &extents, unsigned nrFlags); - virtual ~SampleData(); virtual size_t requiredSize() const; virtual void allocate(Allocator &allocator = heapAllocator); MultiDimArray<T,DIM> samples; - SparseSet<unsigned> *flags; + std::vector<SparseSet<unsigned> > flags; protected: virtual void checkEndianness(); @@ -177,13 +176,7 @@ template <typename T, unsigned DIM> inline size_t SampleData<T,DIM>::requiredSiz template <typename T, unsigned DIM> inline void SampleData<T,DIM>::allocate(Allocator &allocator) { samples.resize(extents, 32, allocator); - flags = new SparseSet<unsigned>[nrFlags]; -} - -template <typename T, unsigned DIM> inline SampleData<T,DIM>::~SampleData() -{ - delete [] flags; - flags = 0; + flags.resize( nrFlags ); } template <typename T, unsigned DIM> inline void SampleData<T,DIM>::checkEndianness() @@ -214,7 +207,6 @@ template <typename T, unsigned DIM> inline void SampleData<T,DIM>::writeData(Str str->write(samples.origin(), samples.num_elements() * sizeof (T)); } - } // namespace RTCP } // namespace LOFAR -- GitLab