diff --git a/base/DPInfo.cc b/base/DPInfo.cc index a0b2bf4461a4b34984925e62d9bb5fd642afabf6..01da262bab52b24ae5bce7ae65eb20178745b109 100644 --- a/base/DPInfo.cc +++ b/base/DPInfo.cc @@ -21,8 +21,6 @@ #include <EveryBeam/correctionmode.h> -#include <aocommon/recursivefor.h> - #include "../common/Epsilon.h" using casacore::MDirection; diff --git a/external/aocommon b/external/aocommon index 505c270d9914190571201887d3bf9315ec5c97f4..f39ef8bc6013e03f7ecd2b8b4df58d4869edb4c5 160000 --- a/external/aocommon +++ b/external/aocommon @@ -1 +1 @@ -Subproject commit 505c270d9914190571201887d3bf9315ec5c97f4 +Subproject commit f39ef8bc6013e03f7ecd2b8b4df58d4869edb4c5 diff --git a/steps/DDECal.cc b/steps/DDECal.cc index 1ab8f45c363cac17ef8af2543d8d4cb1c1890f15..a814793f5945b10661d0aea65bdf9cf9b0e127f0 100644 --- a/steps/DDECal.cc +++ b/steps/DDECal.cc @@ -526,14 +526,6 @@ void DDECal::checkMinimumVisibilities(size_t bufferIndex) { } } -void DDECal::SetPredictThreadingInfo(aocommon::RecursiveFor& recursive_for) { - for (std::shared_ptr<ModelDataStep>& step : itsSteps) { - if (auto predict = std::dynamic_pointer_cast<Predict>(step)) { - predict->SetThreadData(recursive_for, &itsMeasuresMutex); - } - } -} - void DDECal::doSolve() { for (size_t dir = 0; dir < itsDirections.size(); ++dir) { // For directions that reuse model data, the model data of the various @@ -763,7 +755,6 @@ void DDECal::doPrepare() { // Enclose the recursive_for { aocommon::RecursiveFor recursive_for; - SetPredictThreadingInfo(recursive_for); recursive_for.Run(0, itsSteps.size(), [&](size_t dir, size_t) { if (itsSteps[dir]) { // When reusing model data, there is no step. itsSteps[dir]->process( diff --git a/steps/DDECal.h b/steps/DDECal.h index 99ca55679c477751191dd0b5cb551eb638b948a4..ad867ac74d81b4e7b9398a1df2d38ae6dc685fbd 100644 --- a/steps/DDECal.h +++ b/steps/DDECal.h @@ -105,8 +105,6 @@ class DDECal : public Step { /// input data buffer. void CorrectAndSubtractModels(size_t buffer_index); - void SetPredictThreadingInfo(aocommon::RecursiveFor& recursive_for); - const ddecal::Settings itsSettings; /// The input data buffers for the current set of solution intervals. diff --git a/steps/H5ParmPredict.cc b/steps/H5ParmPredict.cc index a5ee9244a8bf83132b40ec52631d51457661baaf..3a91650cf1526e0c74d430753b2fdb67be1d8d63 100644 --- a/steps/H5ParmPredict.cc +++ b/steps/H5ParmPredict.cc @@ -38,8 +38,7 @@ H5ParmPredict::H5ParmPredict(const common::ParameterSet& parset, itsH5ParmName(parset.getString(prefix + "applycal.parmdb")), itsDirections( parset.getStringVector(prefix + "directions", std::vector<string>())), - itsTimer(), - itsThreadPool() { + itsTimer() { H5Parm h5parm = H5Parm(itsH5ParmName, false); std::string soltabName = parset.getString(prefix + "applycal.correction"); if (soltabName == "fulljones") soltabName = "amplitude000"; @@ -82,7 +81,6 @@ H5ParmPredict::H5ParmPredict(const common::ParameterSet& parset, } else { predictStep->SetOperation(operation); } - predictStep->SetThreadData(itsThreadPool, nullptr); predictStep->SetPredictBuffer(itsPredictBuffer); if (!itsPredictSteps.empty()) { diff --git a/steps/H5ParmPredict.h b/steps/H5ParmPredict.h index ae21076d788cd98208842e93c19f894e53df81bd..0803607c9dff28bd78a8e649c0f2114ced7842ef 100644 --- a/steps/H5ParmPredict.h +++ b/steps/H5ParmPredict.h @@ -11,8 +11,6 @@ #ifndef DP3_STEPS_H5PARM_PREDICT_H_ #define DP3_STEPS_H5PARM_PREDICT_H_ -#include <aocommon/recursivefor.h> - #include <dp3/base/DP3.h> #include <dp3/steps/Step.h> @@ -79,7 +77,6 @@ class H5ParmPredict : public Step { std::vector<std::string> itsDirections; common::NSTimer itsTimer; - aocommon::RecursiveFor itsThreadPool; }; } // namespace steps diff --git a/steps/OnePredict.cc b/steps/OnePredict.cc index 15744b2ffa48acda5cc69d6436c7292b41dacab9..02f4378bf0837acacf219bc6134e8b08a5f01686 100644 --- a/steps/OnePredict.cc +++ b/steps/OnePredict.cc @@ -70,7 +70,7 @@ namespace steps { OnePredict::OnePredict(const common::ParameterSet& parset, const std::string& prefix, const std::vector<std::string>& source_patterns) - : recursive_for_(nullptr), measures_mutex_(nullptr) { + : measures_mutex_(nullptr) { if (!source_patterns.empty()) { init(parset, prefix, source_patterns); } else { @@ -386,14 +386,6 @@ bool OnePredict::process(std::unique_ptr<DPBuffer> buffer) { base::Direction(angles.getBaseValue()[0], angles.getBaseValue()[1]); } - std::optional<aocommon::RecursiveFor> local_recursive_for; - aocommon::RecursiveFor* recursive_for = recursive_for_; - if (recursive_for == nullptr) { - // If no ThreadPool was specified, we create a temporary one just - // for execution of this part. - local_recursive_for.emplace(); - recursive_for = &local_recursive_for.value(); - } std::vector<base::Simulator> simulators; simulators.reserve(n_threads); @@ -457,7 +449,7 @@ bool OnePredict::process(std::unique_ptr<DPBuffer> buffer) { } } - recursive_for->Run(0, n_threads, [&](size_t thread_index, size_t) { + aocommon::RecursiveFor::NestedRun(0, n_threads, [&](size_t thread_index) { const std::complex<double> zero(0.0, 0.0); predict_buffer_->GetModel(thread_index).fill(zero); if (apply_beam_) predict_buffer_->GetPatchModel(thread_index).fill(zero); @@ -518,7 +510,7 @@ bool OnePredict::process(std::unique_ptr<DPBuffer> buffer) { aocommon::Barrier barrier(n_threads); // We need to create local threads here because we need to // sync only those using the barrier - recursive_for->Run(0, n_threads, [&](size_t thread_index, size_t) { + aocommon::RecursiveFor::NestedRun(0, n_threads, [&](size_t thread_index) { const common::ScopedMicroSecondAccumulator<decltype(predict_time_)> scoped_time{predict_time_}; // OnePredict the source model and apply beam when an entire patch is @@ -577,7 +569,7 @@ bool OnePredict::process(std::unique_ptr<DPBuffer> buffer) { } }); } else { - recursive_for->Run( + aocommon::RecursiveFor::NestedRun( 0, source_list_.size(), [&](size_t source_index, size_t thread) { const common::ScopedMicroSecondAccumulator<decltype(predict_time_)> scoped_time{predict_time_}; @@ -604,7 +596,7 @@ bool OnePredict::process(std::unique_ptr<DPBuffer> buffer) { }); // Apply beam to the last patch if (apply_beam_) { - recursive_for->Run(0, n_threads, [&](size_t thread, size_t) { + aocommon::RecursiveFor::NestedRun(0, n_threads, [&](size_t thread) { const common::ScopedMicroSecondAccumulator<decltype(predict_time_)> scoped_time{predict_time_}; if (curPatches[thread] != nullptr) { @@ -660,10 +652,6 @@ bool OnePredict::process(std::unique_ptr<DPBuffer> buffer) { buffer->GetData() = std::move(input_data_); } - // Threads must be released before calling next process(), otherwise - // aocommon::*For structures in process() will block since the global thread - // pool is still busy. - local_recursive_for.reset(); timer_.stop(); getNextStep()->process(std::move(buffer)); return false; diff --git a/steps/OnePredict.h b/steps/OnePredict.h index 918404bb8f41772cb4b1c452d714a9417cb4916b..58196c9b48d9926e0b82507a194e784031940d33 100644 --- a/steps/OnePredict.h +++ b/steps/OnePredict.h @@ -23,10 +23,6 @@ #include "../base/PredictBuffer.h" #include "../base/SourceDBUtil.h" -namespace aocommon { -class RecursiveFor; -} // namespace aocommon - namespace dp3 { namespace common { class ParameterSet; @@ -91,15 +87,11 @@ class OnePredict : public ModelDataStep { void SetOperation(const std::string& type); /// When multiple OnePredict steps are running in parallel from multiple - /// threads, they require synchronisation. This is done with these two - /// synchronisation structures. When multiple Predicts steps run serially + /// threads, they require synchronisation. This is done with this mutex. + /// When multiple Predicts steps run serially /// (like currently in H5ParmPredict), this function should not be called, as /// otherwise they will synchronize needlessly. - /// - /// It is also possible to make the predict steps share the same RecursiveFor - /// without further synchronisation, by setting measures_mutex to nullptr. - void SetThreadData(aocommon::RecursiveFor& loop, std::mutex* measures_mutex) { - recursive_for_ = &loop; + void SetThreadData(std::mutex* measures_mutex) { measures_mutex_ = measures_mutex; } @@ -222,7 +214,6 @@ class OnePredict : public ModelDataStep { */ std::atomic<int64_t> apply_beam_time_{0}; - aocommon::RecursiveFor* recursive_for_; std::mutex* measures_mutex_; std::mutex mutex_; }; diff --git a/steps/Predict.cc b/steps/Predict.cc index c4d11e5638dfbbeb408adb87aced758e763ada2d..b62316e051240ebb62374de0d67b9b6b4a446d70 100644 --- a/steps/Predict.cc +++ b/steps/Predict.cc @@ -95,8 +95,8 @@ void Predict::SetOperation(const std::string& operation) { predict_step_->SetOperation(operation); } -void Predict::SetThreadData(aocommon::RecursiveFor& pool, std::mutex* mutex) { - predict_step_->SetThreadData(pool, mutex); +void Predict::SetThreadData(std::mutex* mutex) { + predict_step_->SetThreadData(mutex); } void Predict::SetPredictBuffer( diff --git a/steps/Predict.h b/steps/Predict.h index b50958bf6e7cb65c67a2eb94e34bf738283cf8a6..5d17a4d29bf3af2b798d606792ef36cfc7479244 100644 --- a/steps/Predict.h +++ b/steps/Predict.h @@ -8,10 +8,6 @@ #include <mutex> -namespace aocommon { -class RecursiveFor; -} - namespace dp3 { namespace base { class PredictBuffer; @@ -90,7 +86,7 @@ class Predict : public ModelDataStep { * Forwards thread synchronization structures to its predict sub-step. * @see OnePredict::SetThreadData(). */ - void SetThreadData(aocommon::RecursiveFor& pool, std::mutex* mutex); + void SetThreadData(std::mutex* mutex); void SetPredictBuffer(std::shared_ptr<base::PredictBuffer> predict_buffer); diff --git a/steps/test/unit/tBdaDdeCal.cc b/steps/test/unit/tBdaDdeCal.cc index 30110428700e6e6c6d1c5786751da39926d1d744..87ce4751ff984d6b4d9f70d6b38dd1ebeac987e8 100644 --- a/steps/test/unit/tBdaDdeCal.cc +++ b/steps/test/unit/tBdaDdeCal.cc @@ -3,7 +3,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include <boost/test/unit_test.hpp> -#include <aocommon/recursivefor.h> #include "tPredict.h" #include "tStepCommon.h"