diff --git a/steps/Interpolate.cc b/steps/Interpolate.cc index 44f456ccff1187693ccf97f7ad2f5c1afe6ad6df..8e6a8ce4d8c37969b14b5929ce4b8efd15a25a79 100644 --- a/steps/Interpolate.cc +++ b/steps/Interpolate.cc @@ -23,50 +23,50 @@ namespace dp3 { namespace steps { Interpolate::Interpolate(const common::ParameterSet& parset, - const string& prefix) - : _name(prefix), - _interpolatedPos(0), - _windowSize(parset.getUint(prefix + "windowsize", 15)) { - if (_windowSize % 2 != 1) + const std::string& prefix) + : name_(prefix), + interpolated_pos_(0), + window_size_(parset.getUint(prefix + "windowsize", 15)) { + if (window_size_ % 2 != 1) throw std::runtime_error( "Window size of Interpolate action should be an odd number"); - _kernelLookup.reserve(_windowSize * _windowSize); - for (int t = 0; t != int(_windowSize); ++t) { - int y = t - int(_windowSize / 2); - for (int ch = 0; ch != int(_windowSize); ++ch) { - int x = ch - int(_windowSize / 2); - double windowDist = double(x * x + y * y); + kernel_lookup_.reserve(window_size_ * window_size_); + for (int t = 0; t != int(window_size_); ++t) { + int y = t - int(window_size_ / 2); + for (int ch = 0; ch != int(window_size_); ++ch) { + int x = ch - int(window_size_ / 2); + double window_dist = double(x * x + y * y); // Gaussian function with sigma = 1 // (evaluated with double prec, then converted to floats) - double w = std::exp(windowDist * -0.5); - _kernelLookup.emplace_back(w); + double w = std::exp(window_dist * -0.5); + kernel_lookup_.emplace_back(w); } } } void Interpolate::show(std::ostream& os) const { - os << "Interpolate " << _name << '\n'; - os << " windowsize: " << _windowSize << '\n'; + os << "Interpolate " << name_ << '\n'; + os << " windowsize: " << window_size_ << '\n'; } void Interpolate::showTimings(std::ostream& os, double duration) const { os << " "; - FlagCounter::showPerc1(os, _timer.getElapsed(), duration); - os << " Interpolate " << _name << '\n'; + FlagCounter::showPerc1(os, timer_.getElapsed(), duration); + os << " Interpolate " << name_ << '\n'; } bool Interpolate::process(std::unique_ptr<base::DPBuffer> input_buffer) { - _timer.start(); + timer_.start(); // Collect the data in buffers. - _buffers.emplace_back(std::move(input_buffer)); + buffers_.emplace_back(std::move(input_buffer)); // If we have a full window of data, interpolate everything // up to the middle of the window - if (_buffers.size() >= _windowSize) { - size_t mid = _windowSize / 2; - while (_interpolatedPos <= mid) { - interpolateTimestep(_interpolatedPos); - ++_interpolatedPos; + if (buffers_.size() >= window_size_) { + size_t mid = window_size_ / 2; + while (interpolated_pos_ <= mid) { + interpolateTimestep(interpolated_pos_); + ++interpolated_pos_; } // Buffers are only pushed to the next step when they are completely // out of the window. This is because flags need to be set to false, @@ -74,14 +74,17 @@ bool Interpolate::process(std::unique_ptr<base::DPBuffer> input_buffer) { // interpolation, so these can only be set to false after processing. sendFrontBufferToNextStep(); } - _timer.stop(); + timer_.stop(); return true; } void Interpolate::sendFrontBufferToNextStep() { - auto front_buffer = std::move(_buffers.front()); - const auto& shp = front_buffer->GetData().shape(); - size_t nPol = shp[2], nChan = shp[1], nBl = shp[0], n = nPol * nChan * nBl; + auto front_buffer = std::move(buffers_.front()); + const auto& data_shape = front_buffer->GetData().shape(); + size_t num_pol = data_shape[2]; + size_t num_channels = data_shape[1]; + size_t num_baselines = data_shape[0]; + size_t n = num_pol * num_channels * num_baselines; // Set all flags to false bool* flags = front_buffer->GetFlags().data(); std::complex<float>* data = front_buffer->GetData().data(); @@ -96,27 +99,27 @@ void Interpolate::sendFrontBufferToNextStep() { } } - _timer.stop(); + timer_.stop(); getNextStep()->process(std::move(front_buffer)); - _timer.start(); + timer_.start(); - _buffers.pop_front(); - --_interpolatedPos; + buffers_.pop_front(); + --interpolated_pos_; } void Interpolate::finish() { - _timer.start(); + timer_.start(); // Interpolate everything up to the end of the window - while (_interpolatedPos < _buffers.size()) { - interpolateTimestep(_interpolatedPos); - ++_interpolatedPos; + while (interpolated_pos_ < buffers_.size()) { + interpolateTimestep(interpolated_pos_); + ++interpolated_pos_; } - while (!_buffers.empty()) { + while (!buffers_.empty()) { sendFrontBufferToNextStep(); } - _timer.stop(); + timer_.stop(); getNextStep()->finish(); } @@ -124,22 +127,24 @@ void Interpolate::finish() { #define BUFFER_SIZE 1024 void Interpolate::interpolateTimestep(size_t index) { - const auto& shp = _buffers.front()->GetData().shape(); - const size_t nPol = shp[2], nChan = shp[1], nPerBl = nPol * nChan, - nBl = shp[0]; + const auto& data_shape = buffers_.front()->GetData().shape(); + const size_t num_pol = data_shape[2]; + const size_t num_channels = data_shape[1]; + const size_t num_per_baseline = num_pol * num_channels; + const size_t num_baselines = data_shape[0]; std::vector<std::thread> threads; - size_t nthreads = std::min<size_t>(getInfo().nThreads(), 8); - _lane.resize(nthreads * BUFFER_SIZE); - common::lane_write_buffer<Sample> buflane(&_lane, BUFFER_SIZE); - threads.reserve(nthreads); - for (size_t i = 0; i != nthreads; ++i) + size_t num_threads = std::min<size_t>(getInfo().nThreads(), 8); + lane_.resize(num_threads * BUFFER_SIZE); + common::lane_write_buffer<Sample> buflane(&lane_, BUFFER_SIZE); + threads.reserve(num_threads); + for (size_t i = 0; i != num_threads; ++i) threads.emplace_back(&Interpolate::interpolationThread, this); - for (size_t bl = 0; bl < nBl; ++bl) { - bool* flags = _buffers[index]->GetFlags().data() + bl * nPerBl; - for (size_t ch = 0; ch != nChan; ++ch) { - for (size_t p = 0; p != nPol; ++p) { + for (size_t bl = 0; bl < num_baselines; ++bl) { + bool* flags = buffers_[index]->GetFlags().data() + bl * num_per_baseline; + for (size_t ch = 0; ch != num_channels; ++ch) { + for (size_t p = 0; p != num_pol; ++p) { if (*flags) { buflane.emplace(index, bl, ch, p); } @@ -153,9 +158,9 @@ void Interpolate::interpolateTimestep(size_t index) { } void Interpolate::interpolationThread() { - common::lane_read_buffer<Sample> buflane(&_lane, BUFFER_SIZE); + common::lane_read_buffer<Sample> lane_buffer(&lane_, BUFFER_SIZE); Sample sample; - while (buflane.read(sample)) { + while (lane_buffer.read(sample)) { interpolateSample(sample.timestep, sample.baseline, sample.channel, sample.pol); } @@ -163,48 +168,50 @@ void Interpolate::interpolationThread() { void Interpolate::interpolateSample(size_t timestep, size_t baseline, size_t channel, size_t pol) { - const auto& shp = _buffers.front()->GetData().shape(); - const size_t nPol = shp[2], nChan = shp[1], - timestepBegin = (timestep > _windowSize / 2) - ? (timestep - _windowSize / 2) - : 0, - timestepEnd = - std::min(timestep + _windowSize / 2 + 1, _buffers.size()), - channelBegin = (channel > _windowSize / 2) - ? (channel - _windowSize / 2) - : 0, - channelEnd = std::min(channel + _windowSize / 2 + 1, nChan); - - std::complex<float> valueSum = 0.0; - float windowSum = 0.0; - - for (size_t t = timestepBegin; t != timestepEnd; ++t) { - std::complex<float>* data = _buffers[t]->GetData().data() + - (baseline * nChan + channelBegin) * nPol + pol; - const bool* flags = _buffers[t]->GetFlags().data() + - (baseline * nChan + channelBegin) * nPol + pol; + const auto& data_shape = buffers_.front()->GetData().shape(); + const size_t num_pol = data_shape[2]; + const size_t num_channels = data_shape[1]; + const size_t timestep_begin = + (timestep > window_size_ / 2) ? (timestep - window_size_ / 2) : 0; + const size_t timestep_end = + std::min(timestep + window_size_ / 2 + 1, buffers_.size()); + const size_t channel_begin = + (channel > window_size_ / 2) ? (channel - window_size_ / 2) : 0; + const size_t channel_end = + std::min(channel + window_size_ / 2 + 1, num_channels); + + std::complex<float> value_sum = 0.0; + float window_sum = 0.0; + + for (size_t t = timestep_begin; t != timestep_end; ++t) { + std::complex<float>* data = + buffers_[t]->GetData().data() + + (baseline * num_channels + channel_begin) * num_pol + pol; + const bool* flags = buffers_[t]->GetFlags().data() + + (baseline * num_channels + channel_begin) * num_pol + + pol; const float* row = - &_kernelLookup[_windowSize * (t + int(_windowSize / 2) - timestep)]; - for (size_t ch = channelBegin; ch != channelEnd; ++ch) { + &kernel_lookup_[window_size_ * (t + int(window_size_ / 2) - timestep)]; + for (size_t ch = channel_begin; ch != channel_end; ++ch) { if (!*flags) { - int x = ch + int(_windowSize / 2) - channel; + int x = ch + int(window_size_ / 2) - channel; float w = row[x]; - valueSum += *data * w; - windowSum += w; + value_sum += *data * w; + window_sum += w; } - data += nPol; - flags += nPol; + data += num_pol; + flags += num_pol; } } // This write is multithreaded, but is allowed because this value is never // read from in the loops above (because flagged values are skipped). std::complex<float>& value = - _buffers[timestep] + buffers_[timestep] ->GetData() - .data()[(baseline * nChan + channel) * nPol + pol]; - if (windowSum != 0.0) - value = valueSum / windowSum; + .data()[(baseline * num_channels + channel) * num_pol + pol]; + if (window_sum != 0.0) + value = value_sum / window_sum; else value = std::complex<float>(std::numeric_limits<float>::quiet_NaN(), std::numeric_limits<float>::quiet_NaN()); diff --git a/steps/Interpolate.h b/steps/Interpolate.h index d96f6908bc85cf60d8aa36a3d308c30bbcc8eff4..dc5eccae0140f87744c9be151ff4d20712f694c5 100644 --- a/steps/Interpolate.h +++ b/steps/Interpolate.h @@ -22,7 +22,7 @@ class Interpolate : public Step { public: /// Construct the object. /// Parameters are obtained from the parset using the given prefix. - Interpolate(const common::ParameterSet&, const string& prefix); + Interpolate(const common::ParameterSet&, const std::string& prefix); ~Interpolate() override = default; @@ -68,13 +68,13 @@ class Interpolate : public Step { size_t pol; }; - std::string _name; - size_t _interpolatedPos; - std::deque<std::unique_ptr<base::DPBuffer>> _buffers; - size_t _windowSize; - common::NSTimer _timer; - aocommon::Lane<Sample> _lane; - std::vector<float> _kernelLookup; + std::string name_; + size_t interpolated_pos_; + std::deque<std::unique_ptr<base::DPBuffer>> buffers_; + size_t window_size_; + common::NSTimer timer_; + aocommon::Lane<Sample> lane_; + std::vector<float> kernel_lookup_; }; } // namespace steps diff --git a/steps/test/unit/tInterpolate.cc b/steps/test/unit/tInterpolate.cc index cc014dd25898a54764dfbffb170a67a7084155e2..fbd68f4dd195adafc643cfe76743178817de9c60 100644 --- a/steps/test/unit/tInterpolate.cc +++ b/steps/test/unit/tInterpolate.cc @@ -28,29 +28,29 @@ BOOST_AUTO_TEST_SUITE(interpolate) // It can be used with different nr of times, channels, etc. class TestInput : public dp3::steps::MockInput { public: - TestInput(std::size_t ntime, std::size_t nant, std::size_t nchan, - std::size_t ncorr, bool flag) - : itsCount(0), - itsNTime(ntime), - itsNBl(nant * (nant + 1) / 2), - itsNChan(nchan), - itsNCorr(ncorr), - itsFlag(flag) { + TestInput(std::size_t num_times, std::size_t num_antennas, + std::size_t num_channels, std::size_t num_correlations, bool flag) + : process_count_(0), + num_times_(num_times), + num_baselines_(num_antennas * (num_antennas + 1) / 2), + num_channels_(num_channels), + num_correlations_(num_correlations), + flag_(flag) { using casacore::MPosition; using casacore::Quantum; - info() = DPInfo(itsNCorr, itsNChan); - info().setTimes(100.0, 100.0 + (itsNTime - 1) * 5.0, 5.0); + info() = DPInfo(num_correlations_, num_channels_); + info().setTimes(100.0, 100.0 + (num_times_ - 1) * 5.0, 5.0); // Fill the baseline stations; use 4 stations. // So they are called 00 01 02 03 10 11 12 13 20, etc. - std::vector<int> ant1(itsNBl); - std::vector<int> ant2(itsNBl); + std::vector<int> antenna_1(num_baselines_); + std::vector<int> antenna_2(num_baselines_); int st1 = 0; int st2 = 0; - for (std::size_t i = 0; i < itsNBl; ++i) { - ant1[i] = st1; - ant2[i] = st2; + for (std::size_t i = 0; i < num_baselines_; ++i) { + antenna_1[i] = st1; + antenna_2[i] = st2; if (++st2 == 4) { st2 = 0; if (++st1 == 4) { @@ -58,8 +58,8 @@ class TestInput : public dp3::steps::MockInput { } } } - std::vector<std::string> antNames{"rs01.s01", "rs02.s01", "cs01.s01", - "cs01.s02"}; + std::vector<std::string> antenna_names{"rs01.s01", "rs02.s01", "cs01.s01", + "cs01.s02"}; // Define their positions (more or less WSRT RT0-3). const casacore::Vector<double> vals0( std::vector<int>{3828763, 442449, 5064923}); @@ -69,7 +69,7 @@ class TestInput : public dp3::steps::MockInput { std::vector<int>{3828729, 442735, 5064925}); const casacore::Vector<double> vals3( std::vector<int>{3828713, 442878, 5064926}); - const std::vector<MPosition> antPos{ + const std::vector<MPosition> antenna_positions{ MPosition(Quantum<casacore::Vector<double>>(vals0, "m"), MPosition::ITRF), MPosition(Quantum<casacore::Vector<double>>(vals1, "m"), @@ -78,43 +78,47 @@ class TestInput : public dp3::steps::MockInput { MPosition::ITRF), MPosition(Quantum<casacore::Vector<double>>(vals3, "m"), MPosition::ITRF)}; - const std::vector<double> antDiam(4, 70.); - info().setAntennas(antNames, antDiam, antPos, ant1, ant2); + const std::vector<double> antenna_diameters(4.0, 70.0); + info().setAntennas(antenna_names, antenna_diameters, antenna_positions, + antenna_1, antenna_2); // Define the frequencies. - std::vector<double> chanFreqs; - std::vector<double> chanWidth(nchan, 100000.); - for (std::size_t i = 0; i < nchan; i++) { - chanFreqs.push_back(1050000. + i * 100000.); + std::vector<double> channel_frequencies; + std::vector<double> channel_widths(num_channels, 100000.0); + for (std::size_t i = 0; i < num_channels; ++i) { + channel_frequencies.push_back(1050000.0 + i * 100000.0); } - info().setChannels(std::move(chanFreqs), std::move(chanWidth)); + info().setChannels(std::move(channel_frequencies), + std::move(channel_widths)); } private: bool process(std::unique_ptr<dp3::base::DPBuffer>) override { // Stop when all times are done. - if (itsCount == itsNTime) { + if (process_count_ == num_times_) { return false; } - const std::array<std::size_t, 3> data_shape{itsNBl, itsNChan, itsNCorr}; + const std::array<std::size_t, 3> data_shape{num_baselines_, num_channels_, + num_correlations_}; xt::xtensor<std::complex<float>, 3> data(data_shape); for (std::size_t i = 0; i < data.size(); ++i) { data.data()[i] = std::complex<float>(1.6, 0.9); } - if (itsCount == 5) { - data += std::complex<float>(10., 10.); + if (process_count_ == 5) { + data += std::complex<float>(10.0, 10.0); } auto buffer = std::make_unique<dp3::base::DPBuffer>(); - buffer->setTime(itsCount * 5 + 2); // same interval as in updateAveragInfo + buffer->setTime(process_count_ * 5.0 + + 2.0); // same interval as in updateAveragInfo buffer->ResizeData(data_shape); buffer->GetData() = data; xt::xtensor<float, 3> weights(data_shape, 1.0); buffer->ResizeWeights(data_shape); buffer->GetWeights() = weights; - xt::xtensor<bool, 3> flags(data_shape, itsFlag); + xt::xtensor<bool, 3> flags(data_shape, flag_); buffer->ResizeFlags(data_shape); buffer->GetFlags() = flags; getNextStep()->process(std::move(buffer)); - ++itsCount; + ++process_count_; return true; } @@ -123,55 +127,59 @@ class TestInput : public dp3::steps::MockInput { // Do nothing / keep the info set in the constructor. } - std::size_t itsCount, itsNTime, itsNBl, itsNChan, itsNCorr; - bool itsFlag; + std::size_t process_count_, num_times_, num_baselines_, num_channels_, + num_correlations_; + bool flag_; }; // Class to check result. class TestOutput : public dp3::steps::test::ThrowStep { public: - TestOutput(std::size_t ntime, std::size_t nant, std::size_t nchan, - std::size_t ncorr) - : itsCount(0), - itsNTime(ntime), - itsNBl(nant * (nant + 1) / 2), - itsNChan(nchan), - itsNCorr(ncorr) {} + TestOutput(std::size_t num_times, std::size_t num_antennas, + std::size_t num_channels, std::size_t num_correlations) + : process_count_(0), + num_times_(num_times), + num_baselines_(num_antennas * (num_antennas + 1) / 2), + num_channels_(num_channels), + num_correlations_(num_correlations) {} private: bool process( const std::unique_ptr<dp3::base::DPBuffer> input_buffer) override { // Fill expected result in similar way as TestInput. - const std::array<std::size_t, 3> data_shape{itsNBl, itsNChan, itsNCorr}; - xt::xtensor<std::complex<float>, 3> result(data_shape); - for (std::size_t i = 0; i < result.size(); ++i) { - result.data()[i] = std::complex<float>(1.6, 0.9); + const std::array<std::size_t, 3> data_shape{num_baselines_, num_channels_, + num_correlations_}; + xt::xtensor<std::complex<float>, 3> expected_result(data_shape); + for (std::size_t i = 0; i < expected_result.size(); ++i) { + expected_result.data()[i] = std::complex<float>(1.6, 0.9); } - if (itsCount == 5) { - result += std::complex<float>(10.0, 10.0); + if (process_count_ == 5) { + expected_result += std::complex<float>(10.0, 10.0); } // Check the result. - BOOST_CHECK(xt::allclose(input_buffer->GetData(), result, 1e-10)); - BOOST_CHECK(xt::allclose(input_buffer->getTime(), 2 + 5.0 * itsCount)); - ++itsCount; + BOOST_CHECK(xt::allclose(input_buffer->GetData(), expected_result, 1e-10)); + BOOST_CHECK( + xt::allclose(input_buffer->getTime(), 2 + 5.0 * process_count_)); + ++process_count_; return true; } void finish() override {} void updateInfo(const DPInfo& info) override { - BOOST_CHECK_EQUAL(info.origNChan(), itsNChan); - BOOST_CHECK_EQUAL(info.nchan(), itsNChan); - BOOST_CHECK_EQUAL(info.ntime(), itsNTime); + BOOST_CHECK_EQUAL(info.origNChan(), num_channels_); + BOOST_CHECK_EQUAL(info.nchan(), num_channels_); + BOOST_CHECK_EQUAL(info.ntime(), num_times_); BOOST_CHECK_EQUAL(info.firstTime(), 100.0); BOOST_CHECK_EQUAL(info.timeInterval(), 5.0); BOOST_CHECK_EQUAL(info.nchanAvg(), 1); BOOST_CHECK_EQUAL(info.ntimeAvg(), 1); - BOOST_CHECK_EQUAL(info.chanFreqs().size(), itsNChan); - BOOST_CHECK_EQUAL(info.chanWidths().size(), itsNChan); + BOOST_CHECK_EQUAL(info.chanFreqs().size(), num_channels_); + BOOST_CHECK_EQUAL(info.chanWidths().size(), num_channels_); BOOST_CHECK(info.msName().empty()); } - std::size_t itsCount, itsNTime, itsNBl, itsNChan, itsNCorr; + std::size_t process_count_, num_times_, num_baselines_, num_channels_, + num_correlations_; }; BOOST_AUTO_TEST_CASE(test1) { @@ -181,13 +189,13 @@ BOOST_AUTO_TEST_CASE(test1) { const int kNCorr = 4; const bool kFlag = false; - auto step1 = + auto step_1 = std::make_shared<TestInput>(kNTime, kNAnt, kNChan, kNCorr, kFlag); dp3::common::ParameterSet parset; parset.add("windowsize", "9"); - auto step2 = std::make_shared<dp3::steps::Interpolate>(parset, ""); - auto step3 = std::make_shared<TestOutput>(kNTime, kNAnt, kNChan, kNCorr); - dp3::steps::test::Execute({step1, step2, step3}); + auto step_2 = std::make_shared<dp3::steps::Interpolate>(parset, ""); + auto step_3 = std::make_shared<TestOutput>(kNTime, kNAnt, kNChan, kNCorr); + dp3::steps::test::Execute({step_1, step_2, step_3}); } BOOST_AUTO_TEST_SUITE_END()