From c3b0c74c0216d9d1dfaea6dbf4252baf6b4b8256 Mon Sep 17 00:00:00 2001
From: Tammo Jan Dijkema <dijkema@astron.nl>
Date: Mon, 3 Sep 2018 20:13:20 +0000
Subject: [PATCH] Task #11562: add split step to DPPP

---
 .gitattributes                           |   2 +
 CEP/DP3/DPPP/include/DPPP/CMakeLists.txt |   2 +-
 CEP/DP3/DPPP/include/DPPP/DPInput.h      |   3 +
 CEP/DP3/DPPP/include/DPPP/DPRun.h        |   8 +-
 CEP/DP3/DPPP/include/DPPP/MSWriter.h     |   2 +-
 CEP/DP3/DPPP/include/DPPP/Split.h        |  81 +++++++++++++
 CEP/DP3/DPPP/src/CMakeLists.txt          |   2 +-
 CEP/DP3/DPPP/src/DPRun.cc                | 126 ++++++++++----------
 CEP/DP3/DPPP/src/Split.cc                | 140 +++++++++++++++++++++++
 9 files changed, 302 insertions(+), 64 deletions(-)
 create mode 100644 CEP/DP3/DPPP/include/DPPP/Split.h
 create mode 100644 CEP/DP3/DPPP/src/Split.cc

diff --git a/.gitattributes b/.gitattributes
index b6d8c0c3b19..3a0e74b5f61 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -330,6 +330,7 @@ CEP/DP3/DPPP/include/DPPP/ScaleData.h -text
 CEP/DP3/DPPP/include/DPPP/Simulate.h -text
 CEP/DP3/DPPP/include/DPPP/Simulator.h -text
 CEP/DP3/DPPP/include/DPPP/SourceDBUtil.h -text
+CEP/DP3/DPPP/include/DPPP/Split.h -text
 CEP/DP3/DPPP/include/DPPP/StManParsetKeys.h -text
 CEP/DP3/DPPP/include/DPPP/StefCal.h -text
 CEP/DP3/DPPP/include/DPPP/Stokes.h -text
@@ -367,6 +368,7 @@ CEP/DP3/DPPP/src/Simulate.cc -text
 CEP/DP3/DPPP/src/Simulator.cc -text
 CEP/DP3/DPPP/src/SolTab.cc -text
 CEP/DP3/DPPP/src/SourceDBUtil.cc -text
+CEP/DP3/DPPP/src/Split.cc -text
 CEP/DP3/DPPP/src/StefCal.cc -text
 CEP/DP3/DPPP/src/Stokes.cc -text
 CEP/DP3/DPPP/src/SubtractMixed.cc -text
diff --git a/CEP/DP3/DPPP/include/DPPP/CMakeLists.txt b/CEP/DP3/DPPP/include/DPPP/CMakeLists.txt
index d61fff2899d..d760372fb01 100644
--- a/CEP/DP3/DPPP/include/DPPP/CMakeLists.txt
+++ b/CEP/DP3/DPPP/include/DPPP/CMakeLists.txt
@@ -18,7 +18,7 @@ set(inst_HEADERS
   Predict.h OneApplyCal.h
   GainCal.h StefCal.h PhaseFitter.h
   StManParsetKeys.h H5Parm.h DummyStep.h H5ParmPredict.h GridInterpolate.h
-  Upsample.h
+  Upsample.h Split.h
 )
 
 # Create symbolic link to include directory.
