From eddcdefb3a717ea8ad2b66d9acdbf0f320b3445d Mon Sep 17 00:00:00 2001 From: John Romein <romein@astron.nl> Date: Fri, 1 Aug 2008 07:02:36 +0000 Subject: [PATCH] bug 225: First steps toward 4 RSPs/ION. Definitely needs more polishing. --- .../CEP/CS1/CS1_BGLProc/src/BGL_Processing.cc | 15 +- Appl/CEP/CS1/CS1_BGLProc/src/BGL_Processing.h | 4 - Appl/CEP/CS1/CS1_BGLProc/src/InputData.h | 16 +- Appl/CEP/CS1/CS1_BGLProc/src/PPF.cc | 19 +-- Appl/CEP/CS1/CS1_BGLProc/src/PPF.h | 4 +- Appl/CEP/CS1/CS1_BGLProc/src/Transpose.cc | 57 +++---- Appl/CEP/CS1/CS1_BGLProc/src/Transpose.h | 21 +-- Appl/CEP/CS1/CS1_BGLProc/src/TransposedData.h | 11 +- Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.cc | 46 +++--- Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.h | 6 +- Appl/CEP/CS1/CS1_IONProc/src/CS1_ION_main.cc | 39 +++-- Appl/CEP/CS1/CS1_IONProc/src/InputSection.cc | 141 +++++++++++------- Appl/CEP/CS1/CS1_IONProc/src/InputSection.h | 59 ++++---- Appl/CEP/CS1/CS1_IONProc/src/InputThread.cc | 4 +- Appl/CEP/CS1/CS1_IONProc/src/InputThread.h | 1 + .../include/CS1_Interface/BGL_Configuration.h | 26 ---- .../include/CS1_Interface/CS1_Parset.h | 26 +++- .../include/CS1_Interface/ION_to_CN.h | 105 ------------- .../include/CS1_Interface/Makefile.am | 2 +- .../include/CS1_Interface/SubbandMetaData.h | 69 +++++++++ .../CS1_Interface/src/BGL_Configuration.cc | 14 -- Appl/CEP/CS1/CS1_Interface/src/CS1_Parset.cc | 72 ++++++--- Appl/CEP/CS1/CS1_Run/src/CS1.parset | 73 +++++---- Appl/CEP/CS1/CS1_Run/src/CS1_Hosts.py | 2 +- Appl/CEP/CS1/CS1_Run/src/CS1_Parset.py | 4 +- Appl/CEP/CS1/CS1_Run/src/CS1_Run.py | 6 +- Appl/CEP/CS1/CS1_Run/src/CS1_Sections.py | 4 +- Appl/CEP/CS1/CS1_Run/src/CS1_Stations.py | 2 +- Appl/CEP/CS1/CS1_Run/src/OLAP.parset | 82 +++++----- .../CS1/CS1_Storage/src/WH_SubbandWriter.cc | 2 +- 30 files changed, 481 insertions(+), 451 deletions(-) delete mode 100644 Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/ION_to_CN.h create mode 100644 Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/SubbandMetaData.h diff --git a/Appl/CEP/CS1/CS1_BGLProc/src/BGL_Processing.cc b/Appl/CEP/CS1/CS1_BGLProc/src/BGL_Processing.cc index d28abaa215f..bf853201cd5 100644 --- a/Appl/CEP/CS1/CS1_BGLProc/src/BGL_Processing.cc +++ b/Appl/CEP/CS1/CS1_BGLProc/src/BGL_Processing.cc @@ -266,12 +266,7 @@ void BGL_Processing::preprocess(BGL_Configuration &configuration) itsIsTransposeInput = inputPsetIndex != inputPsets.end(); itsIsTransposeOutput = outputPsetIndex != outputPsets.end(); - unsigned nrStations = configuration.nrStations(); - - itsBeamlet2beams = configuration.beamlet2beams(); - itsSubband2Index = configuration.subband2Index(); - itsNrBeams = configuration.nrBeams(); - + unsigned nrStations = configuration.nrStations(); unsigned nrBaselines = nrStations * (nrStations + 1) / 2; unsigned nrSamplesPerIntegration = configuration.nrSamplesPerIntegration(); unsigned nrSamplesToBGLProc = configuration.nrSamplesToBGLProc(); @@ -324,7 +319,7 @@ void BGL_Processing::preprocess(BGL_Configuration &configuration) #if defined HAVE_MPI if (itsIsTransposeInput || itsIsTransposeOutput) { - itsTranspose = new Transpose(itsIsTransposeInput, itsIsTransposeOutput, myCore, nrStations, itsNrBeams); + itsTranspose = new Transpose(itsIsTransposeInput, itsIsTransposeOutput, myCore); itsTranspose->setupTransposeParams(itsLocationInfo, inputPsets, outputPsets, itsInputData, itsTransposedData); } #endif @@ -344,7 +339,7 @@ void BGL_Processing::process() static NSTimer readTimer("receive timer", true); readTimer.start(); - itsInputData->read(itsStream, itsNrBeams); + itsInputData->read(itsStream); readTimer.stop(); } @@ -361,7 +356,7 @@ MPI_Barrier(itsTransposeGroup); NSTimer transposeTimer("one transpose", LOG_CONDITION); transposeTimer.start(); itsTranspose->transpose(itsInputData, itsTransposedData); - itsTranspose->transposeMetaData(itsInputData, itsTransposedData, itsBeamlet2beams[itsSubband2Index[itsCurrentSubband]] - 1); + itsTranspose->transposeMetaData(itsInputData, itsTransposedData); transposeTimer.stop(); #endif } @@ -374,7 +369,7 @@ MPI_Barrier(itsTransposeGroup); computeTimer.start(); itsPPF->computeFlags(itsTransposedData, itsFilteredData); - itsPPF->filter(itsCenterFrequencies[itsSubband2Index[itsCurrentSubband]], itsTransposedData, itsFilteredData); + itsPPF->filter(itsCenterFrequencies[itsCurrentSubband], itsTransposedData, itsFilteredData); itsCorrelator->computeFlagsAndCentroids(itsFilteredData, itsCorrelatedData); itsCorrelator->correlate(itsFilteredData, itsCorrelatedData); diff --git a/Appl/CEP/CS1/CS1_BGLProc/src/BGL_Processing.h b/Appl/CEP/CS1/CS1_BGLProc/src/BGL_Processing.h index 998bd1fcaa2..3aa1f11e538 100644 --- a/Appl/CEP/CS1/CS1_BGLProc/src/BGL_Processing.h +++ b/Appl/CEP/CS1/CS1_BGLProc/src/BGL_Processing.h @@ -95,10 +95,6 @@ class BGL_Processing { unsigned itsFirstSubband, itsCurrentSubband, itsLastSubband, itsSubbandIncrement; bool itsIsTransposeInput, itsIsTransposeOutput; - std::vector<signed> itsBeamlet2beams; - std::vector<unsigned> itsSubband2Index; - unsigned itsNrBeams; - Arena *itsArenas[2]; InputData *itsInputData; TransposedData *itsTransposedData; diff --git a/Appl/CEP/CS1/CS1_BGLProc/src/InputData.h b/Appl/CEP/CS1/CS1_BGLProc/src/InputData.h index 7958dbd09e5..a047ba6289d 100644 --- a/Appl/CEP/CS1/CS1_BGLProc/src/InputData.h +++ b/Appl/CEP/CS1/CS1_BGLProc/src/InputData.h @@ -4,7 +4,7 @@ #include <Common/lofar_complex.h> #include <Common/DataConvert.h> #include <CS1_Interface/CS1_Config.h> -#include <CS1_Interface/ION_to_CN.h> +#include <CS1_Interface/SubbandMetaData.h> #include <Stream/Stream.h> #include <CS1_Interface/Allocator.h> @@ -12,6 +12,8 @@ #include <boost/multi_array.hpp> +#include <vector> + namespace LOFAR { namespace CS1 { @@ -22,7 +24,7 @@ class InputData InputData(const Arena &, unsigned nrSubbands, unsigned nrSamplesToBGLProc); ~InputData(); - void read(Stream *, const unsigned nrBeams); + void read(Stream *); static size_t requiredSize(unsigned nrSubbands, unsigned nrSamplesToBGLProc); @@ -34,7 +36,7 @@ class InputData public: boost::multi_array_ref<SampleType, 3> samples; //[outputPsets.size()][itsCS1PS->nrSamplesToBGLProc()][NR_POLARIZATIONS] - ION_to_CN metaData; + std::vector<SubbandMetaData> metaData; //[outputPsets.size()] }; @@ -47,7 +49,8 @@ inline size_t InputData::requiredSize(unsigned nrSubbands, unsigned nrSamplesToB inline InputData::InputData(const Arena &arena, unsigned nrSubbands, unsigned nrSamplesToBGLProc) : allocator(arena), - samples(static_cast<SampleType *>(allocator.allocate(requiredSize(nrSubbands, nrSamplesToBGLProc), 32)), boost::extents[nrSubbands][nrSamplesToBGLProc][NR_POLARIZATIONS]) + samples(static_cast<SampleType *>(allocator.allocate(requiredSize(nrSubbands, nrSamplesToBGLProc), 32)), boost::extents[nrSubbands][nrSamplesToBGLProc][NR_POLARIZATIONS]), + metaData(nrSubbands) { } @@ -58,9 +61,10 @@ inline InputData::~InputData() } -inline void InputData::read(Stream *str, const unsigned nrBeams) +inline void InputData::read(Stream *str) { - metaData.read(str, nrBeams); + // read all metadata + str->read(&metaData[0], metaData.size() * sizeof(SubbandMetaData)); // now read all subbands using one recvBlocking call, even though the ION // sends all subbands one at a time diff --git a/Appl/CEP/CS1/CS1_BGLProc/src/PPF.cc b/Appl/CEP/CS1/CS1_BGLProc/src/PPF.cc index 2e11d4f4e11..c191ee27f4e 100644 --- a/Appl/CEP/CS1/CS1_BGLProc/src/PPF.cc +++ b/Appl/CEP/CS1/CS1_BGLProc/src/PPF.cc @@ -153,7 +153,8 @@ void PPF::computeFlags(const TransposedData *transposedData, FilteredData *filte #else for (unsigned stat = 0; stat < itsNrStations; stat ++) { filteredData->flags[stat].reset(); - const SparseSet<unsigned>::Ranges &ranges = transposedData->flags[stat].getRanges(); + SparseSet<unsigned> flags = transposedData->metaData[stat].getFlags(); + const SparseSet<unsigned>::Ranges &ranges = flags.getRanges(); for (SparseSet<unsigned>::const_iterator it = ranges.begin(); it != ranges.end(); it ++) { unsigned begin = std::max(0, (signed) it->begin / NR_SUBBAND_CHANNELS - NR_TAPS + 1); @@ -170,9 +171,9 @@ void PPF::computeFlags(const TransposedData *transposedData, FilteredData *filte #if defined PPF_C_IMPLEMENTATION -fcomplex PPF::phaseShift(unsigned time, unsigned chan, double baseFrequency, const TransposedData::DelayIntervalType &delay) const +fcomplex PPF::phaseShift(unsigned time, unsigned chan, double baseFrequency, double delayAtBegin, double delayAfterEnd) const { - double timeInterpolatedDelay = delay.delayAtBegin + ((double) time / itsNrSamplesPerIntegration) * (delay.delayAfterEnd - delay.delayAtBegin); + double timeInterpolatedDelay = delayAtBegin + ((double) time / itsNrSamplesPerIntegration) * (delayAfterEnd - delayAtBegin); double frequency = baseFrequency + chan * itsChannelBandwidth; double phaseShift = timeInterpolatedDelay * frequency; double phi = -2 * M_PI * phaseShift; @@ -182,10 +183,10 @@ fcomplex PPF::phaseShift(unsigned time, unsigned chan, double baseFrequency, con #else -void PPF::computePhaseShifts(struct phase_shift phaseShifts[/*itsNrSamplesPerIntegration*/], const TransposedData::DelayIntervalType &delay, double baseFrequency) const +void PPF::computePhaseShifts(struct phase_shift phaseShifts[/*itsNrSamplesPerIntegration*/], double delayAtBegin, double delayAfterEnd, double baseFrequency) const { - double phiBegin = -2 * M_PI * delay.delayAtBegin; - double phiEnd = -2 * M_PI * delay.delayAfterEnd; + double phiBegin = -2 * M_PI * delayAtBegin; + double phiEnd = -2 * M_PI * delayAfterEnd; double deltaPhi = (phiEnd - phiBegin) / itsNrSamplesPerIntegration; dcomplex v = cosisin(phiBegin * baseFrequency); dcomplex dv = cosisin(phiBegin * itsChannelBandwidth); @@ -214,7 +215,7 @@ void PPF::filter(double centerFrequency, const TransposedData *transposedData, F #endif for (unsigned stat = 0; stat < itsNrStations; stat ++) { - unsigned alignmentShift = transposedData->alignmentShifts[stat]; + unsigned alignmentShift = transposedData->metaData[stat].alignmentShift; #if 0 std::clog << setprecision(15) << "stat " << stat << ", basefreq " << baseFrequency << ": delay from " << delays[stat].delayAtBegin << " to " << delays[stat].delayAfterEnd << " sec" << std::endl; @@ -259,7 +260,7 @@ void PPF::filter(double centerFrequency, const TransposedData *transposedData, F for (unsigned chan = 0; chan < NR_SUBBAND_CHANNELS; chan ++) { if (itsDelayCompensation) { - fftOutData[chan] *= phaseShift(time, chan, baseFrequency, transposedData->delays[stat]); + fftOutData[chan] *= phaseShift(time, chan, baseFrequency, transposedData->metaData[stat].delayAtBegin, transposedData->metaData[stat].delayAfterEnd); } filteredData->samples[chan][stat][time][pol] = fftOutData[chan]; @@ -298,7 +299,7 @@ void PPF::filter(double centerFrequency, const TransposedData *transposedData, F struct phase_shift phaseShifts[itsNrSamplesPerIntegration]; if (itsDelayCompensation) { - computePhaseShifts(phaseShifts, transposedData->delays[stat], baseFrequency); + computePhaseShifts(phaseShifts, transposedData->metaData[stat].delayAtBegin, transposedData->metaData[stat].delayAfterEnd, baseFrequency); } const SparseSet<unsigned>::Ranges &ranges = filteredData->flags[stat].getRanges(); diff --git a/Appl/CEP/CS1/CS1_BGLProc/src/PPF.h b/Appl/CEP/CS1/CS1_BGLProc/src/PPF.h index c536655bb59..ade78c9c437 100644 --- a/Appl/CEP/CS1/CS1_BGLProc/src/PPF.h +++ b/Appl/CEP/CS1/CS1_BGLProc/src/PPF.h @@ -42,9 +42,9 @@ class PPF void init_fft(), destroy_fft(); #if defined PPF_C_IMPLEMENTATION - fcomplex phaseShift(unsigned time, unsigned chan, double baseFrequency, const TransposedData::DelayIntervalType &delay) const; + fcomplex phaseShift(unsigned time, unsigned chan, double baseFrequency, double delayAtBegin, double delayAfterEnd) const; #else - void computePhaseShifts(struct phase_shift phaseShifts[/*itsNrSamplesPerIntegration*/], const TransposedData::DelayIntervalType &delay, double baseFrequency) const; + void computePhaseShifts(struct phase_shift phaseShifts[/*itsNrSamplesPerIntegration*/], double delayAtBegin, double delayAfterEnd, double baseFrequency) const; #endif unsigned itsNrStations, itsNrSamplesPerIntegration; diff --git a/Appl/CEP/CS1/CS1_BGLProc/src/Transpose.cc b/Appl/CEP/CS1/CS1_BGLProc/src/Transpose.cc index edf1a5c4422..b7ea920cf2b 100644 --- a/Appl/CEP/CS1/CS1_BGLProc/src/Transpose.cc +++ b/Appl/CEP/CS1/CS1_BGLProc/src/Transpose.cc @@ -23,12 +23,10 @@ static NSTimer transposeTimer("transpose()", true); std::vector<MPI_Comm> Transpose::allTransposeGroups; -Transpose::Transpose(bool isTransposeInput, bool isTransposeOutput, unsigned myCore, unsigned nrStations, unsigned nrBeams) +Transpose::Transpose(bool isTransposeInput, bool isTransposeOutput, unsigned myCore) : itsIsTransposeInput(isTransposeInput), itsIsTransposeOutput(isTransposeOutput), - itsNrStations(nrStations), - itsNrBeams(nrBeams), itsTransposeGroup(allTransposeGroups[myCore]) { } @@ -122,25 +120,19 @@ void Transpose::setupTransposeParams(const LocationInfo &locationInfo, const std itsTransposeMetaParams.receive.counts.resize(nrPsetsUsed, 0); itsTransposeMetaParams.receive.displacements.resize(nrPsetsUsed); - itsOutputMetaData.resize(boost::extents[inputPsets.size()][itsNrBeams]); - itsInputMetaData.resize(itsNrBeams); - - if (itsIsTransposeInput) { + if (itsIsTransposeInput) for (unsigned psetIndex = 0; psetIndex < outputPsets.size(); psetIndex ++) { unsigned pset = outputPsets[psetIndex]; unsigned index = psetToGroupIndex[pset]; - if (1 /* FIXME: psetIndex % itsCS1PS->nrRSPboardsPerStation() == 0 */) { - const boost::detail::multi_array::sub_array<InputData::SampleType, 2> &slice = inputData->samples[psetIndex]; + const boost::detail::multi_array::sub_array<InputData::SampleType, 2> &slice = inputData->samples[psetIndex]; - itsTransposeParams.send.counts[index] = slice.num_elements() * sizeof(InputData::SampleType); - itsTransposeParams.send.displacements[index] = reinterpret_cast<const char *>(slice.origin()) - reinterpret_cast<const char *>(inputData->samples.origin()); + itsTransposeParams.send.counts[index] = slice.num_elements() * sizeof(InputData::SampleType); + itsTransposeParams.send.displacements[index] = reinterpret_cast<const char *>(slice.origin()) - reinterpret_cast<const char *>(inputData->samples.origin()); - itsTransposeMetaParams.send.counts[index] = itsNrBeams * sizeof(struct metaData); - itsTransposeMetaParams.send.displacements[index] = 0; - } + itsTransposeMetaParams.send.counts[index] = sizeof(SubbandMetaData); + itsTransposeMetaParams.send.displacements[index] = reinterpret_cast<const char *>(&inputData->metaData[psetIndex]) - reinterpret_cast<const char *>(&inputData->metaData[0]); } - } if (itsIsTransposeOutput) for (unsigned psetIndex = 0; psetIndex < inputPsets.size(); psetIndex ++) { @@ -151,8 +143,8 @@ void Transpose::setupTransposeParams(const LocationInfo &locationInfo, const std itsTransposeParams.receive.counts[index] = slice.num_elements() * sizeof(TransposedData::SampleType); itsTransposeParams.receive.displacements[index] = reinterpret_cast<const char *>(slice.origin()) - reinterpret_cast<const char *>(transposedData->samples.origin()); - itsTransposeMetaParams.receive.counts[index] = itsNrBeams * sizeof(struct metaData); - itsTransposeMetaParams.receive.displacements[index] = psetIndex * itsNrBeams * sizeof(struct metaData); + itsTransposeMetaParams.receive.counts[index] = sizeof(SubbandMetaData); + itsTransposeMetaParams.receive.displacements[index] = reinterpret_cast<const char *>(&transposedData->metaData[psetIndex]) - reinterpret_cast<const char *>(&transposedData->metaData[0]); } #if 0 @@ -216,23 +208,19 @@ void Transpose::transpose(const InputData *inputData, TransposedData *transposed } -void Transpose::transposeMetaData(/*const*/ InputData *inputData, TransposedData *transposedData, const unsigned currentBeam) +void Transpose::transposeMetaData(const InputData *inputData, TransposedData *transposedData) { - if (itsIsTransposeInput) { - for (unsigned beam = 0; beam < itsNrBeams; beam++) { - itsInputMetaData[beam].delayAtBegin = inputData->metaData.delayAtBegin(beam); - itsInputMetaData[beam].delayAfterEnd = inputData->metaData.delayAfterEnd(beam); - itsInputMetaData[beam].alignmentShift = inputData->metaData.alignmentShift(beam); - assert(inputData->metaData.flags(beam).marshall(&itsInputMetaData[beam].flagsBuffer, sizeof itsInputMetaData[beam].flagsBuffer) >= 0); - } - } +#if 0 + // no need to marshall itsInputMetaData; it has not been unmarshalled + // after reading from ION +#endif if (MPI_Alltoallv( - &itsInputMetaData[0], + itsIsTransposeInput ? (void *) &inputData->metaData[0] : 0, &itsTransposeMetaParams.send.counts[0], &itsTransposeMetaParams.send.displacements[0], MPI_BYTE, - &itsOutputMetaData[0][0], + itsIsTransposeOutput ? &transposedData->metaData[0] : 0, &itsTransposeMetaParams.receive.counts[0], &itsTransposeMetaParams.receive.displacements[0], MPI_BYTE, @@ -242,14 +230,11 @@ void Transpose::transposeMetaData(/*const*/ InputData *inputData, TransposedData exit(1); } - if (itsIsTransposeOutput) { - for (unsigned station = 0; station < itsNrStations; station ++) { - transposedData->delays[station].delayAtBegin = itsOutputMetaData[station][currentBeam].delayAtBegin; - transposedData->delays[station].delayAfterEnd = itsOutputMetaData[station][currentBeam].delayAfterEnd; - transposedData->alignmentShifts[station] = itsOutputMetaData[station][currentBeam].alignmentShift; - transposedData->flags[station].unmarshall(itsOutputMetaData[station][currentBeam].flagsBuffer); - } - } +#if 0 + if (itsIsTransposeOutput) + for (unsigned station = 0; station < transposedData->metaData.size(); station ++) + transposedData->metaData[station].unmarshall(); +#endif } #endif diff --git a/Appl/CEP/CS1/CS1_BGLProc/src/Transpose.h b/Appl/CEP/CS1/CS1_BGLProc/src/Transpose.h index ed0f751bc6e..fbd2c95aad5 100644 --- a/Appl/CEP/CS1/CS1_BGLProc/src/Transpose.h +++ b/Appl/CEP/CS1/CS1_BGLProc/src/Transpose.h @@ -5,8 +5,7 @@ #include <InputData.h> #include <LocationInfo.h> #include <TransposedData.h> - -#include <boost/multi_array.hpp> +#include <CS1_Interface/SubbandMetaData.h> #if defined HAVE_MPI #define MPICH_IGNORE_CXX_SEEK @@ -27,7 +26,7 @@ namespace CS1 { class Transpose { public: - Transpose(bool isTransposeInput, bool isTransposeOutput, unsigned myCore, unsigned nrStations,unsigned nrBeams); + Transpose(bool isTransposeInput, bool isTransposeOutput, unsigned myCore); ~Transpose(); void setupTransposeParams(const LocationInfo &, const std::vector<unsigned> &inputPsets, const std::vector<unsigned> &outputPsets, InputData *, TransposedData *); @@ -38,12 +37,10 @@ class Transpose { #endif void transpose(const InputData *, TransposedData *); - void transposeMetaData(/*const*/ InputData *, TransposedData *, const unsigned currentBeam); + void transposeMetaData(const InputData *, TransposedData *); private: - bool itsIsTransposeInput, itsIsTransposeOutput; - unsigned itsNrStations; - unsigned itsNrBeams; + bool itsIsTransposeInput, itsIsTransposeOutput; // All cores at the same position within a pset form a group. The // transpose is done between members of this group. @@ -53,16 +50,6 @@ class Transpose { } send, receive; } itsTransposeParams, itsTransposeMetaParams; - struct metaData { - float delayAtBegin, delayAfterEnd; - unsigned alignmentShift; - char flagsBuffer[132]; // enough for 16 flag ranges - }; - - boost::multi_array<struct metaData, 2> itsOutputMetaData; // [station][beam] - - std::vector<struct metaData> itsInputMetaData; // [beam] - MPI_Comm itsTransposeGroup; static std::vector<MPI_Comm> allTransposeGroups; diff --git a/Appl/CEP/CS1/CS1_BGLProc/src/TransposedData.h b/Appl/CEP/CS1/CS1_BGLProc/src/TransposedData.h index 3a985bb237d..823cb17b102 100644 --- a/Appl/CEP/CS1/CS1_BGLProc/src/TransposedData.h +++ b/Appl/CEP/CS1/CS1_BGLProc/src/TransposedData.h @@ -4,9 +4,10 @@ #include <Common/lofar_complex.h> #include <CS1_Interface/Allocator.h> #include <CS1_Interface/CS1_Config.h> -#include <CS1_Interface/SparseSet.h> +#include <CS1_Interface/SubbandMetaData.h> #include <boost/multi_array.hpp> +#include <vector> namespace LOFAR { @@ -27,7 +28,9 @@ class TransposedData public: boost::multi_array_ref<SampleType, 3> samples; //[itsNrStations][itsCS1PS->nrSamplesToBGLProc()][NR_POLARIZATIONS] + std::vector<SubbandMetaData> metaData; //[itsNrStations] +#if 0 SparseSet<unsigned> *flags; //[itsNrStations] typedef struct { @@ -36,6 +39,7 @@ class TransposedData DelayIntervalType *delays; // [itsNrStations] unsigned *alignmentShifts; // [itsNrStations] +#endif }; @@ -43,9 +47,12 @@ inline TransposedData::TransposedData(const Arena &arena, unsigned nrStations, u : allocator(arena), samples(static_cast<SampleType *>(allocator.allocate(requiredSize(nrStations, nrSamplesToBGLProc), 32)), boost::extents[nrStations][nrSamplesToBGLProc][NR_POLARIZATIONS]), + metaData(nrStations) +#if 0 flags(new SparseSet<unsigned>[nrStations]), delays(new DelayIntervalType[nrStations]), alignmentShifts(new unsigned[nrStations]) +#endif { } @@ -53,9 +60,11 @@ inline TransposedData::TransposedData(const Arena &arena, unsigned nrStations, u inline TransposedData::~TransposedData() { allocator.deallocate(samples.origin()); +#if 0 delete [] flags; delete [] alignmentShifts; delete [] delays; +#endif } diff --git a/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.cc b/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.cc index df0ae9ad0e0..f3c09ccaa6f 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.cc +++ b/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.cc @@ -62,20 +62,18 @@ BeamletBuffer::~BeamletBuffer() void BeamletBuffer::writeElements(Beamlet *data, const TimeStamp &begin, unsigned nrElements) { - static TimeStamp previous; // cache previous index, to avoid expensive - static unsigned previousI; // mapTime2Index() - TimeStamp end = begin + nrElements; itsWriteTimer.start(); - unsigned startI = (begin == previous) ? previousI : mapTime2Index(begin); + // cache previous index, to avoid expensive mapTime2Index() + unsigned startI = (begin == itsPreviousTimeStamp) ? itsPreviousI : mapTime2Index(begin); unsigned endI = startI + nrElements; if (endI >= itsSize) endI -= itsSize; - previous = end; - previousI = endI; + itsPreviousTimeStamp = end; + itsPreviousI = endI; // in synchronous mode, do not overrun tail of reader itsSynchronizedReaderWriter->startWrite(begin, end); @@ -134,11 +132,12 @@ void BeamletBuffer::startReadTransaction(const std::vector<TimeStamp> &begin, un itsEndI.push_back(mapTime2Index(itsEnd[beam])); } - TimeStamp minBegin = *std::min_element(itsBegin.begin(), itsBegin.end()); - TimeStamp maxEnd = *std::max_element(itsEnd.begin(), itsEnd.end()); - itsMinEnd = *std::min_element(itsEnd.begin(), itsEnd.end()); - itsMinStartI = *std::min_element(itsStartI.begin(), itsStartI.end()); - itsMaxEndI = *std::max_element(itsEndI.begin(), itsEndI.end()); + TimeStamp minBegin = *std::min_element(itsBegin.begin(), itsBegin.end()); + TimeStamp maxEnd = *std::max_element(itsEnd.begin(), itsEnd.end()); + itsMinEnd = *std::min_element(itsEnd.begin(), itsEnd.end()); + itsMinStartI = *std::min_element(itsStartI.begin(), itsStartI.end()); + itsMaxEndI = *std::max_element(itsEndI.begin(), itsEndI.end()); + // in synchronous mode, do not overrun writer itsSynchronizedReaderWriter->startRead(minBegin, maxEnd); // do not read from circular buffer section that is being written @@ -146,12 +145,12 @@ void BeamletBuffer::startReadTransaction(const std::vector<TimeStamp> &begin, un } -void BeamletBuffer::sendSubband(Stream *str, unsigned subband, unsigned currentBeam) const +void BeamletBuffer::sendSubband(Stream *str, unsigned subband, unsigned beam) const { // Align to 32 bytes and make multiple of 32 bytes by prepending/appending // extra data. Always send 32 bytes extra, even if data was already aligned. - unsigned startI = itsStartI[currentBeam] & ~(32 / sizeof(Beamlet) - 1); // round down - unsigned endI = (itsEndI[currentBeam] + 32 / sizeof(Beamlet)) & ~(32 / sizeof(Beamlet) - 1); // round up, possibly adding 32 bytes + unsigned startI = itsStartI[beam] & ~(32 / sizeof(Beamlet) - 1); // round down + unsigned endI = (itsEndI[beam] + 32 / sizeof(Beamlet)) & ~(32 / sizeof(Beamlet) - 1); // round up, possibly adding 32 bytes if (endI < startI) { // the data wraps around the allocated memory, so copy in two parts @@ -165,30 +164,33 @@ void BeamletBuffer::sendSubband(Stream *str, unsigned subband, unsigned currentB } -void BeamletBuffer::sendUnalignedSubband(Stream *str, unsigned subband, unsigned currentBeam) const +void BeamletBuffer::sendUnalignedSubband(Stream *str, unsigned subband, unsigned beam) const { - if (itsEndI[currentBeam] < itsStartI[currentBeam]) { + if (itsEndI[beam] < itsStartI[beam]) { // the data wraps around the allocated memory, so copy in two parts - unsigned firstChunk = itsSize - itsStartI[currentBeam]; + unsigned firstChunk = itsSize - itsStartI[beam]; - str->write(itsSBBuffers[subband][itsStartI[currentBeam]].origin(), sizeof(SampleType[firstChunk][NR_POLARIZATIONS])); - str->write(itsSBBuffers[subband][0].origin(), sizeof(SampleType[itsEndI[currentBeam]][NR_POLARIZATIONS])); + str->write(itsSBBuffers[subband][itsStartI[beam]].origin(), sizeof(SampleType[firstChunk][NR_POLARIZATIONS])); + str->write(itsSBBuffers[subband][0].origin(), sizeof(SampleType[itsEndI[beam]][NR_POLARIZATIONS])); } else { - str->write(itsSBBuffers[subband][itsStartI[currentBeam]].origin(), sizeof(SampleType[itsEndI[currentBeam] - itsStartI[currentBeam]][NR_POLARIZATIONS])); + str->write(itsSBBuffers[subband][itsStartI[beam]].origin(), sizeof(SampleType[itsEndI[beam] - itsStartI[beam]][NR_POLARIZATIONS])); } } -void BeamletBuffer::readFlags(SparseSet<unsigned> &flags, unsigned beam) +SparseSet<unsigned> BeamletBuffer::readFlags(unsigned beam) { pthread_mutex_lock(&itsValidDataMutex); SparseSet<TimeStamp> validTimes = itsValidData.subset(itsBegin[beam], itsEnd[beam]); pthread_mutex_unlock(&itsValidDataMutex); - flags.reset().include(0, static_cast<unsigned>(itsEnd[beam] - itsBegin[beam])); + SparseSet<unsigned> flags; + flags.include(0, static_cast<unsigned>(itsEnd[beam] - itsBegin[beam])); for (SparseSet<TimeStamp>::const_iterator it = validTimes.getRanges().begin(); it != validTimes.getRanges().end(); it ++) flags.exclude(static_cast<unsigned>(it->begin - itsBegin[beam]), static_cast<unsigned>(it->end - itsBegin[beam])); + + return flags; } void BeamletBuffer::stopReadTransaction() diff --git a/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.h b/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.h index 138485f7d0b..5ab3bca81b1 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.h +++ b/Appl/CEP/CS1/CS1_IONProc/src/BeamletBuffer.h @@ -64,7 +64,7 @@ class BeamletBuffer void sendSubband(Stream *, unsigned subband, unsigned currentBeam) const; void sendUnalignedSubband(Stream *, unsigned subband, unsigned currentBeam) const; unsigned alignmentShift(unsigned beam) const; - void readFlags(SparseSet<unsigned> &flags, unsigned beam); + SparseSet<unsigned> readFlags(unsigned beam); void stopReadTransaction(); static const unsigned MAX_BEAMLETS = 8; @@ -86,6 +86,10 @@ class BeamletBuffer size_t itsMinStartI, itsMaxEndI; TimeStamp itsMinEnd; + // write internals + TimeStamp itsPreviousTimeStamp; + unsigned itsPreviousI; + NSTimer itsReadTimer, itsWriteTimer; }; diff --git a/Appl/CEP/CS1/CS1_IONProc/src/CS1_ION_main.cc b/Appl/CEP/CS1/CS1_IONProc/src/CS1_ION_main.cc index d12d3ebeb45..eda2cbec1b9 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/CS1_ION_main.cc +++ b/Appl/CEP/CS1/CS1_IONProc/src/CS1_ION_main.cc @@ -34,14 +34,15 @@ //#include <TH_ZoidServer.h> #include <Package__Version.h> -#include <sys/types.h> -#include <sys/stat.h> +#include <boost/lexical_cast.hpp> +#include <cstdlib> +#include <cstring> #include <fcntl.h> +#include <sys/resource.h> +#include <sys/stat.h> +#include <sys/types.h> #include <pthread.h> #include <unistd.h> -#include <cstdlib> -#include <cstring> -#include <boost/lexical_cast.hpp> #if defined HAVE_ZOID extern "C" { @@ -86,7 +87,7 @@ static void checkParset(const CS1_Parset &parset) } -void createClientStreams(unsigned nrClients, const std::string &streamType) +static void createClientStreams(unsigned nrClients, const std::string &streamType) { #if defined HAVE_FCNP && defined __PPC__ if (streamType == "FCNP") { @@ -121,7 +122,7 @@ void createClientStreams(unsigned nrClients, const std::string &streamType) } -void deleteClientStreams() +static void deleteClientStreams() { for (unsigned core = 0; core < clientStreams.size(); core ++) delete clientStreams[core]; @@ -141,7 +142,6 @@ static void configureCNs(const CS1_Parset &parset) BGL_Configuration configuration; configuration.nrStations() = parset.nrStations(); - configuration.nrBeams() = parset.nrBeams(); configuration.nrSamplesPerIntegration() = parset.BGLintegrationSteps(); configuration.nrSamplesToBGLProc() = parset.nrSamplesToBGLProc(); configuration.nrUsedCoresPerPset() = parset.nrCoresPerPset(); @@ -151,8 +151,6 @@ static void configureCNs(const CS1_Parset &parset) configuration.inputPsets() = parset.getUint32Vector("OLAP.BGLProc.inputPsets"); configuration.outputPsets() = parset.getUint32Vector("OLAP.BGLProc.outputPsets"); configuration.refFreqs() = parset.refFreqs(); - configuration.beamlet2beams() = parset.beamlet2beams(); - configuration.subband2Index() = parset.subband2Index(); std::clog << "configuring " << nrCoresPerPset << " cores ..."; std::clog.flush(); @@ -199,7 +197,7 @@ static void stopCNs() } -void *input_thread(void *parset) +static void *input_thread(void *parset) { std::clog << "starting input thread" << std::endl; @@ -225,7 +223,7 @@ void *input_thread(void *parset) } -void *gather_thread(void *argv) +static void *gather_thread(void *argv) { std::clog << "starting gather thread, nrRuns = " << ((char **) argv)[2] << std::endl; @@ -246,6 +244,21 @@ void *gather_thread(void *argv) } +static void enableCoreDumps() +{ + struct rlimit rlimit; + + rlimit.rlim_cur = RLIM_INFINITY; + rlimit.rlim_max = RLIM_INFINITY; + + if (setrlimit(RLIMIT_CORE, &rlimit) < 0) + perror("warning: setrlimit on unlimited core size failed"); + + if (system("echo /tmp/%e.core >/proc/sys/kernel/core_pattern") < 0) + std::cerr << "warning: could not change /proc/sys/kernel/core_pattern" << std::endl; +} + + void *master_thread(void *) { std::string type = "brief"; @@ -253,6 +266,8 @@ void *master_thread(void *) std::clog << "starting master_thread" << std::endl; + enableCoreDumps(); + try { pthread_t input_thread_id = 0, gather_thread_id = 0; diff --git a/Appl/CEP/CS1/CS1_IONProc/src/InputSection.cc b/Appl/CEP/CS1/CS1_IONProc/src/InputSection.cc index 433e31d2ae1..48e26068187 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/InputSection.cc +++ b/Appl/CEP/CS1/CS1_IONProc/src/InputSection.cc @@ -35,6 +35,7 @@ //#include <TH_ZoidServer.h> #include <CS1_Interface/BGL_Command.h> #include <CS1_Interface/BGL_Mapping.h> +#include <CS1_Interface/SubbandMetaData.h> #include <sys/time.h> @@ -57,8 +58,6 @@ static Stream *rawDataStream; InputSection::InputSection(const std::vector<Stream *> &clientStreams, unsigned psetNumber) : - itsInputThread(0), - itsInputStream(0), itsClientStreams(clientStreams), itsPsetNumber(psetNumber), itsBBuffer(0), @@ -74,7 +73,7 @@ InputSection::~InputSection() } -void InputSection::startThread() +void InputSection::startThreads() { /* start up thread which writes RSP data from ethernet link into cyclic buffers */ @@ -82,15 +81,21 @@ void InputSection::startThread() ThreadArgs args; args.BBuffer = itsBBuffer; - args.stream = itsInputStream; args.ipHeaderSize = itsCS1PS->getInt32("OLAP.IPHeaderSize"); args.frameHeaderSize = itsCS1PS->getInt32("OLAP.EPAHeaderSize"); args.nTimesPerFrame = itsCS1PS->getInt32("OLAP.nrTimesInFrame"); args.nSubbandsPerFrame = itsCS1PS->nrSubbandsPerFrame(); args.frameSize = args.frameHeaderSize + args.nSubbandsPerFrame * args.nTimesPerFrame * sizeof(Beamlet); + args.isRealTime = itsCS1PS->realTime(); args.startTime = itsSyncedStamp; - itsInputThread = new InputThread(args); + itsInputThreads.resize(itsStationsAndRSPboardNumbers.size()); + + for (unsigned thread = 0; thread < 1; thread ++) { // FIXME + args.stream = itsInputStreams[thread]; + + itsInputThreads[thread] = new InputThread(args); + } } @@ -99,14 +104,26 @@ void InputSection::preprocess(const CS1_Parset *ps) itsCS1PS = ps; itsSampleRate = ps->sampleRate(); TimeStamp::setMaxBlockId(itsSampleRate); - itsStationNr = ps->inputPsetIndex(itsPsetNumber); + itsStationsAndRSPboardNumbers = ps->getStationNamesAndRSPboardNumbers(itsPsetNumber); + + itsInputStreams.resize(itsStationsAndRSPboardNumbers.size()); + + std::clog << "input list:" << std::endl; + + for (unsigned i = 0; i < itsStationsAndRSPboardNumbers.size(); i ++) { + string &station = itsStationsAndRSPboardNumbers[i].first; + unsigned rspBoard = itsStationsAndRSPboardNumbers[i].second; + string input = ps->getInputDescription(station, rspBoard); + + std::cout << " " << i << ": station \"" << station << "\", RSP board " << rspBoard << ", reads from \"" << input << '"' << std::endl; - std::string stationName = ps->stationName(itsStationNr); - std::clog << "station " << itsStationNr << " = " << stationName << std::endl; + itsInputStreams[i] = CS1_Parset::createStream(input, true); + } - itsInputStream = Connector::getInputStream(ps, stationName); itsNSubbandsPerPset = ps->nrSubbandsPerPset(); itsNSamplesPerSec = ps->nrSubbandSamples(); + itsNrBeams = ps->nrBeams(); + itsNrPsets = ps->nrPsets(); #if defined DUMP_RAW_DATA itsNHistorySamples = 0; @@ -144,7 +161,7 @@ void InputSection::preprocess(const CS1_Parset *ps) itsDelaysAfterEnd.resize(ps->nrBeams()); itsNrCalcDelaysForEachTimeNrDirections.resize(itsNrCalcDelays * ps->nrBeams()); - itsDelayComp = new WH_DelayCompensation(ps, ps->stationName(itsStationNr)); + itsDelayComp = new WH_DelayCompensation(ps, itsStationsAndRSPboardNumbers[0].first); // FIXME std::vector<double> startTimes(itsNrCalcDelays); @@ -162,10 +179,10 @@ void InputSection::preprocess(const CS1_Parset *ps) } unsigned cyclicBufferSize = ps->nrSamplesToBuffer(); - itsIsSynchronous = ps->getString("OLAP.OLAP_Conn.station_Input_Transport") != "UDP"; + itsIsRealTime = ps->realTime(); itsMaxNetworkDelay = ps->maxNetworkDelay(); - std::clog << "maxNetworkDelay = " << itsMaxNetworkDelay << std::endl; - itsBBuffer = new BeamletBuffer(cyclicBufferSize, subbandsToReadFromFrame, itsNHistorySamples, itsIsSynchronous, itsMaxNetworkDelay); + std::clog << "maxNetworkDelay = " << itsMaxNetworkDelay << " samples" << std::endl; + itsBBuffer = new BeamletBuffer(cyclicBufferSize, subbandsToReadFromFrame, itsNHistorySamples, !itsIsRealTime, itsMaxNetworkDelay); #if defined DUMP_RAW_DATA vector<string> rawDataServers = ps->getStringVector("OLAP.OLAP_Conn.rawDataServers"); @@ -181,7 +198,7 @@ void InputSection::preprocess(const CS1_Parset *ps) std::clog << "raw data stream connected" << std::endl; #endif - startThread(); + startThreads(); } @@ -196,16 +213,10 @@ void InputSection::limitFlagsLength(SparseSet<unsigned> &flags) void InputSection::process() { - BGL_Command command(BGL_Command::PROCESS); + std::vector<TimeStamp> delayedStamps(itsNrBeams, itsSyncedStamp - itsNHistorySamples); + std::vector<int> samplesDelay(itsNrBeams); + std::vector<SubbandMetaData> metaDataPerBeam(itsNrBeams); - std::vector<TimeStamp> delayedStamp(itsCS1PS->nrBeams()); - - for (unsigned beam = 0; beam < itsCS1PS->nrBeams(); beam ++) { - delayedStamp[beam] = itsSyncedStamp - itsNHistorySamples; - } - - std::vector<int> samplesDelay(itsCS1PS->nrBeams()); - // set delay if (itsDelayCompensation) { itsCounter ++; @@ -223,19 +234,17 @@ void InputSection::process() itsCounter = 0; - for (unsigned beam = 0; beam < itsCS1PS->nrBeams(); beam ++) { + for (unsigned beam = 0; beam < itsNrBeams; beam ++) { itsDelaysAfterEnd[beam] = itsNrCalcDelaysForEachTimeNrDirections[beam]; } } else { - unsigned firstBeam = itsCounter * itsCS1PS->nrBeams(); - for (unsigned beam = 0; beam < itsCS1PS->nrBeams(); beam ++) { + unsigned firstBeam = itsCounter * itsNrBeams; + for (unsigned beam = 0; beam < itsNrBeams; beam ++) { itsDelaysAfterEnd[beam] = itsNrCalcDelaysForEachTimeNrDirections[firstBeam++]; } } - std::vector<float> fineDelayAtBegin(itsCS1PS->nrBeams()), fineDelayAfterEnd(itsCS1PS->nrBeams()); - - for (unsigned beam = 0; beam < itsCS1PS->nrBeams(); beam ++) { + for (unsigned beam = 0; beam < itsNrBeams; beam ++) { // The coarse delay will be determined for the center of the current // time interval and will be expressed in \e samples. int coarseDelay = (int) floor(0.5 * (itsDelaysAtBegin[beam] + itsDelaysAfterEnd[beam]) * itsSampleRate + 0.5); @@ -244,35 +253,33 @@ void InputSection::process() // time interval and will be expressed in seconds. double d = coarseDelay * itsSampleDuration; - fineDelayAtBegin[beam] = (float)(itsDelaysAtBegin[beam] - d); - fineDelayAfterEnd[beam]= (float)(itsDelaysAfterEnd[beam] - d); - - delayedStamp[beam] += coarseDelay; + delayedStamps[beam] += coarseDelay; samplesDelay[beam] = +coarseDelay; - itsIONtoCNdata.delayAtBegin(beam) = fineDelayAtBegin[beam]; - itsIONtoCNdata.delayAfterEnd(beam) = fineDelayAfterEnd[beam]; + metaDataPerBeam[beam].delayAtBegin = (float)(itsDelaysAtBegin[beam] - d); + metaDataPerBeam[beam].delayAfterEnd = (float)(itsDelaysAfterEnd[beam] - d); } } else { - for (unsigned beam = 0; beam < itsCS1PS->nrBeams(); beam ++) { - itsIONtoCNdata.delayAtBegin(beam) = 0; - itsIONtoCNdata.delayAfterEnd(beam) = 0; + for (unsigned beam = 0; beam < itsNrBeams; beam ++) { + metaDataPerBeam[beam].delayAtBegin = 0; + metaDataPerBeam[beam].delayAfterEnd = 0; samplesDelay[beam] = 0; } } - itsBBuffer->startReadTransaction(delayedStamp, itsNSamplesPerSec + itsNHistorySamples); + itsBBuffer->startReadTransaction(delayedStamps, itsNSamplesPerSec + itsNHistorySamples); - for (unsigned beam = 0; beam < itsCS1PS->nrBeams(); beam ++) { - itsIONtoCNdata.alignmentShift(beam) = itsBBuffer->alignmentShift(beam); + for (unsigned beam = 0; beam < itsNrBeams; beam ++) { + metaDataPerBeam[beam].alignmentShift = itsBBuffer->alignmentShift(beam); // set flags - itsBBuffer->readFlags(itsIONtoCNdata.flags(beam), beam); - limitFlagsLength(itsIONtoCNdata.flags(beam)); + SparseSet<unsigned> flags = itsBBuffer->readFlags(beam); + limitFlagsLength(flags); + metaDataPerBeam[beam].setFlags(flags); } std::clog << itsSyncedStamp; - if (!itsIsSynchronous) { + if (itsIsRealTime) { struct timeval tv; gettimeofday(&tv, 0); @@ -284,13 +291,13 @@ void InputSection::process() } if (itsDelayCompensation) { - for (unsigned beam = 0; beam < itsCS1PS->nrBeams(); beam ++) + for (unsigned beam = 0; beam < itsNrBeams; beam ++) std::clog << (beam == 0 ? ", delays: [" : ", ") << PrettyTime(itsDelaysAtBegin[beam], 7); std::clog << "]"; } - std::clog << ", flags: " << itsIONtoCNdata.flags(0) << " (" << std::setprecision(3) << (100.0 * itsIONtoCNdata.flags(0).count() / (itsNSamplesPerSec + itsNHistorySamples)) << "%)" << std::endl; // not really correct; beam(0) may be shifted + std::clog << ", flags: " << metaDataPerBeam[0].getFlags() << " (" << std::setprecision(3) << (100.0 * metaDataPerBeam[0].getFlags().count() / (itsNSamplesPerSec + itsNHistorySamples)) << "%)" << std::endl; // not really correct; beam(0) may be shifted NSTimer timer; timer.start(); @@ -323,7 +330,7 @@ void InputSection::process() fileHeader.sampleRate = itsSampleRate; memcpy(fileHeader.subbandFrequencies, &itsCS1PS->refFreqs()[0], 54 * sizeof(double)); - for (unsigned beam = 1; beam < itsCS1PS->nrBeams()+1; beam ++){ + for (unsigned beam = 1; beam < itsNrBeams+1; beam ++){ vector<double> beamDir = itsCS1PS->getBeamDirection(beam); fileHeader.beamDirections[beam][0] = beamDir[0]; @@ -351,8 +358,8 @@ void InputSection::process() } blockHeader; blockHeader.magic = 0x2913D852; - for (unsigned beam = 0; beam < itsCS1PS->nrBeams(); beam ++) { - blockHeader.time[beam] = delayedStamp[beam]; + for (unsigned beam = 0; beam < itsNrBeams; beam ++) { + blockHeader.time[beam] = delayedStamps[beam]; blockHeader.coarseDelayApplied[beam] = samplesDelay[beam]; blockHeader.fineDelayRemainingAtBegin[beam] = itsIONtoCNdata.delayAtBegin(beam); blockHeader.fineDelayRemainingAfterEnd[beam] = itsIONtoCNdata.delayAfterEnd(beam); @@ -366,17 +373,31 @@ void InputSection::process() #else + BGL_Command command(BGL_Command::PROCESS); + for (unsigned subbandBase = 0; subbandBase < itsNSubbandsPerPset; subbandBase ++) { unsigned core = BGL_Mapping::mapCoreOnPset(itsCurrentComputeCore, itsPsetNumber); Stream *str = itsClientStreams[core]; command.write(str); - itsIONtoCNdata.write(str, itsCS1PS->nrBeams()); - for (unsigned pset = 0; pset < itsCS1PS->nrPsets(); pset ++) { + std::vector<SubbandMetaData> metaDataPerComputeNode(itsNrPsets); + + for (unsigned pset = 0; pset < itsNrPsets; pset ++) { unsigned subband = itsNSubbandsPerPset * pset + subbandBase; + unsigned beam = itsBeamlet2beams[itsSubband2Index[subband]] - 1; - itsBBuffer->sendSubband(str, subband, itsBeamlet2beams[itsSubband2Index[subband]] - 1); + metaDataPerComputeNode[pset] = metaDataPerBeam[beam]; + } + + // send all metadata in one "large" message + str->write(&metaDataPerComputeNode[0], metaDataPerComputeNode.size() * sizeof(SubbandMetaData)); + + for (unsigned pset = 0; pset < itsNrPsets; pset ++) { + unsigned subband = itsNSubbandsPerPset * pset + subbandBase; + unsigned beam = itsBeamlet2beams[itsSubband2Index[subband]] - 1; + + itsBBuffer->sendSubband(str, subband, beam); } if (++ itsCurrentComputeCore == itsNrCoresPerPset) @@ -396,10 +417,18 @@ void InputSection::postprocess() { std::clog << "InputSection::postprocess" << std::endl; - delete itsInputThread; itsInputThread = 0; - delete itsInputStream; itsInputStream = 0; - delete itsBBuffer; itsBBuffer = 0; - delete itsDelayComp; itsDelayComp = 0; + for (unsigned i = 0; i < itsInputThreads.size(); i ++) + delete itsInputThreads[i]; + + itsInputThreads.resize(0); + + for (unsigned i = 0; i < itsInputStreams.size(); i ++) + delete itsInputStreams[i]; + + itsInputStreams.resize(0); + + delete itsBBuffer; itsBBuffer = 0; + delete itsDelayComp; itsDelayComp = 0; itsDelayTimer.print(std::clog); } diff --git a/Appl/CEP/CS1/CS1_IONProc/src/InputSection.h b/Appl/CEP/CS1/CS1_IONProc/src/InputSection.h index 986a20e44a3..b9fb73ed0f4 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/InputSection.h +++ b/Appl/CEP/CS1/CS1_IONProc/src/InputSection.h @@ -31,7 +31,6 @@ //# Includes #include <CS1_Interface/CS1_Parset.h> #include <CS1_Interface/RSPTimeStamp.h> -#include <CS1_Interface/ION_to_CN.h> #include <Stream/Stream.h> #include <BeamletBuffer.h> #include <WH_DelayCompensation.h> @@ -44,11 +43,9 @@ namespace LOFAR { namespace CS1 { -// This class is the workholder that receives data from the RSP boards -// and distributes it per subband to the Blue Gene/L class InputSection { public: - InputSection(const std::vector<Stream *> &, unsigned psetNumber); + InputSection(const std::vector<Stream *> &clientStreams, unsigned psetNumber); ~InputSection(); void preprocess(const CS1_Parset *ps); @@ -56,49 +53,45 @@ class InputSection { void postprocess(); private: - void limitFlagsLength(SparseSet<unsigned> &flags); + static void limitFlagsLength(SparseSet<unsigned> &flags); + void startThreads(); - bool itsDelayCompensation, itsIsSynchronous; - std::vector<int32> itsBeamlet2beams; - std::vector<uint32> itsSubband2Index; + bool itsDelayCompensation, itsIsRealTime; + std::vector<int32> itsBeamlet2beams; + std::vector<uint32> itsSubband2Index; - ION_to_CN itsIONtoCNdata; + std::vector<InputThread *> itsInputThreads; - // writer thread - InputThread *itsInputThread; - - Stream *itsInputStream; + std::vector<Stream *> itsInputStreams; const std::vector<Stream *> &itsClientStreams; - unsigned itsStationNr; + std::vector<std::pair<std::string, unsigned> > itsStationsAndRSPboardNumbers; - const CS1_Parset *itsCS1PS; + const CS1_Parset *itsCS1PS; - // synced stamp - TimeStamp itsSyncedStamp; + TimeStamp itsSyncedStamp; - double itsSampleDuration; - std::vector<double> itsDelaysAtBegin; - std::vector<double> itsDelaysAfterEnd; - std::vector<double> itsNrCalcDelaysForEachTimeNrDirections; - //std::vector<double> itsNrCalcIntTimes; - unsigned itsNrCalcDelays; - unsigned itsCounter; + std::vector<double> itsDelaysAtBegin; + std::vector<double> itsDelaysAfterEnd; + std::vector<double> itsNrCalcDelaysForEachTimeNrDirections; + unsigned itsNrCalcDelays; + unsigned itsCounter; + unsigned itsNrPsets; - unsigned itsMaxNetworkDelay; - unsigned itsNSubbandsPerPset; - unsigned itsNSamplesPerSec; - unsigned itsNHistorySamples; + unsigned itsMaxNetworkDelay; // in samples + unsigned itsNSubbandsPerPset; + unsigned itsNSamplesPerSec; + unsigned itsNHistorySamples; + unsigned itsNrBeams; - unsigned itsCurrentComputeCore, itsNrCoresPerPset; - unsigned itsPsetNumber; + unsigned itsCurrentComputeCore, itsNrCoresPerPset; + unsigned itsPsetNumber; BeamletBuffer *itsBBuffer; WH_DelayCompensation *itsDelayComp; - double itsSampleRate; + double itsSampleRate, itsSampleDuration; - NSTimer itsDelayTimer; + NSTimer itsDelayTimer; - void startThread(); }; } // namespace CS1 diff --git a/Appl/CEP/CS1/CS1_IONProc/src/InputThread.cc b/Appl/CEP/CS1/CS1_IONProc/src/InputThread.cc index 9768d0898ba..ee6151627a2 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/InputThread.cc +++ b/Appl/CEP/CS1/CS1_IONProc/src/InputThread.cc @@ -219,7 +219,9 @@ void InputThread::mainLoop() actualstamp.setStamp(seqid, blockid); } else { actualstamp += itsArgs.nTimesPerFrame; - wallClockTime.waitUntil(actualstamp); + + if (itsArgs.isRealTime) + wallClockTime.waitUntil(actualstamp); } // expected packet received so write data into corresponding buffer diff --git a/Appl/CEP/CS1/CS1_IONProc/src/InputThread.h b/Appl/CEP/CS1/CS1_IONProc/src/InputThread.h index c3825564988..ac191a7818a 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/InputThread.h +++ b/Appl/CEP/CS1/CS1_IONProc/src/InputThread.h @@ -55,6 +55,7 @@ namespace LOFAR int frameHeaderSize; int nTimesPerFrame; int nSubbandsPerFrame; + bool isRealTime; TimeStamp startTime; }; diff --git a/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/BGL_Configuration.h b/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/BGL_Configuration.h index 707ce491ffa..3afaba59b5d 100644 --- a/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/BGL_Configuration.h +++ b/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/BGL_Configuration.h @@ -35,7 +35,6 @@ class BGL_Configuration { public: unsigned &nrStations(); - unsigned &nrBeams(); unsigned &nrSamplesPerIntegration(); unsigned &nrSamplesToBGLProc(); unsigned &nrUsedCoresPerPset(); @@ -44,8 +43,6 @@ class BGL_Configuration double &sampleRate(); std::vector<unsigned> &inputPsets(), &outputPsets(); std::vector<double> &refFreqs(); - std::vector<signed> &beamlet2beams(); - std::vector<unsigned> &subband2Index(); void read(Stream *); void write(Stream *); @@ -56,14 +53,10 @@ class BGL_Configuration private: std::vector<unsigned> itsInputPsets, itsOutputPsets; std::vector<double> itsRefFreqs; - std::vector<signed> itsBeamlet2beams; - std::vector<unsigned> itsSubband2Index; struct MarshalledData { unsigned itsNrStations; - unsigned itsNrBeams; - unsigned itsNrSamplesPerIntegration; unsigned itsNrSamplesToBGLProc; unsigned itsNrUsedCoresPerPset; @@ -72,12 +65,8 @@ class BGL_Configuration double itsSampleRate; unsigned itsInputPsetsSize, itsOutputPsetsSize; unsigned itsRefFreqsSize; - unsigned itsBeamlet2beamsSize; - unsigned itsSubband2IndexSize; unsigned itsInputPsets[MAX_PSETS], itsOutputPsets[MAX_PSETS]; double itsRefFreqs[MAX_SUBBANDS]; - signed itsBeamlet2beams[MAX_SUBBANDS]; // to which beam each beamlet belongs - unsigned itsSubband2Index[MAX_SUBBANDS]; // to which beam each beamlet belongs } itsMarshalledData; }; @@ -87,11 +76,6 @@ inline unsigned &BGL_Configuration::nrStations() return itsMarshalledData.itsNrStations; } -inline unsigned &BGL_Configuration::nrBeams() -{ - return itsMarshalledData.itsNrBeams; -} - inline unsigned &BGL_Configuration::nrSamplesPerIntegration() { return itsMarshalledData.itsNrSamplesPerIntegration; @@ -137,16 +121,6 @@ inline std::vector<double> & BGL_Configuration::refFreqs() return itsRefFreqs; } -inline std::vector<signed> & BGL_Configuration::beamlet2beams() -{ - return itsBeamlet2beams; -} - -inline std::vector<unsigned> & BGL_Configuration::subband2Index() -{ - return itsSubband2Index; -} - } // namespace CS1 } // namespace LOFAR diff --git a/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/CS1_Parset.h b/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/CS1_Parset.h index 7d3f3c96ef8..589351d630d 100644 --- a/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/CS1_Parset.h +++ b/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/CS1_Parset.h @@ -37,6 +37,7 @@ #include <Common/LofarLogger.h> #include <CS1_Interface/CS1_Config.h> #include <ApplCommon/Observation.h> +#include <Stream/Stream.h> #include <boost/date_time/c_local_time_adjustor.hpp> @@ -90,7 +91,7 @@ public: uint32 nrSubbands() const; uint32 nrPsets() const; uint32 nrCoresPerPset() const; - vector<double> refFreqs(uint32 rspid=0) const; + vector<double> refFreqs() const; double chanWidth() const; vector<string> getPortsOf(const string& aKey) const; string inputPortnr(const string& aKey) const; @@ -115,10 +116,16 @@ public: vector<int32> beamlet2subbands(uint32 rspid=0) const; vector<uint32> subband2Index(uint32 rspid=0) const; int32 nrSubbandsPerFrame() const; + string partitionName() const; + bool realTime() const; vector<double> getBeamDirection(const unsigned currentBeam) const; string getBeamDirectionType(const unsigned currentBeam) const; + vector<pair<string, unsigned> > getStationNamesAndRSPboardNumbers(unsigned psetNumber) const; + string getInputDescription(const string &stationName, unsigned rspBoardNumber) const; + static Stream *createStream(const string &description, bool asReader); + //# Datamembers string name; vector<double> itsStPositions; @@ -268,6 +275,11 @@ inline uint32 CS1_Parset::nrCoresPerPset() const return getUint32("OLAP.BGLProc.coresPerPset"); } +inline vector<double> CS1_Parset::refFreqs() const +{ + return getDoubleVector("Observation.RefFreqs"); +} + inline double CS1_Parset::chanWidth() const { return sampleRate() / nrChannelsPerSubband(); @@ -328,11 +340,21 @@ inline uint32 CS1_Parset::nrBeams() const return getUint32("Observation.nrBeams"); } -inline int32 CS1_Parset::nrSubbandsPerFrame() const +inline int32 CS1_Parset::nrSubbandsPerFrame() const { return getInt32("OLAP.nrSubbandsPerFrame"); } +inline string CS1_Parset::partitionName() const +{ + return getString("OLAP.BGLProc.partition"); +} + +inline bool CS1_Parset::realTime() const +{ + return getBool("OLAP.realTime"); +} + } // namespace CS1 } // namespace LOFAR diff --git a/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/ION_to_CN.h b/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/ION_to_CN.h deleted file mode 100644 index 0fb7ebe4434..00000000000 --- a/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/ION_to_CN.h +++ /dev/null @@ -1,105 +0,0 @@ -//# ION_to_CN.h: -//# -//# Copyright (C) 2007 -//# ASTRON (Netherlands Foundation for Research in Astronomy) -//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl -//# -//# This program is free software; you can redistribute it and/or modify -//# it under the terms of the GNU General Public License as published by -//# the Free Software Foundation; either version 2 of the License, or -//# (at your option) any later version. -//# -//# This program is distributed in the hope that it will be useful, -//# but WITHOUT ANY WARRANTY; without even the implied warranty of -//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -//# GNU General Public License for more details. -//# -//# You should have received a copy of the GNU General Public License -//# along with this program; if not, write to the Free Software -//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -//# -//# $Id$ - -#ifndef LOFAR_CS1_INTERFACE_ION_TO_CN_H -#define LOFAR_CS1_INTERFACE_ION_TO_CN_H - -#include <CS1_Interface/SparseSet.h> -#include <Stream/Stream.h> - -#include <cassert> - - -namespace LOFAR { -namespace CS1 { - -class ION_to_CN -{ - public: - SparseSet<unsigned> &flags(const unsigned beam); - float &delayAtBegin(const unsigned beam), &delayAfterEnd(const unsigned beam); - unsigned &alignmentShift(const unsigned beam); - - void read(Stream *, const unsigned nrBeams); - void write(Stream *, const unsigned nrBeams); - - static const unsigned MAX_BEAMLETS = 8; - - private: - std::vector<SparseSet<unsigned> > itsFlags; - - struct MarshalledData - { - float delayAtBegin, delayAfterEnd; - unsigned alignmentShift; - unsigned char flagsBuffer[132]; - }; - - std::vector<struct MarshalledData> itsMarshalledData; - public: - ION_to_CN(){itsMarshalledData.resize(MAX_BEAMLETS); - itsFlags.resize(MAX_BEAMLETS);}; - }; - -inline SparseSet<unsigned> &ION_to_CN::flags(const unsigned beam) -{ - return itsFlags[beam]; -} - -inline float &ION_to_CN::delayAtBegin(const unsigned beam) -{ - return itsMarshalledData[beam].delayAtBegin; -} - -inline float &ION_to_CN::delayAfterEnd(const unsigned beam) -{ - return itsMarshalledData[beam].delayAfterEnd; -} - -inline unsigned &ION_to_CN::alignmentShift(const unsigned beam) -{ - return itsMarshalledData[beam].alignmentShift; -} - -inline void ION_to_CN::read(Stream *str, const unsigned nrBeams) -{ - str->read(&itsMarshalledData[0], sizeof(struct MarshalledData) * nrBeams); - - for (unsigned beam = 0; beam < nrBeams; beam ++) - itsFlags[beam].unmarshall(itsMarshalledData[beam].flagsBuffer); -} - -inline void ION_to_CN::write(Stream *str, const unsigned nrBeams) -{ - for (unsigned beam = 0; beam < nrBeams; beam ++) { - ssize_t size = itsFlags[beam].marshall(&itsMarshalledData[beam].flagsBuffer, sizeof itsMarshalledData[beam].flagsBuffer); - - assert(size >= 0); - } - - str->write(&itsMarshalledData[0], sizeof(struct MarshalledData) * nrBeams); -} - -} // namespace CS1 -} // namespace LOFAR - -#endif diff --git a/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/Makefile.am b/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/Makefile.am index 39221130b3f..05e0f7ad9d7 100644 --- a/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/Makefile.am +++ b/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/Makefile.am @@ -6,7 +6,7 @@ pkginclude_HEADERS = Package__Version.h \ CS1_Config.h \ CS1_Parset.h \ DH_Visibilities.h \ - ION_to_CN.h \ + SubbandMetaData.h \ PrintVector.h \ RSPTimeStamp.h \ SparseSet.h \ diff --git a/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/SubbandMetaData.h b/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/SubbandMetaData.h new file mode 100644 index 00000000000..a37faccec95 --- /dev/null +++ b/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/SubbandMetaData.h @@ -0,0 +1,69 @@ +//# SubbandMetaData.h: +//# +//# Copyright (C) 2007 +//# ASTRON (Netherlands Foundation for Research in Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +//# +//# This program is free software; you can redistribute it and/or modify +//# it under the terms of the GNU General Public License as published by +//# the Free Software Foundation; either version 2 of the License, or +//# (at your option) any later version. +//# +//# This program is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License +//# along with this program; if not, write to the Free Software +//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +//# +//# $Id$ + +#ifndef LOFAR_CS1_INTERFACE_SUBBAND_META_DATA_H +#define LOFAR_CS1_INTERFACE_SUBBAND_META_DATA_H + +#include <CS1_Interface/SparseSet.h> +#include <Stream/Stream.h> + +#include <cassert> + + +namespace LOFAR { +namespace CS1 { + +struct SubbandMetaData +{ + public: + SparseSet<unsigned> getFlags() const; + void setFlags(const SparseSet<unsigned> &); + + float delayAtBegin, delayAfterEnd; + unsigned alignmentShift; + + private: + unsigned char flagsBuffer[132]; +}; + + +inline SparseSet<unsigned> SubbandMetaData::getFlags() const +{ + SparseSet<unsigned> flags; + + flags.unmarshall(flagsBuffer); + return flags; +} + + +inline void SubbandMetaData::setFlags(const SparseSet<unsigned> &flags) +{ + ssize_t size = flags.marshall(&flagsBuffer, sizeof flagsBuffer); + + assert(size >= 0); +} + + +} // namespace CS1 +} // namespace LOFAR + +#endif diff --git a/Appl/CEP/CS1/CS1_Interface/src/BGL_Configuration.cc b/Appl/CEP/CS1/CS1_Interface/src/BGL_Configuration.cc index 435b04caaaa..77d28cff96d 100644 --- a/Appl/CEP/CS1/CS1_Interface/src/BGL_Configuration.cc +++ b/Appl/CEP/CS1/CS1_Interface/src/BGL_Configuration.cc @@ -41,12 +41,6 @@ void BGL_Configuration::read(Stream *str) itsRefFreqs.resize(itsMarshalledData.itsRefFreqsSize); memcpy(&itsRefFreqs[0], itsMarshalledData.itsRefFreqs, itsMarshalledData.itsRefFreqsSize * sizeof(double)); - - itsBeamlet2beams.resize(itsMarshalledData.itsBeamlet2beamsSize); - memcpy(&itsBeamlet2beams[0], itsMarshalledData.itsBeamlet2beams, itsMarshalledData.itsBeamlet2beamsSize * sizeof(signed)); - - itsSubband2Index.resize(itsMarshalledData.itsSubband2IndexSize); - memcpy(&itsSubband2Index[0], itsMarshalledData.itsSubband2Index, itsMarshalledData.itsSubband2IndexSize * sizeof(unsigned)); } @@ -64,14 +58,6 @@ void BGL_Configuration::write(Stream *str) assert(itsMarshalledData.itsRefFreqsSize <= MAX_SUBBANDS); memcpy(itsMarshalledData.itsRefFreqs, &itsRefFreqs[0], itsMarshalledData.itsRefFreqsSize * sizeof(double)); - itsMarshalledData.itsBeamlet2beamsSize = itsBeamlet2beams.size(); - assert(itsMarshalledData.itsBeamlet2beamsSize <= MAX_SUBBANDS); - memcpy(itsMarshalledData.itsBeamlet2beams, &itsBeamlet2beams[0], itsMarshalledData.itsBeamlet2beamsSize * sizeof(signed)); - - itsMarshalledData.itsSubband2IndexSize = itsSubband2Index.size(); - assert(itsMarshalledData.itsSubband2IndexSize <= MAX_SUBBANDS); - memcpy(itsMarshalledData.itsSubband2Index, &itsSubband2Index[0], itsMarshalledData.itsSubband2IndexSize * sizeof(unsigned)); - str->write(&itsMarshalledData, sizeof itsMarshalledData); } diff --git a/Appl/CEP/CS1/CS1_Interface/src/CS1_Parset.cc b/Appl/CEP/CS1/CS1_Interface/src/CS1_Parset.cc index ef35bc78bde..d78c13ca28a 100644 --- a/Appl/CEP/CS1/CS1_Interface/src/CS1_Parset.cc +++ b/Appl/CEP/CS1/CS1_Interface/src/CS1_Parset.cc @@ -31,10 +31,15 @@ //#include <APL/APLCommon/APLUtilities.h> #include <CS1_Interface/CS1_Parset.h> +#include <Stream/FileStream.h> +#include <Stream/NullStream.h> +#include <Stream/SocketStream.h> + #include <boost/algorithm/string.hpp> #include <boost/algorithm/string/classification.hpp> #include <boost/algorithm/string/split.hpp> #include <boost/format.hpp> +#include <boost/lexical_cast.hpp> #include <algorithm> namespace LOFAR { @@ -79,6 +84,55 @@ void CS1_Parset::IONodeRSPDestPorts(uint32 pset, vector<pair<string, string> > & } } + +vector<pair<string, unsigned> > CS1_Parset::getStationNamesAndRSPboardNumbers(unsigned psetNumber) const +{ + vector<string> inputs = getStringVector(string("PIC.Core.IONProc.") + partitionName() + '[' + boost::lexical_cast<string>(psetNumber) + "].inputs"); + vector<pair<string, unsigned> > stationsAndRSPs(inputs.size()); + + for (unsigned i = 0; i < inputs.size(); i ++) { + vector<string> split = StringUtil::split(inputs[i], '/'); + + if (split.size() != 2 || split[1].substr(0, 3) != "RSP") + throw std::runtime_error(string("expected stationname/RSPn pair in \"") + inputs[i] + '"'); + + string &stationName = split[0]; + unsigned rspBoardNumber = boost::lexical_cast<unsigned>(split[1].substr(3)); + + stationsAndRSPs[i] = pair<string, unsigned>(stationName, rspBoardNumber); + } + + return stationsAndRSPs; +} + + +string CS1_Parset::getInputDescription(const string &stationName, unsigned rspBoardNumber) const +{ + return getStringVector(string("PIC.Core.Station.") + stationName + ".RSP.ports")[rspBoardNumber]; +} + + +Stream *CS1_Parset::createStream(const string &description, bool asReader) +{ + vector<string> split = StringUtil::split(description, ':'); + + if (description == "null:") + return new NullStream; + else if (split.size() == 3 && split[0] == "udp") + return new SocketStream(split[1].c_str(), boost::lexical_cast<short>(split[2]), SocketStream::UDP, asReader ? SocketStream::Server : SocketStream::Client); + else if (split.size() == 3 && split[0] == "tcp") + return new SocketStream(split[1].c_str(), boost::lexical_cast<short>(split[2]), SocketStream::TCP, asReader ? SocketStream::Server : SocketStream::Client); + else if (split.size() == 2 && split[0] == "file") + return asReader ? new FileStream(split[1].c_str()) : new FileStream(split[1].c_str(), 0666); + else if (split.size() == 2) + return new SocketStream(split[0].c_str(), boost::lexical_cast<short>(split[1]), SocketStream::UDP, asReader ? SocketStream::Server : SocketStream::Client); + else if (split.size() == 1) + return asReader ? new FileStream(split[0].c_str()) : new FileStream(split[0].c_str(), 0666); + else + throw std::runtime_error(string("unrecognized connector format: \"" + description + '"')); +} + + uint32 CS1_Parset::nrSubbands() const { uint32 nrSubbands(0); @@ -248,24 +302,6 @@ vector<double> CS1_Parset::getPhaseCentresOf(const string& name) const return list; } -vector<double> CS1_Parset::refFreqs(uint32 rspid) const -{ - vector<uint32> sb2Index = subband2Index(rspid); - vector<int32> subbandIDs = beamlet2subbands(rspid); - vector<double> refFreqs; - - refFreqs.resize(subbandIDs.size(), -1); - - for (uint i = 0; i < subbandIDs.size(); i++) - { - if (subbandIDs[i] != -1) - refFreqs[i] = ((itsObservation.nyquistZone - 1 )*(getUint32("Observation.sampleClock")*1000000/2) + - sampleRate()*subbandIDs[sb2Index[i]]); - } - - return refFreqs; -} - string CS1_Parset::getMSname(unsigned sb) const { using namespace boost; diff --git a/Appl/CEP/CS1/CS1_Run/src/CS1.parset b/Appl/CEP/CS1/CS1_Run/src/CS1.parset index 9825c8d80cc..f67ea15068e 100644 --- a/Appl/CEP/CS1/CS1_Run/src/CS1.parset +++ b/Appl/CEP/CS1/CS1_Run/src/CS1.parset @@ -13,7 +13,7 @@ OLAP.OLAP_Conn.BGLProc_Storage_Ports = [8300..8363] # should be one of NULL(inputFromMemory), FILE, TCP, UDP, ETHERNET -OLAP.OLAP_Conn.station_Input_Transport = UDP +OLAP.OLAP_Conn.station_Input_Transport = NULL # should be one of ZOID, FCNP, TCP, NULL # BGLProc doesn't open the parset file! @@ -23,8 +23,9 @@ OLAP.OLAP_Conn.rawDataServers = ["10.181.0.4"] OLAP.OLAP_Conn.rawDataPorts = [4000] # Variables for Storage -OLAP.subbandsPerPset = 6 #6 -OLAP.psetsPerStorage = 2 #3 +OLAP.subbandsPerPset = 3 +OLAP.psetsPerStorage = 8 +OLAP.realTime = T OLAP.BGLProc.integrationSteps = 608 #768 at 200MHz OLAP.BGLProc.nrPPFTaps=16 OLAP.BGLProc.coresPerPset = 64 @@ -44,54 +45,62 @@ OLAP.DelayComp.nrCalcDelays = 16 OLAP.IPHeaderSize = 32 OLAP.EPAHeaderSize = 16 OLAP.nrTimesInFrame = 16 -OLAP.nrSubbandsPerFrame = 18 +OLAP.nrSubbandsPerFrame = 48 OLAP.nrBitsPerSample=16 -OLAP.nrSecondsOfBuffer = 6 -OLAP.maxNetworkDelay = 2.0 # 1 second extra added to compensate for timestamp bug +OLAP.nrSecondsOfBuffer = 4 +OLAP.maxNetworkDelay = 0.5 # 1 second extra added to compensate for timestamp bug OLAP.delayCompensation = F Observation.sampleClock = 160 -# Nyquist zone: -Observation.bandFilter = LBL_10_80 -#Observation.bandFilter = LBL_30_80 +#Observation.bandFilter = LBL_10_80 +Observation.bandFilter = LBL_30_80 #Observation.bandFilter = LBH_10_80 #Observation.bandFilter = LBH_30_80 #Observation.bandFilter = HB_100_190 #Observation.bandFilter = HB_170_230 #Observation.bandFilter = HB_210_240 -Observation.Beam[1].angle1 = 0 # NCP -Observation.Beam[1].angle2 = 1.570796327 -Observation.Beam[2].angle1 = 5.2336866848083394 # Cygnus -Observation.Beam[2].angle2 = 0.71094251447010637 -Observation.Beam[3].angle1 = 6.1234876806221052 # Cas A -Observation.Beam[3].angle2 = 1.0265153995604648 -Observation.Beam[4].angle1 = 0 # NCP -Observation.Beam[4].angle2 = 1.570796327 -Observation.Beam[5].angle1 = 0.9293405574 # pulsar -Observation.Beam[5].angle2 = 0.9525774347 -Observation.Beam[6].angle1 = 4.5192832066722115 # Jupiter -Observation.Beam[6].angle2 = 5.893698795 -Observation.Beam[7].angle1 = 1.4596748494230258 # Taurus -Observation.Beam[7].angle2 = 0.38422502336661052 -Observation.Beam[8].angle1 = 0 # Test -Observation.Beam[8].angle2 = 0 +#Observation.Beam[1].angle1 = 0 # NCP +#Observation.Beam[1].angle2 = 1.570796327 +Observation.Beam[1].angle1 = 5.2336866848083394 # Cygnus +Observation.Beam[1].angle2 = 0.71094251447010637 +#Observation.Beam[1].angle1 = 6.1234876806221052 # Cas A +#Observation.Beam[1].angle2 = 1.0265153995604648 +#Observation.Beam[1].angle1 = 0 # NCP +#Observation.Beam[1].angle2 = 1.570796327 +#Observation.Beam[1].angle1 = 0.9293405574 # pulsar +#Observation.Beam[1].angle2 = 0.9525774347 +#Observation.Beam[1].angle1 = 4.5192832066722115 # Jupiter +#Observation.Beam[1].angle2 = 5.893698795 +#Observation.Beam[1].angle1 = 1.4596748494230258 # Taurus +#Observation.Beam[1].angle2 = 0.38422502336661052 +#Observation.Beam[8].angle1 = 0 # Test +#Observation.Beam[8].angle2 = 0 Observation.nrBeams = 1 - Observation.Beam[1].directionTypes = J2000 -Observation.Beam[1].subbandList = [57,73,89,121,138,154,170,187,203,219,236,252,268,285,301,317,334,350,366,383,399,415,432,448] -Observation.Beam[1].beamletList = [0..23] +Observation.Beam[1].subbandList = [300..347] +Observation.Beam[1].beamletList = [0..47] +#Observation.Beam[1].subbandList = [257] +#Observation.Beam[1].beamletList = [0] # example 2 beams: #Observation.nrBeams = 2 #Observation.Beam[1].directionTypes = J2000 -#Observation.Beam[1].subbandList = [0..3, 0..3, 0..3, 0..3] -#Observation.Beam[1].beamletList = [0..3,54..57,108..111,162..165] +#Observation.Beam[1].subbandList = [300..303] +#Observation.Beam[1].beamletList = [10..13] +#Observation.Beam[2].directionTypes = J2000 +#Observation.Beam[2].subbandList = [304..307] +#Observation.Beam[2].beamletList = [14..17] + +#Observation.nrBeams = 2 +#Observation.Beam[1].directionTypes = J2000 +#Observation.Beam[1].subbandList = [300..303, 0..3, 0..3, 0..3] +#Observation.Beam[1].beamletList = [10..13,54..57,108..111,162..165] #Observation.Beam[2].directionTypes = J2000 -#Observation.Beam[2].subbandList = [4..7, 4..7, 4..7, 4..7] -#Observation.Beam[2].beamletList = [4..7,58..61,112..115,166..169] +#Observation.Beam[2].subbandList = [304..307, 4..7, 4..7, 4..7] +#Observation.Beam[2].beamletList = [14..17,58..61,112..115,166..169] Observation.channelsPerSubband = 256 Observation.nrPolarisations = 2 diff --git a/Appl/CEP/CS1/CS1_Run/src/CS1_Hosts.py b/Appl/CEP/CS1/CS1_Run/src/CS1_Hosts.py index dfecaabb0d0..7348f4e02d5 100644 --- a/Appl/CEP/CS1/CS1_Run/src/CS1_Hosts.py +++ b/Appl/CEP/CS1/CS1_Run/src/CS1_Hosts.py @@ -4,7 +4,7 @@ import sys listfen = ClusterFEN(name = 'listfen' , address = '129.125.99.50') -listfen.setSlavesByPattern('list%03d', '10.181.0.%d', [4]) +listfen.setSlavesByPattern('list%03d', '10.181.0.%d', [3,4]) #listfen.slaves.append(ClusterSlave('lifs001', '10.182.0.1')) #listfen.slaves.append(ClusterSlave('lifs002', '10.182.0.2')) #listfen.slaves.append(ClusterSlave('lifs003', '10.182.0.3')) diff --git a/Appl/CEP/CS1/CS1_Run/src/CS1_Parset.py b/Appl/CEP/CS1/CS1_Run/src/CS1_Parset.py index 0145eca9192..fb2777be719 100644 --- a/Appl/CEP/CS1/CS1_Run/src/CS1_Parset.py +++ b/Appl/CEP/CS1/CS1_Run/src/CS1_Parset.py @@ -30,7 +30,8 @@ class CS1_Parset(LOFAR_Parset.Parset): elif self.clock == '200MHz': self['Observation.sampleClock'] = 200 self['OLAP.BGLProc.integrationSteps'] = 768 - #self['OLAP.BGLProc.integrationSteps'] = 16 + #self['OLAP.BGLProc.integrationSteps'] = 384 + #self['OLAP.BGLProc.integrationSteps'] = 192 self.updateSBValues() def getClockString(self): @@ -43,6 +44,7 @@ class CS1_Parset(LOFAR_Parset.Parset): def setPartition(self, partition): self.partition = partition + self['OLAP.BGLProc.partition'] = partition def getPartition(self): return self.partition diff --git a/Appl/CEP/CS1/CS1_Run/src/CS1_Run.py b/Appl/CEP/CS1/CS1_Run/src/CS1_Run.py index cbf9db440cd..7d7aee6258c 100755 --- a/Appl/CEP/CS1/CS1_Run/src/CS1_Run.py +++ b/Appl/CEP/CS1/CS1_Run/src/CS1_Run.py @@ -44,7 +44,7 @@ def doObservation(obsID, parset): sectionTable = dict({\ 'IONProcSection': IONProcSection(parset, userId.getHost(), options.partition), 'BGLProcSection': BGLProcSection(parset, userId.getHost(), options.partition), - 'StorageSection': StorageSection(parset, listfen) + #'StorageSection': StorageSection(parset, listfen) #Flagger(parset, listfen) }) @@ -131,9 +131,9 @@ if __name__ == '__main__': # do not use the callback actions of the OptionParser, because we must make sure we read the parset before adding any variables parser.add_option('--parset' , dest='parset' , default='CS1.parset', type='string', help='name of the parameterset [%default]') parser.add_option('--partition' , dest='partition' , default='R000_128_0T',type='string', help='name of the BGL partion [%default]') - parser.add_option('--clock' , dest='clock' , default='160MHz' , type='string', help='clock frequency (either 160MHz or 200MHz) [%default]') + parser.add_option('--clock' , dest='clock' , default='200MHz' , type='string', help='clock frequency (either 160MHz or 200MHz) [%default]') parser.add_option('--runtime' , dest='runtime' , default='600' , type='int' , help='length of measurement in seconds [%default]') - parser.add_option('--starttime' , dest='starttime', default=int(time.time() + 90), type='int', help='start of measurement in UTC seconds [now + 90s]') + parser.add_option('--starttime' , dest='starttime', default=int(time.time() + 30), type='int', help='start of measurement in UTC seconds [now + 30]') parser.add_option('--integrationtime', dest='integrationtime', default='60' , type='int' , help='length of integration interval in seconds [%default]') parser.add_option('--msname' , dest='msname' , type='string', help='name of the measurement set') parser.add_option('--stationlist' , dest='stationlist' , default='CS010_4dipoles0_4_8_12', type='string', help='name of the station or stationconfiguration (see CS1_Stations.py) [%default]') diff --git a/Appl/CEP/CS1/CS1_Run/src/CS1_Sections.py b/Appl/CEP/CS1/CS1_Run/src/CS1_Sections.py index 26bed4952a0..84e358413b3 100644 --- a/Appl/CEP/CS1/CS1_Run/src/CS1_Sections.py +++ b/Appl/CEP/CS1/CS1_Run/src/CS1_Sections.py @@ -171,8 +171,8 @@ class BGLProcSection(Section): coresPerPset = self.parset.getInt32('OLAP.BGLProc.coresPerPset') subbandsPerPset = self.parset.getInt32('OLAP.subbandsPerPset') actualRuns = int(noRuns * subbandsPerPset / coresPerPset) - if not actualRuns * coresPerPset == noRuns * subbandsPerPset: - raise Exception('illegal number of runs') + #if not actualRuns * coresPerPset == noRuns * subbandsPerPset: + #raise Exception('illegal number of runs') Section.run(self, runlog, actualRuns, runCmd) class GeneratorSection(Section): diff --git a/Appl/CEP/CS1/CS1_Run/src/CS1_Stations.py b/Appl/CEP/CS1/CS1_Run/src/CS1_Stations.py index 1066de93874..1d15a37f365 100644 --- a/Appl/CEP/CS1/CS1_Run/src/CS1_Stations.py +++ b/Appl/CEP/CS1/CS1_Run/src/CS1_Stations.py @@ -212,7 +212,7 @@ CS016 = [Station('CS016')] # CS0XX_4dipoles_2 = CS0XX_dipole2 + CSXX_dipole6 + CS0XX_dipole10 + CS0XX_dipole14 AllStations = CS001 + CS010 + CS016 -AllMicroStations = CS010_4us + CS001_4us + CS008_4us + CS016_4us +AllMicroStations = CS010_4us + CS001_4us + CS016_4us AllDipoles = CS010_4dipoles_1 + CS001_4dipoles_1 + CS008_4dipoles_1 + CS016_4dipoles_1 AllDipolesMixed = CS010_4dipoles_1 + CS001_4dipoles_2 + CS008_4dipoles_2 + CS016_4dipoles_2 AllHBAs = CS010_4HBAs + CS001_4HBAs + CS008_4HBAs + CS016_4HBAs diff --git a/Appl/CEP/CS1/CS1_Run/src/OLAP.parset b/Appl/CEP/CS1/CS1_Run/src/OLAP.parset index 60bdc454605..b08cb189c4f 100644 --- a/Appl/CEP/CS1/CS1_Run/src/OLAP.parset +++ b/Appl/CEP/CS1/CS1_Run/src/OLAP.parset @@ -484,37 +484,51 @@ PIC.Core.B03_RSP.dest.ports = [10.170.0.13:4346,10.170.0.14:4347] PIC.Core.B03_0.RSP = 0 PIC.Core.B03_1.RSP = 1 -PIC.Core.T0.phaseCenter = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_0.position = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_1.position = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_2.position = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_3.position = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_4.position = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_5.position = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_6.position = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_7.position = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_8.position = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_9.position = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_10.position = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_11.position = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_12.position = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_13.position = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_14.position = [0.119884530715, 0.920263520535, 6364621.19236] -PIC.Core.T0_15.position = [0.119884530715, 0.920263520535, 6364621.19236] - -PIC.Core.T0_0.port = 10.170.0.2:4345 -PIC.Core.T0_1.port = 10.170.0.1:4345 -PIC.Core.T0_2.port = 10.170.0.4:4345 -PIC.Core.T0_3.port = 10.170.0.3:4345 -PIC.Core.T0_4.port = 10.170.0.10:4345 -PIC.Core.T0_5.port = 10.170.0.9:4345 -PIC.Core.T0_6.port = 10.170.0.12:4345 -PIC.Core.T0_7.port = 10.170.0.11:4345 -PIC.Core.T0_8.port = 10.170.0.18:4345 -PIC.Core.T0_9.port = 10.170.0.17:4345 -PIC.Core.T0_10.port = 10.170.0.20:4345 -PIC.Core.T0_11.port = 10.170.0.19:4345 -PIC.Core.T0_12.port = 10.170.0.26:4345 -PIC.Core.T0_13.port = 10.170.0.25:4345 -PIC.Core.T0_14.port = 10.170.0.28:4345 -PIC.Core.T0_15.port = 10.170.0.27:4345 +PIC.Core.Station.B00_0.RSP.ports = [10.170.0.1:4346,10.170.0.1:4347,10.170.0.1:4348,10.170.0.1:4349] +PIC.Core.Station.B00_1.RSP.ports = [10.170.0.2:4346,10.170.0.2:4347,10.170.0.2:4348,10.170.0.2:4349] +PIC.Core.IONProc.R000-B00[0].inputs = [B00_0/RSP0,B00_0/RSP1,B00_0/RSP2,B00_0/RSP3] +PIC.Core.IONProc.R000-B00[1].inputs = [B00_1/RSP0,B00_1/RSP1,B00_1/RSP2,B00_1/RSP3] + +PIC.Core.Station.B01_0.RSP.ports = [10.170.0.5:4346,10.170.0.5:4347,10.170.0.5:4348,10.170.0.5:4349] +PIC.Core.Station.B01_1.RSP.ports = [10.170.0.6:4346,10.170.0.6:4347,10.170.0.6:4348,10.170.0.6:4349] +PIC.Core.IONProc.R000-B01[0].inputs = [B01_0/RSP0,B01_0/RSP1,B01_0/RSP2,B01_0/RSP3] +PIC.Core.IONProc.R000-B01[1].inputs = [B01_1/RSP0,B01_1/RSP1,B01_1/RSP2,B01_1/RSP3] + +PIC.Core.Station.B02_0.RSP.ports = [10.170.0.9:4346,10.170.0.9:4347,10.170.0.9:4348,10.170.0.9:4349] +PIC.Core.Station.B02_1.RSP.ports = [10.170.0.10:4346,10.170.0.10:4347,10.170.0.10:4348,10.170.0.10:4349] +PIC.Core.IONProc.R000-B02[0].inputs = [B02_0/RSP0,B02_0/RSP1,B02_0/RSP2,B02_0/RSP3] +PIC.Core.IONProc.R000-B02[1].inputs = [B02_1/RSP0,B02_1/RSP1,B02_1/RSP2,B02_1/RSP3] + +PIC.Core.Station.B03_0.RSP.ports = [10.170.0.13:4346,10.170.0.13:4347,10.170.0.13:4348,10.170.0.13:4349] +PIC.Core.Station.B03_1.RSP.ports = [10.170.0.14:4346,10.170.0.14:4347,10.170.0.14:4348,10.170.0.14:4349] +PIC.Core.IONProc.R000-B03[0].inputs = [B03_0/RSP0,B03_0/RSP1,B03_0/RSP2,B03_0/RSP3] +PIC.Core.IONProc.R000-B03[1].inputs = [B03_1/RSP0,B03_1/RSP1,B03_1/RSP2,B03_1/RSP3] + +PIC.Core.Station.CS001_us0.RSP.ports = [null:] #[10.170.0.33:4346] +PIC.Core.Station.CS001_us1.RSP.ports = [null:] #[10.170.0.34:4347] +PIC.Core.Station.CS001_us2.RSP.ports = [null:] #[10.170.0.37:4348] +PIC.Core.Station.CS001_us3.RSP.ports = [null:] #[10.170.0.38:4349] +PIC.Core.Station.CS010_us0.RSP.ports = [null:] #[10.170.0.41:4346] +PIC.Core.Station.CS010_us1.RSP.ports = [null:] #[10.170.0.42:4347] +PIC.Core.Station.CS010_us2.RSP.ports = [null:] #[10.170.0.45:4348] +PIC.Core.Station.CS010_us3.RSP.ports = [null:] #[10.170.0.46:4349] +PIC.Core.Station.CS016_us0.RSP.ports = [null:] #[10.170.0.49:4346] +PIC.Core.Station.CS016_us1.RSP.ports = [null:] #[10.170.0.50:4347] +PIC.Core.Station.CS016_us2.RSP.ports = [null:] #[10.170.0.53:4348] +PIC.Core.Station.CS016_us3.RSP.ports = [null:] #[10.170.0.54:4349] +PIC.Core.IONProc.R000_256_1[0].inputs = [CS001_us0/RSP0] +PIC.Core.IONProc.R000_256_1[1].inputs = [CS001_us1/RSP0] +PIC.Core.IONProc.R000_256_1[2].inputs = [CS001_us2/RSP0] +PIC.Core.IONProc.R000_256_1[3].inputs = [CS001_us3/RSP0] +PIC.Core.IONProc.R000_256_1[4].inputs = [CS010_us0/RSP0] +PIC.Core.IONProc.R000_256_1[5].inputs = [CS010_us1/RSP0] +PIC.Core.IONProc.R000_256_1[6].inputs = [CS010_us2/RSP0] +PIC.Core.IONProc.R000_256_1[7].inputs = [CS010_us3/RSP0] +PIC.Core.IONProc.R000_256_1[8].inputs = [CS016_us0/RSP0] +PIC.Core.IONProc.R000_256_1[9].inputs = [CS016_us1/RSP0] +PIC.Core.IONProc.R000_256_1[10].inputs = [CS016_us2/RSP0] +PIC.Core.IONProc.R000_256_1[11].inputs = [CS016_us3/RSP0] +PIC.Core.IONProc.R000_256_1[12].inputs = [] +PIC.Core.IONProc.R000_256_1[13].inputs = [] +PIC.Core.IONProc.R000_256_1[14].inputs = [] +PIC.Core.IONProc.R000_256_1[15].inputs = [] diff --git a/Appl/CEP/CS1/CS1_Storage/src/WH_SubbandWriter.cc b/Appl/CEP/CS1/CS1_Storage/src/WH_SubbandWriter.cc index c62417f9b3a..be18d8f3304 100644 --- a/Appl/CEP/CS1/CS1_Storage/src/WH_SubbandWriter.cc +++ b/Appl/CEP/CS1/CS1_Storage/src/WH_SubbandWriter.cc @@ -180,7 +180,7 @@ namespace LOFAR for (uint sb = 0; sb < itsNrSubbandsPerStorage; ++sb) { // compensate for the half-channel shift introduced by the PPF - double refFreq = refFreqs[sb2Index[itsSubbandIDs[sb]]] - chanWidth / 2; + double refFreq = refFreqs[itsSubbandIDs[sb]] - chanWidth / 2; itsBandIDs[sb] = itsWriters[sb]->addBand(itsNPolSquared, itsNChannels, refFreq, chanWidth); } -- GitLab