Skip to content
Snippets Groups Projects
Select Git revision
  • cc6794ca5c1899e44c8496d712f87c99e41b198c
  • master default protected
  • plot_using_locky
  • poppy_integration_v50
  • optimize_workflow
  • hotfix/disable_shm
  • poppy_integration
  • releases/v5.0 protected
  • use-versioned-releases
  • releases/v5.0rc2 protected
  • releases/v5.0rc1 protected
  • releases/ldv_v407_atdb protected
  • ldv_v407_debug
  • releases/ldv_v406_debug protected
  • releases/ldv_v405 protected
  • releases/ldv_v404 protected
  • v5.0
  • v5.0rc2
  • v5.0rc1
  • ldv_v406_debug
  • ldv_v405_debug
  • ldv_v404
  • ldv_v403
  • ldv_v402
  • v4.0
  • ldv_v401
  • ldv_v40
  • ldv_v031
  • ldv_v03
  • ldv_v01
30 results

check_ateam_separation.cwl

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    OnePredict.cc 30.64 KiB
    // OnePredict.cc: DP3 step class that predicts visibilities.
    // Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
    // SPDX-License-Identifier: GPL-3.0-or-later
    //
    // @author Tammo Jan Dijkema
    
    #include "OnePredict.h"
    #include "ApplyBeam.h"
    
    #include <algorithm>
    #include <cassert>
    #include <iostream>
    
    #include <xtensor/xview.hpp>
    
    #include "../common/ParameterSet.h"
    #include "../common/Timer.h"
    #include "../common/StreamUtil.h"
    
    #include "../parmdb/ParmDBMeta.h"
    #include "../parmdb/PatchInfo.h"
    #include "../parmdb/SkymodelToSourceDB.h"
    
    #include <dp3/base/DPInfo.h>
    #include "../base/FlagCounter.h"
    #include "../base/GaussianSource.h"
    #include "../base/PointSource.h"
    #include "../base/Simulate.h"
    #include "../base/Simulator.h"
    #include "../base/SkyModelCache.h"
    #include "../base/Stokes.h"
    #include "../base/Telescope.h"
    
    #include "../parmdb/SourceDB.h"
    
    #include <aocommon/barrier.h>
    #include <aocommon/logger.h>
    #include <aocommon/recursivefor.h>
    #include <aocommon/staticfor.h>
    
    #include <casacore/casa/Arrays/Array.h>
    #include <casacore/casa/Arrays/Vector.h>
    #include <casacore/casa/OS/File.h>
    #include <casacore/measures/Measures/MDirection.h>
    #include <casacore/measures/Measures/MeasConvert.h>
    #include <casacore/measures/Measures/MEpoch.h>
    #include <casacore/tables/Tables/RefRows.h>
    
    #include <algorithm>
    #include <cstddef>
    #include <numeric>
    #include <optional>
    #include <mutex>
    #include <sstream>
    #include <string>
    #include <utility>
    #include <vector>
    
    #include <boost/algorithm/string/case_conv.hpp>
    
    using casacore::MDirection;
    using casacore::MEpoch;
    using casacore::MVDirection;
    using casacore::MVEpoch;
    using casacore::Quantum;
    
    using dp3::base::DPBuffer;
    using dp3::base::DPInfo;
    using dp3::base::PredictModel;
    using dp3::common::operator<<;
    
    namespace dp3 {
    namespace steps {
    
    OnePredict::OnePredict(const common::ParameterSet& parset,
                           const std::string& prefix,
                           const std::vector<std::string>& source_patterns)
        : measures_mutex_(nullptr) {
      if (!source_patterns.empty()) {
        init(parset, prefix, source_patterns);
      } else {
        const std::vector<std::string> parset_patterns =
            parset.getStringVector(prefix + "sources", std::vector<std::string>());
        init(parset, prefix, parset_patterns);
      }
    }
    
    void OnePredict::init(const common::ParameterSet& parset,
                          const std::string& prefix,
                          const std::vector<std::string>& sourcePatterns) {
      name_ = prefix;
      source_db_name_ = parset.getString(prefix + "sourcedb");
      correct_freq_smearing_ =
          parset.getBool(prefix + "correctfreqsmearing", false);
      SetOperation(parset.getString(prefix + "operation", "replace"));
      output_data_name_ = parset.getString(prefix + "outputmodelname", "");
    
      apply_beam_ = parset.getBool(prefix + "usebeammodel", false);
      thread_over_baselines_ = parset.getBool(prefix + "parallelbaselines", false);
      debug_level_ = parset.getInt(prefix + "debuglevel", 0);
      patch_list_.clear();
    
      // Save directions specifications to pass to applycal
      std::stringstream ss;
      ss << sourcePatterns;
      direction_str_ = ss.str();
    
      aocommon::Logger::Debug << "Loading " << source_db_name_
                              << " in predict step for direction " << direction_str_
                              << ".\n";
      base::SourceDBWrapper source_db =
          base::SkyModelCache::GetInstance().GetSkyModel(source_db_name_);
      source_db.Filter(sourcePatterns, base::SourceDBWrapper::FilterMode::kPattern);
      try {
        patch_list_ = source_db.MakePatchList();
        if (patch_list_.empty()) {
          throw std::runtime_error("Couldn't find patch for direction " +
                                   direction_str_);
        }
      } catch (std::exception& exception) {
        throw std::runtime_error(std::string("Something went wrong while reading "
                                             "the source model. The error was: ") +
                                 exception.what());
      }
    
      if (apply_beam_) {
        use_channel_freq_ = parset.getBool(prefix + "usechannelfreq", true);
        one_beam_per_patch_ = parset.getBool(prefix + "onebeamperpatch", false);
        beam_proximity_limit_ =
            parset.getDouble(prefix + "beamproximitylimit", 60.0) *
            (M_PI / (180.0 * 60.0 * 60.0));
    
        beam_mode_ = everybeam::ParseCorrectionMode(
            parset.getString(prefix + "beammode", "default"));
    
        std::string element_model = boost::to_lower_copy(
            parset.getString(prefix + "elementmodel", "hamaker"));
        if (element_model == "hamaker") {
          element_response_model_ = everybeam::ElementResponseModel::kHamaker;
        } else if (element_model == "lobes") {
          element_response_model_ = everybeam::ElementResponseModel::kLOBES;
        } else if (element_model == "oskar") {
          element_response_model_ =
              everybeam::ElementResponseModel::kOSKARSphericalWave;
        } else if (element_model == "oskardipole") {
          element_response_model_ = everybeam::ElementResponseModel::kOSKARDipole;
        } else {
          throw std::runtime_error(
              "Elementmodel should be HAMAKER, LOBES, OSKAR or OSKARDIPOLE");
        }
    
        // By default, a source model has each direction in one patch. Therefore,
        // if one-beam-per-patch is requested, we don't have to do anything.
        if (!one_beam_per_patch_) {
          if (beam_proximity_limit_ > 0.0) {
            // Rework patch list to cluster proximate sources
            aocommon::Logger::Debug << "Clustering proximate sources for direction "
                                    << direction_str_ << ".\n";
            patch_list_ =
                clusterProximateSources(patch_list_, beam_proximity_limit_);
          } else {
            // Rework patch list to contain a patch for every source
            patch_list_ = makeOnePatchPerComponent(patch_list_);
          }
        }
      }
    
      // If called from h5parmpredict, applycal gets set by that step,
      // so must not be read from parset
      if (parset.isDefined(prefix + "applycal.parmdb") ||
          parset.isDefined(prefix + "applycal.steps")) {
        SetApplyCal(parset, prefix + "applycal.");
      }
    
      source_list_ = makeSourceList(patch_list_);
    
      // Determine whether any sources are polarized. If not, enable
      // Stokes-I-only mode (note that this mode cannot be used with apply_beam_)
      if (apply_beam_ && beam_mode_ != everybeam::CorrectionMode::kArrayFactor) {
        stokes_i_only_ = false;
      } else {
        stokes_i_only_ = !source_db.CheckPolarized();
      }
      any_orientation_is_absolute_ = source_db.CheckAnyOrientationIsAbsolute();
    }
    
    void OnePredict::SetApplyCal(const common::ParameterSet& parset,
                                 const string& prefix) {
      apply_cal_step_ =
          std::make_shared<ApplyCal>(parset, prefix, true, direction_str_);
      if (operation_ != Operation::kReplace &&
          parset.getBool(prefix + "applycal.updateweights", false))
        throw std::invalid_argument(
            "Weights cannot be updated when operation is not replace");
      result_step_ = std::make_shared<ResultStep>();
      apply_cal_step_->setNextStep(result_step_);
    }
    
    OnePredict::~OnePredict() = default;
    
    void OnePredict::initializeThreadData() {
      const size_t nBl = info().nbaselines();
      const size_t nSt = info().nantenna();
      const size_t nCh = info().nchan();
      const size_t nCr = stokes_i_only_ ? 1 : info().ncorr();
      const size_t nThreads = aocommon::ThreadPool::GetInstance().NThreads();
    
      station_uvw_.resize({nSt, 3});
    
      std::vector<std::array<double, 3>> antenna_pos(info().antennaPos().size());
      for (unsigned int i = 0; i < info().antennaPos().size(); ++i) {
        casacore::Quantum<casacore::Vector<double>> pos =
            info().antennaPos()[i].get("m");
        antenna_pos[i][0] = pos.getValue()[0];
        antenna_pos[i][1] = pos.getValue()[1];
        antenna_pos[i][2] = pos.getValue()[2];
      }
    
      uvw_split_index_ = base::nsetupSplitUVW(info().nantenna(), info().getAnt1(),
                                              info().getAnt2(), antenna_pos);
    
      if (!predict_buffer_) {
        predict_buffer_ = std::make_shared<base::PredictBuffer>();
      }
      bool is_dish_telescope = false;
      if (apply_beam_) {
        telescope_ = base::GetTelescope(info().msName(), element_response_model_,
                                        use_channel_freq_);
        is_dish_telescope = base::IsDish(*telescope_);
      }
      predict_buffer_->resize(nThreads, nCr, nCh, nBl,
                              (is_dish_telescope ? 1 : nSt), apply_beam_,
                              !stokes_i_only_);
      // Create the Measure ITRF conversion info given the array position.
      // The time and direction are filled in later.
      meas_convertors_.resize(nThreads);
      meas_frame_.resize(nThreads);
    
      for (size_t thread = 0; thread < nThreads; ++thread) {
        const bool need_meas_converters = moving_phase_ref_ || apply_beam_;
        if (need_meas_converters) {
          // Prepare measures converters
          meas_frame_[thread].set(info().arrayPosCopy());
          meas_frame_[thread].set(
              MEpoch(MVEpoch(info().startTime() / 86400), MEpoch::UTC));
          meas_convertors_[thread].set(
              MDirection::J2000,
              MDirection::Ref(MDirection::ITRF, meas_frame_[thread]));
        }
      }
    }
    
    void OnePredict::updateInfo(const DPInfo& infoIn) {
      Step::updateInfo(infoIn);
      if (operation_ == Operation::kReplace)
        info().setBeamCorrectionMode(
            static_cast<int>(everybeam::CorrectionMode::kNone));
    
      for (size_t bl = 0; bl != info().nbaselines(); ++bl) {
        baselines_.emplace_back(info().getAnt1()[bl], info().getAnt2()[bl]);
      }
    
      try {
        MDirection dirJ2000(
            MDirection::Convert(infoIn.phaseCenter(), MDirection::J2000)());
        Quantum<casacore::Vector<double>> angles = dirJ2000.getAngle();
        moving_phase_ref_ = false;
        phase_ref_ =
            base::Direction(angles.getBaseValue()[0], angles.getBaseValue()[1]);
      } catch (casacore::AipsError&) {
        // Phase direction (in J2000) is time dependent
        moving_phase_ref_ = true;
      }
    
      initializeThreadData();
    
      if (apply_cal_step_) {
        info() = apply_cal_step_->setInfo(info());
      }
    }
    
    base::Direction OnePredict::GetFirstDirection() const {
      return patch_list_.front()->direction();
    }
    
    void OnePredict::SetOperation(const std::string& operation) {
      if (operation == "replace") {
        operation_ = Operation::kReplace;
      } else if (operation == "add") {
        operation_ = Operation::kAdd;
      } else if (operation == "subtract") {
        operation_ = Operation::kSubtract;
      } else {
        throw std::invalid_argument(
            "Operation must be 'replace', 'add' or 'subtract'.");
      }
    }
    
    void OnePredict::show(std::ostream& os) const {
      os << "OnePredict " << name_ << '\n';
      os << "  sourcedb:                " << source_db_name_ << '\n';
      os << "   number of patches:      " << patch_list_.size() << '\n';
      os << "   patches clustered:      " << std::boolalpha
         << (!one_beam_per_patch_ && (beam_proximity_limit_ > 0.0)) << '\n';
      os << "   number of components:   " << source_list_.size() << '\n';
      os << "   absolute orientation:   " << std::boolalpha
         << any_orientation_is_absolute_ << '\n';
      os << "   all unpolarized:        " << std::boolalpha << stokes_i_only_
         << '\n';
      os << "   correct freq smearing:  " << std::boolalpha
         << correct_freq_smearing_ << '\n';
      os << "  apply beam:              " << std::boolalpha << apply_beam_ << '\n';
      if (apply_beam_) {
        os << "   mode:                   " << everybeam::ToString(beam_mode_);
        os << '\n';
        os << "   use channelfreq:        " << std::boolalpha << use_channel_freq_
           << '\n';
        os << "   one beam per patch:     " << std::boolalpha << one_beam_per_patch_
           << '\n';
        os << "   beam proximity limit:   "
           << (beam_proximity_limit_ * (180.0 * 60.0 * 60.0) / M_PI) << " arcsec\n";
      }
      os << "  operation:               ";
      switch (operation_) {
        case Operation::kReplace:
          os << "replace\n";
          break;
        case Operation::kAdd:
          os << "add\n";
          break;
        case Operation::kSubtract:
          os << "subtract\n";
          break;
      }
      if (apply_cal_step_) {
        apply_cal_step_->show(os);
      }
    }
    
    void OnePredict::showTimings(std::ostream& os, double duration) const {
      os << "  ";
      base::FlagCounter::showPerc1(os, timer_.getElapsed(), duration);
      os << " OnePredict " << name_ << '\n';
    
      /*
       * The timer_ measures the time in a single thread. Both predict_time_ and
       * apply_beam_time_ are the sum of time in multiple threads. This makes it
       * hard to determine the exact time spent in these phases. Instead it shows
       * the percentage spent in these two parts.
       */
      const int64_t time{predict_time_ + apply_beam_time_};
      os << "          ";
      base::FlagCounter::showPerc1(os, predict_time_, time);
      os << " of it spent in predict" << '\n';
    
      os << "          ";
      base::FlagCounter::showPerc1(os, apply_beam_time_, time);
      os << " of it spent in apply beam" << '\n';
    }
    
    void OnePredict::CopyPredictBufferToData(
        base::DPBuffer::DataType& destination,
        const aocommon::xt::UTensor<std::complex<double>, 3>& buffer) {
      if (stokes_i_only_) {
        // Add the predicted model to the first and last correlation.
        const size_t n_correlations = info().ncorr();
        auto dest_view = xt::view(destination, xt::all(), xt::all(),
                                  xt::keep(0, n_correlations - 1));
        // Without explicit casts, XTensor does not know what to do.
        dest_view = xt::cast<std::complex<float>>(buffer);
      } else {
        // Without explicit casts, XTensor does not know what to do.
        destination = xt::cast<std::complex<float>>(buffer);
      }
    }
    
    bool OnePredict::process(std::unique_ptr<DPBuffer> buffer) {
      timer_.start();
    
      // Determine the various sizes.
      const size_t nSt = info().nantenna();
      const size_t nBl = info().nbaselines();
      const size_t nCh = info().nchan();
      const size_t nCr = info().ncorr();
      const size_t nThreads = aocommon::ThreadPool::GetInstance().NThreads();
    
      base::nsplitUVW(uvw_split_index_, baselines_, buffer->GetUvw(), station_uvw_);
    
      double time = buffer->GetTime();
      // Set up directions for beam evaluation
      everybeam::vector3r_t refdir, tiledir;
    
      size_t n_threads = aocommon::ThreadPool::GetInstance().NThreads();
      const bool need_meas_converters = moving_phase_ref_ || apply_beam_;
      if (need_meas_converters) {
        // Because multiple predict steps might be predicting simultaneously, and
        // Casacore is not thread safe, this needs synchronization.
        std::unique_lock<std::mutex> lock;
        if (measures_mutex_ != nullptr)
          lock = std::unique_lock<std::mutex>(*measures_mutex_);
        for (size_t thread = 0; thread != n_threads; ++thread) {
          meas_frame_[thread].resetEpoch(
              MEpoch(MVEpoch(time / 86400), MEpoch::UTC));
          // Do a conversion on all threads
          refdir = dir2Itrf(info().delayCenter(), meas_convertors_[thread]);
          tiledir = dir2Itrf(info().tileBeamDir(), meas_convertors_[thread]);
        }
      }
    
      if (moving_phase_ref_) {
        // Convert phase reference to J2000
        MDirection dirJ2000(MDirection::Convert(
            info().phaseCenter(),
            MDirection::Ref(MDirection::J2000, meas_frame_[0]))());
        Quantum<casacore::Vector<double>> angles = dirJ2000.getAngle();
        phase_ref_ =
            base::Direction(angles.getBaseValue()[0], angles.getBaseValue()[1]);
      }
    
      std::vector<base::Simulator> simulators;
      simulators.reserve(n_threads);
    
      std::vector<std::pair<size_t, size_t>> baseline_range;
      std::vector<xt::xtensor<std::complex<double>, 3>> sim_buffer;
      std::vector<std::vector<std::pair<size_t, size_t>>> baselines_split;
      std::vector<std::pair<size_t, size_t>> station_range;
    
      // Take ownership of the input visibilities if we need them later.
      if (operation_ == Operation::kAdd || operation_ == Operation::kSubtract ||
          !output_data_name_.empty()) {
        input_data_ = buffer->TakeData();
      }
    
      // Determine destination of the predicted visibilities
      if (!output_data_name_.empty()) {
        buffer->AddData(output_data_name_);
      }
      DPBuffer::DataType& data = buffer->GetData(output_data_name_);
      data.resize({nBl, nCh, nCr});
      data.fill(std::complex<float>(0.0, 0.0));
    
      const size_t actual_nCr = (stokes_i_only_ ? 1 : nCr);
      if (thread_over_baselines_) {
        std::unique_ptr<PredictModel> model_buffer = std::make_unique<PredictModel>(
            nThreads, stokes_i_only_ ? 1 : info().ncorr(), nCh, nBl, apply_beam_);
    
        // Reduce the number of threads if there are not enough baselines.
        n_threads = std::min(n_threads, nBl);
    
        // All threads process 'baselines_per_thread' baselines.
        // The first 'remaining_baselines' threads process an extra baseline.
        const size_t baselines_per_thread = nBl / n_threads;
        const size_t remaining_baselines = nBl % n_threads;
    
        baseline_range.resize(n_threads);
        sim_buffer.resize(n_threads);
        baselines_split.resize(n_threads);
        if (apply_beam_) {
          station_range.resize(n_threads);
        }
    
        // Index of the first baseline for the current thread. The loop below
        // updates this variable in each iteration.
        size_t first_baseline = 0;
        for (size_t thread_index = 0; thread_index != n_threads; ++thread_index) {
          const size_t chunk_size =
              baselines_per_thread + ((thread_index < remaining_baselines) ? 1 : 0);
    
          baseline_range[thread_index] =
              std::make_pair(first_baseline, first_baseline + chunk_size);
          sim_buffer[thread_index].resize({chunk_size, nCh, actual_nCr});
    
          baselines_split[thread_index].resize(chunk_size);
          std::copy_n(std::next(baselines_.cbegin(), first_baseline), chunk_size,
                      baselines_split[thread_index].begin());
    
          first_baseline += chunk_size;  // Update for the next loop iteration.
        }
        // Verify that all baselines are assigned to threads.
        assert(first_baseline == nBl);
    
        // find min,max station indices for this thread
        if (apply_beam_) {
          const size_t stations_thread = (nSt + n_threads - 1) / n_threads;
          for (size_t thread_index = 0; thread_index != n_threads; ++thread_index) {
            const size_t station_start = thread_index * stations_thread;
            const size_t station_end = station_start + stations_thread < nSt
                                           ? station_start + stations_thread
                                           : nSt;
            if (station_start < nSt) {
              station_range[thread_index] =
                  std::make_pair(station_start, station_end);
            } else {
              // fill an invalid station range
              // so that station_start<nSt for valid range
              station_range[thread_index] = std::make_pair(nSt + 1, nSt + 1);
            }
          }
        }
    
        aocommon::RecursiveFor::NestedRun(0, n_threads, [&](size_t thread_index) {
          const std::complex<double> zero(0.0, 0.0);
          model_buffer->GetModel(thread_index).fill(zero);
          if (apply_beam_) model_buffer->GetPatchModel(thread_index).fill(zero);
          sim_buffer[thread_index].fill(zero);
        });
    
        // Keep this loop single threaded, I'm not sure if Simulator constructor
        // is thread safe.
        for (size_t thread_index = 0; thread_index != n_threads; ++thread_index) {
          // When applying beam, simulate into patch vector
          // Create a Casacore view since the Simulator still uses Casacore.
          xt::xtensor<std::complex<double>, 3>& thread_buffer =
              sim_buffer[thread_index];
          const casacore::IPosition shape(3, thread_buffer.shape(2),
                                          thread_buffer.shape(1),
                                          thread_buffer.shape(0));
          casacore::Cube<std::complex<double>> simulatedest(
              shape, thread_buffer.data(), casacore::SHARE);
    
          simulators.emplace_back(phase_ref_, nSt, baselines_split[thread_index],
                                  casacore::Vector<double>(info().chanFreqs()),
                                  casacore::Vector<double>(info().chanWidths()),
                                  station_uvw_, simulatedest,
                                  correct_freq_smearing_, stokes_i_only_);
        }
    
        std::vector<std::shared_ptr<const base::Patch>> curPatches(n_threads);
    
        aocommon::Barrier barrier(n_threads);
        // We need to create local threads here because we need to
        // sync only those using the barrier
        aocommon::RecursiveFor::NestedRun(0, n_threads, [&](size_t thread_index) {
          const common::ScopedMicroSecondAccumulator<decltype(predict_time_)>
              scoped_time{predict_time_};
          // Predict the source model and apply beam when an entire patch is
          // done
          std::shared_ptr<const base::Patch>& curPatch = curPatches[thread_index];
    
          for (size_t source_index = 0; source_index < source_list_.size();
               ++source_index) {
            const bool patchIsFinished =
                curPatch != source_list_[source_index].second &&
                curPatch != nullptr;
    
            if (apply_beam_ && patchIsFinished) {
              // PatchModel <- SimulBuffer
              aocommon::xt::UTensor<std::complex<double>, 3>& patch_model =
                  model_buffer->GetPatchModel(thread_index);
              xt::view(patch_model,
                       xt::range(baseline_range[thread_index].first,
                                 baseline_range[thread_index].second),
                       xt::all(), xt::all()) = sim_buffer[thread_index];
    
              // Apply the beam and add PatchModel to Model
              addBeamToData(*curPatch, model_buffer->GetModel(thread_index), time,
                            thread_index, patch_model, baseline_range[thread_index],
                            station_range[thread_index], barrier, stokes_i_only_);
              // Initialize patchmodel to zero for the next patch
              sim_buffer[thread_index].fill(std::complex<double>(0.0, 0.0));
            }
            // Depending on apply_beam_, the following call will add to either
            // the Model or the PatchModel of the predict buffer
            simulators[thread_index].simulate(source_list_[source_index].first);
    
            curPatch = source_list_[source_index].second;
          }
          // catch last source
          if (apply_beam_ && curPatch != nullptr) {
            // PatchModel <- SimulBuffer
            aocommon::xt::UTensor<std::complex<double>, 3>& patch_model =
                model_buffer->GetPatchModel(thread_index);
            xt::view(patch_model,
                     xt::range(baseline_range[thread_index].first,
                               baseline_range[thread_index].second),
                     xt::all(), xt::all()) = sim_buffer[thread_index];
    
            addBeamToData(*curPatch, model_buffer->GetModel(thread_index), time,
                          thread_index, patch_model, baseline_range[thread_index],
                          station_range[thread_index], barrier, stokes_i_only_);
          }
          if (!apply_beam_) {
            aocommon::xt::UTensor<std::complex<double>, 3>& model =
                model_buffer->GetModel(thread_index);
            xt::view(model,
                     xt::range(baseline_range[thread_index].first,
                               baseline_range[thread_index].second),
                     xt::all(), xt::all()) = sim_buffer[thread_index];
          }
        });
    
        // Add all thread model data to one buffer
        for (size_t thread = 1; thread < n_threads; ++thread) {
          // Sum thread model data in their own container (doubles) to prevent
          // rounding errors when writing to the data member of the DPBuffer
          // (floats).
          model_buffer->GetModel(0) += model_buffer->GetModel(thread);
        }
    
        CopyPredictBufferToData(data, model_buffer->GetModel(0));
      } else {
        PredictWithSourceParallelization(data, time);
      }
    
      if (apply_cal_step_) {
        apply_cal_step_->process(std::move(buffer));
        buffer = result_step_->take();
      }
    
      if (operation_ == Operation::kAdd) {
        data += input_data_;
      } else if (operation_ == Operation::kSubtract) {
        data = input_data_ - data;
      }
      if (!output_data_name_.empty()) {
        // Put the input visibilities back to the main buffer when needed.
        buffer->GetData() = std::move(input_data_);
      }
    
      timer_.stop();
    
      getNextStep()->process(std::move(buffer));
      return false;
    }
    
    void OnePredict::PredictSourceRange(
        aocommon::xt::UTensor<std::complex<double>, 3>& result, size_t start,
        size_t end, size_t thread_index, std::mutex& mutex, double time) {
      const size_t n_stations = info().nantenna();
      const size_t n_baselines = info().nbaselines();
      const size_t n_channels = info().nchan();
      const size_t n_buffer_correlations = stokes_i_only_ ? 1 : info().ncorr();
    
      aocommon::xt::UTensor<std::complex<double>, 3> model_data(
          {n_baselines, n_channels, n_buffer_correlations},
          std::complex<double>(0.0, 0.0));
    
      aocommon::xt::UTensor<std::complex<double>, 3> patch_model_data;
      if (apply_beam_) {
        patch_model_data.resize({n_baselines, n_channels, n_buffer_correlations});
        patch_model_data.fill(std::complex<double>(0.0, 0.0));
      }
    
      // Create a Casacore view since the Simulator still uses Casacore.
      // model_data.shape() and patch_model_data_data always have the same shape.
      const casacore::IPosition shape(3, model_data.shape(2), model_data.shape(1),
                                      model_data.shape(0));
      std::complex<double>* simulator_data =
          apply_beam_ ? patch_model_data.data() : model_data.data();
      casacore::Cube<std::complex<double>> casacore_data(shape, simulator_data,
                                                         casacore::SHARE);
      base::Simulator simulator(phase_ref_, n_stations, baselines_,
                                casacore::Vector<double>(info().chanFreqs()),
                                casacore::Vector<double>(info().chanWidths()),
                                station_uvw_, casacore_data, correct_freq_smearing_,
                                stokes_i_only_);
    
      const common::ScopedMicroSecondAccumulator<decltype(predict_time_)>
          scoped_time{predict_time_};
      std::shared_ptr<const base::Patch> patch;
    
      for (size_t source_index = start; source_index != end; ++source_index) {
        // Predict the source model and apply beam when an entire patch is
        // done
        const bool patch_is_finished =
            patch != source_list_[source_index].second && patch != nullptr;
        if (apply_beam_ && patch_is_finished) {
          // Apply the beam and add PatchModel to Model
          addBeamToData(*patch, model_data, time, thread_index, patch_model_data,
                        stokes_i_only_);
          // Initialize patchmodel to zero for the next patch
          patch_model_data.fill(std::complex<double>(0.0, 0.0));
        }
        // Depending on apply_beam_, the following call will add to either
        // the Model or the PatchModel of the predict buffer
        simulator.simulate(source_list_[source_index].first);
    
        patch = source_list_[source_index].second;
      }
    
      if (apply_beam_) {
        // Apply beam to the last patch
        const common::ScopedMicroSecondAccumulator<decltype(predict_time_)>
            scoped_time{predict_time_};
        if (patch != nullptr) {
          addBeamToData(*patch, model_data, time, thread_index, patch_model_data,
                        stokes_i_only_);
        }
      }
    
      // Add this thread's data to the global buffer
      std::lock_guard lock(mutex);
      result += model_data;
    }
    
    void OnePredict::PredictWithSourceParallelization(
        base::DPBuffer::DataType& destination, double time) {
      const size_t n_baselines = info().nbaselines();
      const size_t n_channels = info().nchan();
      const size_t buffered_correlations = stokes_i_only_ ? 1 : info().ncorr();
    
      aocommon::xt::UTensor<std::complex<double>, 3> global_data(
          {n_baselines, n_channels, buffered_correlations},
          std::complex<double>(0.0, 0.0));
    
      std::mutex mutex;
      aocommon::StaticFor<size_t> loop;
      // The way source are split into consecutive subranges
      // is important: it makes sure that a single patch is mostly calculated
      // by a single thread, which limits duplicate beam evaluations.
      loop.Run(0, source_list_.size(),
               [&](size_t start, size_t end, size_t thread_index) {
                 PredictSourceRange(global_data, start, end, thread_index, mutex,
                                    time);
               });
    
      CopyPredictBufferToData(destination, global_data);
    }
    
    everybeam::vector3r_t OnePredict::dir2Itrf(const MDirection& dir,
                                               MDirection::Convert& measConverter) {
      const MDirection& itrfDir = measConverter(dir);
      const casacore::Vector<double>& itrf = itrfDir.getValue().getValue();
      everybeam::vector3r_t vec;
      vec[0] = itrf[0];
      vec[1] = itrf[1];
      vec[2] = itrf[2];
      return vec;
    }
    
    void OnePredict::addBeamToData(
        const base::Patch& patch,
        aocommon::xt::UTensor<std::complex<double>, 3>& model_data, double time,
        size_t thread, aocommon::xt::UTensor<std::complex<double>, 3>& data,
        bool stokesIOnly) {
      // Apply beam for a patch, add result to Model
      MDirection dir(MVDirection(patch.direction().ra, patch.direction().dec),
                     MDirection::J2000);
      everybeam::vector3r_t srcdir = dir2Itrf(dir, meas_convertors_[thread]);
    
      if (stokesIOnly) {
        const common::ScopedMicroSecondAccumulator<decltype(apply_beam_time_)>
            scoped_time{apply_beam_time_};
        ApplyBeam::applyBeamStokesIArrayFactor(
            info(), time, data.data(), srcdir, telescope_.get(),
            predict_buffer_->GetScalarBeamValues(thread), false, beam_mode_,
            &mutex_);
      } else {
        const common::ScopedMicroSecondAccumulator<decltype(apply_beam_time_)>
            scoped_time{apply_beam_time_};
        float* dummyweight = nullptr;
        ApplyBeam::applyBeam(info(), time, data.data(), dummyweight, srcdir,
                             telescope_.get(),
                             predict_buffer_->GetFullBeamValues(thread), false,
                             beam_mode_, false, &mutex_);
      }
    
      // Add temporary buffer to Model
      model_data += data;
    }
    
    void OnePredict::addBeamToData(
        const base::Patch& patch,
        aocommon::xt::UTensor<std::complex<double>, 3>& model_data, double time,
        size_t thread, aocommon::xt::UTensor<std::complex<double>, 3>& data,
        const std::pair<size_t, size_t>& baseline_range,
        const std::pair<size_t, size_t>& station_range, aocommon::Barrier& barrier,
        bool stokesIOnly) {
      // Apply beam for a patch, add result to Model
      MDirection dir(MVDirection(patch.direction().ra, patch.direction().dec),
                     MDirection::J2000);
      everybeam::vector3r_t srcdir = dir2Itrf(dir, meas_convertors_[thread]);
    
      // We use a common buffer to calculate beam values
      const size_t common_thread = 0;
      if (stokesIOnly) {
        const common::ScopedMicroSecondAccumulator<decltype(apply_beam_time_)>
            scoped_time{apply_beam_time_};
        ApplyBeam::applyBeamStokesIArrayFactor(
            info(), time, data.data(), srcdir, telescope_.get(),
            predict_buffer_->GetScalarBeamValues(common_thread), baseline_range,
            station_range, barrier, false, beam_mode_, &mutex_);
      } else {
        const common::ScopedMicroSecondAccumulator<decltype(apply_beam_time_)>
            scoped_time{apply_beam_time_};
        float* dummyweight = nullptr;
        ApplyBeam::applyBeam(
            info(), time, data.data(), dummyweight, srcdir, telescope_.get(),
            predict_buffer_->GetFullBeamValues(common_thread), baseline_range,
            station_range, barrier, false, beam_mode_, false, &mutex_);
      }
    
      // Add temporary buffer to Model
      model_data += data;
    }
    
    void OnePredict::finish() {
      // Let the next steps finish.
      getNextStep()->finish();
    }
    }  // namespace steps
    }  // namespace dp3