diff --git a/CEP/DP3/DPPP/include/DPPP/DPInput.h b/CEP/DP3/DPPP/include/DPPP/DPInput.h
index 2a439714ad3..4f1370ca9d6 100644
--- a/CEP/DP3/DPPP/include/DPPP/DPInput.h
+++ b/CEP/DP3/DPPP/include/DPPP/DPInput.h
@@ -63,6 +63,9 @@ namespace LOFAR {
     class DPInput: public DPStep
     {
     public:
+      // Define the shared pointer for this type.
+      typedef shared_ptr<DPInput> ShPtr;
+
       virtual ~DPInput();
 
       // Read the UVW at the given row numbers into the buffer.
diff --git a/CEP/DP3/DPPP/include/DPPP/DPRun.h b/CEP/DP3/DPPP/include/DPPP/DPRun.h
index 06121de8e4b..53a85043bbd 100644
--- a/CEP/DP3/DPPP/include/DPPP/DPRun.h
+++ b/CEP/DP3/DPPP/include/DPPP/DPRun.h
@@ -64,10 +64,12 @@ namespace LOFAR {
       static void execute (const std::string& parsetName,
                            int argc=0, char* argv[] = 0);
 
-    private:
       // Create the step objects.
-      static DPStep::ShPtr makeSteps (const ParameterSet& parset);
+      static DPStep::ShPtr makeSteps (const ParameterSet& parset,
+                                      const std::string& prefix,
+                                      DPInput* reader);
 
+    private:
       // Create an output step, either an MSWriter or an MSUpdater
       // If no data are modified (for example if only count was done),
       // still an MSUpdater is created, but it will not write anything.
@@ -79,7 +81,7 @@ namespace LOFAR {
       // If there is a writer, the reader needs to read the visibility data.
       // reader should be the original reader
       static DPStep::ShPtr makeOutputStep(MSReader* reader,
-          const ParameterSet& parset, const string& prefix, bool multipleInputs,
+          const ParameterSet& parset, const string& prefix,
           casacore::String& currentMSName);
 
       // The map to create a step object from its type name.
diff --git a/CEP/DP3/DPPP/include/DPPP/MSWriter.h b/CEP/DP3/DPPP/include/DPPP/MSWriter.h
index 49004f9d779..08500b34fe7 100644
--- a/CEP/DP3/DPPP/include/DPPP/MSWriter.h
+++ b/CEP/DP3/DPPP/include/DPPP/MSWriter.h
@@ -170,7 +170,7 @@ namespace LOFAR {
       string          itsOutName;
       DPBuffer        itsBuffer;
       casacore::Table     itsMS;
-      const ParameterSet&   itsParset; //# parset for writing history
+      ParameterSet    itsParset; //# parset for writing history
       casacore::String    itsDataColName;
       casacore::String    itsWeightColName;
       double          itsInterval;
diff --git a/CEP/DP3/DPPP/include/DPPP/Split.h b/CEP/DP3/DPPP/include/DPPP/Split.h
new file mode 100644
index 00000000000..f35b956e1ff
--- /dev/null
+++ b/CEP/DP3/DPPP/include/DPPP/Split.h
@@ -0,0 +1,81 @@
+//# Split.h: DPPP step class to Split visibilities from a source model
+//# Copyright (C) 2013
+//# ASTRON (Netherlands Institute for Radio Astronomy)
+//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
+//#
+//# This file is part of the LOFAR software suite.
+//# The LOFAR software suite is free software: you can redistribute it and/or
+//# modify it under the terms of the GNU General Public License as published
+//# by the Free Software Foundation, either version 3 of the License, or
+//# (at your option) any later version.
+//#
+//# The LOFAR software suite is distributed in the hope that it will be useful,
+//# but WITHOUT ANY WARRANTY; without even the implied warranty of
+//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+//# GNU General Public License for more details.
+//#
+//# You should have received a copy of the GNU General Public License along
+//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
+//#
+//# $Id:
+//#
+//# @author Tammo Jan Dijkema
+
+#ifndef DPPP_Split_H
+#define DPPP_Split_H
+
+// @file
+// @brief DPPP step class to Split visibilities from a source model
+
+#include <DPPP/DPInput.h>
+#include <DPPP/DPBuffer.h>
+
+#include <utility>
+
+namespace LOFAR {
+
+  class ParameterSet;
+
+  namespace DPPP {
+    // @ingroup NDPPP
+
+    // This class is an empty DPStep subclass to use as implementation template
+
+    class Split: public DPStep
+    {
+    public:
+      // Construct the object.
+      // Parameters are obtained from the parset using the given prefix.
+      Split (DPInput*, const ParameterSet&, const string& prefix);
+
+      virtual ~Split();
+
+      // Process the data.
+      // It keeps the data.
+      // When processed, it invokes the process function of the next step.
+      virtual bool process (const DPBuffer&);
+
+      // Finish the processing of this step and subsequent steps.
+      virtual void finish();
+
+      // Update the general info.
+      virtual void updateInfo (const DPInfo&);
+
+      // Show the step parameters.
+      virtual void show (std::ostream&) const;
+
+      // Show the timings.
+      virtual void showTimings (std::ostream&, double duration) const;
+
+    private:
+      //# Data members.
+      string           itsName;
+
+      std::vector<std::string>   itsReplaceParms; // The names of the parameters that differ along the substeps
+      std::vector<DPStep::ShPtr> itsSubsteps;
+    };
+
+  } //# end namespace
+}
+
+#endif
diff --git a/CEP/DP3/DPPP/src/CMakeLists.txt b/CEP/DP3/DPPP/src/CMakeLists.txt
index 029322f29ce..cb4eb74985e 100644
--- a/CEP/DP3/DPPP/src/CMakeLists.txt
+++ b/CEP/DP3/DPPP/src/CMakeLists.txt
@@ -21,7 +21,7 @@ lofar_add_library(dppp
   Predict.cc OneApplyCal.cc
   ApplyBeam.cc
   PhaseFitter.cc H5Parm.cc SolTab.cc DummyStep.cc H5ParmPredict.cc GridInterpolate.cc
-  Upsample.cc
+  Upsample.cc Split.cc
 )
 
 lofar_add_bin_program(NDPPP NDPPP.cc)
diff --git a/CEP/DP3/DPPP/src/DPRun.cc b/CEP/DP3/DPPP/src/DPRun.cc
index f25bb67283b..d22467b0ffb 100644
--- a/CEP/DP3/DPPP/src/DPRun.cc
+++ b/CEP/DP3/DPPP/src/DPRun.cc
@@ -43,6 +43,7 @@
 #include <DPPP/Predict.h>
 #include <DPPP/H5ParmPredict.h>
 #include <DPPP/GainCal.h>
+#include <DPPP/Split.h>
 #include <DPPP/Upsample.h>
 #include <DPPP/Filter.h>
 #include <DPPP/Counter.h>
@@ -129,8 +130,12 @@ namespace LOFAR {
       uint numThreads = parset.getInt("numthreads", OpenMP::maxThreads());
       OpenMP::setNumThreads(numThreads);
 
-      // Create the steps and fill their DPInfo objects.
-      DPStep::ShPtr firstStep = makeSteps (parset);
+      // Create the steps, link them toggether
+      DPStep::ShPtr firstStep = makeSteps (parset, "", 0);
+
+      // Let all steps fill their DPInfo object using the info from the previous step.
+      DPInfo lastInfo = firstStep->setInfo (DPInfo());
+
       // Show the steps.
       DPStep::ShPtr step = firstStep;
       DPStep::ShPtr lastStep;
@@ -226,58 +231,62 @@ namespace LOFAR {
       // The destructors are called automatically at this point.
     }
 
-    DPStep::ShPtr DPRun::makeSteps (const ParameterSet& parset)
+    DPStep::ShPtr DPRun::makeSteps (const ParameterSet& parset,
+                                    const string& prefix,
+                                    DPInput* reader)
     {
       DPStep::ShPtr firstStep;
       DPStep::ShPtr lastStep;
-      // Get input and output MS name.
-      // Those parameters were always called msin and msout.
-      // However, SAS/MAC cannot handle a parameter and a group with the same
-      // name, hence one can also use msin.name and msout.name.
-      vector<string> inNames = parset.getStringVector ("msin.name",
-                                                       vector<string>());
-      if (inNames.empty()) {
-        inNames = parset.getStringVector ("msin");
-      }
-      ASSERTSTR (inNames.size() > 0, "No input MeasurementSets given");
-      // Find all file names matching a possibly wildcarded input name.
-      // This is only possible if a single name is given.
-      if (inNames.size() == 1) {
-        if (inNames[0].find_first_of ("*?{['") != string::npos) {
-          vector<string> names;
-          names.reserve (80);
-          casacore::Path path(inNames[0]);
-          casacore::String dirName(path.dirName());
-          casacore::Directory dir(dirName);
-          // Use the basename as the file name pattern.
-          casacore::DirectoryIterator dirIter (dir,
-                                           casacore::Regex::fromPattern(path.baseName()));
-          while (!dirIter.pastEnd()) {
-            names.push_back (dirName + '/' + dirIter.name());
-            dirIter++;
+      if (!reader) {
+        // Get input and output MS name.
+        // Those parameters were always called msin and msout.
+        // However, SAS/MAC cannot handle a parameter and a group with the same
+        // name, hence one can also use msin.name and msout.name.
+        vector<string> inNames = parset.getStringVector ("msin.name",
+                                                         vector<string>());
+        if (inNames.empty()) {
+          inNames = parset.getStringVector ("msin");
+        }
+        ASSERTSTR (inNames.size() > 0, "No input MeasurementSets given");
+        // Find all file names matching a possibly wildcarded input name.
+        // This is only possible if a single name is given.
+        if (inNames.size() == 1) {
+          if (inNames[0].find_first_of ("*?{['") != string::npos) {
+            vector<string> names;
+            names.reserve (80);
+            casacore::Path path(inNames[0]);
+            casacore::String dirName(path.dirName());
+            casacore::Directory dir(dirName);
+            // Use the basename as the file name pattern.
+            casacore::DirectoryIterator dirIter (dir,
+                                             casacore::Regex::fromPattern(path.baseName()));
+            while (!dirIter.pastEnd()) {
+              names.push_back (dirName + '/' + dirIter.name());
+              dirIter++;
+            }
+            ASSERTSTR (!names.empty(), "No datasets found matching msin "
+                       << inNames[0]);
+            inNames = names;
           }
-          ASSERTSTR (!names.empty(), "No datasets found matching msin "
-                     << inNames[0]);
-          inNames = names;
         }
-      }
 
-      // Get the steps.
-      vector<string> steps = parset.getStringVector ("steps");
-      // Currently the input MS must be given.
-      // In the future it might be possible to have a simulation step instead.
-      // Create MSReader step if input ms given.
-      MSReader* reader = 0;
-      if (inNames.size() == 1) {
-        reader = new MSReader (inNames[0], parset, "msin.");
-      } else {
-        reader = new MultiMSReader (inNames, parset, "msin.");
+        // Get the steps.
+        // Currently the input MS must be given.
+        // In the future it might be possible to have a simulation step instead.
+        // Create MSReader step if input ms given.
+        if (inNames.size() == 1) {
+          reader = new MSReader (inNames[0], parset, "msin.");
+        } else {
+          reader = new MultiMSReader (inNames, parset, "msin.");
+        }
+        firstStep = DPStep::ShPtr (reader);
       }
+
       casacore::Path pathIn (reader->msName());
       casacore::String currentMSName (pathIn.absoluteName());
 
       // Create the other steps.
-      firstStep = DPStep::ShPtr (reader);
+      vector<string> steps = parset.getStringVector (prefix + "steps");
       lastStep = firstStep;
       DPStep::ShPtr step;
       for (vector<string>::const_iterator iter = steps.begin();
@@ -329,27 +338,30 @@ namespace LOFAR {
           step = DPStep::ShPtr(new GainCal (reader, parset, prefix));
         } else if (type == "upsample") {
           step = DPStep::ShPtr(new Upsample (reader, parset, prefix));
-        } else if (type == "out" || type=="output") {
-          step = makeOutputStep(reader, parset, prefix,
-                                inNames.size()>1, currentMSName);
+        } else if (type == "split" || type == "explode") {
+          step = DPStep::ShPtr(new Split (reader, parset, prefix));
+        } else if (type == "out" || type=="output" || type=="msout") {
+          step = makeOutputStep(dynamic_cast<MSReader*>(reader), parset, prefix, currentMSName);
         } else {
           // Maybe the step is defined in a dynamic library.
           step = findStepCtor(type) (reader, parset, prefix);
         }
-        lastStep->setNextStep (step);
+        if (lastStep) {
+          lastStep->setNextStep (step);
+        }
         lastStep = step;
         // Define as first step if not defined yet.
         if (!firstStep) {
           firstStep = step;
         }
       }
-      step = makeOutputStep(reader, parset, "msout.",
-                            inNames.size()>1, currentMSName);
-      lastStep->setNextStep (step);
-      lastStep = step;
-
-      // Let all steps fill their info using the info from the previous step.
-      DPInfo lastInfo = firstStep->setInfo (DPInfo());
+      // Add an output step if not explicitly added in steps (unless last step is a 'split' step)
+      if (steps[steps.size()-1] != "out" && steps[steps.size()-1] != "output" &&
+          steps[steps.size()-1] != "msout" && steps[steps.size()-1] != "split") {
+        step = makeOutputStep(dynamic_cast<MSReader*>(reader), parset, "msout.", currentMSName);
+        lastStep->setNextStep (step);
+        lastStep = step;
+      }
 
       // Add a null step, so the last step can use getNextStep->process().
       DPStep::ShPtr nullStep(new NullStep());
@@ -364,7 +376,6 @@ namespace LOFAR {
     DPStep::ShPtr DPRun::makeOutputStep (MSReader* reader,
                                          const ParameterSet& parset,
                                          const string& prefix,
-                                         bool multipleInputs,
                                          casacore::String& currentMSName)
     {
       DPStep::ShPtr step;
@@ -396,9 +407,8 @@ namespace LOFAR {
         // Create MSUpdater.
         // Take care the history is not written twice.
         // Note that if there is nothing to write, the updater won't do anything.
-        ASSERTSTR (! multipleInputs,
-                   "No update can be done if multiple input MSs are used");
-        step = DPStep::ShPtr(new MSUpdater(reader, outName, parset, prefix,
+        step = DPStep::ShPtr(new MSUpdater(dynamic_cast<MSReader*>(reader),
+                                           outName, parset, prefix,
                                            outName!=currentMSName));
       } else {
         step = DPStep::ShPtr(new MSWriter (reader, outName, parset, prefix));
diff --git a/CEP/DP3/DPPP/src/Split.cc b/CEP/DP3/DPPP/src/Split.cc
new file mode 100644
index 00000000000..4f114295394
--- /dev/null
+++ b/CEP/DP3/DPPP/src/Split.cc
@@ -0,0 +1,140 @@
+//# Split.cc: DPPP step class to Split visibilities
+//# Copyright (C) 2018
+//# ASTRON (Netherlands Institute for Radio Astronomy)
+//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
+//#
+//# This file is part of the LOFAR software suite.
+//# The LOFAR software suite is free software: you can redistribute it and/or
+//# modify it under the terms of the GNU General Public License as published
+//# by the Free Software Foundation, either version 3 of the License, or
+//# (at your option) any later version.
+//#
+//# The LOFAR software suite is distributed in the hope that it will be useful,
+//# but WITHOUT ANY WARRANTY; without even the implied warranty of
+//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+//# GNU General Public License for more details.
+//#
+//# You should have received a copy of the GNU General Public License along
+//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
+//#
+//# $Id: GainCal.cc 21598 2012-07-16 08:07:34Z diepen $
+//#
+//# @author Tammo Jan Dijkema
+
+#include <lofar_config.h>
+#include <DPPP/Split.h>
+#include <DPPP/DPRun.h>
+
+#include <iostream>
+#include <Common/ParameterSet.h>
+#include <Common/Timer.h>
+
+#include <stddef.h>
+#include <string>
+#include <sstream>
+#include <utility>
+#include <vector>
+
+using namespace casacore;
+
+namespace LOFAR {
+  namespace DPPP {
+
+    Split::Split (DPInput* input,
+                  const ParameterSet& parset,
+                  const string& prefix)
+    {
+      itsReplaceParms = parset.getStringVector(prefix + "replaceparms");
+      vector<vector<string> > replaceParmValues; // For each of the parameters, the values for each substep
+      replaceParmValues.resize(itsReplaceParms.size());
+
+      vector<vector<string> >::iterator replaceParmValueIt = replaceParmValues.begin();
+      vector<string>::iterator replaceParmIt;
+      uint numSteps = 0;
+      for (replaceParmIt = itsReplaceParms.begin();
+           replaceParmIt != itsReplaceParms.end(); ++replaceParmIt) {
+        vector<string> parmValues = parset.getStringVector(*replaceParmIt);
+        *(replaceParmValueIt++) = parmValues;
+        if (numSteps > 0) {
+          ASSERTSTR(parmValues.size() == numSteps, "Each parameter in replaceparms should have the same number of items (expected "<<
+                    numSteps <<", got "<<parmValues.size() <<" for step "<<(*replaceParmIt));
+        } else {
+          numSteps = parmValues.size();
+        }
+      }
+
+      // Make a shallow copy to work around constness of parset
+      ParameterSet parsetCopy(parset);
+
+      // Create the substeps
+      uint numParameters = itsReplaceParms.size();
+      for (uint i = 0; i<numSteps; ++i) {
+        for (uint j = 0; j<numParameters; ++j) {
+          parsetCopy.replace(itsReplaceParms[j], replaceParmValues[j][i]);
+        }
+        DPStep::ShPtr firstStep = DPRun::makeSteps (parsetCopy, prefix, input);
+        firstStep->setPrevStep(this);
+        itsSubsteps.push_back(firstStep);
+      }
+      ASSERT(itsSubsteps.size()>0);
+    }
+
+    Split::~Split()
+    {}
+
+    void Split::updateInfo (const DPInfo& infoIn)
+    {
+      info() = infoIn;
+
+      vector<DPStep::ShPtr>::iterator it;
+      for (it=itsSubsteps.begin(); it!=itsSubsteps.end(); ++it) {
+        (*it)->setInfo(infoIn);
+      }
+    }
+
+    void Split::show (std::ostream& os) const
+    {
+      os << "Split " << itsName << endl;
+      os << "  replace parameters:" << itsReplaceParms << endl;
+      // Show the steps.
+      for (uint i=0; i<itsSubsteps.size(); ++i) {
+        os << "Split substep "<<(i+1)<<" of "<<itsSubsteps.size()<<endl;
+        DPStep::ShPtr step = itsSubsteps[0];
+        DPStep::ShPtr lastStep;
+        while (step) {
+          step->show (os);
+          lastStep = step;
+          step = step->getNextStep();
+        }
+      }
+    }
+
+    void Split::showTimings (std::ostream& os, double duration) const
+    {
+      for (uint i=0; i<itsSubsteps.size(); ++i) {
+        DPStep::ShPtr step = itsSubsteps[i];
+        while (step) {
+          step->showTimings(os, duration);
+          step = step->getNextStep();
+        }
+      }
+    }
+
+    bool Split::process (const DPBuffer& bufin)
+    {
+      for (uint i=0; i<itsSubsteps.size(); ++i) {
+        itsSubsteps[i]->process(bufin);
+      }
+      return false;
+    }
+
+
+    void Split::finish()
+    {
+      // Let the next steps finish.
+      for (uint i=0; i<itsSubsteps.size(); ++i) {
+        itsSubsteps[i]->finish();
+      }
+    }
+  } //# end namespace
+}
-- 
GitLab