From f1f84c876a19311796c20f071da8239cb66478f7 Mon Sep 17 00:00:00 2001
From: Jan David Mol <mol@astron.nl>
Date: Sat, 30 Mar 2013 06:16:07 +0000
Subject: [PATCH] Task #4315: Added delay compensation class

---
 RTCP/Cobalt/InputProc/src/CMakeLists.txt   |   1 +
 RTCP/Cobalt/InputProc/src/Delays/Delays.cc | 305 +++++++++++++++++++++
 RTCP/Cobalt/InputProc/src/Delays/Delays.h  | 179 ++++++++++++
 3 files changed, 485 insertions(+)
 create mode 100644 RTCP/Cobalt/InputProc/src/Delays/Delays.cc
 create mode 100644 RTCP/Cobalt/InputProc/src/Delays/Delays.h

diff --git a/RTCP/Cobalt/InputProc/src/CMakeLists.txt b/RTCP/Cobalt/InputProc/src/CMakeLists.txt
index 993479fcead..761258cbe48 100644
--- a/RTCP/Cobalt/InputProc/src/CMakeLists.txt
+++ b/RTCP/Cobalt/InputProc/src/CMakeLists.txt
@@ -12,6 +12,7 @@ lofar_add_library(inputproc
   Buffer/Ranges.cc
   Buffer/SharedMemory.cc
   Buffer/StationID.cc
+  Delays/Delays.cc
   Station/Generator.cc
   Station/PacketFactory.cc
   Station/PacketReader.cc
diff --git a/RTCP/Cobalt/InputProc/src/Delays/Delays.cc b/RTCP/Cobalt/InputProc/src/Delays/Delays.cc
new file mode 100644
index 00000000000..e64f81901c9
--- /dev/null
+++ b/RTCP/Cobalt/InputProc/src/Delays/Delays.cc
@@ -0,0 +1,305 @@
+//# Delays.cc: Workholder for the delay compensation.
+//# Copyright (C) 2012-2013  ASTRON (Netherlands Institute for Radio Astronomy)
+//# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
+//#
+//# This file is part of the LOFAR software suite.
+//# The LOFAR software suite is free software: you can redistribute it and/or
+//# modify it under the terms of the GNU General Public License as published
+//# by the Free Software Foundation, either version 3 of the License, or
+//# (at your option) any later version.
+//#
+//# The LOFAR software suite is distributed in the hope that it will be useful,
+//# but WITHOUT ANY WARRANTY; without even the implied warranty of
+//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+//# GNU General Public License for more details.
+//#
+//# You should have received a copy of the GNU General Public License along
+//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
+//#
+//# $Id$
+
+//# Always #include <lofar_config.h> first!
+#include <lofar_config.h>
+
+#include "Delays.h"
+
+#include <Common/LofarLogger.h>
+#include <Common/PrettyUnits.h>
+#include <Common/Thread/Mutex.h>
+#include <Common/Thread/Cancellation.h>
+#include <CoInterface/Exceptions.h>
+#include <CoInterface/BeamCoordinates.h>
+
+#include <measures/Measures/MEpoch.h>
+#include <measures/Measures/MCDirection.h>
+#include <casa/Exceptions/Error.h>
+
+
+namespace LOFAR
+{
+  namespace Cobalt
+  {
+
+    using namespace casa;
+
+    static LOFAR::Mutex casacoreMutex; // casacore is not thread safe
+
+    //##----------------  Public methods  ----------------##//
+
+    Delays::Delays(const Parset &parset, const std::string &stationName, const TimeStamp &startTime)
+      :
+      itsParset(parset),
+      stop(false),
+      // we need an extra entry for the central beam
+      itsBuffer(bufferSize, parset.nrBeams(), parset.maxNrTABs() + 1),
+      head(0),
+      tail(0),
+      bufferFree(bufferSize),
+      bufferUsed(0),
+      itsNrCalcDelays(parset.nrCalcDelays()),
+      itsNrBeams(parset.nrBeams()),
+      itsMaxNrTABs(parset.maxNrTABs()),
+      itsNrTABs(parset.nrTABs()),
+      itsStartTime(startTime),
+      itsNrSamplesPerSec(parset.nrSamplesPerSubband()),
+      itsSampleDuration(parset.sampleDuration()),
+      itsStationName(stationName),
+      itsDelayTimer("delay producer", true, true)
+    {
+    }
+
+
+    void Delays::start()
+    {
+      itsThread = new Thread(this, &Delays::mainLoop, "[DelayCompensation] ");
+    }
+
+
+    Delays::~Delays()
+    {
+      ScopedDelayCancellation dc; // Semaphores provide cancellation points
+
+      // trigger mainLoop and force it to stop
+      stop = true;
+      bufferFree.up(itsNrCalcDelays);
+    }
+
+
+    // convert a time in samples to a (day,fraction) pair in UTC in a CasaCore format
+    MVEpoch Delays::toUTC(int64 timeInSamples)
+    {
+      double utc_sec = (timeInSamples * itsSampleDuration) / MVEpoch::secInDay;
+      double day = floor(utc_sec);
+      double frac = utc_sec - day;
+
+      // (40587 modify Julian day number = 00:00:00 January 1, 1970, GMT)
+      return MVEpoch(day + 40587., frac);
+    }
+
+
+    void Delays::init()
+    {
+      ScopedLock lock(casacoreMutex);
+      ScopedDelayCancellation dc;
+
+      setBeamDirections(itsParset);
+      setPositionDiff(itsParset);
+
+      // We need bufferSize to be a multiple of batchSize to avoid wraparounds in
+      // the middle of the batch calculations. This makes life a lot easier and there is no
+      // need to support other cases.
+
+      if (bufferSize % itsNrCalcDelays > 0)
+        THROW(GPUProcException, "nrCalcDelays (" << itsNrCalcDelays << ") must divide bufferSize (" << bufferSize << ")");
+
+      // Set an initial epoch for the itsFrame
+      itsFrame.set(MEpoch(toUTC(itsStartTime), MEpoch::UTC));
+
+      // Set the position for the itsFrame.
+      itsFrame.set(itsPhaseCentre);
+
+      // Set-up the conversion engines, using reference direction ITRF.
+      for (unsigned beam = 0; beam < itsNrBeams; beam++) {
+        const casa::MDirection::Types &dirtype = itsDirections[beam].type;
+
+        if (itsConverters.find(dirtype) == itsConverters.end())
+          itsConverters[dirtype] = MDirection::Convert(dirtype, MDirection::Ref(MDirection::ITRF, itsFrame));
+      }
+    }
+
+
+    void Delays::mainLoop()
+    {
+      LOG_DEBUG("Delay compensation thread running");
+
+      init();
+
+      // the current time, in samples
+      int64 currentTime = itsStartTime;
+
+      try {
+        while (!stop) {
+          bufferFree.down(itsNrCalcDelays);
+
+          itsDelayTimer.start();
+
+          // Calculate itsNrCalcDelays seconds worth of delays. Technically, we do not have
+          // to calculate that many at the end of the run, but there is no need to
+          // prevent the few excess delays from being calculated.
+
+          {
+            ScopedLock lock(casacoreMutex);
+            ScopedDelayCancellation dc;
+
+            // For each given moment in time ...
+            for (uint i = 0; i < itsNrCalcDelays; i++) {
+              // Set the instant in time in the itsFrame (40587 modify Julian day number = 00:00:00 January 1, 1970, GMT)
+              itsFrame.resetEpoch(toUTC(currentTime));
+
+              // Check whether we will store results in a valid place
+              ASSERTSTR(tail < bufferSize, tail << " < " << bufferSize);
+
+              // For each given direction in the sky ...
+              for (uint b = 0; b < itsNrBeams; b++) {
+                MDirection::Convert &converter = itsConverters[itsDirections[b].type];
+
+                for (uint p = 0; p < itsNrTABs[b] + 1; p++) {
+                  // Define the astronomical direction as a J2000 direction.
+                  MVDirection &sky = p == 0 ? itsDirections[b].SAP : itsDirections[b].TABs[p - 1];
+
+                  // Convert this direction, using the conversion engine.
+                  MDirection dir = converter(sky);
+
+                  // Add to the return vector
+                  itsBuffer[tail][b][p] = dir.getValue();
+                }
+              }
+
+              // Advance time for the next calculation
+              currentTime += itsNrSamplesPerSec;
+
+              // Advance to the next result set.
+              // since bufferSize % itsNrCalcDelays == 0, wrap
+              // around can only occur between runs
+              ++tail;
+            }
+          }
+          // check for wrap around for the next run
+          if (tail >= bufferSize)
+            tail = 0;
+
+          itsDelayTimer.stop();
+
+          bufferUsed.up(itsNrCalcDelays);
+        }
+      } catch (AipsError &ex) {
+        // trigger getNextDelays and force it to stop
+        stop = true;
+        bufferUsed.up(1);
+
+        THROW(Exception, "AipsError: " << ex.what());
+      }
+
+      LOG_DEBUG("Delay compensation thread stopped");
+    }
+
+
+    void Delays::getNextDelays(Matrix<MVDirection> &directions, Matrix<double> &delays)
+    {
+      ASSERTSTR(directions.num_elements() == itsNrBeams * (itsMaxNrTABs + 1),
+                directions.num_elements() << " == " << itsNrBeams << "*" << (itsMaxNrTABs + 1));
+
+      ASSERTSTR(delays.num_elements() == itsNrBeams * (itsMaxNrTABs + 1),
+                delays.num_elements() << " == " << itsNrBeams << "*" << (itsMaxNrTABs + 1));
+
+      ASSERT(itsThread);
+
+      bufferUsed.down();
+
+      if (stop)
+        THROW(Exception, "Cannot obtain delays -- delay thread stopped running");
+
+      // copy the directions at itsBuffer[head] into the provided buffer,
+      // and calculate the respective delays
+      for (unsigned b = 0; b < itsNrBeams; b++) {
+        for (unsigned p = 0; p < itsNrTABs[b] + 1; p++) {
+          const MVDirection &dir = itsBuffer[head][b][p];
+
+          directions[b][p] = dir;
+          delays[b][p] = dir * itsPhasePositionDiff * (1.0 / speedOfLight);
+        }
+      }
+
+      // increment the head pointer
+      if (++head == bufferSize)
+        head = 0;
+
+      bufferFree.up();
+    }
+
+
+    void Delays::setBeamDirections(const Parset &parset)
+    {
+      // TODO: For now, we include pencil beams for all regular beams,
+      // and use the pencil beam offsets as offsets in J2000.
+      // To do the coordinates properly, the offsets should be applied
+      // in today's coordinates (JMEAN/JTRUE?), not J2000.
+      //
+      itsDirections.resize(itsNrBeams);
+
+      for (unsigned beam = 0; beam < itsNrBeams; beam++) {
+        const std::string type = toUpper(parset.getBeamDirectionType(beam));
+
+        if (!MDirection::getType(itsDirections[beam].type, type))
+          THROW(Exception, "Beam direction type unknown: " << type);
+      }
+
+      // Get the source directions from the parameter set.
+      // Split the \a dir vector into separate Direction objects.
+
+      for (unsigned beam = 0; beam < itsNrBeams; beam++) {
+        const std::vector<double> beamDir = parset.getBeamDirection(beam);
+        const BeamCoordinates& TABs = parset.TABs(beam);
+
+        // add central beam coordinates for non-beamforming pipelines
+        itsDirections[beam].SAP = MVDirection(beamDir[0], beamDir[1]);
+
+        itsDirections[beam].TABs.resize(itsNrTABs[beam]);
+
+        for (unsigned pencil = 0; pencil < itsNrTABs[beam]; pencil++) {
+          // obtain pencil coordinate
+          const BeamCoord3D &pencilCoord = TABs[pencil];
+
+          // apply angle modification
+          const double angle1 = beamDir[0] + pencilCoord[0];
+          const double angle2 = beamDir[1] + pencilCoord[1];
+
+          // store beam
+          itsDirections[beam].TABs[pencil] = MVDirection(angle1, angle2);
+        }
+      }
+    }
+
+
+    void Delays::setPositionDiff(const Parset &parset)
+    {
+      // Calculate the station to reference station position difference of apply station.
+
+      // Station positions must be given in ITRF
+      std::string str = toUpper(parset.positionType());
+
+      if (str != "ITRF")
+        THROW(GPUProcException, "OLAP.DelayComp.positionType must be ITRF");
+
+      // Get the antenna positions from the parameter set. The antenna
+      // positions are stored as one large vector of doubles.
+      const MVPosition pRef(parset.getRefPhaseCentre());
+      const MVPosition phaseCentre(parset.getPhaseCentreOf(itsStationName));
+
+      itsPhaseCentre = MPosition(phaseCentre, MPosition::ITRF);
+      itsPhasePositionDiff = phaseCentre - pRef;
+    }
+
+  } // namespace Cobalt
+} // namespace LOFAR
+
diff --git a/RTCP/Cobalt/InputProc/src/Delays/Delays.h b/RTCP/Cobalt/InputProc/src/Delays/Delays.h
new file mode 100644
index 00000000000..59dc5e6437b
--- /dev/null
+++ b/RTCP/Cobalt/InputProc/src/Delays/Delays.h
@@ -0,0 +1,179 @@
+//# Delays.h: Calculate delay compensation for all stations.
+//# Copyright (C) 2012-2013  ASTRON (Netherlands Institute for Radio Astronomy)
+//# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
+//#
+//# This file is part of the LOFAR software suite.
+//# The LOFAR software suite is free software: you can redistribute it and/or
+//# modify it under the terms of the GNU General Public License as published
+//# by the Free Software Foundation, either version 3 of the License, or
+//# (at your option) any later version.
+//#
+//# The LOFAR software suite is distributed in the hope that it will be useful,
+//# but WITHOUT ANY WARRANTY; without even the implied warranty of
+//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+//# GNU General Public License for more details.
+//#
+//# You should have received a copy of the GNU General Public License along
+//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
+//#
+//# $Id$
+
+#ifndef LOFAR_INPUTPROC_DELAYS_H
+#define LOFAR_INPUTPROC_DELAYS_H
+
+// \file
+// Calculate delay compensation for all stations.
+
+//# Never #include <config.h> or #include <lofar_config.h> in a header file!
+
+//# Includes
+#include <string>
+#include <vector>
+#include <map>
+
+#include <Common/LofarTypes.h>
+#include <Common/Timer.h>
+#include <Common/Thread/Thread.h>
+#include <Common/Thread/Semaphore.h>
+#include <CoInterface/MultiDimArray.h>
+#include <CoInterface/Parset.h>
+#include <CoInterface/RSPTimeStamp.h>
+#include <CoInterface/SmartPtr.h>
+
+#include <measures/Measures/MeasConvert.h>
+#include <measures/Measures/MDirection.h>
+#include <measures/Measures/MPosition.h>
+#include <casa/Quanta/MVDirection.h>
+#include <casa/Quanta/MVPosition.h>
+#include <casa/Quanta/MVEpoch.h>
+
+namespace LOFAR
+{
+  namespace Cobalt
+  {
+
+    // Speed of light in vacuum, in m/s.
+    const double speedOfLight = 299792458;
+
+    // Workholder for calculating the delay compensation that must be applied
+    // per beam per station. We start by calculating the path length
+    // difference for beam \f$\mathbf{b}_i\f$ between station \f$j\f$ at
+    // position \f$\mathbf{p}_j\f$ and the reference station 0 at position
+    // \f$\mathbf{p}_0\f$.
+    // \f[
+    //   d_{ij} - d_{i0} = \mathbf{b}_i \cdot \mathbf{p}_j
+    //                   - \mathbf{b}_i \cdot \mathbf{p}_0
+    //                   = \mathbf{b}_i \cdot (\mathbf{p}_j - \mathbf{p}_0)
+    // \f]
+    // The choice of reference station is arbitrary, so we simply choose the
+    // first station from the parameter set. From the equation above it is
+    // clear that we can reduce the number of dot products if we precalculate
+    // the position difference vectors \f$\mathbf{p}_j - \mathbf{p}_0$\f,
+    // which we will store in \c			itsPositionDiffs.
+    //
+    // The geometrical delay is easily obtained by dividing the path length
+    // difference by the speed of light in vacuum. We don't need to know the
+    // speed of light in the atmosphere, because the AZEL directions that
+    // we've calculated are valid for vacuum (only!). This is the delay that
+    // must be compensated for.
+    //
+    // The calculated delay compensation must be split into a coarse (whole
+    // sample) delay and a fine (subsample) delay.  The coarse delay will be
+    // applied in the input section as a true time delay, by shifting the
+    // input samples. The fine delay will be applied in the correlator as a
+    // phase shift in each frequency channel.
+    class Delays
+    {
+    public:
+      Delays(const Parset &ps, const std::string &stationName, const TimeStamp &startTime);
+      ~Delays();
+
+      void start();
+
+      // get the set of directions (ITRF) and delays for the beams, for the next CN integration time
+      // Both matrices must have dimensions [itsNrBeams][itsMaxNrTABs+1]
+      void getNextDelays(Matrix<casa::MVDirection> &directions, Matrix<double> &delays);
+
+    private:
+      casa::MVEpoch                       toUTC(int64 timeInSamples);
+
+      void                                init();
+
+      // do the delay compensation calculations in a separate thread to allow bulk
+      // calculations and to avoid blocking other threads
+      void                                mainLoop();
+
+      const Parset                        &itsParset;
+
+      volatile bool stop;
+
+      // the circular buffer to hold the moving beam directions for every second of data
+      Cube<casa::MVDirection>             itsBuffer;
+      size_t head, tail;
+
+      // two semaphores are used: one to trigger the producer that free space is available,
+      // another to trigger the consumer that data is available.
+      Semaphore bufferFree, bufferUsed;
+
+      // the number of seconds to maintain in the buffer
+      static const size_t bufferSize = 128;
+
+      // the number of delays to calculate in a single run
+      const unsigned itsNrCalcDelays;
+
+      // Get the source directions from the parameter file and initialize \c
+      // itsBeamDirections. Beam positions must be specified as
+      // <tt>(longitude, latitude, direction-type)</tt>. The direction angles
+      // are in radians; the direction type must be one of J2000, ITRF, or
+      // AZEL.
+      void setBeamDirections(const Parset &);
+
+      // Set the station to reference station position differences for
+      // all stations. CS002LBA is the reference station, even if it
+      // does not take part in the observation. The position
+      // differences are stored in \c itsPositionDiffs. In other
+      // words: we store \f$\mathbf{p}_j - \mathbf{p}_0\f$, where
+      // \f$\mathbf{p}_0\f$ is the position of the reference station
+      // and \f$\mathbf{p}_j\f$ is the position of station \f$j\f$.
+      void setPositionDiff(const Parset &);
+
+      // Beam info.
+      const unsigned itsNrBeams;
+      const unsigned itsMaxNrTABs;
+      const std::vector<unsigned>         itsNrTABs;
+
+      struct BeamDirection {
+        casa::MVDirection              SAP;
+        std::vector<casa::MVDirection> TABs;
+
+        casa::MDirection::Types        type;
+      };
+
+      std::vector<struct BeamDirection>   itsDirections; // [itsNrBeams]
+
+      // Sample timings.
+      const TimeStamp itsStartTime;
+      const unsigned itsNrSamplesPerSec;
+      const double itsSampleDuration;
+
+      // Station Name.
+      const std::string itsStationName;
+      casa::MeasFrame itsFrame;
+      std::map<casa::MDirection::Types, casa::MDirection::Convert> itsConverters;
+
+      // Station phase centre.
+      casa::MPosition itsPhaseCentre;
+
+      // Station to reference station position difference vector.
+      casa::MVPosition itsPhasePositionDiff;
+
+      NSTimer itsDelayTimer;
+
+      SmartPtr<Thread>                    itsThread;
+    };
+
+  } // namespace Cobalt
+} // namespace LOFAR
+
+#endif
+
-- 
GitLab