-
Alexander van Amesfoort authoredAlexander van Amesfoort authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
Parset.cc 61.10 KiB
//# Parset.cc
//# Copyright (C) 2008-2017 ASTRON (Netherlands Institute for Radio Astronomy)
//# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
//#
//# This file is part of the LOFAR software suite.
//# The LOFAR software suite is free software: you can redistribute it and/or
//# modify it under the terms of the GNU General Public License as published
//# by the Free Software Foundation, either version 3 of the License, or
//# (at your option) any later version.
//#
//# The LOFAR software suite is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License along
//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
//#
//# $Id$
//# Always #include <lofar_config.h> first!
#include <lofar_config.h>
#include <CoInterface/Parset.h>
#include <cstring>
#include <cmath>
#include <set>
#include <algorithm>
#include <memory> // auto_ptr
#include <boost/format.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string.hpp>
#include <Common/LofarLogger.h>
#include <Common/DataConvert.h>
#include <Common/LofarBitModeInfo.h>
#include <Common/StringUtil.h>
#include <ApplCommon/PosixTime.h>
#include <CoInterface/PrintVector.h>
#include <CoInterface/OutputTypes.h>
#include <CoInterface/Align.h>
#include <CoInterface/Config.h>
#include <CoInterface/Exceptions.h>
#include <CoInterface/PrintVector.h>
#include <CoInterface/SetOperations.h>
#include <CoInterface/RingCoordinates.h>
using namespace std;
using boost::format;
namespace LOFAR
{
namespace Cobalt
{
StokesType stokesType( const std::string &name )
{
if (name == "I")
return STOKES_I;
if (name == "IQUV")
return STOKES_IQUV;
if (name == "XXYY")
return STOKES_XXYY;
return INVALID_STOKES;
}
size_t nrStokes( StokesType type )
{
switch(type) {
case STOKES_I:
return 1;
case STOKES_IQUV:
case STOKES_XXYY:
return 4;
case INVALID_STOKES:
default:
return 0;
}
}
string stokesType( StokesType type )
{
switch(type) {
case STOKES_I:
return "I";
case STOKES_IQUV:
return "IQUV";
case STOKES_XXYY:
return "XXYY";
case INVALID_STOKES:
default:
return "";
}
}
unsigned ObservationSettings::nyquistZone() const
{
if (bandFilter == "LBA_10_70" ||
bandFilter == "LBA_30_70" ||
bandFilter == "LBA_10_90" ||
bandFilter == "LBA_30_90" )
return 1;
if (bandFilter == "HBA_110_190")
return 2;
if (bandFilter == "HBA_170_230" ||
bandFilter == "HBA_210_250")
return 3;
THROW(CoInterfaceException, std::string("unknown band filter \"" + bandFilter + '"'));
}
unsigned ObservationSettings::clockHz() const
{
return clockMHz * 1000000;
}
Parset::Parset()
{
}
Parset::Parset(const string &name)
:
ParameterSet(name.c_str()),
itsName(name)
{
// we check the parset once we can communicate any errors
//check();
updateSettings();
}
void readParameterSet(Stream &stream, ParameterSet ¶meterSet)
{
// Read size
uint64 size;
stream.read(&size, sizeof size);
// Read data
std::vector<char> tmp(size + 1);
stream.read(&tmp[0], size);
tmp[size] = '\0';
// Add data to parset
std::string buffer(&tmp[0], size);
parameterSet.adoptBuffer(buffer);
}
Parset::Parset(Stream *stream)
{
readParameterSet(*stream, *this);
// Update the settings
updateSettings();
}
void Parset::write(Stream *stream) const
{
// stream == NULL fills the settings,
// causing subsequent write()s to use it
bool readCache = !itsWriteCache.empty();
bool writeCache = !stream;
std::string newbuffer;
std::string &buffer = readCache || writeCache ? itsWriteCache : newbuffer;
if (buffer.empty())
writeBuffer(buffer);
if (!stream) {
// we only filled the settings
return;
}
uint64 size = buffer.size();
stream->write(&size, sizeof size);
stream->write(buffer.data(), size);
}
ostream &operator<<(ostream &os, const ObservationSettings::AntennaFieldName &fieldName) {
return os << fieldName.fullName();
}
vector<struct ObservationSettings::AntennaFieldName> ObservationSettings::expandAntennaFieldNames(const vector<string> &stations, const string &antennaSet) {
vector<struct AntennaFieldName> result;
for (vector<string>::const_iterator i = stations.begin(); i != stations.end(); ++i) {
const string &station = *i;
bool coreStation = station.substr(0,2) == "CS";
if (station.length() != 5) {
// Backward compatibility: the key
// Observation.VirtualInstrument.stationList could contain full
// antennafield names in the past, such as "CS001LBA".
LOG_WARN_STR("Warning: old (preparsed) station name: " << station);
// Do not assume the standard station name format (silly "S9" test name).
string stName;
string antFieldName;
if (station.length() <= 1)
stName = station; // if stName or antFieldName is empty, writing an MS table will fail
else if (station.length() <= 5) {
stName = station.substr(0, station.length()-1);
antFieldName = station.substr(station.length()-1);
} else {
stName = station.substr(0, 5);
antFieldName = station.substr(5);
}
result.push_back(AntennaFieldName(stName, antFieldName));
continue;
}
if (antennaSet == "LBA" /* used for debugging */
|| antennaSet == "LBA_INNER"
|| antennaSet == "LBA_OUTER"
|| antennaSet == "LBA_X"
|| antennaSet == "LBA_Y"
|| antennaSet == "LBA_SPARSE_EVEN"
|| antennaSet == "LBA_SPARSE_ODD") {
result.push_back(AntennaFieldName(station, "LBA"));
} else if (
antennaSet == "HBA" /* used for debugging */
|| antennaSet == "HBA_JOINED"
|| antennaSet == "HBA_JOINED_INNER") {
result.push_back(AntennaFieldName(station, "HBA"));
} else if (
antennaSet == "HBA_ZERO"
|| antennaSet == "HBA_ZERO_INNER") {
result.push_back(AntennaFieldName(station, coreStation ? "HBA0" : "HBA"));
} else if (
antennaSet == "HBA_ONE"
|| antennaSet == "HBA_ONE_INNER") {
result.push_back(AntennaFieldName(station, coreStation ? "HBA1" : "HBA"));
} else if (
antennaSet == "HBA_DUAL"
|| antennaSet == "HBA_DUAL_INNER") {
if (coreStation) {
result.push_back(AntennaFieldName(station, "HBA0"));
result.push_back(AntennaFieldName(station, "HBA1"));
} else {
result.push_back(AntennaFieldName(station, "HBA"));
}
} else {
THROW(CoInterfaceException, "Unknown antennaSet: " << antennaSet);
}
}
return result;
}
/*
* operator<() for station names.
*
* Sorts in the following order:
* 1. Core stations (CSxxx)
* 2. Remote stations (RSxxx)
* 3. International stations (others)
*
* Within each group, the stations are
* sorted lexicographically. For group 3
* we skip the first 2 chars when sorting.
*/
bool compareStationNames( const string &a, const string &b ) {
if (a.size() >= 5 && b.size() >= 5) { // common case
if ( (a[0] == 'C' || a[0] == 'R') && a[1] == 'S' &&
(b[0] == 'C' || b[0] == 'R') && b[1] == 'S' ) {
return a < b; // both CS/RS stations; 'C'<'R'
} else { // at least 1 non-CS/RS name; cmp (presumed) nrs
return std::strcmp(&a.c_str()[2], &b.c_str()[2]) < 0;
}
}
return a < b; // at least 1 short name
}
struct ObservationSettings Parset::observationSettings() const
{
struct ObservationSettings settings;
// the set of hosts on which outputProc has to run, which will
// be constructed during the parsing of the parset
set<string> outputProcHosts;
// NOTE: Make sure that all keys have defaults, to make test parsets
// a lot shorter.
// --Update: No, instead use tParsetDefault in tests and reject parsets with missing required values.
const vector<string> emptyVectorString;
const vector<unsigned> emptyVectorUnsigned;
const vector<double> emptyVectorDouble;
// Generic information
settings.realTime = getBool("Cobalt.realTime", false);
settings.observationID = getUint32("Observation.ObsID", 0);
settings.momID = getUint32("Observation.momID", 0);
settings.commandStream = getString("Cobalt.commandStream", "null:");
settings.startTime = getTime("Observation.startTime", "2013-01-01 00:00:00");
settings.stopTime = getTime("Observation.stopTime", "2013-01-01 00:01:00");
settings.clockMHz = getUint32("Observation.sampleClock", 200);
settings.nrBitsPerSample = getUint32("Observation.nrBitsPerSample", 16);
settings.nrPolarisations = 2;
settings.corrections.bandPass = getBool("Cobalt.correctBandPass", true);
settings.corrections.clock = getBool("Cobalt.correctClocks", true);
settings.corrections.dedisperse = getBool("Cobalt.BeamFormer.coherentDedisperseChannels", true);
settings.delayCompensation.enabled = getBool("Cobalt.delayCompensation", true);
settings.delayCompensation.referencePhaseCenter = getDoubleVector("Observation.referencePhaseCenter", vector<double>(3,0), true);
if (settings.delayCompensation.referencePhaseCenter == emptyVectorDouble)
LOG_WARN("Parset: Observation.referencePhaseCenter is missing (or (0.0, 0.0, 0.0)).");
// Station information (required by pointing information)
settings.antennaSet = getString("Observation.antennaSet", "LBA_INNER");
settings.bandFilter = getString("Observation.bandFilter", "LBA_30_70");
// Pointing information
size_t nrSAPs = getUint32("Observation.nrBeams", 1);
unsigned subbandOffset = 512 * (settings.nyquistZone() - 1);
settings.SAPs.resize(nrSAPs);
settings.subbands.clear();
for (unsigned sapNr = 0; sapNr < nrSAPs; ++sapNr)
{
struct ObservationSettings::SAP &sap = settings.SAPs[sapNr];
sap.direction.type = getString(str(format("Observation.Beam[%u].directionType") % sapNr), "J2000");
sap.direction.angle1 = getDouble(str(format("Observation.Beam[%u].angle1") % sapNr), 0.0);
sap.direction.angle2 = getDouble(str(format("Observation.Beam[%u].angle2") % sapNr), 0.0);
sap.target = getString(str(format("Observation.Beam[%u].target") % sapNr), "");
// Process the subbands of this SAP
vector<unsigned> subbandList = getUint32Vector(str(format("Observation.Beam[%u].subbandList") % sapNr), emptyVectorUnsigned, true);
ASSERTSTR(!subbandList.empty(), "subband list for SAP " << sapNr << " must be non-empty (Observation.Beam[" << sapNr << "].subbandList)");
vector<double> frequencyList = getDoubleVector(str(format("Observation.Beam[%u].frequencyList") % sapNr), emptyVectorDouble, true);
for (unsigned sb = 0; sb < subbandList.size(); ++sb)
{
struct ObservationSettings::Subband subband;
subband.idx = settings.subbands.size();
subband.stationIdx = subbandList[sb];
subband.SAP = sapNr;
subband.idxInSAP = sb;
subband.centralFrequency = frequencyList.empty()
? settings.subbandWidth() * (subband.stationIdx + subbandOffset)
: frequencyList[sb];
// Register the subband both globally and in the SAP structure
settings.subbands.push_back(subband);
settings.SAPs[sapNr].subbands.push_back(subband);
}
}
settings.anaBeam.enabled = settings.antennaSet.substr(0,3) == "HBA";
if (settings.anaBeam.enabled) {
settings.anaBeam.direction.type = getString("Observation.AnaBeam[0].directionType", "J2000");
settings.anaBeam.direction.angle1 = getDouble("Observation.AnaBeam[0].angle1", 0.0);
settings.anaBeam.direction.angle2 = getDouble("Observation.AnaBeam[0].angle2", 0.0);
}
settings.blockSize = getUint32("Cobalt.blockSize", 196608);
// Station information (used pointing information to verify settings)
vector<string> stations = getStringVector("Observation.VirtualInstrument.stationList", emptyVectorString, true);
ASSERTSTR(!stations.empty(), "station list (Observation.VirtualInstrument.stationList) must be non-empty");
settings.rawStationList = getString("Observation.VirtualInstrument.stationList", "[]");
// Sort stations (CS, RS, int'l), to get a consistent and predictable
// order in the MeasurementSets.
std::sort(stations.begin(), stations.end(), compareStationNames);
// Conversion from station names to antenna field names.
settings.antennaFieldNames = ObservationSettings::expandAntennaFieldNames(stations, settings.antennaSet);
settings.antennaFields.resize(settings.antennaFieldNames.size());
for (unsigned i = 0; i < settings.antennaFields.size(); ++i) {
struct ObservationSettings::AntennaField &antennaField = settings.antennaFields[i];
antennaField.name = settings.antennaFieldNames[i].fullName();
antennaField.inputStreams = getStringVector(str(format("PIC.Core.%s.RSP.ports") % antennaField.name), emptyVectorString, true);
antennaField.receiver = getString(str(format("PIC.Core.%s.RSP.receiver") % antennaField.name), "");
// NOTE: Support for clockCorrectionTime can be phased out when the
// BG/P is gone. delay.X and delay.Y are superior to it, being
// polarisation specific.
antennaField.clockCorrection = getDouble(str(format("PIC.Core.%s.clockCorrectionTime") % antennaField.name), 0.0);
antennaField.phaseCenter = getDoubleVector(str(format("PIC.Core.%s.phaseCenter") % antennaField.name), vector<double>(3, 0), true);
if (antennaField.phaseCenter == emptyVectorDouble)
LOG_WARN_STR("Parset: PIC.Core." << antennaField.name << ".phaseCenter is missing (or (0.0, 0.0, 0.0)).");
antennaField.phase0.x = getDouble(str(format("PIC.Core.%s.%s.%s.phase0.X") % antennaField.name % settings.antennaSet % settings.bandFilter), 0.0);
antennaField.phase0.y = getDouble(str(format("PIC.Core.%s.%s.%s.phase0.Y") % antennaField.name % settings.antennaSet % settings.bandFilter), 0.0);
antennaField.delay.x = getDouble(str(format("PIC.Core.%s.%s.%s.delay.X") % antennaField.name % settings.antennaSet % settings.bandFilter), 0.0);
antennaField.delay.y = getDouble(str(format("PIC.Core.%s.%s.%s.delay.Y") % antennaField.name % settings.antennaSet % settings.bandFilter), 0.0);
if (antennaField.delay.x > 0.0 || antennaField.delay.y > 0.0) {
if (antennaField.clockCorrection != 0.0) {
// Ignore clockCorrectionTime if delay.X or delay.Y are specified.
antennaField.clockCorrection = 0.0;
LOG_WARN_STR("Ignoring PIC.Core." << antennaField.name <<
".clockCorrectionTime in favor of PIC.Core." <<
antennaField.name << "." << settings.antennaSet <<
"." << settings.bandFilter << ".delay.{X,Y}");
}
}
string key = std::string(str(format("Observation.Dataslots.%s.RSPBoardList") % antennaField.name));
if (!isDefined(key))
key = "Observation.rspBoardList";
antennaField.rspBoardMap = getUint32Vector(key, emptyVectorUnsigned, true);
key = std::string(str(format("Observation.Dataslots.%s.DataslotList") % antennaField.name));
if (!isDefined(key))
key = "Observation.rspSlotList";
antennaField.rspSlotMap = getUint32Vector(key, emptyVectorUnsigned, true);
if (antennaField.rspBoardMap.empty() && antennaField.rspSlotMap.empty()) {
// supply identity mapping
const size_t nrDataSlotsPerBoard = maxBeamletsPerRSP(settings.nrBitsPerSample);
LOG_DEBUG_STR("Using identity mapping for rspBoardMap and rspSlotMap for " << antennaField.name << " using " << nrDataSlotsPerBoard << " slots/board");
for (size_t sb = 0; sb < settings.subbands.size(); sb++) {
antennaField.rspBoardMap.push_back(sb / nrDataSlotsPerBoard);
antennaField.rspSlotMap.push_back(sb % nrDataSlotsPerBoard);
}
}
ASSERTSTR(antennaField.rspBoardMap.size() >= settings.subbands.size(),
"Observation has " << settings.subbands.size() <<
" subbands, but antenna field " << antennaField.name <<
" has only board numbers defined for " << antennaField.rspBoardMap.size() <<
" subbands. Please correct either Observation.rspBoardList or Observation.Dataslots." <<
antennaField.name << ".RSPBoardList" );
ASSERTSTR(antennaField.rspSlotMap.size() >= settings.subbands.size(),
"Observation has " << settings.subbands.size() <<
" subbands, but antenna field " << antennaField.name <<
" has only board numbers defined for " << antennaField.rspSlotMap.size() <<
" subbands. Please correct either Observation.rspSlotList or Observation.Dataslots." <<
antennaField.name << ".DataslotList" );
}
// Resource information
settings.outputCluster = getString("Observation.Cluster.ProcessingCluster.clusterName", "");
vector<string> nodes = getStringVector("Cobalt.Nodes", emptyVectorString, true);
// We can restrict cobalt nodes to the set that receives from antenna fields,
// but that will break if that set cannot handle the computations.
bool receivingNodesOnly = getBool("Cobalt.restrictNodesToStationStreams", false);
for (size_t i = 0; i < nodes.size(); ++i) {
if (receivingNodesOnly && !nodeReadsAntennaFieldData(settings, nodes[i]))
continue;
struct ObservationSettings::Node node;
node.rank = i;
node.name = nodes[i];
string prefix = str(format("PIC.Core.Cobalt.%s.") % node.name);
node.hostName = getString(prefix + "host", "localhost");
node.cpu = getInt(prefix + "cpu", -1);
node.mpi_nic = getString(prefix + "mpi_nic", "");
node.gpus = getUint32Vector(prefix + "gpus", vector<unsigned>(1,0)); // default to [0]
// format: [CEP4:ib0,DragNet:ib0]
node.out_nic = "";
vector<string> out_nic = getStringVector(prefix + "out_nic", emptyVectorString, true);
for(size_t n = 0; n < out_nic.size(); n++) {
// search for <outputCluster>:<nic>
const vector<string> cluster_nic = StringUtil::split(out_nic[n], ':');
if (cluster_nic[0] == settings.outputCluster) {
node.out_nic = cluster_nic[1];
break;
}
}
settings.nodes.push_back(node);
}
/* ===============================
* Correlator pipeline information
* ===============================
*/
settings.correlator.enabled = getBool("Observation.DataProducts.Output_Correlated.enabled", false);
if (settings.correlator.enabled) {
settings.correlator.nrChannels = getUint32("Cobalt.Correlator.nrChannelsPerSubband", 64);
//settings.correlator.nrChannels = getUint32("Observation.channelsPerSubband", 64);
settings.correlator.channelWidth = settings.subbandWidth() / settings.correlator.nrChannels;
settings.correlator.nrSamplesPerBlock = settings.blockSize / settings.correlator.nrChannels;
settings.correlator.nrBlocksPerIntegration = getUint32("Cobalt.Correlator.nrBlocksPerIntegration", 1);
settings.correlator.nrIntegrationsPerBlock = getUint32("Cobalt.Correlator.nrIntegrationsPerBlock", 1);
// We either have the integration time spanning multiple blocks, or the integration time being a part
// of a block, but never both.
ASSERT(settings.correlator.nrBlocksPerIntegration == 1 || settings.correlator.nrIntegrationsPerBlock == 1);
settings.correlator.nrIntegrations = settings.nrBlocks()
* settings.correlator.nrIntegrationsPerBlock
/ settings.correlator.nrBlocksPerIntegration;
// super-station beam former
//
// TODO: Super-station beam former is unused, so will likely be
// implemented differently. The code below is only there to show how
// the OLAP.* keys used to be interpreted.
//
// Note: then, also adapt TODO in writeCommonLofarAttributes()
// OLAP.CNProc.tabList[i] = j <=> superstation j contains (input) station i
vector<unsigned> tabList = getUint32Vector("OLAP.CNProc.tabList", emptyVectorUnsigned, true);
// Names for all superstations, including those that are simple copies
// of (input) antenna fields.
vector<string> tabNames = getStringVector("OLAP.tiedArrayStationNames", emptyVectorString, true);
if (tabList.empty()) {
// default: input station list = output station list
settings.correlator.stations.resize(settings.antennaFields.size());
for (size_t i = 0; i < settings.correlator.stations.size(); ++i) {
struct ObservationSettings::Correlator::Station &station = settings.correlator.stations[i];
station.name = settings.antennaFields[i].name;
station.inputStations = vector<size_t>(1, i);
}
} else {
// process super-station beam former list
settings.correlator.stations.resize(tabList.size());
for (size_t i = 0; i < settings.correlator.stations.size(); ++i) {
struct ObservationSettings::Correlator::Station &station = settings.correlator.stations[i];
station.name = tabNames[i];
}
for (size_t i = 0; i < tabList.size(); ++i) {
settings.correlator.stations[tabList[i]].inputStations.push_back(i);
}
}
// Files to output
const vector<ObservationSettings::FileLocation> locations = getFileLocations("Correlated");
settings.correlator.files.resize(settings.subbands.size());
for (size_t i = 0; i < settings.correlator.files.size(); ++i) {
if (i >= locations.size())
THROW(CoInterfaceException, "No correlator filename or location specified for subband " << i);
settings.correlator.files[i].streamNr = i;
settings.correlator.files[i].location = locations[i];
outputProcHosts.insert(settings.correlator.files[i].location.host);
}
}
/* ===============================
* Beamformer pipeline information
* ===============================
*/
// SAP/TAB-crossing counter for the files we generate
size_t bfStreamNr = 0;
bool doCoherentStokes = getBool(
"Observation.DataProducts.Output_CoherentStokes.enabled", false);
bool doIncoherentStokes = getBool(
"Observation.DataProducts.Output_IncoherentStokes.enabled", false);
settings.beamFormer.enabled = doCoherentStokes || doIncoherentStokes;
if (settings.beamFormer.enabled) {
// Parse global settings
// 4096 channels is enough, but allow parset override.
if (!isDefined("Cobalt.BeamFormer.nrHighResolutionChannels")) {
settings.beamFormer.nrHighResolutionChannels = 4096;
} else {
settings.beamFormer.nrHighResolutionChannels =
getUint32("Cobalt.BeamFormer.nrHighResolutionChannels");
ASSERTSTR(powerOfTwo(settings.beamFormer.nrHighResolutionChannels) &&
settings.beamFormer.nrHighResolutionChannels < 65536,
"Parset: Cobalt.BeamFormer.nrHighResolutionChannels must be a power of 2 and < 64k");
}
settings.beamFormer.doFlysEye = getBool("Cobalt.BeamFormer.flysEye", false);
unsigned nrDelayCompCh;
if (!isDefined("Cobalt.BeamFormer.nrDelayCompensationChannels")) {
nrDelayCompCh = calcNrDelayCompensationChannels(settings);
} else {
nrDelayCompCh = getUint32("Cobalt.BeamFormer.nrDelayCompensationChannels");
}
if (nrDelayCompCh > settings.beamFormer.nrHighResolutionChannels) {
nrDelayCompCh = settings.beamFormer.nrHighResolutionChannels;
}
settings.beamFormer.nrDelayCompensationChannels = nrDelayCompCh;
// Derive antennaFields to use for beam forming
settings.beamFormer.antennaFieldNames = getOutputTypeAntennaFieldNames("Cobalt.BeamFormer.stationList",
stations, settings.antennaSet);
LOG_DEBUG_STR("Beamforming " << settings.beamFormer.antennaFieldNames.size() << " fields: " << settings.beamFormer.antennaFieldNames);
ObservationSettings::BeamFormer::StokesSettings
defaultSettings =
{
true, // coherent stokes?
STOKES_I, // StokesType
1, // nrStokes
1, // nrChannels
1, // timeIntegrationFactor
0, // nrSamples
0 // nrSubbandsPerFile
};
settings.beamFormer.coherentSettings = defaultSettings;
settings.beamFormer.incoherentSettings = defaultSettings;
for (unsigned i = 0; i < 2; ++i) {
// Set coherent and incoherent Stokes settings by
// iterating twice.
// TODO: This is an ugly way to do this.
string prefix = "";
struct ObservationSettings::BeamFormer::StokesSettings *stSettings = 0;
// Select coherent or incoherent for this iteration
switch(i) {
case 0:
prefix = "Cobalt.BeamFormer.CoherentStokes";
stSettings = &settings.beamFormer.coherentSettings;
stSettings->coherent = true;
break;
case 1:
prefix = "Cobalt.BeamFormer.IncoherentStokes";
stSettings = &settings.beamFormer.incoherentSettings;
stSettings->coherent = false;
break;
default:
ASSERT(false);
break;
}
// Coherent Stokes
if (i == 0 && !doCoherentStokes)
continue;
// Incoherent Stokes
if (i == 1 && !doIncoherentStokes)
continue;
// Obtain settings of selected stokes
stSettings->type = stokesType(getString(prefix + ".which", "I"));
stSettings->nrStokes = nrStokes(stSettings->type);
stSettings->nrChannels = getUint32(prefix + ".nrChannelsPerSubband", 1);
ASSERT(stSettings->nrChannels > 0);
stSettings->timeIntegrationFactor = getUint32(prefix + ".timeIntegrationFactor", 1);
ASSERT(stSettings->timeIntegrationFactor > 0);
stSettings->nrSubbandsPerFile = getUint32(prefix + ".subbandsPerFile", 0); // 0 or a large nr is interpreted below
stSettings->nrSamples = settings.blockSize / stSettings->timeIntegrationFactor / stSettings->nrChannels;
}
const vector<ObservationSettings::FileLocation> coherent_locations =
getFileLocations("CoherentStokes");
const vector<ObservationSettings::FileLocation> incoherent_locations =
getFileLocations("IncoherentStokes");
size_t coherent_idx = 0;
size_t incoherent_idx = 0;
// Parse all TABs
settings.beamFormer.SAPs.resize(nrSAPs);
for (unsigned i = 0; i < nrSAPs; ++i)
{
struct ObservationSettings::BeamFormer::SAP &sap = settings.beamFormer.SAPs[i];
struct ObservationSettings::SAP &obsSap = settings.SAPs[i];
// Clear counters
sap.nrCoherent = 0;
sap.nrIncoherent = 0;
size_t nrTABs = getUint32(str(format("Observation.Beam[%u].nrTiedArrayBeams") % i), 0);
size_t nrTABSParset = nrTABs;
size_t nrRings = getUint32(str(format("Observation.Beam[%u].nrTabRings") % i), 0);
double ringWidth = getDouble(str(format("Observation.Beam[%u].tabRingSize") % i), 0.0);
// Create a ptr to RingCoordinates object
// If there are tab rings the object will be actuall constructed
// The actual tabs will be extracted after we added all manual tabs
// But we need the number of tabs from rings at this location
std::auto_ptr<RingCoordinates> ptrRingCoords;
if (nrRings > 0) {
const string prefix = str(format("Observation.Beam[%u]") % i);
string directionType = getString(prefix + ".directionType", "J2000");
// Convert to COORDTYPES
RingCoordinates::COORDTYPES type;
if (directionType == "J2000")
type = RingCoordinates::J2000;
else if (directionType == "B1950")
type = RingCoordinates::B1950;
else
type = RingCoordinates::OTHER;
// Create coords object
ptrRingCoords = std::auto_ptr<RingCoordinates>(
new RingCoordinates(nrRings, ringWidth,
RingCoordinates::Coordinate(obsSap.direction.angle1, obsSap.direction.angle2), type));
// Increase the amount of tabs with the number from the coords object
// this might be zero
nrTABs = nrTABSParset + ptrRingCoords->nCoordinates();
} else if (settings.beamFormer.doFlysEye) {
// For Fly's Eye mode we have exactly one TAB per antenna field.
nrTABs = settings.antennaFields.size();
}
sap.TABs.resize(nrTABs);
for (unsigned j = 0; j < nrTABs; ++j)
{
struct ObservationSettings::BeamFormer::TAB &tab = sap.TABs[j];
// Add flys eye tabs
if (settings.beamFormer.doFlysEye)
{
const string prefix = str(format("Observation.Beam[%u]") % i);
tab.direction.type = getString(prefix + ".directionType", "J2000");
tab.direction.angle1 = getDouble(prefix + ".angle1", 0.0);
tab.direction.angle2 = getDouble(prefix + ".angle2", 0.0);
tab.dispersionMeasure = 0.0;
tab.coherent = true;
}
// Add manual tabs and then the tab rings.
else
{
if (j < nrTABSParset) // If we are working on manual tabs
{
const string prefix = str(format("Observation.Beam[%u].TiedArrayBeam[%u]") % i % j);
tab.direction.type = getString(prefix + ".directionType", "J2000");
tab.dispersionMeasure = getDouble(prefix + ".dispersionMeasure", 0.0);
tab.coherent = getBool(prefix + ".coherent", true);
// Incoherent TABs point in the same direction as the SAP by definition. The processing
// pipelines do not use the angles, but the data writer does as part of its annotation.
tab.direction.angle1 = tab.coherent ? getDouble(prefix + ".angle1", 0.0) : obsSap.direction.angle1;
tab.direction.angle2 = tab.coherent ? getDouble(prefix + ".angle2", 0.0) : obsSap.direction.angle2;
}
else
{
// Get the pointing for the tabrings.
// Subtract the number of manual to get index in the ringCoords
RingCoordinates::Coordinate pointing =
ptrRingCoords->coordinates().at(j - nrTABSParset);
// Note that RingCoordinates provide *relative* coordinates, and
// we need absolute ones.
tab.direction.type = ptrRingCoords->coordTypeAsString();
tab.direction.angle1 = obsSap.direction.angle1 + pointing.first; // TODO: missing projection bug (also below for angle2)
tab.direction.angle2 = obsSap.direction.angle2 + pointing.second;
// One dispersion measure for all TABs in rings is inconvenient,
// but not used anyway. Unclear if setting to 0.0 is better/worse.
const string prefix = str(format("Cobalt.Observation.Beam[%u]") % i);
tab.dispersionMeasure = getDouble(prefix + ".tabRingDispersionMeasure", 0.0);
tab.coherent = true; // rings cannot be incoherent, since we use non-(0,0) pointings
}
}
if (tab.coherent)
sap.nrCoherent++;
else
sap.nrIncoherent++;
struct ObservationSettings::BeamFormer::StokesSettings &stSettings =
tab.coherent ? settings.beamFormer.coherentSettings
: settings.beamFormer.incoherentSettings;
// If needed, limit to / apply default: the #subbands in this SAP.
size_t nrSubbandsPerFile = stSettings.nrSubbandsPerFile;
if (nrSubbandsPerFile == 0 ||
nrSubbandsPerFile > settings.SAPs[i].subbands.size()) {
nrSubbandsPerFile = settings.SAPs[i].subbands.size();
}
// Generate file list
unsigned nrParts = max(1UL, ceilDiv(settings.SAPs[i].subbands.size(), nrSubbandsPerFile));
tab.files.resize(stSettings.nrStokes * nrParts);
for (size_t s = 0; s < stSettings.nrStokes; ++s)
{
for (unsigned part = 0; part < nrParts; ++part)
{
struct ObservationSettings::BeamFormer::File file;
file.streamNr = bfStreamNr++;
file.sapNr = i;
file.tabNr = j;
file.stokesNr = s;
file.partNr = part;
file.coherent = tab.coherent;
if (file.coherent) {
file.coherentIdxInSAP = sap.nrCoherent - 1;
if (coherent_idx >= coherent_locations.size())
THROW(CoInterfaceException, "No CoherentStokes filename or location specified for file idx " << file.streamNr);
file.location = coherent_locations[coherent_idx++];
} else {
file.incoherentIdxInSAP = sap.nrIncoherent - 1;
if (incoherent_idx >= incoherent_locations.size())
THROW(CoInterfaceException, "No IncoherentStokes filename or location specified for file idx " << file.streamNr);
file.location = incoherent_locations[incoherent_idx++];
}
file.firstSubbandIdx = settings.SAPs[i].subbands[0].idx +
part * nrSubbandsPerFile;
file.lastSubbandIdx = min(file.firstSubbandIdx + nrSubbandsPerFile,
// last file(s) in part series can have fewer subbands
settings.SAPs[i].subbands[0].idx +
settings.SAPs[i].subbands.size());
ASSERTSTR(file.firstSubbandIdx < file.lastSubbandIdx,
"strmNr=" << file.streamNr << " 1stIdx=" << file.firstSubbandIdx << " lstIdx=" << file.lastSubbandIdx);
ASSERTSTR(file.lastSubbandIdx <= settings.subbands.size(),
"strmNr=" << file.streamNr << " lstIdx=" << file.lastSubbandIdx << " nSb=" << settings.subbands.size());
tab.files[s * nrParts + part] = file;
settings.beamFormer.files.push_back(file);
outputProcHosts.insert(file.location.host);
}
}
}
}
settings.beamFormer.dedispersionFFTsize = getUint32("Cobalt.BeamFormer.dedispersionFFTsize", settings.blockSize);
}
/* ===============================
* RSPRaw collection information
* ===============================
*/
settings.rspRaw.enabled = getBool("Observation.DataProducts.Output_RSPRaw.enabled", false);
if (settings.rspRaw.enabled) {
if (isDefined("Cobalt.RSPRaw.startTime")) {
settings.rspRaw.startTime = getTime("Cobalt.RSPRaw.startTime", "");
} else {
settings.rspRaw.startTime = (time_t)settings.startTime;
}
if (isDefined("Cobalt.RSPRaw.stopTime")) {
settings.rspRaw.stopTime = getTime("Cobalt.RSPRaw.stopTime", "");
} else {
settings.rspRaw.stopTime = (time_t)settings.stopTime;
}
// Read antenna field names (via stationList key) to use for RSP raw output
settings.rspRaw.antennaFieldNames = getOutputTypeAntennaFieldNames("Cobalt.RSPRaw.stationList",
stations, settings.antennaSet);
if (settings.rspRaw.antennaFieldNames.empty()) {
settings.rspRaw.antennaFieldNames = settings.antennaFieldNames; // default
}
LOG_INFO_STR("RSP raw: " << settings.rspRaw.antennaFieldNames.size() << " fields: " << settings.rspRaw.antennaFieldNames);
/*
* RSPRaw is not a supported output type in the rest of the system (it is COBALT-only).
* The Observatory specifies a correlation or beamforming observation, then copies a
* parset override in place. Hence, all RSPRaw settings must be auto-detectable (sane defaults),
* possibly from correlation or beamforming settings, even if those are not enabled (anymore)!
*/
vector<ObservationSettings::FileLocation> locations = getFileLocations("RSPRaw");
const bool locationsAutoDetected = locations.empty();
if (locationsAutoDetected) {
const vector<ObservationSettings::FileLocation> correlated_locations = getFileLocations("Correlated");
const vector<ObservationSettings::FileLocation> coherent_locations = getFileLocations("CoherentStokes");
const vector<ObservationSettings::FileLocation> incoherent_locations = getFileLocations("IncoherentStokes");
locations.insert(locations.end(), correlated_locations.begin(), correlated_locations.end());
locations.insert(locations.end(), coherent_locations.begin(), coherent_locations.end());
locations.insert(locations.end(), incoherent_locations.begin(), incoherent_locations.end());
if (locations.empty()) {
THROW(CoInterfaceException, "No RSP raw locations specified and could not derive any location(s) from correlated, coherent, or incoherent file locations (even if not enabled).");
}
// Purge duplicate hostname locations. Clearing filenames first makes this easier.
for (size_t i = 0; i < locations.size(); ++i) {
locations[i].filename.clear();
}
std::sort(locations.begin(), locations.end());
std::unique(locations.begin(), locations.end());
}
// Assign output file locations
vector<unsigned> maxNrBeamletsPerBoard; // normally we end up with 4 sending RSP boards per ant field
unsigned locationIdx = 0;
for (size_t i = 0; i < settings.rspRaw.antennaFieldNames.size(); ++i) {
const ObservationSettings::AntennaFieldName& afName = settings.rspRaw.antennaFieldNames[i];
vector<ObservationSettings::AntennaFieldName>::iterator nameIt = std::find(settings.antennaFieldNames.begin(),
settings.antennaFieldNames.end(), afName);
ASSERTSTR(nameIt != settings.antennaFieldNames.end(), "RSP raw antenna field name " << afName.fullName() <<
" missing in observation antenna field list"); // getOutputTypeAntennaFieldNames() must have avoided this
vector<ObservationSettings::AntennaField>::iterator afIt = settings.antennaFields.begin() +
std::distance(settings.antennaFieldNames.begin(), nameIt);
size_t nrStreams = afIt->inputStreams.size();
if (maxNrBeamletsPerBoard.size() < nrStreams) {
maxNrBeamletsPerBoard.resize(nrStreams);
}
for (unsigned s = 0; s < nrStreams; ++s) {
if (locationIdx >= locations.size()) {
if (locationsAutoDetected) {
locationIdx = 0; // if auto-detected, assign locations in round-robin order
} else { // Either you specify no locations and we do it all, or you specify a complete list. No half-baked RSP raw settings!
THROW(CoInterfaceException, "No RSP raw filename or location specified for antenna field " <<
afName.fullName() << " stream " << s);
}
}
ObservationSettings::RSPRaw::File file;
file.antennaFieldNameIdx = i;
file.streamNr = s;
locations[locationIdx].filename = str(format("L%u_%s_%u_rsp.raw") % settings.observationID % afName.fullName().c_str() % s);
file.location = locations[locationIdx];
settings.rspRaw.files.push_back(file);
outputProcHosts.insert(file.location.host);
locationIdx += 1;
maxNrBeamletsPerBoard[s] = std::max(maxNrBeamletsPerBoard[s],
(unsigned)std::count(afIt->rspBoardMap.begin(),
afIt->rspBoardMap.end(), s));
}
}
// Read nrBeamletsPerBoardList and apply sane defaults wrt max nr beamlets per board in the obs
settings.rspRaw.nrBeamletsPerBoardList = getUint32Vector("Cobalt.RSPRaw.nrBeamletsPerBoardList",
emptyVectorUnsigned, true);
if (settings.rspRaw.nrBeamletsPerBoardList.size() < maxNrBeamletsPerBoard.size()) {
settings.rspRaw.nrBeamletsPerBoardList.resize(maxNrBeamletsPerBoard.size(), UINT_MAX);
}
for (size_t i = 0; i < settings.rspRaw.nrBeamletsPerBoardList.size(); ++i) {
settings.rspRaw.nrBeamletsPerBoardList[i] = std::min(settings.rspRaw.nrBeamletsPerBoardList[i],
maxNrBeamletsPerBoard[i]);
}
}
// set output hosts
settings.outputProcHosts.clear();
for (set<string>::const_iterator i = outputProcHosts.begin(); i != outputProcHosts.end(); ++i) {
if (i->empty()) {
continue;
}
settings.outputProcHosts.push_back(*i);
}
return settings;
}
vector<struct ObservationSettings::AntennaFieldName>
Parset::getOutputTypeAntennaFieldNames(const string& stationListKey,
const vector<string>& stations,
const string& antennaSet) const
{
vector<string> newStations = getStringVector(stationListKey, vector<string>(), true);
if (newStations.empty()) {
// if [], default to all stations
newStations = stations;
} else {
// filter out stations not in the observation. This allows basic defaults ([CS002..7] etc) to be used, even if
// they contain stations that are not available.
vector<string> filteredList;
for (size_t i = 0; i < newStations.size(); i++) {
if (std::find(stations.begin(), stations.end(),
newStations[i]) != stations.end()) {
filteredList.push_back(newStations[i]);
} else {
LOG_WARN_STR("Removing station " << newStations[i] << " from interpreted " << stationListKey <<
", as it is not participating in the observation: " << settings.rawStationList);
}
}
newStations = filteredList;
}
// Note that this could happen by accident if too many stations are taken out of the observation, leaving none
ASSERTSTR(!newStations.empty(), "No stations left! Aborting.");
// Sort stations (CS, RS, int'l), to get a consistent and predictable order.
std::sort(newStations.begin(), newStations.end(), compareStationNames);
return ObservationSettings::expandAntennaFieldNames(newStations, antennaSet);
}
bool Parset::nodeReadsAntennaFieldData(const struct ObservationSettings& settings,
const string& nodeName) const {
for (size_t i = 0; i < settings.antennaFields.size(); ++i) {
if (settings.antennaFields[i].receiver == nodeName)
return true;
}
return false;
}
// pos and ref must each have at least size 3.
double Parset::distanceVec3(const vector<double>& pos,
const vector<double>& ref) {
double dx = pos.at(0) - ref.at(0);
double dy = pos.at(1) - ref.at(1);
double dz = pos.at(2) - ref.at(2);
return std::sqrt(dx * dx + dy * dy + dz * dz);
}
// max delay distance in meters; static per obs, i.e. unprojected (some upper bound)
double Parset::maxDelayDistance(const struct ObservationSettings& settings) {
// Available in each parset through included StationCalibration.parset.
const vector<double> refPhaseCenter =
settings.delayCompensation.referencePhaseCenter;
double maxDelayDistance = 0.0;
for (unsigned af = 0; af < settings.antennaFields.size(); af++) {
vector<double> phaseCenter = settings.antennaFields[af].phaseCenter;
double delayDist = distanceVec3(phaseCenter, refPhaseCenter);
if (delayDist > maxDelayDistance)
maxDelayDistance = delayDist;
}
return maxDelayDistance;
}
// Top frequency of highest subband observed in Hz.
double Parset::maxObservationFrequency(const struct ObservationSettings& settings,
double subbandWidth) {
double maxCentralFrequency = 0.0;
for (unsigned sb = 0; sb < settings.subbands.size(); sb++) {
if (settings.subbands[sb].centralFrequency > maxCentralFrequency)
maxCentralFrequency = settings.subbands[sb].centralFrequency;
}
return maxCentralFrequency + 0.5 * subbandWidth;
}
// Determine the nr of channels per subband for delay compensation.
// We aim for the visibility samples to be good to about 1 part in 1000.
// See the Cobalt beamformer design doc for more info on how and why.
unsigned Parset::calcNrDelayCompensationChannels(const struct ObservationSettings& settings) {
double d = maxDelayDistance(settings); // in meters
if (d < 400.0)
d = 400.0; // for e.g. CS002LBA only; CS001LBA-CS002LBA is ~441 m
double nu_clk = settings.clockMHz * 1e6; // in Hz
double subbandWidth = nu_clk / 1024.0;
double nu = maxObservationFrequency(settings, subbandWidth); // in Hz
if (nu < 10e6)
nu = 10e6;
// deltaPhi is the phase change over t_u in rad: ~= sqrt(24.0*1e-3) (Taylor approx)
// Design doc states deltaPhi must be <= 0.155
double deltaPhi = 0.15491933384829667540;
const double omegaE = 7.29211585e-5; // sidereal angular velocity of Earth in rad/s
const double speedOfLight = 299792458.0; // in vacuum in m/s
double phi = 2.0 * M_PI * nu * omegaE / speedOfLight * d /* * cos(delta) (=1) */;
// Fringe stopping of the residual delay is done at an interval t_u.
double t_u = deltaPhi / phi;
double max_n_FFT = t_u * subbandWidth;
unsigned max_n_FFT_pow2 = roundUpToPowerOfTwo(((unsigned)max_n_FFT + 1) / 2); // round down to pow2
// Little benefit beyond 256; more work and lower GPU FFT efficiency.
if (max_n_FFT_pow2 > 256)
max_n_FFT_pow2 = 256;
// This lower bound comes from the derivation in the design doc.
// It is pi*cbrt(2.0/(9.0*1e-3)) (also after Taylor approx).
const double min_n_ch = 19.02884235042726617904; // design doc states n_ch >= 19
const unsigned min_n_ch_pow2 = 32; // rounded up to pow2 for efficient FFT
if (max_n_FFT_pow2 < min_n_ch_pow2) {
LOG_WARN_STR("Parset: calcNrDelayCompensationChannels(): upper bound " <<
max_n_FFT << " ends up below lower bound " << min_n_ch <<
". Returning " << min_n_ch_pow2 << ". Stations far from"
" the core may not be delay compensated optimally.");
max_n_FFT_pow2 = min_n_ch_pow2;
}
return max_n_FFT_pow2;
}
size_t ObservationSettings::nrBlocks() const {
return static_cast<size_t>(floor((stopTime - startTime) * subbandWidth() / blockSize));
}
size_t ObservationSettings::nrRspRawBlocks() const {
return static_cast<size_t>(floor((rspRaw.stopTime - rspRaw.startTime) * subbandWidth() / blockSize));
}
double ObservationSettings::subbandWidth() const {
return 1.0 * clockHz() / 1024;
}
double ObservationSettings::sampleDuration() const {
return 1.0 / subbandWidth();
}
unsigned ObservationSettings::nrCrossPolarisations() const {
return nrPolarisations * nrPolarisations;
}
double ObservationSettings::blockDuration() const {
return blockSize * sampleDuration();
}
vector<unsigned> ObservationSettings::SAP::subbandIndices() const {
vector<unsigned> indices;
for (size_t i = 0; i < subbands.size(); ++i) {
indices.push_back(subbands[i].idx);
}
return indices;
}
double ObservationSettings::Correlator::integrationTime() const {
return 1.0 * nrSamplesPerIntegration() / channelWidth;
}
size_t ObservationSettings::Correlator::nrSamplesPerIntegration() const {
return nrSamplesPerBlock / nrIntegrationsPerBlock * nrBlocksPerIntegration;
}
std::vector<struct ObservationSettings::FileLocation> Parset::getFileLocations(const std::string outputType) const {
const string prefix = "Observation.DataProducts.Output_" + outputType;
vector<string> empty;
vector<string> filenames = getStringVector(prefix + ".filenames", empty, true);
vector<string> locations = getStringVector(prefix + ".locations", empty, true);
size_t numValidEntries = std::min(filenames.size(), locations.size());
vector<struct ObservationSettings::FileLocation> result(numValidEntries);
for (size_t i = 0; i < numValidEntries; ++i) {
ObservationSettings::FileLocation &location = result[i];
const vector<string> host_dir = StringUtil::split(locations[i], ':');
location.filename = filenames[i];
if (host_dir.size() == 2) {
location.cluster = "";
location.host = host_dir[0];
location.directory = host_dir[1];
} else if (host_dir.size() == 3) {
location.cluster = host_dir[0];
location.host = host_dir[1];
location.directory = host_dir[2];
} else {
THROW(CoInterfaceException, "Location must adhere to '[cluster:]host:directory' in " << prefix << ".locations: " << locations[i]);
}
}
return result;
}
std::vector<unsigned> ObservationSettings::AntennaFieldName::indices(const std::vector<AntennaFieldName> &subSet, const std::vector<AntennaFieldName> &fullSet)
{
vector<unsigned> mapping(subSet.size());
vector<AntennaFieldName>::const_iterator idx;
for(size_t i = 0; i < subSet.size(); i++) {
idx = std::find(fullSet.begin(),
fullSet.end(),
subSet[i]);
if (idx == fullSet.end())
THROW(CoInterfaceException, "Could not find antenna field " << subSet[i].fullName());
// add to mapping
mapping[i] = idx - fullSet.begin();
}
return mapping;
}
bool ObservationSettings::BeamFormer::anyCoherentTABs() const
{
return enabled && maxNrCoherentTABsPerSAP() > 0;
}
bool ObservationSettings::BeamFormer::anyIncoherentTABs() const
{
return enabled && maxNrIncoherentTABsPerSAP() > 0;
}
size_t ObservationSettings::BeamFormer::maxNrTABsPerSAP() const
{
size_t max = 0;
for (size_t sapNr = 0; sapNr < SAPs.size(); ++sapNr)
max = std::max(max, SAPs[sapNr].TABs.size());
return max;
}
size_t ObservationSettings::BeamFormer::maxNrCoherentTABsPerSAP() const
{
size_t max = 0;
for (size_t sapNr = 0; sapNr < SAPs.size(); ++sapNr)
max = std::max(max, SAPs[sapNr].nrCoherent);
return max;
}
size_t ObservationSettings::BeamFormer::maxNrIncoherentTABsPerSAP() const
{
size_t max = 0;
for (size_t sapNr = 0; sapNr < SAPs.size(); ++sapNr)
max = std::max(max, SAPs[sapNr].nrIncoherent);
return max;
}
void Parset::updateSettings()
{
settings = observationSettings();
}
void Parset::checkInputConsistency() const
{
}
void Parset::check() const
{
}
// The real stop time can be a bit further than the one actually specified,
// because we process in blocks.
double Parset::getRealStopTime() const
{
return settings.startTime +
settings.nrBlocks() * settings.blockDuration();
}
std::string Parset::getHostName(OutputType outputType, unsigned streamNr) const
{
if (outputType == CORRELATED_DATA)
return settings.correlator.files.at(streamNr).location.host;
if (outputType == BEAM_FORMED_DATA)
return settings.beamFormer.files.at(streamNr).location.host;
if (outputType == RSP_RAW_DATA)
return settings.rspRaw.files.at(streamNr).location.host;
return "unknown";
}
std::string Parset::getFileName(OutputType outputType, unsigned streamNr) const
{
if (outputType == CORRELATED_DATA)
return settings.correlator.files.at(streamNr).location.filename;
if (outputType == BEAM_FORMED_DATA)
return settings.beamFormer.files.at(streamNr).location.filename;
if (outputType == RSP_RAW_DATA)
return settings.rspRaw.files.at(streamNr).location.filename;
return "unknown";
}
std::string Parset::getDirectoryName(OutputType outputType, unsigned streamNr) const
{
if (outputType == CORRELATED_DATA)
return settings.correlator.files.at(streamNr).location.directory;
if (outputType == BEAM_FORMED_DATA)
return settings.beamFormer.files.at(streamNr).location.directory;
if (outputType == RSP_RAW_DATA)
return settings.rspRaw.files.at(streamNr).location.directory;
return "unknown";
}
unsigned Parset::nrStreams(OutputType outputType, bool force) const
{
if (!outputThisType(outputType) && !force)
return 0;
switch (outputType) {
case CORRELATED_DATA: return settings.correlator.files.size();
case BEAM_FORMED_DATA: return settings.beamFormer.files.size();
case RSP_RAW_DATA: return settings.rspRaw.files.size();
default: THROW(CoInterfaceException, "Unknown output type");
}
}
unsigned Parset::getRSPRawOutputStreamIdx(const std::string& antennaFieldName,
unsigned boardNr) const
{
const std::vector<ObservationSettings::RSPRaw::File>& files = settings.rspRaw.files;
for (size_t i = 0; i < files.size(); ++i) {
if (settings.rspRaw.antennaFieldNames.at(files[i].antennaFieldNameIdx).fullName() == antennaFieldName &&
files[i].streamNr == boardNr) {
return i;
}
}
THROW(CoInterfaceException, "Unknown output stream for antenna field name " <<
antennaFieldName << " and RSP board nr " << boardNr);
}
size_t Parset::nrBytesPerComplexSample() const
{
return 2 * nrBitsPerSample() / 8;
}
std::vector<double> Parset::centroidPos(const std::string &stations) const
{
std::vector<double> Centroid, posList, pos;
Centroid.resize(3);
vector<string> stationList = StringUtil::split(stations, '+');
for (unsigned i = 0; i < stationList.size(); i++)
{
pos = position(stationList[i]);
posList.insert(posList.end(), pos.begin(), pos.end());
}
for (unsigned i = 0; i < posList.size() / 3; i++)
{
Centroid[0] += posList[3 * i]; // x in m
Centroid[1] += posList[3 * i + 1]; // y in m
Centroid[2] += posList[3 * i + 2]; // z in m
}
Centroid[0] /= posList.size() / 3;
Centroid[1] /= posList.size() / 3;
Centroid[2] /= posList.size() / 3;
return Centroid;
}
vector<double> Parset::position( const std::string &name ) const
{
const string positionKey = "PIC.Core." + name + ".position";
const string phaseCenterKey = "PIC.Core." + name + ".phaseCenter";
if (isDefined(positionKey))
return getDoubleVector(positionKey, true);
else
return getDoubleVector(phaseCenterKey, true);
}
MultiDimArray<double,2> Parset::positions() const
{
const vector<ObservationSettings::Correlator::Station> &stations = settings.correlator.stations;
MultiDimArray<double,2> list(boost::extents[stations.size()][3]);
for (size_t i = 0; i < stations.size(); i++) {
const string &name = stations[i].name;
vector<double> pos;
if (name.find("+") != string::npos)
pos = centroidPos(name); // super station
else
pos = position(name);
ASSERT(pos.size() == 3);
list[i][0] = pos[0];
list[i][1] = pos[1];
list[i][2] = pos[2];
}
return list;
}
time_t Parset::getTime(const std::string &name, const std::string &defaultValue) const
{
return LOFAR::to_time_t(boost::posix_time::time_from_string(getString(name, defaultValue)));
}
std::string Parset::name() const
{
return itsName;
}
ssize_t ObservationSettings::antennaFieldIndex(const std::string &name) const
{
for (size_t a = 0; a < antennaFields.size(); ++a) {
if (antennaFields[a].name == name)
return a;
}
return -1;
}
std::vector<std::string> ObservationSettings::AntennaFieldName::names( const std::vector<ObservationSettings::AntennaFieldName> &list )
{
vector<string> names(list.size());
for (unsigned af = 0; af < names.size(); ++af)
names[af] = list[af].fullName();
return names;
}
unsigned Parset::nrTabStations() const
{
return settings.correlator.stations.size();
}
std::vector<std::string> Parset::mergedStationNames() const
{
std::vector<string> tabStations;
for (size_t i = 0; i < settings.correlator.stations.size(); ++i)
tabStations.push_back(settings.correlator.stations[i].name);
return tabStations;
}
unsigned Parset::nrMergedStations() const
{
return settings.correlator.stations.size();
}
unsigned Parset::nrBaselines() const
{
size_t stations = settings.correlator.stations.size();
return stations * (stations + 1) / 2;
}
double Parset::sampleDuration() const
{
return 1.0 / settings.subbandWidth();
}
unsigned Parset::dedispersionFFTsize() const
{
return settings.beamFormer.dedispersionFFTsize;
}
unsigned Parset::nrBitsPerSample() const
{
return settings.nrBitsPerSample;
}
unsigned Parset::nrProcessedOutputTypes() const
{
unsigned nr = 0;
if (settings.correlator.enabled) {
nr += 1;
}
if (settings.beamFormer.anyCoherentTABs()) {
nr += 1;
}
if (settings.beamFormer.anyIncoherentTABs()) {
nr += 1;
}
// RSP raw is not a processed output type, so not counted: see caller(s) for impact if changing this
return nr;
}
bool Parset::outputThisType(OutputType outputType) const
{
switch (outputType) {
case CORRELATED_DATA: return settings.correlator.enabled;
case BEAM_FORMED_DATA: return settings.beamFormer.enabled;
case RSP_RAW_DATA: return settings.rspRaw.enabled;
default: THROW(CoInterfaceException, "Unknown output type");
}
}
string Parset::positionType() const
{
return "ITRF";
}
double Parset::channel0Frequency(size_t subband, size_t nrChannels) const
{
const double sbFreq = settings.subbands[subband].centralFrequency;
if (nrChannels == 1)
return sbFreq;
// if the 2nd PPF is used, the subband is shifted half a channel
// downwards, so subtracting half a subband results in the
// center of channel 0 (instead of the bottom).
return sbFreq - 0.5 * settings.subbandWidth();
}
string Parset::PVSS_TempObsName() const
{
return getString("_DPname", "LOFAR_ObsSW_TempObs_0001");
}
} // namespace Cobalt
} // namespace LOFAR