Skip to content
Snippets Groups Projects
Commit f1f84c87 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #4315: Added delay compensation class

parent b23d3f87
No related branches found
No related tags found
No related merge requests found
......@@ -12,6 +12,7 @@ lofar_add_library(inputproc
Buffer/Ranges.cc
Buffer/SharedMemory.cc
Buffer/StationID.cc
Delays/Delays.cc
Station/Generator.cc
Station/PacketFactory.cc
Station/PacketReader.cc
......
//# Delays.cc: Workholder for the delay compensation.
//# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy)
//# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
//#
//# This file is part of the LOFAR software suite.
//# The LOFAR software suite is free software: you can redistribute it and/or
//# modify it under the terms of the GNU General Public License as published
//# by the Free Software Foundation, either version 3 of the License, or
//# (at your option) any later version.
//#
//# The LOFAR software suite is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License along
//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
//#
//# $Id$
//# Always #include <lofar_config.h> first!
#include <lofar_config.h>
#include "Delays.h"
#include <Common/LofarLogger.h>
#include <Common/PrettyUnits.h>
#include <Common/Thread/Mutex.h>
#include <Common/Thread/Cancellation.h>
#include <CoInterface/Exceptions.h>
#include <CoInterface/BeamCoordinates.h>
#include <measures/Measures/MEpoch.h>
#include <measures/Measures/MCDirection.h>
#include <casa/Exceptions/Error.h>
namespace LOFAR
{
namespace Cobalt
{
using namespace casa;
static LOFAR::Mutex casacoreMutex; // casacore is not thread safe
//##---------------- Public methods ----------------##//
Delays::Delays(const Parset &parset, const std::string &stationName, const TimeStamp &startTime)
:
itsParset(parset),
stop(false),
// we need an extra entry for the central beam
itsBuffer(bufferSize, parset.nrBeams(), parset.maxNrTABs() + 1),
head(0),
tail(0),
bufferFree(bufferSize),
bufferUsed(0),
itsNrCalcDelays(parset.nrCalcDelays()),
itsNrBeams(parset.nrBeams()),
itsMaxNrTABs(parset.maxNrTABs()),
itsNrTABs(parset.nrTABs()),
itsStartTime(startTime),
itsNrSamplesPerSec(parset.nrSamplesPerSubband()),
itsSampleDuration(parset.sampleDuration()),
itsStationName(stationName),
itsDelayTimer("delay producer", true, true)
{
}
void Delays::start()
{
itsThread = new Thread(this, &Delays::mainLoop, "[DelayCompensation] ");
}
Delays::~Delays()
{
ScopedDelayCancellation dc; // Semaphores provide cancellation points
// trigger mainLoop and force it to stop
stop = true;
bufferFree.up(itsNrCalcDelays);
}
// convert a time in samples to a (day,fraction) pair in UTC in a CasaCore format
MVEpoch Delays::toUTC(int64 timeInSamples)
{
double utc_sec = (timeInSamples * itsSampleDuration) / MVEpoch::secInDay;
double day = floor(utc_sec);
double frac = utc_sec - day;
// (40587 modify Julian day number = 00:00:00 January 1, 1970, GMT)
return MVEpoch(day + 40587., frac);
}
void Delays::init()
{
ScopedLock lock(casacoreMutex);
ScopedDelayCancellation dc;
setBeamDirections(itsParset);
setPositionDiff(itsParset);
// We need bufferSize to be a multiple of batchSize to avoid wraparounds in
// the middle of the batch calculations. This makes life a lot easier and there is no
// need to support other cases.
if (bufferSize % itsNrCalcDelays > 0)
THROW(GPUProcException, "nrCalcDelays (" << itsNrCalcDelays << ") must divide bufferSize (" << bufferSize << ")");
// Set an initial epoch for the itsFrame
itsFrame.set(MEpoch(toUTC(itsStartTime), MEpoch::UTC));
// Set the position for the itsFrame.
itsFrame.set(itsPhaseCentre);
// Set-up the conversion engines, using reference direction ITRF.
for (unsigned beam = 0; beam < itsNrBeams; beam++) {
const casa::MDirection::Types &dirtype = itsDirections[beam].type;
if (itsConverters.find(dirtype) == itsConverters.end())
itsConverters[dirtype] = MDirection::Convert(dirtype, MDirection::Ref(MDirection::ITRF, itsFrame));
}
}
void Delays::mainLoop()
{
LOG_DEBUG("Delay compensation thread running");
init();
// the current time, in samples
int64 currentTime = itsStartTime;
try {
while (!stop) {
bufferFree.down(itsNrCalcDelays);
itsDelayTimer.start();
// Calculate itsNrCalcDelays seconds worth of delays. Technically, we do not have
// to calculate that many at the end of the run, but there is no need to
// prevent the few excess delays from being calculated.
{
ScopedLock lock(casacoreMutex);
ScopedDelayCancellation dc;
// For each given moment in time ...
for (uint i = 0; i < itsNrCalcDelays; i++) {
// Set the instant in time in the itsFrame (40587 modify Julian day number = 00:00:00 January 1, 1970, GMT)
itsFrame.resetEpoch(toUTC(currentTime));
// Check whether we will store results in a valid place
ASSERTSTR(tail < bufferSize, tail << " < " << bufferSize);
// For each given direction in the sky ...
for (uint b = 0; b < itsNrBeams; b++) {
MDirection::Convert &converter = itsConverters[itsDirections[b].type];
for (uint p = 0; p < itsNrTABs[b] + 1; p++) {
// Define the astronomical direction as a J2000 direction.
MVDirection &sky = p == 0 ? itsDirections[b].SAP : itsDirections[b].TABs[p - 1];
// Convert this direction, using the conversion engine.
MDirection dir = converter(sky);
// Add to the return vector
itsBuffer[tail][b][p] = dir.getValue();
}
}
// Advance time for the next calculation
currentTime += itsNrSamplesPerSec;
// Advance to the next result set.
// since bufferSize % itsNrCalcDelays == 0, wrap
// around can only occur between runs
++tail;
}
}
// check for wrap around for the next run
if (tail >= bufferSize)
tail = 0;
itsDelayTimer.stop();
bufferUsed.up(itsNrCalcDelays);
}
} catch (AipsError &ex) {
// trigger getNextDelays and force it to stop
stop = true;
bufferUsed.up(1);
THROW(Exception, "AipsError: " << ex.what());
}
LOG_DEBUG("Delay compensation thread stopped");
}
void Delays::getNextDelays(Matrix<MVDirection> &directions, Matrix<double> &delays)
{
ASSERTSTR(directions.num_elements() == itsNrBeams * (itsMaxNrTABs + 1),
directions.num_elements() << " == " << itsNrBeams << "*" << (itsMaxNrTABs + 1));
ASSERTSTR(delays.num_elements() == itsNrBeams * (itsMaxNrTABs + 1),
delays.num_elements() << " == " << itsNrBeams << "*" << (itsMaxNrTABs + 1));
ASSERT(itsThread);
bufferUsed.down();
if (stop)
THROW(Exception, "Cannot obtain delays -- delay thread stopped running");
// copy the directions at itsBuffer[head] into the provided buffer,
// and calculate the respective delays
for (unsigned b = 0; b < itsNrBeams; b++) {
for (unsigned p = 0; p < itsNrTABs[b] + 1; p++) {
const MVDirection &dir = itsBuffer[head][b][p];
directions[b][p] = dir;
delays[b][p] = dir * itsPhasePositionDiff * (1.0 / speedOfLight);
}
}
// increment the head pointer
if (++head == bufferSize)
head = 0;
bufferFree.up();
}
void Delays::setBeamDirections(const Parset &parset)
{
// TODO: For now, we include pencil beams for all regular beams,
// and use the pencil beam offsets as offsets in J2000.
// To do the coordinates properly, the offsets should be applied
// in today's coordinates (JMEAN/JTRUE?), not J2000.
//
itsDirections.resize(itsNrBeams);
for (unsigned beam = 0; beam < itsNrBeams; beam++) {
const std::string type = toUpper(parset.getBeamDirectionType(beam));
if (!MDirection::getType(itsDirections[beam].type, type))
THROW(Exception, "Beam direction type unknown: " << type);
}
// Get the source directions from the parameter set.
// Split the \a dir vector into separate Direction objects.
for (unsigned beam = 0; beam < itsNrBeams; beam++) {
const std::vector<double> beamDir = parset.getBeamDirection(beam);
const BeamCoordinates& TABs = parset.TABs(beam);
// add central beam coordinates for non-beamforming pipelines
itsDirections[beam].SAP = MVDirection(beamDir[0], beamDir[1]);
itsDirections[beam].TABs.resize(itsNrTABs[beam]);
for (unsigned pencil = 0; pencil < itsNrTABs[beam]; pencil++) {
// obtain pencil coordinate
const BeamCoord3D &pencilCoord = TABs[pencil];
// apply angle modification
const double angle1 = beamDir[0] + pencilCoord[0];
const double angle2 = beamDir[1] + pencilCoord[1];
// store beam
itsDirections[beam].TABs[pencil] = MVDirection(angle1, angle2);
}
}
}
void Delays::setPositionDiff(const Parset &parset)
{
// Calculate the station to reference station position difference of apply station.
// Station positions must be given in ITRF
std::string str = toUpper(parset.positionType());
if (str != "ITRF")
THROW(GPUProcException, "OLAP.DelayComp.positionType must be ITRF");
// Get the antenna positions from the parameter set. The antenna
// positions are stored as one large vector of doubles.
const MVPosition pRef(parset.getRefPhaseCentre());
const MVPosition phaseCentre(parset.getPhaseCentreOf(itsStationName));
itsPhaseCentre = MPosition(phaseCentre, MPosition::ITRF);
itsPhasePositionDiff = phaseCentre - pRef;
}
} // namespace Cobalt
} // namespace LOFAR
//# Delays.h: Calculate delay compensation for all stations.
//# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy)
//# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
//#
//# This file is part of the LOFAR software suite.
//# The LOFAR software suite is free software: you can redistribute it and/or
//# modify it under the terms of the GNU General Public License as published
//# by the Free Software Foundation, either version 3 of the License, or
//# (at your option) any later version.
//#
//# The LOFAR software suite is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License along
//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
//#
//# $Id$
#ifndef LOFAR_INPUTPROC_DELAYS_H
#define LOFAR_INPUTPROC_DELAYS_H
// \file
// Calculate delay compensation for all stations.
//# Never #include <config.h> or #include <lofar_config.h> in a header file!
//# Includes
#include <string>
#include <vector>
#include <map>
#include <Common/LofarTypes.h>
#include <Common/Timer.h>
#include <Common/Thread/Thread.h>
#include <Common/Thread/Semaphore.h>
#include <CoInterface/MultiDimArray.h>
#include <CoInterface/Parset.h>
#include <CoInterface/RSPTimeStamp.h>
#include <CoInterface/SmartPtr.h>
#include <measures/Measures/MeasConvert.h>
#include <measures/Measures/MDirection.h>
#include <measures/Measures/MPosition.h>
#include <casa/Quanta/MVDirection.h>
#include <casa/Quanta/MVPosition.h>
#include <casa/Quanta/MVEpoch.h>
namespace LOFAR
{
namespace Cobalt
{
// Speed of light in vacuum, in m/s.
const double speedOfLight = 299792458;
// Workholder for calculating the delay compensation that must be applied
// per beam per station. We start by calculating the path length
// difference for beam \f$\mathbf{b}_i\f$ between station \f$j\f$ at
// position \f$\mathbf{p}_j\f$ and the reference station 0 at position
// \f$\mathbf{p}_0\f$.
// \f[
// d_{ij} - d_{i0} = \mathbf{b}_i \cdot \mathbf{p}_j
// - \mathbf{b}_i \cdot \mathbf{p}_0
// = \mathbf{b}_i \cdot (\mathbf{p}_j - \mathbf{p}_0)
// \f]
// The choice of reference station is arbitrary, so we simply choose the
// first station from the parameter set. From the equation above it is
// clear that we can reduce the number of dot products if we precalculate
// the position difference vectors \f$\mathbf{p}_j - \mathbf{p}_0$\f,
// which we will store in \c itsPositionDiffs.
//
// The geometrical delay is easily obtained by dividing the path length
// difference by the speed of light in vacuum. We don't need to know the
// speed of light in the atmosphere, because the AZEL directions that
// we've calculated are valid for vacuum (only!). This is the delay that
// must be compensated for.
//
// The calculated delay compensation must be split into a coarse (whole
// sample) delay and a fine (subsample) delay. The coarse delay will be
// applied in the input section as a true time delay, by shifting the
// input samples. The fine delay will be applied in the correlator as a
// phase shift in each frequency channel.
class Delays
{
public:
Delays(const Parset &ps, const std::string &stationName, const TimeStamp &startTime);
~Delays();
void start();
// get the set of directions (ITRF) and delays for the beams, for the next CN integration time
// Both matrices must have dimensions [itsNrBeams][itsMaxNrTABs+1]
void getNextDelays(Matrix<casa::MVDirection> &directions, Matrix<double> &delays);
private:
casa::MVEpoch toUTC(int64 timeInSamples);
void init();
// do the delay compensation calculations in a separate thread to allow bulk
// calculations and to avoid blocking other threads
void mainLoop();
const Parset &itsParset;
volatile bool stop;
// the circular buffer to hold the moving beam directions for every second of data
Cube<casa::MVDirection> itsBuffer;
size_t head, tail;
// two semaphores are used: one to trigger the producer that free space is available,
// another to trigger the consumer that data is available.
Semaphore bufferFree, bufferUsed;
// the number of seconds to maintain in the buffer
static const size_t bufferSize = 128;
// the number of delays to calculate in a single run
const unsigned itsNrCalcDelays;
// Get the source directions from the parameter file and initialize \c
// itsBeamDirections. Beam positions must be specified as
// <tt>(longitude, latitude, direction-type)</tt>. The direction angles
// are in radians; the direction type must be one of J2000, ITRF, or
// AZEL.
void setBeamDirections(const Parset &);
// Set the station to reference station position differences for
// all stations. CS002LBA is the reference station, even if it
// does not take part in the observation. The position
// differences are stored in \c itsPositionDiffs. In other
// words: we store \f$\mathbf{p}_j - \mathbf{p}_0\f$, where
// \f$\mathbf{p}_0\f$ is the position of the reference station
// and \f$\mathbf{p}_j\f$ is the position of station \f$j\f$.
void setPositionDiff(const Parset &);
// Beam info.
const unsigned itsNrBeams;
const unsigned itsMaxNrTABs;
const std::vector<unsigned> itsNrTABs;
struct BeamDirection {
casa::MVDirection SAP;
std::vector<casa::MVDirection> TABs;
casa::MDirection::Types type;
};
std::vector<struct BeamDirection> itsDirections; // [itsNrBeams]
// Sample timings.
const TimeStamp itsStartTime;
const unsigned itsNrSamplesPerSec;
const double itsSampleDuration;
// Station Name.
const std::string itsStationName;
casa::MeasFrame itsFrame;
std::map<casa::MDirection::Types, casa::MDirection::Convert> itsConverters;
// Station phase centre.
casa::MPosition itsPhaseCentre;
// Station to reference station position difference vector.
casa::MVPosition itsPhasePositionDiff;
NSTimer itsDelayTimer;
SmartPtr<Thread> itsThread;
};
} // namespace Cobalt
} // namespace LOFAR
#endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment