From 1cbf78a34caec1017ccda50ccfe3291465fc2655 Mon Sep 17 00:00:00 2001
From: Ger van Diepen <diepen@astron.nl>
Date: Mon, 31 Jan 2011 10:50:26 +0000
Subject: [PATCH] bug 1638: First version of the PhaseShift step

---
 CEP/DP3/DPPP/include/DPPP/CMakeLists.txt |   2 +-
 CEP/DP3/DPPP/include/DPPP/DPInfo.h       |  14 ++
 CEP/DP3/DPPP/include/DPPP/PhaseShift.h   |  16 +-
 CEP/DP3/DPPP/src/CMakeLists.txt          |   2 +-
 CEP/DP3/DPPP/src/DPInfo.cc               |   3 +-
 CEP/DP3/DPPP/src/DPRun.cc                |   5 +
 CEP/DP3/DPPP/src/MSReader.cc             |   1 +
 CEP/DP3/DPPP/src/PhaseShift.cc           | 104 ++++++++----
 CEP/DP3/DPPP/test/CMakeLists.txt         |   1 +
 CEP/DP3/DPPP/test/tPhaseShift.cc         | 191 +++++++++++++++++++++++
 10 files changed, 298 insertions(+), 41 deletions(-)
 create mode 100644 CEP/DP3/DPPP/test/tPhaseShift.cc

diff --git a/CEP/DP3/DPPP/include/DPPP/CMakeLists.txt b/CEP/DP3/DPPP/include/DPPP/CMakeLists.txt
index 292c3b352e8..9ac59d5618a 100644
--- a/CEP/DP3/DPPP/include/DPPP/CMakeLists.txt
+++ b/CEP/DP3/DPPP/include/DPPP/CMakeLists.txt
@@ -6,7 +6,7 @@ set(inst_HEADERS
   ParSet.h DPLogger.h ProgressMeter.h FlagCounter.h
   UVWCalculator.h
   MSReader.h MSWriter.h MSUpdater.h Counter.h
-  Averager.h MedFlagger.h PreFlagger.h UVWFlagger.h
+  Averager.h MedFlagger.h PreFlagger.h UVWFlagger.h PhaseShift.h
   FlaggerStatistics.h
   MsInfo.h
   MsFile.h
diff --git a/CEP/DP3/DPPP/include/DPPP/DPInfo.h b/CEP/DP3/DPPP/include/DPPP/DPInfo.h
index 1c01228831a..06a12bf2d16 100644
--- a/CEP/DP3/DPPP/include/DPPP/DPInfo.h
+++ b/CEP/DP3/DPPP/include/DPPP/DPInfo.h
@@ -28,6 +28,7 @@
 // @brief General info about DPPP data processing attributes like averaging
 
 #include <Common/LofarTypes.h>
+#include <measures/Measures/MDirection.h>
 
 namespace LOFAR {
   namespace DPPP {
@@ -56,6 +57,11 @@ namespace LOFAR {
       // It returns the possibly reset nr of channels to average.
       uint update (uint chanAvg, uint timeAvg);
 
+      // Set the phase center.
+      // If original=true, it is set to the original phase center.
+      void setPhaseCenter (const casa::MDirection& phaseCenter, bool original)
+        { itsPhaseCenter=phaseCenter; itsPhaseCenterIsOriginal = original; }
+
       // Get the info.
       uint ncorr() const
         { return itsNCorr; }
@@ -76,6 +82,12 @@ namespace LOFAR {
       double timeInterval() const
         { return itsTimeInterval; }
 
+      // Get the phase center info.
+      const casa::MDirection& phaseCenter() const
+        { return itsPhaseCenter; }
+      bool phaseCenterIsOriginal() const
+        { return itsPhaseCenterIsOriginal; }
+
     private:
       uint   itsNCorr;
       uint   itsStartChan;
@@ -86,6 +98,8 @@ namespace LOFAR {
       uint   itsNTime;
       uint   itsTimeAvg;
       double itsTimeInterval;
+      casa::MDirection itsPhaseCenter;
+      bool             itsPhaseCenterIsOriginal;
     };
 
   } //# end namespace
diff --git a/CEP/DP3/DPPP/include/DPPP/PhaseShift.h b/CEP/DP3/DPPP/include/DPPP/PhaseShift.h
index b66049a7907..8220610a6da 100644
--- a/CEP/DP3/DPPP/include/DPPP/PhaseShift.h
+++ b/CEP/DP3/DPPP/include/DPPP/PhaseShift.h
@@ -77,14 +77,16 @@ namespace LOFAR {
       virtual void showTimings (std::ostream&, double duration) const;
 
     private:
+      // Interpret the phase center specification.
+      // Currently only J2000 RA and DEC can be given.
+      casa::MDirection PhaseShift::handleCenter();
+      
       //# Data members.
-      DPInput*         itsInput;
-      string           itsName;
-      DPBuffer         itsBuf;
-      string           itsRaStr;
-      string           itsDecStr;
-      casa::UVWMachine itsMachine;
-      NSTimer          itsTimer;
+      DPInput*          itsInput;
+      string            itsName;
+      vector<string>    itsCenter;
+      casa::UVWMachine* itsMachine;
+      NSTimer           itsTimer;
     };
 
   } //# end namespace
diff --git a/CEP/DP3/DPPP/src/CMakeLists.txt b/CEP/DP3/DPPP/src/CMakeLists.txt
index a31e27fc75f..21d62fc4f11 100644
--- a/CEP/DP3/DPPP/src/CMakeLists.txt
+++ b/CEP/DP3/DPPP/src/CMakeLists.txt
@@ -8,7 +8,7 @@ lofar_add_library(dppp
   ParSet.cc DPLogger.cc ProgressMeter.cc FlagCounter.cc
   UVWCalculator.cc
   MSReader.cc MSWriter.cc MSUpdater.cc Counter.cc
-  Averager.cc MedFlagger.cc AORFlagger.cc PreFlagger.cc UVWFlagger.cc
+  Averager.cc MedFlagger.cc AORFlagger.cc PreFlagger.cc UVWFlagger.cc PhaseShift.cc
   FlaggerStatistics.cc
   MsInfo.cc
   MsFile.cc
diff --git a/CEP/DP3/DPPP/src/DPInfo.cc b/CEP/DP3/DPPP/src/DPInfo.cc
index bfef8165fa5..812ce103b97 100644
--- a/CEP/DP3/DPPP/src/DPInfo.cc
+++ b/CEP/DP3/DPPP/src/DPInfo.cc
@@ -37,7 +37,8 @@ namespace LOFAR {
         itsNBl          (0),
         itsNTime        (0),
         itsTimeAvg      (1),
-        itsTimeInterval (0)
+        itsTimeInterval (0),
+        itsPhaseCenterIsOriginal (true)
     {}
 
     void DPInfo::init (uint ncorr, uint startChan, uint nchan,
diff --git a/CEP/DP3/DPPP/src/DPRun.cc b/CEP/DP3/DPPP/src/DPRun.cc
index 13113821877..07d37c5aa90 100644
--- a/CEP/DP3/DPPP/src/DPRun.cc
+++ b/CEP/DP3/DPPP/src/DPRun.cc
@@ -33,6 +33,7 @@
 #include <DPPP/AORFlagger.h>
 #include <DPPP/PreFlagger.h>
 #include <DPPP/UVWFlagger.h>
+#include <DPPP/PhaseShift.h>
 #include <DPPP/Counter.h>
 #include <DPPP/ParSet.h>
 #include <DPPP/ProgressMeter.h>
@@ -168,6 +169,8 @@ namespace LOFAR {
           step = DPStep::ShPtr(new UVWFlagger (reader, parset, prefix));
         } else if (type == "counter"  ||  type == "count") {
           step = DPStep::ShPtr(new Counter (reader, parset, prefix));
+        } else if (type == "phaseshifter"  ||  type == "phaseshift") {
+          step = DPStep::ShPtr(new PhaseShift (reader, parset, prefix));
         } else {
           THROW (LOFAR::Exception, "DPPP step type " << type << " is unknown");
         }
@@ -189,6 +192,8 @@ namespace LOFAR {
       if (outName.empty()) {
         ASSERTSTR (info.nchanAvg() == 1  &&  info.ntimeAvg() == 1,
                    "A new MS has to be given in msout if averaging is done");
+        ASSERTSTR (info.phaseCenterIsOriginal(),
+                   "A new MS has to be given in msout if a phase shift is done");
         step = DPStep::ShPtr(new MSUpdater (reader, parset, "msout."));
       } else {
         step = DPStep::ShPtr(new MSWriter (reader, outName, info,
diff --git a/CEP/DP3/DPPP/src/MSReader.cc b/CEP/DP3/DPPP/src/MSReader.cc
index 07379619b5a..83d5d9845cb 100644
--- a/CEP/DP3/DPPP/src/MSReader.cc
+++ b/CEP/DP3/DPPP/src/MSReader.cc
@@ -250,6 +250,7 @@ namespace LOFAR {
       info.init (itsNrCorr, itsStartChan, itsNrChan, itsNrBl,
                  int((itsLastTime - itsFirstTime)/itsInterval + 1.5),
                  itsInterval);
+      info.setPhaseCenter (itsPhaseCenter, true);
     }
 
     void MSReader::show (std::ostream& os) const
diff --git a/CEP/DP3/DPPP/src/PhaseShift.cc b/CEP/DP3/DPPP/src/PhaseShift.cc
index 5f94bd211ed..912bdd503c3 100644
--- a/CEP/DP3/DPPP/src/PhaseShift.cc
+++ b/CEP/DP3/DPPP/src/PhaseShift.cc
@@ -27,7 +27,11 @@
 #include <DPPP/DPInfo.h>
 #include <DPPP/ParSet.h>
 #include <Common/LofarLogger.h>
+#include <Common/StreamUtil.h>
 #include <casa/Arrays/ArrayMath.h>
+#include <casa/Arrays/VectorIter.h>
+#include <casa/Quanta/Quantum.h>
+#include <casa/Quanta/MVAngle.h>
 #include <iostream>
 #include <iomanip>
 
@@ -38,33 +42,34 @@ namespace LOFAR {
 
     PhaseShift::PhaseShift (DPInput* input,
                             const ParSet& parset, const string& prefix)
-      : itsInput  (input),
-        itsName   (prefix),
-        itsRaStr  (parset.getString(prefix+"ra", "")),
-        itsDecStr (parset.getString(prefix+"dec", ""))
+      : itsInput   (input),
+        itsName    (prefix),
+        itsCenter  (parset.getStringVector(prefix+"center")),
+        itsMachine (0)
     {}
 
     PhaseShift::~PhaseShift()
-    {}
+    {
+      delete itsMachine;
+    }
 
     void PhaseShift::updateInfo (DPInfo& info)
     {
       // Default phase center is the original one.
-      MDirection newDir(info.originalPhaseCenter());
-      if (itsRaStr.empty() && itsDecStr.empty()) {
-        itsRa  = MVTime::read (itsRaStr);
-        itsDec = MVTime::read (itsDecStr);
-        newDir = MDirection(itsRa, itsDec, MDirection::J2000);
+      MDirection newDir(itsInput->phaseCenter());
+      bool original = true;
+      if (! itsCenter.empty()) {
+        newDir = handleCenter();
+        original = false;
       }
-      new casa::UVWMachine(info.phaseCentre(), newDir, false, true);
-      info.setNewPhaseCenter (newDir);
+      itsMachine = new casa::UVWMachine(info.phaseCenter(), newDir, false, true);
+      info.setPhaseCenter (newDir, original);
     }
 
     void PhaseShift::show (std::ostream& os) const
     {
       os << "PhaseShift " << itsName << std::endl;
-      os << "  ra:             " << itsRaStr << std::endl;
-      os << "  dec:            " << itsDecStr << std::endl;
+      os << "  center:         " << itsCenter << std::endl;
     }
 
     void PhaseShift::showTimings (std::ostream& os, double duration) const
@@ -77,29 +82,31 @@ namespace LOFAR {
     bool PhaseShift::process (const DPBuffer& buf)
     {
       itsTimer.start();
-      RefRows rowNrs(buf.getRowNrs());
-      itsBuf.getUVW()  = itsInput->fetchUVW (buf, rowNrs);
-      itsBuf.getData().resize (buf.getData().shape());
-      const Complex* indata = buf.getData().data();
-      Complex* outdata = itsBuf.getData().data();
-      uint ncorr = buf.getData().shape()[0];
-      uint nchan = buf.getData().shape()[1];
-      uint ntime = buf.getData().shape()[2];
-      VectorIterator uvwIter(itsBuf.getUVW());
-      for (uint i=0; i<ntime; ++i) {
+      DPBuffer newBuf(buf);
+      RefRows rowNrs(newBuf.getRowNrs());
+      if (newBuf.getUVW().empty()) {
+        newBuf.getUVW().reference (itsInput->fetchUVW (newBuf, rowNrs));
+      }
+      newBuf.getData().unique();
+      Complex* data = newBuf.getData().data();
+      uint np  = newBuf.getData().shape()[0] * newBuf.getData().shape()[1];
+      uint nbl = newBuf.getData().shape()[2];
+      VectorIterator<double> uvwIter(newBuf.getUVW(), 0);
+      double phase;
+      //# If ever later a time dependent phase center is used, the machine must be reset
+      //# for each new time, thus each new call to process.
+      for (uint i=0; i<nbl; ++i) {
         // Convert the uvw coordinates and get the phase shift term.
-        itsMachine.convertUVW (phase, uvwIter.vector());
+        itsMachine->convertUVW (phase, uvwIter.vector());
         Complex phasor(cos(phase), sin(phase));
-        // Shift the phase of the data.
-        for (uint j=0; j<nchan; ++j) {
-          for (uint k=0; j<ncorr; ++k) {
-            *outdata = *indata * phasor;
-          }
+        // Shift the phase of the data of this baseline.
+        for (uint j=0; j<np; ++j) {
+          *data++ *= phasor;
         }
         uvwIter.next();
      }
      itsTimer.stop();
-     getNextStep()->process (buf);
+     getNextStep()->process (newBuf);
      return true;
     }
 
@@ -109,5 +116,40 @@ namespace LOFAR {
       getNextStep()->finish();
     }
 
+    MDirection PhaseShift::handleCenter()
+    {
+      // The phase center must be given in J2000 as two values (ra,dec).
+      // In the future time dependent frames can be done as in UVWFlagger.
+      ASSERTSTR (itsCenter.size() == 2,
+                 "2 values must be given in PhaseShift phasecenter");
+      ///ASSERTSTR (itsCenter.size() < 4,
+      ///"Up to 3 values can be given in UVWFlagger phasecenter");
+      MDirection phaseCenter;
+      if (itsCenter.size() == 1) {
+        string str = toUpper(itsCenter[0]);
+        MDirection::Types tp;
+        ASSERTSTR (MDirection::getType(tp, str),
+                   str << " is an invalid source type"
+                   " in UVWFlagger phasecenter");
+        return MDirection(tp);
+      }
+      Quantity q0, q1;
+      ASSERTSTR (MVAngle::read (q0, itsCenter[0]),
+                 itsCenter[0] << " is an invalid RA or longitude"
+                 " in UVWFlagger phasecenter");
+      ASSERTSTR (MVAngle::read (q1, itsCenter[1]),
+                 itsCenter[1] << " is an invalid DEC or latitude"
+                 " in UVWFlagger phasecenter");
+      MDirection::Types type = MDirection::J2000;
+      if (itsCenter.size() > 2) {
+        string str = toUpper(itsCenter[2]);
+        MDirection::Types tp;
+        ASSERTSTR (MDirection::getType(tp, str),
+                   str << " is an invalid direction type in UVWFlagger"
+                   " in UVWFlagger phasecenter");
+      }
+      return MDirection(q0, q1, type);
+    }
+
   } //# end namespace
 }
diff --git a/CEP/DP3/DPPP/test/CMakeLists.txt b/CEP/DP3/DPPP/test/CMakeLists.txt
index 1db6d5fa910..425b9dfc400 100644
--- a/CEP/DP3/DPPP/test/CMakeLists.txt
+++ b/CEP/DP3/DPPP/test/CMakeLists.txt
@@ -11,6 +11,7 @@ lofar_add_test(tMedFlagger tMedFlagger.cc)
 lofar_add_test(tPreFlagger tPreFlagger.cc)
 lofar_add_test(tPSet tPSet.cc)
 lofar_add_test(tUVWFlagger tUVWFlagger.cc)
+lofar_add_test(tPhaseShift tPhaseShift.cc)
 lofar_add_test(tNDPPP tNDPPP.cc)
 lofar_add_test(tparse tparse.cc)
 lofar_add_test(tParSet tParSet.cc)
diff --git a/CEP/DP3/DPPP/test/tPhaseShift.cc b/CEP/DP3/DPPP/test/tPhaseShift.cc
new file mode 100644
index 00000000000..d2cec31614c
--- /dev/null
+++ b/CEP/DP3/DPPP/test/tPhaseShift.cc
@@ -0,0 +1,191 @@
+//# tPhaseShift.cc: Test program for class PhaseShift
+//# Copyright (C) 2010
+//# ASTRON (Netherlands Institute for Radio Astronomy)
+//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
+//#
+//# This file is part of the LOFAR software suite.
+//# The LOFAR software suite is free software: you can redistribute it and/or
+//# modify it under the terms of the GNU General Public License as published
+//# by the Free Software Foundation, either version 3 of the License, or
+//# (at your option) any later version.
+//#
+//# The LOFAR software suite is distributed in the hope that it will be useful,
+//# but WITHOUT ANY WARRANTY; without even the implied warranty of
+//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+//# GNU General Public License for more details.
+//#
+//# You should have received a copy of the GNU General Public License along
+//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
+//#
+//# $Id$
+//#
+//# @author Ger van Diepen
+
+#include <lofar_config.h>
+#include <DPPP/PhaseShift.h>
+#include <DPPP/DPBuffer.h>
+#include <DPPP/DPInfo.h>
+#include <DPPP/ParSet.h>
+#include <Common/StringUtil.h>
+#include <casa/Arrays/ArrayMath.h>
+#include <casa/Arrays/ArrayLogical.h>
+#include <casa/Arrays/ArrayIO.h>
+#include <iostream>
+
+using namespace LOFAR;
+using namespace LOFAR::DPPP;
+using namespace casa;
+using namespace std;
+
+
+// Simple class to generate input arrays.
+// It can only set all flags to true or all false.
+// Weights are always 1.
+// It can be used with different nr of times, channels, etc.
+class TestInput: public DPInput
+{
+public:
+  TestInput(int ntime, int nbl, int nchan, int ncorr, bool flag)
+    : itsCount(0), itsNTime(ntime), itsNBl(nbl), itsNChan(nchan),
+      itsNCorr(ncorr), itsFlag(flag)
+  {}
+private:
+  virtual bool process (const DPBuffer&)
+  {
+    // Stop when all times are done.
+    if (itsCount == itsNTime) {
+      return false;
+    }
+    Cube<Complex> data(itsNCorr, itsNChan, itsNBl);
+    for (int i=0; i<int(data.size()); ++i) {
+      data.data()[i] = Complex(i+itsCount*10,i-1000+itsCount*6);
+    }
+    DPBuffer buf;
+    buf.setTime (itsCount*5 + 2);   //same interval as in updateAveragInfo
+    buf.setData (data);
+    Cube<float> weights(data.shape());
+    weights = 1.;
+    buf.setWeights (weights);
+    Cube<bool> flags(data.shape());
+    flags = itsFlag;
+    buf.setFlags (flags);
+    // The fullRes flags are a copy of the XX flags, but differently shaped.
+    // They are not averaged, thus only 1 time per row.
+    Cube<bool> fullResFlags(itsNChan, 1, itsNBl);
+    fullResFlags = itsFlag;
+    buf.setFullResFlags (fullResFlags);
+    Matrix<double> uvw(3,itsNBl);
+    indgen (uvw, double(itsCount*100));
+    buf.setUVW (uvw);
+    getNextStep()->process (buf);
+    ++itsCount;
+    return true;
+  }
+
+  virtual void finish() {getNextStep()->finish();}
+  virtual void show (std::ostream&) const {}
+  virtual void updateInfo (DPInfo& info)
+    // Use startchan=8 and timeInterval=5
+  {
+    info.init (itsNCorr, 8, itsNChan, itsNBl, itsNTime, 5);
+    MDirection dir(Quantity(45,"deg"), Quantity(30,"deg"), MDirection::J2000);
+    info.setPhaseCenter (dir, true);
+  }
+
+  int itsCount, itsNTime, itsNBl, itsNChan, itsNCorr;
+  bool itsFlag;
+};
+
+// Class to check result of averaging TestInput.
+class TestOutput: public DPStep
+{
+public:
+  TestOutput(int ntime, int nbl, int nchan, int ncorr, bool flag)
+    : itsCount(0), itsNTime(ntime), itsNBl(nbl), itsNChan(nchan),
+      itsNCorr(ncorr), itsFlag(flag)
+  {}
+private:
+  virtual bool process (const DPBuffer& buf)
+  {
+    // Stop when all times are done.
+    if (itsCount == itsNTime) {
+      return false;
+    }
+    Cube<Complex> result(itsNCorr, itsNChan, itsNBl);
+    for (int i=0; i<int(result.size()); ++i) {
+      result.data()[i] = Complex(i+itsCount*10,i-1000+itsCount*6);
+    }
+    Matrix<double> uvw(3,itsNBl);
+    indgen (uvw, double(itsCount*100));
+    // Check the result.
+    ASSERT (allNear(real(buf.getData()), real(result), 1e-5));
+    ///cout << imag(buf.getData()) << endl<<imag(result);
+    ASSERT (allNear(imag(buf.getData()), imag(result), 1e-5));
+    ASSERT (allEQ(buf.getFlags(), itsFlag));
+    ASSERT (near(buf.getTime(), 2.+5*itsCount));
+    ASSERT (allNear(buf.getUVW(), uvw, 1e-5));
+    ++itsCount;
+    return true;
+  }
+
+  virtual void finish() {}
+  virtual void show (std::ostream&) const {}
+  virtual void updateInfo (DPInfo& info)
+  {
+    MVDirection dir = info.phaseCenter().getValue();
+    ASSERT (near(dir.getLong("deg").getValue(), 45.));
+    ASSERT (near(dir.getLat("deg").getValue(), 30.));
+  }
+
+  int itsCount;
+  int itsNTime, itsNBl, itsNChan, itsNCorr;
+  bool itsFlag;
+};
+
+
+// Execute steps.
+void execute (const DPStep::ShPtr& step1)
+{
+  // Set DPInfo.
+  DPInfo info;
+  DPStep::ShPtr step = step1;
+  while (step) {
+    step->updateInfo (info);
+    step = step->getNextStep();
+  }
+  // Execute the steps.
+  DPBuffer buf;
+  while (step1->process(buf));
+  step1->finish();
+}
+
+// Test simple averaging without flagged points.
+void test1(int ntime, int nbl, int nchan, int ncorr, bool flag)
+{
+  cout << "test1: ntime=" << ntime << " nrbl=" << nbl << " nchan=" << nchan
+       << " ncorr=" << ncorr << " flag=" << flag << endl;
+  // Create the steps.
+  TestInput* in = new TestInput(ntime, nbl, nchan, ncorr, flag);
+  DPStep::ShPtr step1(in);
+  ParameterSet parset;
+  // Keep phase center the same to be able to check if data are correct.
+  parset.add ("center", "[45deg, 30deg]");
+  DPStep::ShPtr step2(new PhaseShift(in, parset, ""));
+  DPStep::ShPtr step3(new TestOutput(ntime, nbl, nchan, ncorr, flag));
+  step1->setNextStep (step2);
+  step2->setNextStep (step3);
+  execute (step1);
+}
+
+
+int main()
+{
+  try {
+    test1(10, 3, 32, 4, false);
+    test1(10, 3, 30, 1, true);
+  } catch (std::exception& x) {
+    cout << "Unexpected exception: " << x.what() << endl;
+    return 1;
+  }
+  return 0;
+}
-- 
GitLab