diff --git a/RTCP/Cobalt/CoInterface/src/Parset.cc b/RTCP/Cobalt/CoInterface/src/Parset.cc index 209ad4f378229a65b6c39182e8deb152a4cfb2ba..75b1bef990278e3b8eb437197fa53fe1b82a81c2 100644 --- a/RTCP/Cobalt/CoInterface/src/Parset.cc +++ b/RTCP/Cobalt/CoInterface/src/Parset.cc @@ -494,9 +494,13 @@ namespace LOFAR settings.correlator.nrChannels = getUint32("Cobalt.Correlator.nrChannelsPerSubband", 64); //settings.correlator.nrChannels = getUint32("Observation.channelsPerSubband", 64); settings.correlator.channelWidth = settings.subbandWidth() / settings.correlator.nrChannels; - settings.correlator.nrSamplesPerChannel = settings.blockSize / settings.correlator.nrChannels; + settings.correlator.nrSamplesPerBlock = settings.blockSize / settings.correlator.nrChannels; settings.correlator.nrBlocksPerIntegration = getUint32("Cobalt.Correlator.nrBlocksPerIntegration", 1); - settings.correlator.nrBlocksPerObservation = static_cast<size_t>(floor((settings.stopTime - settings.startTime) / settings.correlator.integrationTime())); + settings.correlator.nrIntegrationsPerBlock = getUint32("Cobalt.Correlator.nrIntegrationsPerBlock", 1); + + // We either have the integration time spanning multiple blocks, or the integration time being a part + // of a block, but never both. + ASSERT(settings.correlator.nrBlocksPerIntegration == 1 || settings.correlator.nrIntegrationsPerBlock == 1); // super-station beam former // @@ -818,7 +822,7 @@ namespace LOFAR } } - settings.beamFormer.dedispersionFFTsize = getUint32("Cobalt.BeamFormer.dedispersionFFTsize", settings.correlator.nrSamplesPerChannel); + settings.beamFormer.dedispersionFFTsize = getUint32("Cobalt.BeamFormer.dedispersionFFTsize", settings.blockSize); } // set output hosts @@ -929,6 +933,10 @@ namespace LOFAR return max_n_FFT_pow2; } + size_t ObservationSettings::nrBlocks() const { + return static_cast<size_t>(floor((stopTime - startTime) * subbandWidth() / blockSize)); + } + double ObservationSettings::subbandWidth() const { return 1.0 * clockHz() / 1024; @@ -962,7 +970,11 @@ namespace LOFAR } double ObservationSettings::Correlator::integrationTime() const { - return 1.0 * nrSamplesPerChannel * nrBlocksPerIntegration / channelWidth; + return 1.0 * nrSamplesPerIntegration() / channelWidth; + } + + size_t ObservationSettings::Correlator::nrSamplesPerIntegration() const { + return nrSamplesPerBlock / nrIntegrationsPerBlock * nrBlocksPerIntegration; } std::vector<struct ObservationSettings::FileLocation> Parset::getFileLocations(const std::string outputType) const { @@ -1207,26 +1219,6 @@ namespace LOFAR return settings.observationID; } - double Parset::startTime() const - { - return settings.startTime; - } - - double Parset::stopTime() const - { - return settings.stopTime; - } - - unsigned Parset::nrCorrelatedBlocks() const - { - return settings.correlator.nrBlocksPerObservation; - } - - unsigned Parset::nrBeamFormedBlocks() const - { - return static_cast<unsigned>(floor( (stopTime() - startTime()) / CNintegrationTime())); - } - ssize_t ObservationSettings::antennaFieldIndex(const std::string &name) const { for (size_t a = 0; a < antennaFields.size(); ++a) { @@ -1281,24 +1273,9 @@ namespace LOFAR return stations * (stations + 1) / 2; } - unsigned Parset::nrCrossPolarisations() const - { - return settings.nrCrossPolarisations(); - } - - unsigned Parset::clockSpeed() const - { - return settings.clockHz(); - } - - double Parset::subbandBandwidth() const - { - return settings.subbandWidth(); - } - double Parset::sampleDuration() const { - return 1.0 / subbandBandwidth(); + return 1.0 / settings.subbandWidth(); } unsigned Parset::dedispersionFFTsize() const @@ -1311,21 +1288,6 @@ namespace LOFAR return settings.nrBitsPerSample; } - unsigned Parset::CNintegrationSteps() const - { - return settings.correlator.nrSamplesPerChannel; - } - - unsigned Parset::IONintegrationSteps() const - { - return settings.correlator.nrBlocksPerIntegration; - } - - unsigned Parset::integrationSteps() const - { - return CNintegrationSteps() * IONintegrationSteps(); - } - bool Parset::outputThisType(OutputType outputType) const { switch (outputType) { @@ -1335,41 +1297,16 @@ namespace LOFAR } } - double Parset::CNintegrationTime() const - { - return nrSamplesPerSubband() / subbandBandwidth(); - } - - double Parset::IONintegrationTime() const - { - return settings.correlator.integrationTime(); - } - unsigned Parset::nrSamplesPerSubband() const { return settings.nrSamplesPerSubband(); } - unsigned Parset::nrSamplesPerChannel() const - { - return settings.correlator.enabled ? settings.correlator.nrSamplesPerChannel : 0; - } - - unsigned Parset::nrChannelsPerSubband() const - { - return settings.correlator.enabled ? settings.correlator.nrChannels : 0; - } - size_t Parset::nrSubbands() const { return settings.subbands.size(); } - double Parset::channelWidth() const - { - return settings.correlator.channelWidth; - } - bool Parset::delayCompensation() const { return settings.delayCompensation.enabled; @@ -1395,7 +1332,7 @@ namespace LOFAR // if the 2nd PPF is used, the subband is shifted half a channel // downwards, so subtracting half a subband results in the // center of channel 0 (instead of the bottom). - return sbFreq - 0.5 * subbandBandwidth(); + return sbFreq - 0.5 * settings.subbandWidth(); } bool Parset::realTime() const diff --git a/RTCP/Cobalt/CoInterface/src/Parset.h b/RTCP/Cobalt/CoInterface/src/Parset.h index 128d323c85d46adcad151d85d9e7d82e9c7245de..8dea9496365303dd151f806adcfddcaedd31ec50 100644 --- a/RTCP/Cobalt/CoInterface/src/Parset.h +++ b/RTCP/Cobalt/CoInterface/src/Parset.h @@ -104,6 +104,8 @@ namespace LOFAR // key: Cobalt.blockSize size_t blockSize; + size_t nrBlocks() const; + // Alias for blockSize size_t nrSamplesPerSubband() const; @@ -394,22 +396,29 @@ namespace LOFAR // The number of samples in one block of one channel. // // key: OLAP.CNProc.integrationSteps - size_t nrSamplesPerChannel; + size_t nrSamplesPerBlock; // The number of blocks to integrate to obtain the final // integration time. // + // If >1, the integration time is longer than the blockSize. + // // key: Cobalt.Correlator.nrBlocksPerIntegration size_t nrBlocksPerIntegration; + // The number of integrations to produce per block. + // + // If >1, the integration time is shorter than the blockSize. + // + // key: Cobalt.Correlator.nrIntegrationsPerBlock + size_t nrIntegrationsPerBlock; + + // The number of samples to integrate over. + size_t nrSamplesPerIntegration() const; + // The total integration time of all blocks, in seconds. double integrationTime() const; - // The number of blocks in this observation. - // - // set to: floor((stopTime - startTime) / integrationTime()) - size_t nrBlocksPerObservation; - struct Station { // The name of this (super)station // @@ -661,36 +670,20 @@ namespace LOFAR void write(Stream *) const; unsigned observationID() const; - double startTime() const; - double stopTime() const; - - unsigned nrCorrelatedBlocks() const; - unsigned nrBeamFormedBlocks() const; unsigned nrStations() const; unsigned nrTabStations() const; unsigned nrMergedStations() const; std::vector<std::string> mergedStationNames() const; unsigned nrBaselines() const; - unsigned nrCrossPolarisations() const; - unsigned clockSpeed() const; // Hz - double subbandBandwidth() const; double sampleDuration() const; unsigned nrBitsPerSample() const; size_t nrBytesPerComplexSample() const; MultiDimArray<double,2> positions() const; std::string positionType() const; unsigned dedispersionFFTsize() const; - unsigned CNintegrationSteps() const; - unsigned IONintegrationSteps() const; - unsigned integrationSteps() const; - double CNintegrationTime() const; - double IONintegrationTime() const; - unsigned nrSamplesPerChannel() const; unsigned nrSamplesPerSubband() const; - unsigned nrChannelsPerSubband() const; - double channelWidth() const; bool delayCompensation() const; bool correctClocks() const; bool correctBandPass() const; diff --git a/RTCP/Cobalt/CoInterface/test/tParset.cc b/RTCP/Cobalt/CoInterface/test/tParset.cc index 64cff7e812d2b6cd83b282ebcfd301bf9eec9958..2fe63414500bad79b34d1349fd17fe29a75a0a02 100644 --- a/RTCP/Cobalt/CoInterface/test/tParset.cc +++ b/RTCP/Cobalt/CoInterface/test/tParset.cc @@ -102,14 +102,12 @@ TEST(startTime) { Parset ps = makeDefaultTestParset("Observation.startTime", "2013-03-17 10:55:08"); CHECK_CLOSE(1363517708.0, ps.settings.startTime, 0.1); - CHECK_CLOSE(1363517708.0, ps.startTime(), 0.1); } TEST(stopTime) { Parset ps = makeDefaultTestParset("Observation.stopTime", "2013-03-17 10:55:08"); CHECK_CLOSE(1363517708.0, ps.settings.stopTime, 0.1); - CHECK_CLOSE(1363517708.0, ps.stopTime(), 0.1); } SUITE(clockMHz) { @@ -117,10 +115,9 @@ SUITE(clockMHz) { Parset ps = makeDefaultTestParset("Observation.sampleClock", "200"); CHECK_EQUAL(200U, ps.settings.clockMHz); - CHECK_EQUAL(200000000U, ps.clockSpeed()); + CHECK_EQUAL(200000000U, ps.settings.clockHz()); CHECK_CLOSE(195312.5, ps.settings.subbandWidth(), 0.001); - CHECK_CLOSE(195312.5, ps.subbandBandwidth(), 0.001); CHECK_CLOSE(1.0/195312.5, ps.sampleDuration(), 0.001); } @@ -128,10 +125,9 @@ SUITE(clockMHz) { Parset ps = makeDefaultTestParset("Observation.sampleClock", "160"); CHECK_EQUAL(160U, ps.settings.clockMHz); - CHECK_EQUAL(160000000U, ps.clockSpeed()); + CHECK_EQUAL(160000000U, ps.settings.clockHz()); CHECK_CLOSE(156250.0, ps.settings.subbandWidth(), 0.001); - CHECK_CLOSE(156250.0, ps.subbandBandwidth(), 0.001); CHECK_CLOSE(1.0/156250.0, ps.sampleDuration(), 0.001); } } @@ -169,7 +165,6 @@ TEST(nrPolarisations) { CHECK_EQUAL(nPol, ps.settings.nrPolarisations); CHECK_EQUAL(nPol * nPol, ps.settings.nrCrossPolarisations()); - CHECK_EQUAL(nPol * nPol, ps.nrCrossPolarisations()); } SUITE(corrections) { @@ -741,7 +736,6 @@ SUITE(correlator) { ps.updateSettings(); CHECK_EQUAL(256U, ps.settings.correlator.nrChannels); - CHECK_EQUAL(256U, ps.nrChannelsPerSubband()); } TEST(channelWidth) { @@ -754,7 +748,6 @@ SUITE(correlator) { ps.updateSettings(); CHECK_CLOSE(ps.settings.subbandWidth() / nrChannels, ps.settings.correlator.channelWidth, 0.00001); - CHECK_CLOSE(ps.settings.subbandWidth() / nrChannels, ps.channelWidth(), 0.00001); } } @@ -768,9 +761,7 @@ SUITE(correlator) { ps.updateSettings(); // verify settings - CHECK_EQUAL(4U, ps.settings.correlator.nrSamplesPerChannel); - CHECK_EQUAL(4U, ps.CNintegrationSteps()); - CHECK_EQUAL(4U, ps.nrSamplesPerChannel()); + CHECK_EQUAL(4U, ps.settings.correlator.nrSamplesPerBlock); } TEST(nrBlocksPerIntegration) { @@ -783,7 +774,6 @@ SUITE(correlator) { // verify settings CHECK_EQUAL(42U, ps.settings.correlator.nrBlocksPerIntegration); - CHECK_EQUAL(42U, ps.IONintegrationSteps()); } /* TODO: test super-station beam former */ @@ -1149,8 +1139,10 @@ SUITE(integration) { CHECK_EQUAL(true, ps.settings.correlator.enabled); CHECK_EQUAL(64U, ps.settings.correlator.nrChannels); CHECK_CLOSE(3051.76, ps.settings.correlator.channelWidth, 0.01); - CHECK_EQUAL(768U, ps.settings.correlator.nrSamplesPerChannel); - CHECK_EQUAL(30U, ps.settings.correlator.nrBlocksPerIntegration); + CHECK_EQUAL(768U, ps.settings.correlator.nrSamplesPerBlock); + CHECK_EQUAL(30U, ps.settings.correlator.nrBlocksPerIntegration); + CHECK_EQUAL(30U * 768U, ps.settings.correlator.nrSamplesPerIntegration()); + CHECK_EQUAL(1U, ps.settings.correlator.nrIntegrationsPerBlock); CHECK_EQUAL(nrStations, ps.settings.correlator.stations.size()); for (unsigned st = 0; st < nrStations; ++st) { CHECK_EQUAL(ps.settings.antennaFields[st].name, ps.settings.correlator.stations[st].name); diff --git a/RTCP/Cobalt/GPUProc/doc/bf-pipeline.txt b/RTCP/Cobalt/GPUProc/doc/bf-pipeline.txt index 0ea6927d2136a233397c15a2e404a85e59c33917..b1d6aca5279a0097290d93c90eeeefd7ead320bc 100644 --- a/RTCP/Cobalt/GPUProc/doc/bf-pipeline.txt +++ b/RTCP/Cobalt/GPUProc/doc/bf-pipeline.txt @@ -23,6 +23,7 @@ For max size, we assume: - 48 stations - 1 subband - 1 second blocks (195312.5 samples) rounded to next multiple of 4096 (= 196608 samples). + - 10 integration periods/block (~0.1s integration time) Note: MiB = 2^20 bytes (= 1048576 bytes). @@ -49,7 +50,7 @@ Delay compensation + Band pass + Transpose {I/O: delays} | [station][channel][sample][pol] [80][64][3072][2] = 240 MiB B V Correlator - | [baseline][channel][pol][pol] [3240][64][2][2] = 6 MiB E + | [subblks][baseline][channel][pol][pol] [10][3240][64][2][2]= 60 MiB E V (output) diff --git a/RTCP/Cobalt/GPUProc/doc/corr-pipeline.txt b/RTCP/Cobalt/GPUProc/doc/corr-pipeline.txt deleted file mode 100644 index c73c97a448f1a46029df32fa486819195b3051ff..0000000000000000000000000000000000000000 --- a/RTCP/Cobalt/GPUProc/doc/corr-pipeline.txt +++ /dev/null @@ -1,53 +0,0 @@ ------------------------- -Corelator Pipeline ------------------------- - -Design decisions: -------------------- - -* The PPF is at the beginning of the pipeline, to introduce the parallellism immediately, and to - allow the subsequent processing kernels to operate in the channel space. -* 1 channel/subband is only used for debugging purposes -- its performance is likely quite bad. - -Pipeline -------------------- - -For array dimensions, we assume: - - 64 channels - -For max size, we assume: - - 80 stations - - 1 subband - - 1 second blocks (195312.5 samples) rounded to next multiple of 4096 (= 196608 samples). - -Note: - MiB = 2^20 bytes (= 1048576 bytes). - -[ A -> E, trashes B ] - -Flow: Data dimensions: Max size (fcomplex): Buffer: -=================================================================================================================== -(input) [station][sample][pol] [80][196608][2] = 120 MiB A - | - V - FIR (if >1ch) {IntToFloat} {I/O: history samples} - | [station][pol][sample] [80][2][196608] = 240 MiB Nch: B - V - FFT (if >1ch) {inplace} - | [station][pol][sample][channel] [80][2][3072][64] = 240 MiB Nch: E - V -Delay compensation + Band pass + Transpose {I/O: delays} - | [station][channel][sample][pol] [80][64][3072][2] = 240 MiB B - V -Correlator - | [baseline][channel][pol][pol] [3240][64][2][2] = 6 MiB E - V -(output) - - -A = 120 MiB -B = 240 MiB -E = 240 MiB - -A and E are allowed to overlap if A is not needed afterwards. - diff --git a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc index 6d9ad7c60a64e2cc215eed72bb1469e501a08ed5..ccbf19dd686b177e09da723d160158beed2ec546 100644 --- a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc +++ b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc @@ -70,11 +70,11 @@ namespace LOFAR { stationID(StationID::parseFullFieldName(ps.settings.antennaFields.at(stationIdx).name)), logPrefix(str(format("[station %s] ") % stationID.name())), - startTime(ps.startTime() * ps.subbandBandwidth(), ps.clockSpeed()), - stopTime(ps.stopTime() * ps.subbandBandwidth(), ps.clockSpeed()), + startTime(ps.settings.startTime * ps.settings.subbandWidth(), ps.settings.clockHz()), + stopTime(ps.settings.stopTime * ps.settings.subbandWidth(), ps.settings.clockHz()), nrSamples(ps.settings.blockSize), - nrBlocks((stopTime - startTime) / nrSamples), + nrBlocks(ps.settings.nrBlocks()), metaDataPool(str(format("StationMetaData::metaDataPool [station %s]") % stationID.name()), false), @@ -234,8 +234,8 @@ namespace LOFAR { } if (desc == "factory:") { - const TimeStamp from(ps.startTime() * ps.subbandBandwidth(), ps.clockSpeed()); - const TimeStamp to(ps.stopTime() * ps.subbandBandwidth(), ps.clockSpeed()); + const TimeStamp from(ps.settings.startTime * ps.settings.subbandWidth(), ps.settings.clockHz()); + const TimeStamp to(ps.settings.stopTime * ps.settings.subbandWidth(), ps.settings.clockHz()); const struct BoardMode mode(ps.settings.nrBitsPerSample, ps.settings.clockMHz); PacketFactory factory(mode); diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Kernels/CorrelatorKernel.cc b/RTCP/Cobalt/GPUProc/src/cuda/Kernels/CorrelatorKernel.cc index 002120f4e4d94b40998ef2d20a82cdf3d9e7967c..78116d8d5415e0a58dafa29f8086bc76c4c5ff9c 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Kernels/CorrelatorKernel.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/Kernels/CorrelatorKernel.cc @@ -48,10 +48,11 @@ namespace LOFAR nrStations(ps.settings.antennaFields.size()), // For Cobalt (= up to 80 antenna fields), the 2x2 kernel gives the best // performance. - nrStationsPerThread(2), + nrStationsPerThread(1), nrChannels(ps.settings.correlator.nrChannels), - nrSamplesPerChannel(ps.settings.correlator.nrSamplesPerChannel) + nrSamplesPerIntegration(ps.settings.correlator.nrSamplesPerBlock / ps.settings.correlator.nrIntegrationsPerBlock), + nrIntegrationsPerBlock(ps.settings.correlator.nrIntegrationsPerBlock) { dumpBuffers = ps.getBool("Cobalt.Kernels.CorrelatorKernel.dumpOutput", false); @@ -64,17 +65,21 @@ namespace LOFAR return nrStations * (nrStations + 1) / 2; } + size_t CorrelatorKernel::Parameters::nrSamplesPerBlock() const { + return nrSamplesPerIntegration * nrIntegrationsPerBlock; + } + size_t CorrelatorKernel::Parameters::bufferSize(BufferType bufferType) const { switch (bufferType) { case CorrelatorKernel::INPUT_DATA: return - (size_t) nrChannels * nrSamplesPerChannel * nrStations * + (size_t) nrChannels * nrSamplesPerBlock() * nrStations * NR_POLARIZATIONS * sizeof(std::complex<float>); case CorrelatorKernel::OUTPUT_DATA: return - (size_t) nrBaselines() * nrChannels * + (size_t) nrIntegrationsPerBlock * nrBaselines() * nrChannels * NR_POLARIZATIONS * NR_POLARIZATIONS * sizeof(std::complex<float>); default: THROW(GPUProcException, "Invalid bufferType (" << bufferType << ")"); @@ -106,10 +111,6 @@ namespace LOFAR unsigned nrUsableChannels = std::max(params.nrChannels - 1, 1U); setEnqueueWorkSizes( gpu::Grid(nrPasses * nrThreads, nrUsableChannels), gpu::Block(nrThreads, 1) ); - - nrOperations = (size_t) nrUsableChannels * params.nrBaselines() * params.nrSamplesPerChannel * 32; - nrBytesRead = (size_t) nrPasses * params.nrStations * nrUsableChannels * params.nrSamplesPerChannel * NR_POLARIZATIONS * sizeof(std::complex<float>); - nrBytesWritten = (size_t) params.nrBaselines() * nrUsableChannels * NR_POLARIZATIONS * NR_POLARIZATIONS * sizeof(std::complex<float>); } //-------- Template specializations for KernelFactory --------// @@ -125,9 +126,10 @@ namespace LOFAR defs["NR_STATIONS_PER_THREAD"] = lexical_cast<string>(itsParameters.nrStationsPerThread); defs["NR_CHANNELS"] = lexical_cast<string>(itsParameters.nrChannels); - defs["NR_INTEGRATIONS"] = "1"; + defs["NR_INTEGRATIONS"] = + lexical_cast<string>(itsParameters.nrIntegrationsPerBlock); defs["NR_SAMPLES_PER_INTEGRATION"] = - lexical_cast<string>(itsParameters.nrSamplesPerChannel); + lexical_cast<string>(itsParameters.nrSamplesPerIntegration); return defs; } diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Kernels/CorrelatorKernel.h b/RTCP/Cobalt/GPUProc/src/cuda/Kernels/CorrelatorKernel.h index 72c9229eaf2690f377c79f4bcc46b96e8f4a0def..c4343b9d9b9556ac5e20ba657f5b55e3dcc5b61c 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Kernels/CorrelatorKernel.h +++ b/RTCP/Cobalt/GPUProc/src/cuda/Kernels/CorrelatorKernel.h @@ -49,7 +49,9 @@ namespace LOFAR unsigned nrBaselines() const; unsigned nrChannels; - unsigned nrSamplesPerChannel; + unsigned nrSamplesPerIntegration; + unsigned nrIntegrationsPerBlock; + size_t nrSamplesPerBlock() const; size_t bufferSize(BufferType bufferType) const; }; diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc index 40db43270f95b1532c196f2913056a7fa67d4b10..1f4cc853802545ccaa4f094063f7a49f0e4d60c2 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc @@ -703,18 +703,23 @@ namespace LOFAR } // Return outputData back to the subbandProc. - const double maxRetentionTime = 3.0; - using namespace TimeSpec; - if (ps.settings.realTime && TimeSpec::now() - outputQueue.oldest() > maxRetentionTime) { - // Drop - spillQueue.append(data); - correlatorLoss.dropping = true; - correlatorLoss.blocksDropped++; + if (ps.settings.correlator.enabled) { + const double maxRetentionTime = 3.0 + ps.settings.blockDuration(); + using namespace TimeSpec; + if (ps.settings.realTime && TimeSpec::now() - outputQueue.oldest() > maxRetentionTime) { + // Drop + spillQueue.append(data); + correlatorLoss.dropping = true; + correlatorLoss.blocksDropped++; + } else { + // Forward to correlator + outputQueue.append(data); + correlatorLoss.blocksWritten++; + } } else { - // Forward to correlator - outputQueue.append(data); - correlatorLoss.blocksWritten++; + spillQueue.append(data); } + ASSERT(!data); const double blockDuration = ps.settings.blockDuration(); @@ -768,8 +773,6 @@ namespace LOFAR // Process pool elements until end-of-output while ((data = inputQueue.remove()) != NULL) { - CorrelatedData &correlatedData = data->correlatedData; - const struct BlockID id = data->blockID; ASSERT( globalSubbandIdx == id.globalSubbandIdx ); @@ -782,7 +785,8 @@ namespace LOFAR // Write block to outputProc try { writeTimer.start(); - correlatedData.write(outputStream.get(), true); + for (size_t i = 0; i < data->correlatedData.integrations.size(); ++i) + data->correlatedData.integrations[i]->write(outputStream.get(), true); writeTimer.stop(); } catch (Exception &ex) { // No reconnect, as outputProc doesn't yet re-listen when the conn drops. diff --git a/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/CorrelatorStep.cc b/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/CorrelatorStep.cc index 118e43b7dc29bc761c10dcf1ab9b27169073cffa..4a0c3096492aad36c83588bfe33916d232cef15f 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/CorrelatorStep.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/CorrelatorStep.cc @@ -72,10 +72,10 @@ namespace LOFAR MultiDimArray<LOFAR::SparseSet<unsigned>, 1>const &inputFlags, MultiDimArray<SparseSet<unsigned>, 2>& flagsPerChannel) { - unsigned numberOfChannels = ps.nrChannelsPerSubband(); + unsigned numberOfChannels = ps.settings.correlator.nrChannels; unsigned log2NrChannels = log2(numberOfChannels); //Convert the flags per sample to flags per channel - for (unsigned station = 0; station < ps.nrStations(); station ++) + for (unsigned station = 0; station < ps.settings.correlator.stations.size(); station ++) { // get the flag ranges const SparseSet<unsigned>::Ranges &ranges = inputFlags[station].getRanges(); @@ -88,7 +88,7 @@ namespace LOFAR { // do nothing, just take the ranges as supplied begin_idx = it->begin; - end_idx = std::min(ps.nrSamplesPerChannel(), it->end ); + end_idx = std::min(static_cast<unsigned>(ps.settings.correlator.nrSamplesPerBlock), it->end); } else { @@ -109,7 +109,7 @@ namespace LOFAR // The min is needed, because flagging the last input // samples would cause NR_TAPS subsequent samples to // be flagged, which aren't necessarily part of this block. - end_idx = std::min(ps.nrSamplesPerChannel() + 1, + end_idx = std::min(static_cast<unsigned>(ps.settings.correlator.nrSamplesPerBlock + 1U), ((it->end - 1) >> log2NrChannels) + 1); } @@ -125,7 +125,7 @@ namespace LOFAR void CorrelatorStep::Flagger::propagateFlags( Parset const &parset, MultiDimArray<LOFAR::SparseSet<unsigned>, 1>const &inputFlags, - LOFAR::Cobalt::CorrelatedData &output) + SubbandProcOutputData::CorrelatedData &output) { // Object for storing transformed flags MultiDimArray<SparseSet<unsigned>, 2> flagsPerChannel( @@ -154,40 +154,42 @@ namespace LOFAR template<typename T> void CorrelatorStep::Flagger::calcWeights( Parset const &parset, MultiDimArray<SparseSet<unsigned>, 2>const & flagsPerChannel, - LOFAR::Cobalt::CorrelatedData &output) + SubbandProcOutputData::CorrelatedData &output) { - unsigned nrSamplesPerIntegration = parset.settings.correlator.nrSamplesPerChannel; + // The number of samples per integration within this block. + const unsigned nrSamples = + parset.settings.correlator.nrSamplesPerBlock / + parset.settings.correlator.nrIntegrationsPerBlock; // loop the stations - for (unsigned stat1 = 0; stat1 < parset.nrStations(); stat1 ++) { + for (unsigned stat1 = 0; stat1 < parset.settings.correlator.stations.size(); stat1 ++) { for (unsigned stat2 = 0; stat2 <= stat1; stat2 ++) { - unsigned bl = baseline(stat1, stat2); + const unsigned bl = baseline(stat1, stat2); - // If there is a single channel then the index 0 contains real data - if (parset.settings.correlator.nrChannels == 1) - { + for(unsigned ch = 0; ch < parset.settings.correlator.nrChannels; ch ++) { // The number of invalid (flagged) samples is the union of the // flagged samples in the two stations - unsigned nrValidSamples = nrSamplesPerIntegration - - (flagsPerChannel[0][stat1] | flagsPerChannel[0][stat2]).count(); + const SparseSet<unsigned> flags = + flagsPerChannel[ch][stat1] | flagsPerChannel[ch][stat2]; - // Moet worden toegekend op de correlated dataobject - output.nrValidSamples<T>(bl, 0) = nrValidSamples; - } - else - { - // channel 0 does not contain valid data - output.nrValidSamples<T>(bl, 0) = 0; - - for(unsigned ch = 1; ch < parset.settings.correlator.nrChannels; ch ++) - { - // valid samples is total number of samples minus the union of the - // Two stations. - unsigned nrValidSamples = nrSamplesPerIntegration - - (flagsPerChannel[ch][stat1] | - flagsPerChannel[ch][stat2]).count(); - - output.nrValidSamples<T>(bl, ch) = nrValidSamples; + for (size_t i = 0; i < parset.settings.correlator.nrIntegrationsPerBlock; ++i) { + LOFAR::Cobalt::CorrelatedData &correlatedData = *output.integrations[i]; + + // Channel zero is invalid, unless we have only one channel + if (parset.settings.correlator.nrChannels > 1 && ch == 0) { + correlatedData.nrValidSamples<T>(bl, 0) = 0; + continue; + } + + // Extract the flags for this subblock + const SparseSet<unsigned> subBlockFlags = flags.subset( + i * nrSamples, + + (i+1) * nrSamples); + + const T nrValidSamples = + nrSamples - subBlockFlags.count(); + + correlatedData.nrValidSamples<T>(bl, ch) = nrValidSamples; } } } @@ -198,9 +200,9 @@ namespace LOFAR void CorrelatorStep::Flagger::calcWeights( Parset const &parset, MultiDimArray<SparseSet<unsigned>, 2>const & flagsPerChannel, - LOFAR::Cobalt::CorrelatedData &output) + SubbandProcOutputData::CorrelatedData &output) { - switch (output.itsNrBytesPerNrValidSamples) { + switch (output.integrations[0]->itsNrBytesPerNrValidSamples) { case 4: calcWeights<uint32_t>(parset, flagsPerChannel, output); break; @@ -237,7 +239,7 @@ namespace LOFAR // include it both for 1 and >1 channels/subband. for (unsigned ch = 0; ch < parset.settings.correlator.nrChannels; ch++) { - T nrValidSamples = output.nrValidSamples<T>(bl, ch); + const T nrValidSamples = output.nrValidSamples<T>(bl, ch); // If all samples flagged, weights is zero. // TODO: make a lookup table for the expensive division; measure first @@ -307,9 +309,10 @@ namespace LOFAR // Initialize the output buffers for the long-time integration for (size_t i = 0; i < integratedData.size(); i++) { integratedData[i] = + // Note that we always integrate complete blocks make_pair(0, new LOFAR::Cobalt::CorrelatedData(ps.settings.antennaFields.size(), ps.settings.correlator.nrChannels, - ps.settings.correlator.nrSamplesPerChannel)); + ps.settings.correlator.nrSamplesPerBlock)); } } @@ -350,7 +353,7 @@ namespace LOFAR void CorrelatorStep::readOutput(SubbandProcOutputData &output) { // Read data back from the kernel - queue.readBuffer(output.correlatedData, devE, outputCounter, false); + queue.readBuffer(output.correlatedData.data, devE, outputCounter, false); } @@ -373,23 +376,29 @@ namespace LOFAR bool CorrelatorStep::integrate(SubbandProcOutputData &output) { const size_t idx = output.blockID.subbandProcSubbandIdx; - const size_t nblock = ps.settings.correlator.nrBlocksPerIntegration; + const size_t nblock = ps.settings.correlator.nrBlocksPerIntegration; + const size_t nsubblock = ps.settings.correlator.nrIntegrationsPerBlock; // We don't want to copy the data if we don't need to integrate. if (nblock == 1) { - output.correlatedData.setSequenceNumber(output.blockID.block); + for (size_t i = 0; i < nsubblock; ++i) { + output.correlatedData.integrations[i]->setSequenceNumber(output.blockID.block * nsubblock + i); + } return true; } + // We don't have subblocks if we integrate multiple blocks. + ASSERT( nsubblock == 1 ); + integratedData[idx].first++; if (integratedData[idx].first < nblock) { - *integratedData[idx].second += output.correlatedData; + *integratedData[idx].second += *output.correlatedData.integrations[0]; return false; } else { - output.correlatedData += *integratedData[idx].second; - output.correlatedData.setSequenceNumber(output.blockID.block / nblock); + *output.correlatedData.integrations[0] += *integratedData[idx].second; + output.correlatedData.integrations[0]->setSequenceNumber(output.blockID.block / nblock); integratedData[idx].first = 0; integratedData[idx].second->reset(); return true; @@ -406,7 +415,9 @@ namespace LOFAR // The flags are already copied to the correct location // now the flagged amount should be applied to the visibilities - Flagger::applyWeights(ps, output.correlatedData); + for (size_t i = 0; i < ps.settings.correlator.nrIntegrationsPerBlock; ++i) { + Flagger::applyWeights(ps, *output.correlatedData.integrations[i]); + } return true; } diff --git a/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/CorrelatorStep.h b/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/CorrelatorStep.h index 9ff8a7f6b78972de3d7ed49e2b3394a8156bd2e0..261c3119f944465635094bf86d2930e727f46ae1 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/CorrelatorStep.h +++ b/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/CorrelatorStep.h @@ -93,7 +93,7 @@ namespace LOFAR // samples and save this in output static void propagateFlags(Parset const & parset, MultiDimArray<LOFAR::SparseSet<unsigned>, 1>const &inputFlags, - LOFAR::Cobalt::CorrelatedData &output); + SubbandProcOutputData::CorrelatedData &output); // 1.1 Convert the flags per station to channel flags, change time scale // if nchannel > 1 @@ -111,7 +111,7 @@ namespace LOFAR static void calcWeights(Parset const &parset, MultiDimArray<SparseSet<unsigned>, 2>const &flagsPerChannel, - LOFAR::Cobalt::CorrelatedData &output); + SubbandProcOutputData::CorrelatedData &output); // 2.1 Apply the supplied weight to the complex values in the channel // and baseline @@ -125,7 +125,7 @@ namespace LOFAR static void calcWeights(Parset const &parset, MultiDimArray<SparseSet<unsigned>, 2>const &flagsPerChannel, - LOFAR::Cobalt::CorrelatedData &output); + SubbandProcOutputData::CorrelatedData &output); }; diff --git a/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/SubbandProcOutputData.cc b/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/SubbandProcOutputData.cc index 228434f83180fce035cfaed2c67bc9d2ab8a754d..94fcb82757603c369aa16b7ad5a1c6cb68774463 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/SubbandProcOutputData.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/SubbandProcOutputData.cc @@ -45,9 +45,10 @@ namespace LOFAR : boost::extents[0][0][0][0], context, 0), - correlatedData(ps.settings.correlator.enabled ? ps.settings.antennaFields.size() : 0, - ps.settings.correlator.enabled ? ps.settings.correlator.nrChannels : 0, - ps.settings.correlator.enabled ? ps.settings.correlator.nrSamplesPerChannel : 0, + correlatedData(ps.settings.correlator.enabled ? ps.settings.correlator.nrIntegrationsPerBlock : 0, + ps.settings.correlator.enabled ? ps.settings.antennaFields.size() : 0, + ps.settings.correlator.enabled ? ps.settings.correlator.nrChannels : 0, + ps.settings.correlator.enabled ? ps.settings.correlator.nrSamplesPerIntegration() : 0, context), emit_correlatedData(false) { @@ -55,21 +56,28 @@ namespace LOFAR SubbandProcOutputData::CorrelatedData::CorrelatedData( + unsigned nrIntegrations, unsigned nrStations, unsigned nrChannels, unsigned maxNrValidSamples, gpu::Context &context) : - MultiDimArrayHostBuffer<fcomplex, 4>( + data( boost::extents + [nrIntegrations] [nrStations * (nrStations + 1) / 2] [nrChannels][NR_POLARIZATIONS] [NR_POLARIZATIONS], context, 0), - LOFAR::Cobalt::CorrelatedData(nrStations, nrChannels, - maxNrValidSamples, this->origin(), - this->num_elements(), heapAllocator, 1) + integrations(nrIntegrations) { + for (size_t i = 0; i < nrIntegrations; ++i) { + const size_t num_elements = data.strides()[0]; + + integrations[i] = new LOFAR::Cobalt::CorrelatedData( + nrStations, nrChannels, maxNrValidSamples, + &data[i][0][0][0][0], num_elements, + heapAllocator, 1); + } } } } - diff --git a/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/SubbandProcOutputData.h b/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/SubbandProcOutputData.h index 58743cee0c3ceee310a5560a084478b930eac6fc..3d382765ce3e5ec839fd52c791d0062de3525579 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/SubbandProcOutputData.h +++ b/RTCP/Cobalt/GPUProc/src/cuda/SubbandProcs/SubbandProcOutputData.h @@ -24,6 +24,7 @@ #include <CoInterface/BlockID.h> #include <CoInterface/Parset.h> #include <CoInterface/CorrelatedData.h> +#include <CoInterface/SmartPtr.h> #include <GPUProc/gpu_wrapper.h> #include <GPUProc/MultiDimArrayHostBuffer.h> @@ -43,14 +44,16 @@ namespace LOFAR MultiDimArrayHostBuffer<float, 4> coherentData; MultiDimArrayHostBuffer<float, 4> incoherentData; - struct CorrelatedData: - public MultiDimArrayHostBuffer<fcomplex,4>, - public LOFAR::Cobalt::CorrelatedData + struct CorrelatedData { - CorrelatedData(unsigned nrStations, + CorrelatedData(unsigned nrIntegrations, + unsigned nrStations, unsigned nrChannels, unsigned maxNrValidSamples, gpu::Context &context); + + MultiDimArrayHostBuffer<fcomplex, 5> data; + std::vector< SmartPtr<LOFAR::Cobalt::CorrelatedData> > integrations; }; CorrelatedData correlatedData; diff --git a/RTCP/Cobalt/GPUProc/src/rtcp.cc b/RTCP/Cobalt/GPUProc/src/rtcp.cc index c8d25ba6c7171483685a02aa34c0fca9fae62577..a2f3d5d8b653e53620bb0b0313b43175332b4a26 100644 --- a/RTCP/Cobalt/GPUProc/src/rtcp.cc +++ b/RTCP/Cobalt/GPUProc/src/rtcp.cc @@ -243,7 +243,7 @@ int main(int argc, char **argv) // ends. const time_t now = time(0); - const double stopTime = ps.stopTime(); + const double stopTime = ps.settings.stopTime; if (now < stopTime + rtcpTimeout) { size_t maxRunTime = stopTime + rtcpTimeout - now; @@ -435,7 +435,7 @@ int main(int argc, char **argv) mpi.init(argc, argv); // Periodically log system information - SysInfoLogger siLogger(ps.startTime(), ps.stopTime()); + SysInfoLogger siLogger(ps.settings.startTime, ps.settings.stopTime); /* * RUN stage diff --git a/RTCP/Cobalt/GPUProc/test/Kernels/tCorrelatorKernel.cc b/RTCP/Cobalt/GPUProc/test/Kernels/tCorrelatorKernel.cc index 452b3a27148db960d7d5037868ad9ae07c757b55..b2d1c4a0e997973f37966b20ecfbd749981e4f5d 100644 --- a/RTCP/Cobalt/GPUProc/test/Kernels/tCorrelatorKernel.cc +++ b/RTCP/Cobalt/GPUProc/test/Kernels/tCorrelatorKernel.cc @@ -112,12 +112,12 @@ TEST(DataValidity) // Fill input BlockID blockID; - MultiDimArrayHostBuffer<fcomplex, 4> hostInput(boost::extents[NR_STATIONS][ps.nrChannelsPerSubband()][ps.nrSamplesPerChannel()][NR_POLARIZATIONS], context); - MultiDimArrayHostBuffer<fcomplex, 4> hostOutput(boost::extents[ps.nrBaselines()][ps.nrChannelsPerSubband()][NR_POLARIZATIONS][NR_POLARIZATIONS], context); + MultiDimArrayHostBuffer<fcomplex, 4> hostInput(boost::extents[NR_STATIONS][ps.settings.correlator.nrChannels][ps.settings.correlator.nrSamplesPerIntegration()][NR_POLARIZATIONS], context); + MultiDimArrayHostBuffer<fcomplex, 4> hostOutput(boost::extents[ps.nrBaselines()][ps.settings.correlator.nrChannels][NR_POLARIZATIONS][NR_POLARIZATIONS], context); for (size_t st = 0; st < NR_STATIONS; st++) { - for (size_t ch = 0; ch < ps.nrChannelsPerSubband(); ch++) { - for (size_t t = 0; t < ps.nrSamplesPerChannel(); t++) { + for (size_t ch = 0; ch < ps.settings.correlator.nrChannels; ch++) { + for (size_t t = 0; t < ps.settings.correlator.nrSamplesPerIntegration(); t++) { /* * NOTE: Using higher values will result in imprecision, * causing the output to be less predictable. @@ -134,7 +134,7 @@ TEST(DataValidity) stream.readBuffer(hostOutput, devOutput, true); // Verify output - const size_t nrSamples = ps.nrSamplesPerChannel(); + const size_t nrSamples = ps.settings.correlator.nrSamplesPerIntegration(); for (size_t st1 = 0; st1 < NR_STATIONS; st1++) { for (size_t st2 = 0; st2 <= st1; st2++) { const unsigned bl = ::baseline(st1, st2); @@ -142,7 +142,7 @@ TEST(DataValidity) LOG_INFO_STR("Checking baseline " << bl << " = (" << st1 << ", " << st2 << ")"); // Start checking at ch = 1, because channel 0 is skipped - for (size_t ch = 1; ch < ps.nrChannelsPerSubband(); ch++) { + for (size_t ch = 1; ch < ps.settings.correlator.nrChannels; ch++) { LOG_INFO_STR("Checking channel " << ch); // NOTE: XY and YX polarizations have been swapped (see issue #5640) diff --git a/RTCP/Cobalt/GPUProc/test/SubbandProcs/tCorrelatorStep.cc b/RTCP/Cobalt/GPUProc/test/SubbandProcs/tCorrelatorStep.cc index 9a185dafa2cd96a191780f829a89400f10a3e418..6d4c9d51cbfe3752c2ec2dd3ccf0e0cf828123cb 100644 --- a/RTCP/Cobalt/GPUProc/test/SubbandProcs/tCorrelatorStep.cc +++ b/RTCP/Cobalt/GPUProc/test/SubbandProcs/tCorrelatorStep.cc @@ -67,7 +67,7 @@ TEST(convertFlagsToChannelFlags) inputFlags[1].include(100, 600); // E. Second station (10, 150) // The converted channel flags MultiDimArray<LOFAR::SparseSet<unsigned>, 2> flagsPerChanel( - boost::extents[parset.nrChannelsPerSubband()][parset.nrStations()]); + boost::extents[parset.settings.correlator.nrChannels][parset.nrStations()]); // ****** perform the translation CorrelatorStep::Flagger::convertFlagsToChannelFlags(parset, inputFlags, flagsPerChanel); diff --git a/RTCP/Cobalt/GPUProc/test/SubbandProcs/tCorrelatorSubbandProc.cc b/RTCP/Cobalt/GPUProc/test/SubbandProcs/tCorrelatorSubbandProc.cc index e1f7e96ca14e5a195c4d65af4515bcba86e1379e..541104572d42da209ae03b4acd2827628e251092 100644 --- a/RTCP/Cobalt/GPUProc/test/SubbandProcs/tCorrelatorSubbandProc.cc +++ b/RTCP/Cobalt/GPUProc/test/SubbandProcs/tCorrelatorSubbandProc.cc @@ -29,9 +29,15 @@ #include <CoInterface/SparseSet.h> #include <CoInterface/MultiDimArray.h> #include <Common/LofarLogger.h> +#include <Common/LofarTypes.h> #include <complex> +using namespace LOFAR; using namespace LOFAR::Cobalt; +using namespace std; + +// Global variable holding the GPU context to bind objects to +gpu::Context *context = 0; TEST(propagateFlags) { @@ -54,27 +60,29 @@ TEST(propagateFlags) parset.add("Observation.DataProducts.Output_Correlated.locations","[lse011:/data3/L2011_24523/]"); parset.updateSettings(); - unsigned number_of_baselines = (parset.nrStations() * 1.0 * (parset.nrStations() + 1)) / 2; + unsigned number_of_baselines = parset.nrBaselines(); // Input flags: an array of sparseset - MultiDimArray<LOFAR::SparseSet<unsigned>, 1> inputFlags(boost::extents[parset.nrStations()]); + MultiDimArray<LOFAR::SparseSet<unsigned>, 1> inputFlags(boost::extents[parset.settings.correlator.stations.size()]); // Set some flags inputFlags[2].include(100, 101); // A. a single sample flagged in station 3 inputFlags[3].include(500, 501); // B. a single sample flagged is station 4 // The output data to be flagged - CorrelatedData output(parset.nrStations(), - parset.nrChannelsPerSubband(), - parset.integrationSteps()); + SubbandProcOutputData::CorrelatedData output(1, + parset.settings.correlator.stations.size(), + parset.settings.correlator.nrChannels, + parset.settings.correlator.nrSamplesPerIntegration(), + *context); + MultiDimArray<fcomplex, 4> &visibilities = output.integrations[0]->visibilities; // The content ergo the visibilities should be 1, this allows us to validate the weighting //assign all values 1 pol 0 for(unsigned idx_baseline = 0; idx_baseline < number_of_baselines; ++idx_baseline) - for(unsigned idx_channel = 0; idx_channel < parset.nrChannelsPerSubband(); ++idx_channel) + for(unsigned idx_channel = 0; idx_channel < parset.settings.correlator.nrChannels; ++idx_channel) for(unsigned idx_pol1 = 0; idx_pol1 < NR_POLARIZATIONS; ++idx_pol1) for(unsigned idx_pol2 = 0; idx_pol2 < NR_POLARIZATIONS; ++idx_pol2) - output.visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2] = std::complex<float>(1,0); - + visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2] = std::complex<float>(1,0); // ********************************************************************************************* @@ -82,7 +90,7 @@ TEST(propagateFlags) CorrelatorStep::Flagger::propagateFlags(parset, inputFlags, output); // now perform weighting of the data based on the number of valid samples - CorrelatorStep::Flagger::applyWeights(parset, output); + CorrelatorStep::Flagger::applyWeights(parset, *output.integrations[0]); // ********************************************************************************************* // Now validate the functionality: @@ -91,7 +99,7 @@ TEST(propagateFlags) for(unsigned idx_channel = 0; idx_channel < 1; ++idx_channel) //validate channel zero for(unsigned idx_pol1 = 0; idx_pol1 < NR_POLARIZATIONS; ++idx_pol1) for(unsigned idx_pol2 = 0; idx_pol2 < NR_POLARIZATIONS; ++idx_pol2) - CHECK_EQUAL(std::complex<float>(0,0), output.visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2]); + CHECK_EQUAL(std::complex<float>(0,0), visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2]); // 2. station zero and one have no flags, the baselines for these station should be default float weight_of_unflagged_sample = 1.0f/1024; // default weighting / number of samples @@ -100,7 +108,7 @@ TEST(propagateFlags) for(unsigned idx_pol1 = 0; idx_pol1 < NR_POLARIZATIONS; ++idx_pol1) for(unsigned idx_pol2 = 0; idx_pol2 < NR_POLARIZATIONS; ++idx_pol2) CHECK_CLOSE(weight_of_unflagged_sample, - output.visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2].real(), + visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2].real(), 1e-18f); // float compare with this delta // 3. Now check the weights for bl 4 to 6: flagging should be a single flagged sample on the input @@ -111,7 +119,7 @@ TEST(propagateFlags) for(unsigned idx_pol1 = 0; idx_pol1 < NR_POLARIZATIONS; ++idx_pol1) for(unsigned idx_pol2 = 0; idx_pol2 < NR_POLARIZATIONS; ++idx_pol2) CHECK_CLOSE(weight_of_single_sample, - output.visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2].real(), + visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2].real(), 1e-18f); // float compare with this delta // 3. Now check the weights for bl 4 to 6: flagging should be a single flagged sample on the input from station 3 @@ -121,7 +129,7 @@ TEST(propagateFlags) for(unsigned idx_pol1 = 0; idx_pol1 < NR_POLARIZATIONS; ++idx_pol1) for(unsigned idx_pol2 = 0; idx_pol2 < NR_POLARIZATIONS; ++idx_pol2) CHECK_CLOSE(weight_of_single_sample, - output.visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2].real(), + visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2].real(), 1e-18f); // float compare with this delta // station 2 and 3: two samples @@ -131,7 +139,7 @@ TEST(propagateFlags) for(unsigned idx_pol1 = 0; idx_pol1 < NR_POLARIZATIONS; ++idx_pol1) for(unsigned idx_pol2 = 0; idx_pol2 < NR_POLARIZATIONS; ++idx_pol2) CHECK_CLOSE(weight_of_two_sample, - output.visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2].real(), + visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2].real(), 1e-18f); // float compare with this delta // station 3 and auto 3: 1 sample @@ -140,9 +148,8 @@ TEST(propagateFlags) for(unsigned idx_pol1 = 0; idx_pol1 < NR_POLARIZATIONS; ++idx_pol1) for(unsigned idx_pol2 = 0; idx_pol2 < NR_POLARIZATIONS; ++idx_pol2) CHECK_CLOSE(weight_of_single_sample, - output.visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2].real(), + visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2].real(), 1e-18f); // float compare with this delta - } @@ -169,31 +176,33 @@ TEST(calcWeights4Channels) parset.updateSettings(); // Input flags: an array of sparseset - MultiDimArray<LOFAR::SparseSet<unsigned>, 2> flagsPerChanel( - boost::extents[parset.nrChannelsPerSubband()][parset.nrStations()]); + MultiDimArray<LOFAR::SparseSet<unsigned>, 2> flagsPerChannel( + boost::extents[parset.settings.correlator.nrChannels][parset.settings.correlator.stations.size()]); // Output object - CorrelatedData output(parset.nrStations(), - parset.nrChannelsPerSubband(), - parset.integrationSteps()); + SubbandProcOutputData::CorrelatedData output(1, + parset.settings.correlator.stations.size(), + parset.settings.correlator.nrChannels, + parset.settings.correlator.nrSamplesPerIntegration(), + *context); //insert same cases - flagsPerChanel[1][0].include(100,111);//A. - flagsPerChanel[1][1].include(111,120);//E. second station flags + flagsPerChannel[1][0].include(100,111);//A. + flagsPerChannel[1][1].include(111,120);//E. second station flags //propageFlags - CorrelatorStep::Flagger::calcWeights(parset, flagsPerChanel, output); + CorrelatorStep::Flagger::calcWeights(parset, flagsPerChannel, output); // Now check that the flags are correctly set in the ouput object - CHECK_EQUAL(256u - 11u, output.getNrValidSamples(0,1)); // 11 flagged in station 1 - CHECK_EQUAL(256u - 20u, output.getNrValidSamples(1,1)); // The union is 11+9 == 20 flagged - CHECK_EQUAL(256u - 9u, output.getNrValidSamples(2,1)); // 9 flagged in station 2 + CHECK_EQUAL(256u - 11u, output.integrations[0]->getNrValidSamples(0,1)); // 11 flagged in station 1 + CHECK_EQUAL(256u - 20u, output.integrations[0]->getNrValidSamples(1,1)); // The union is 11+9 == 20 flagged + CHECK_EQUAL(256u - 9u, output.integrations[0]->getNrValidSamples(2,1)); // 9 flagged in station 2 // Channel zero should always be all flagged - CHECK_EQUAL(0u, output.getNrValidSamples(0,0)); // all flagged in station 2 - CHECK_EQUAL(0u, output.getNrValidSamples(1,0)); // all flagged in station 2 - CHECK_EQUAL(0u, output.getNrValidSamples(2,0)); // all flagged in station 2 + CHECK_EQUAL(0u, output.integrations[0]->getNrValidSamples(0,0)); // all flagged in station 2 + CHECK_EQUAL(0u, output.integrations[0]->getNrValidSamples(1,0)); // all flagged in station 2 + CHECK_EQUAL(0u, output.integrations[0]->getNrValidSamples(2,0)); // all flagged in station 2 } TEST(calcWeights1Channels) @@ -219,26 +228,28 @@ TEST(calcWeights1Channels) parset.updateSettings(); // Input flags: an array of sparseset - MultiDimArray<LOFAR::SparseSet<unsigned>, 2> flagsPerChanel( - boost::extents[parset.nrChannelsPerSubband()][parset.nrStations()]); + MultiDimArray<LOFAR::SparseSet<unsigned>, 2> flagsPerChannel( + boost::extents[parset.settings.correlator.nrChannels][parset.settings.correlator.stations.size()]); // Output object - CorrelatedData output(parset.nrStations(), - parset.nrChannelsPerSubband(), - parset.integrationSteps()); + SubbandProcOutputData::CorrelatedData output(1, + parset.settings.correlator.stations.size(), + parset.settings.correlator.nrChannels, + parset.settings.correlator.nrSamplesPerIntegration(), + *context); //insert same cases - flagsPerChanel[0][0].include(100,111);//A. - flagsPerChanel[0][1].include(111,120);//E. second station flags + flagsPerChannel[0][0].include(100,111);//A. + flagsPerChannel[0][1].include(111,120);//E. second station flags //propageFlags - CorrelatorStep::Flagger::calcWeights(parset, flagsPerChanel, output); + CorrelatorStep::Flagger::calcWeights(parset, flagsPerChannel, output); // Now check that the flags are correctly set in the ouput object // channel is 1 so no time resolution loss!! - CHECK_EQUAL(245u, output.getNrValidSamples(0,0)); // 11 flagged in station 1 - CHECK_EQUAL(236u, output.getNrValidSamples(1,0)); // The union is 11+9 == 20 flagged - CHECK_EQUAL(247u, output.getNrValidSamples(2,0)); // 9 flagged in station 2 + CHECK_EQUAL(245u, output.integrations[0]->getNrValidSamples(0,0)); // 11 flagged in station 1 + CHECK_EQUAL(236u, output.integrations[0]->getNrValidSamples(1,0)); // The union is 11+9 == 20 flagged + CHECK_EQUAL(247u, output.integrations[0]->getNrValidSamples(2,0)); // 9 flagged in station 2 } TEST(applyWeights) @@ -262,40 +273,42 @@ TEST(applyWeights) parset.add("Observation.DataProducts.Output_Correlated.locations","[lse011:/data3/L2011_24523/]"); parset.updateSettings(); // Create correlated data object - CorrelatedData output(parset.nrStations(), - parset.nrChannelsPerSubband(), - parset.integrationSteps()); - + SubbandProcOutputData::CorrelatedData output(1, + parset.settings.correlator.stations.size(), + parset.settings.correlator.nrChannels, + parset.settings.correlator.nrSamplesPerIntegration(), + *context); + MultiDimArray<fcomplex, 4> &visibilities = output.integrations[0]->visibilities; //assign all values 1 pol 0 for(unsigned idx_baseline = 0; idx_baseline < 3; ++idx_baseline) - for(unsigned idx_channel = 0; idx_channel < parset.nrChannelsPerSubband(); ++idx_channel) + for(unsigned idx_channel = 0; idx_channel < parset.settings.correlator.nrChannels; ++idx_channel) for(unsigned idx_pol1 = 0; idx_pol1 < NR_POLARIZATIONS; ++idx_pol1) for(unsigned idx_pol2 = 0; idx_pol2 < NR_POLARIZATIONS; ++idx_pol2) - output.visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2] = std::complex<float>(1,0); + visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2] = std::complex<float>(1,0); // set some flagged samples unsigned n_valid_samples = 20; - output.setNrValidSamples(0,1,n_valid_samples); //baseline 0, channel 1 - output.setNrValidSamples(1,1,256); //baseline 1, channel 1 - output.setNrValidSamples(2,1,0); //baseline 0, channel 1 - CorrelatorStep::Flagger::applyWeights(parset, output); + output.integrations[0]->setNrValidSamples(0,1,n_valid_samples); //baseline 0, channel 1 + output.integrations[0]->setNrValidSamples(1,1,256); //baseline 1, channel 1 + output.integrations[0]->setNrValidSamples(2,1,0); //baseline 0, channel 1 + CorrelatorStep::Flagger::applyWeights(parset, *output.integrations[0]); // 4 channels: therefore the chanel zero should be zero - CHECK_EQUAL(std::complex<float>(0,0), output.visibilities[0][0][0][0]); - CHECK_EQUAL(std::complex<float>(0,0), output.visibilities[2][0][1][1]); // check origin and far corner + CHECK_EQUAL(std::complex<float>(0,0), visibilities[0][0][0][0]); + CHECK_EQUAL(std::complex<float>(0,0), visibilities[2][0][1][1]); // check origin and far corner // the weighted values should be divided by the number of samples - CHECK_EQUAL(std::complex<float>(1.0/n_valid_samples,0), output.visibilities[0][1][0][0]); - CHECK_EQUAL(std::complex<float>(1.0/n_valid_samples,0), output.visibilities[0][1][1][1]); + CHECK_EQUAL(std::complex<float>(1.0/n_valid_samples,0), visibilities[0][1][0][0]); + CHECK_EQUAL(std::complex<float>(1.0/n_valid_samples,0), visibilities[0][1][1][1]); // baselines 1 - CHECK_EQUAL(std::complex<float>(1.0/256,0), output.visibilities[1][1][0][0]); - CHECK_EQUAL(std::complex<float>(1.0/256,0), output.visibilities[1][1][1][1]); + CHECK_EQUAL(std::complex<float>(1.0/256,0), visibilities[1][1][0][0]); + CHECK_EQUAL(std::complex<float>(1.0/256,0), visibilities[1][1][1][1]); //baseline 2 no samples so should be zero - CHECK_EQUAL(std::complex<float>(0,0), output.visibilities[2][1][0][0]); - CHECK_EQUAL(std::complex<float>(0,0), output.visibilities[2][1][1][1]); + CHECK_EQUAL(std::complex<float>(0,0), visibilities[2][1][0][0]); + CHECK_EQUAL(std::complex<float>(0,0), visibilities[2][1][1][1]); } TEST(applyWeight) @@ -323,33 +336,47 @@ TEST(applyWeight) parset.updateSettings(); // Output object - CorrelatedData output(parset.nrStations(), - parset.nrChannelsPerSubband(), - parset.integrationSteps()); - + SubbandProcOutputData::CorrelatedData output(1, + parset.settings.correlator.stations.size(), + parset.settings.correlator.nrChannels, + parset.settings.correlator.nrSamplesPerIntegration(), + *context); + MultiDimArray<fcomplex, 4> &visibilities = output.integrations[0]->visibilities; //assign all visibilities values 1 pol 0 for(unsigned idx_baseline = 0; idx_baseline < 3; ++idx_baseline) - for(unsigned idx_channel = 0; idx_channel < parset.nrChannelsPerSubband(); ++idx_channel) + for(unsigned idx_channel = 0; idx_channel < parset.settings.correlator.nrChannels; ++idx_channel) for(unsigned idx_pol1 = 0; idx_pol1 < NR_POLARIZATIONS; ++idx_pol1) for(unsigned idx_pol2 = 0; idx_pol2 < NR_POLARIZATIONS; ++idx_pol2) - output.visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2] = std::complex<float>(1,0); + visibilities[idx_baseline][idx_channel][idx_pol1][idx_pol2] = std::complex<float>(1,0); // multiply all polarization in sb 0 channel 0 with 0,5 - CorrelatorStep::Flagger::applyWeight(0,0,0.5,output); + CorrelatorStep::Flagger::applyWeight(0,0,0.5, *output.integrations[0]); //sb 0 should be (0.5, 0) - CHECK_EQUAL(std::complex<float>(0.5,0), - output.visibilities[0][0][0][0]); + CHECK_EQUAL(std::complex<float>(0.5,0), visibilities[0][0][0][0]); //still be 1.0 - CHECK_EQUAL(std::complex<float>(1,0), - output.visibilities[1][0][0][0]); + CHECK_EQUAL(std::complex<float>(1,0), visibilities[1][0][0][0]); } int main() { INIT_LOGGER("tCorrelatorSubbandProc"); + + try { + gpu::Platform pf; + LOG_INFO_STR("Detected " << pf.size() << " CUDA devices"); + } catch (gpu::CUDAException& e) { + LOG_FATAL_STR("Caught exception: " << e.what()); + return 3; + } + + gpu::Device device(0); + vector<gpu::Device> devices(1, device); + gpu::Context ctx(device); + context = &ctx; + return UnitTest::RunAllTests() > 0; } diff --git a/RTCP/Cobalt/GPUProc/test/SubbandProcs/tCorrelatorSubbandProcProcessSb.cc b/RTCP/Cobalt/GPUProc/test/SubbandProcs/tCorrelatorSubbandProcProcessSb.cc index fa970874313d5624d4c698d68f3a988c8397074a..93a7b635f98755f62f43b36d9aaca2492813a744 100644 --- a/RTCP/Cobalt/GPUProc/test/SubbandProcs/tCorrelatorSubbandProcProcessSb.cc +++ b/RTCP/Cobalt/GPUProc/test/SubbandProcs/tCorrelatorSubbandProcProcessSb.cc @@ -55,7 +55,7 @@ int main() { const size_t nrStations = ps.nrStations(); const size_t nrPolarisations = ps.settings.nrPolarisations; const size_t maxNrTABsPerSAP = ps.settings.beamFormer.maxNrTABsPerSAP(); - const size_t nrSamplesPerChannel = ps.nrSamplesPerChannel(); + const size_t nrSamplesPerChannel = ps.settings.correlator.nrSamplesPerIntegration(); const size_t nrSamplesPerSubband = ps.nrSamplesPerSubband(); const size_t nrBitsPerSample = ps.settings.nrBitsPerSample; const size_t nrBytesPerComplexSample = ps.nrBytesPerComplexSample(); @@ -68,8 +68,8 @@ int main() { const size_t nrBaselines = nrStations * (nrStations + 1) / 2; const size_t nrBlocksPerIntegration = ps.settings.correlator.nrBlocksPerIntegration; - const size_t nrChannelsPerSubband = ps.nrChannelsPerSubband(); - const size_t integrationSteps = ps.integrationSteps(); + const size_t nrChannelsPerSubband = ps.settings.correlator.nrChannels; + const size_t integrationSteps = ps.settings.correlator.nrSamplesPerIntegration(); const size_t scaleFactor = nrBitsPerSample == 16 ? 1 : 16; // The output is the correlation-product of two inputs (with identical @@ -115,7 +115,7 @@ int main() { "\n scaleFactor = " << scaleFactor << "\n outputValue = " << outputValue << "\n ----------------------------" << - "\n Total bytes = " << out.correlatedData.size()); + "\n Total bytes = " << out.correlatedData.integrations[0]->visibilities.size()); // Initialize synthetic input to all (1, 1). for (size_t st = 0; st < nrStations; st++) @@ -165,9 +165,9 @@ int main() { for (size_t c = 0; c < nrChannelsPerSubband; c++) for (size_t pol0 = 0; pol0 < nrPolarisations; pol0++) for (size_t pol1 = 0; pol1 < nrPolarisations; pol1++) - ASSERTSTR(fpEquals(out.correlatedData[b][c][pol0][pol1], outputValue), + ASSERTSTR(fpEquals(out.correlatedData.integrations[0]->visibilities[b][c][pol0][pol1], outputValue), "out[" << b << "][" << c << "][" << pol0 << - "][" << pol1 << "] = " << out.correlatedData[b][c][pol0][pol1] << + "][" << pol1 << "] = " << out.correlatedData.integrations[0]->visibilities[b][c][pol0][pol1] << "; outputValue = " << outputValue); LOG_INFO("Test OK"); diff --git a/RTCP/Cobalt/InputProc/src/Delays/Delays.cc b/RTCP/Cobalt/InputProc/src/Delays/Delays.cc index f67a1e942306f53168d8b0dce00da477b4f38ecf..ab6928971b00699c5760af0eb871046616813d3d 100644 --- a/RTCP/Cobalt/InputProc/src/Delays/Delays.cc +++ b/RTCP/Cobalt/InputProc/src/Delays/Delays.cc @@ -284,8 +284,8 @@ namespace LOFAR // The coarse delay compensation is based on the average delay // between begin and end. - coarseDelaysSamples[sap] = static_cast<ssize_t>(round(0.5 * (delayAtBegin + delayAfterEnd) * parset.subbandBandwidth())); - coarseDelaysSeconds[sap] = coarseDelaysSamples[sap] / parset.subbandBandwidth(); + coarseDelaysSamples[sap] = static_cast<ssize_t>(round(0.5 * (delayAtBegin + delayAfterEnd) * parset.settings.subbandWidth())); + coarseDelaysSeconds[sap] = coarseDelaysSamples[sap] / parset.settings.subbandWidth(); } // Compute the offsets at which each subband is read diff --git a/RTCP/Cobalt/InputProc/src/Delays/printDelays.cc b/RTCP/Cobalt/InputProc/src/Delays/printDelays.cc index e006797213fa7c7170b336be927288d0e0fc0027..7cbf232f673737f5d7c2e2766a926014e17e25f7 100644 --- a/RTCP/Cobalt/InputProc/src/Delays/printDelays.cc +++ b/RTCP/Cobalt/InputProc/src/Delays/printDelays.cc @@ -60,8 +60,8 @@ int main( int argc, char **argv ) } /* Determine start/stop/blocksize parameters */ - const TimeStamp from(ps.startTime() * ps.subbandBandwidth(), ps.clockSpeed()); - const TimeStamp to(ps.stopTime() * ps.subbandBandwidth(), ps.clockSpeed()); + const TimeStamp from(ps.settings.startTime * ps.settings.subbandWidth(), ps.settings.clockHz()); + const TimeStamp to(ps.settings.stopTime * ps.settings.subbandWidth(), ps.settings.clockHz()); ssize_t block = -1; size_t blockSize = ps.nrSamplesPerSubband(); diff --git a/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc b/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc index 8df8cd6949d3ee27bd4d57288bd9cb7aa5590728..d2f421d83e47eb5827a5f9cbf23637f92bb29e7d 100644 --- a/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc +++ b/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc @@ -153,7 +153,7 @@ bool process(Stream &controlStream, unsigned myRank) // Create a collector for this fileIdx collectors[fileIdx] = new TABTranspose::BlockCollector( - *outputPools[fileIdx], fileIdx, nrSubbands, nrChannels, nrSamples, parset.nrBeamFormedBlocks(), parset.realTime() ? 5 : 0); + *outputPools[fileIdx], fileIdx, nrSubbands, nrChannels, nrSamples, parset.settings.nrBlocks(), parset.settings.realTime ? 5 : 0); string logPrefix = str(format("[obs %u beamformed stream %3u] ") % parset.observationID() % fileIdx); diff --git a/RTCP/Cobalt/OutputProc/src/InputThread.cc b/RTCP/Cobalt/OutputProc/src/InputThread.cc index a53de99a203a74dc81b49804197723e0415068fb..e636374c33e074c9af41d87b7aae29887b670db5 100644 --- a/RTCP/Cobalt/OutputProc/src/InputThread.cc +++ b/RTCP/Cobalt/OutputProc/src/InputThread.cc @@ -40,7 +40,7 @@ namespace LOFAR itsLogPrefix(logPrefix + "[InputThread] "), itsInputDescriptor(getStreamDescriptorBetweenIONandStorage(parset, CORRELATED_DATA, streamNr)), itsOutputPool(outputPool), - itsDeadline(parset.realTime() ? parset.stopTime() : 0) + itsDeadline(parset.settings.realTime ? parset.settings.stopTime : 0) { } diff --git a/RTCP/Cobalt/OutputProc/src/MSWriterCorrelated.cc b/RTCP/Cobalt/OutputProc/src/MSWriterCorrelated.cc index d003d615a0b9cada325d70a590788ac36e5964a9..3914faeab3bc726991a6dc53df982b559a2c746f 100644 --- a/RTCP/Cobalt/OutputProc/src/MSWriterCorrelated.cc +++ b/RTCP/Cobalt/OutputProc/src/MSWriterCorrelated.cc @@ -121,7 +121,7 @@ namespace LOFAR itsNrBlocksWritten++; itsConfiguration.replace("size", str(format("%u") % getDataSize())); - itsConfiguration.replace("duration", str(format("%f") % ((data->sequenceNumber() + 1) * itsParset.IONintegrationTime()))); + itsConfiguration.replace("duration", str(format("%f") % ((data->sequenceNumber() + 1) * itsParset.settings.correlator.integrationTime()))); itsConfiguration.replace("percentageWritten", str(format("%u") % percentageWritten())); } diff --git a/RTCP/Cobalt/OutputProc/src/MSWriterDAL.cc b/RTCP/Cobalt/OutputProc/src/MSWriterDAL.cc index 9ab0db259e7139aba2522bbe0253f883996aa379..75177eaf8445072ba17bcb601372631732902e65 100644 --- a/RTCP/Cobalt/OutputProc/src/MSWriterDAL.cc +++ b/RTCP/Cobalt/OutputProc/src/MSWriterDAL.cc @@ -110,7 +110,7 @@ namespace LOFAR itsNextSeqNr(0), itsFileNr(fileno) { - itsNrExpectedBlocks = itsParset.nrBeamFormedBlocks(); + itsNrExpectedBlocks = itsParset.settings.nrBlocks(); string h5filename = forceextension(string(filename),".h5"); string rawfilename = forceextension(string(filename),".raw"); @@ -145,8 +145,6 @@ namespace LOFAR itsBlockSize = itsNrSamples * itsNrChannels; - unsigned nrBlocks = parset.nrBeamFormedBlocks(); - //******************************* vector<string> stokesVars; @@ -211,7 +209,7 @@ namespace LOFAR file.observationStartMJD().value = toMJD(parset.settings.startTime); // The stop time can be a bit further than the one actually specified, because we process in blocks. - double stopTime = parset.settings.startTime + nrBlocks * parset.settings.blockDuration(); + double stopTime = parset.settings.startTime + itsNrExpectedBlocks * parset.settings.blockDuration(); file.observationEndUTC().value = toUTC(stopTime); file.observationEndMJD().value = toMJD(stopTime); @@ -219,7 +217,7 @@ namespace LOFAR file.observationNofStations().value = parset.nrStations(); // TODO: SS beamformer? file.observationStationsList().value = parset.allStationNames(); // TODO: SS beamformer? - double subbandBandwidth = parset.subbandBandwidth(); + double subbandBandwidth = parset.settings.subbandWidth(); double channelBandwidth = subbandBandwidth / stokesSet.nrChannels; // if PPF is used, the frequencies are shifted down by half a channel @@ -269,7 +267,7 @@ namespace LOFAR file.BFFormat().value = "TAB"; file.BFVersion().value = str(format("Cobalt/OutputProc %s r%s using DAL %s and HDF5 %s") % OutputProcVersion::getVersion() % OutputProcVersion::getRevision() % dal::version().to_string() % dal::version_hdf5().to_string()); - file.totalIntegrationTime().value = nrBlocks * parset.settings.blockDuration(); + file.totalIntegrationTime().value = itsNrExpectedBlocks * parset.settings.blockDuration(); file.totalIntegrationTimeUnit().value = "s"; //file.subArrayPointingDiameter().value = 0.0; @@ -298,7 +296,7 @@ namespace LOFAR // TODO: fix the system to use the parset.beamDuration(sapNr), but OLAP // does not work that way yet (beamDuration is currently unsupported). - sap.totalIntegrationTime().value = nrBlocks * parset.settings.blockDuration(); + sap.totalIntegrationTime().value = itsNrExpectedBlocks * parset.settings.blockDuration(); sap.totalIntegrationTimeUnit().value = "s"; // TODO: non-J2000 pointings. @@ -363,10 +361,10 @@ namespace LOFAR beam.beamDiameterDEC().value = 0; beam.beamDiameterDECUnit().value = "arcmin"; - beam.nofSamples().value = itsNrSamples * nrBlocks; - beam.samplingRate().value = parset.subbandBandwidth() / stokesSet.nrChannels / stokesSet.timeIntegrationFactor; + beam.nofSamples().value = itsNrSamples * itsNrExpectedBlocks; + beam.samplingRate().value = parset.settings.subbandWidth() / stokesSet.nrChannels / stokesSet.timeIntegrationFactor; beam.samplingRateUnit().value = "Hz"; - beam.samplingTime().value = parset.sampleDuration() * stokesSet.nrChannels * stokesSet.timeIntegrationFactor; + beam.samplingTime().value = parset.settings.sampleDuration() * stokesSet.nrChannels * stokesSet.timeIntegrationFactor; beam.samplingTimeUnit().value = "s"; beam.channelsPerSubband().value = stokesSet.nrChannels; @@ -502,7 +500,7 @@ namespace LOFAR vector<ssize_t> dims(2), maxdims(2); - dims[0] = itsNrSamples * nrBlocks; + dims[0] = itsNrSamples * itsNrExpectedBlocks; dims[1] = itsNrChannels; maxdims[0] = -1; diff --git a/RTCP/Cobalt/OutputProc/src/MSWriterNull.cc b/RTCP/Cobalt/OutputProc/src/MSWriterNull.cc index 4c63ca356c725cae4cd766819d65be2c73759b81..4260d4c06a8b8b71e5293d68c5d0ba8b21ba8868 100644 --- a/RTCP/Cobalt/OutputProc/src/MSWriterNull.cc +++ b/RTCP/Cobalt/OutputProc/src/MSWriterNull.cc @@ -57,7 +57,7 @@ namespace LOFAR itsConfiguration.replace("size", str(format("%u") % getDataSize())); itsConfiguration.replace("duration", str(format("%f") % ((data->sequenceNumber() + 1) * - itsParset.IONintegrationTime()))); + itsParset.settings.correlator.integrationTime()))); } } // namespace Cobalt diff --git a/RTCP/Cobalt/OutputProc/src/MeasurementSetFormat.cc b/RTCP/Cobalt/OutputProc/src/MeasurementSetFormat.cc index 504baa316f41796929bea1d6dbd8d1059fe9fc36..e4c0d458b3133c6869d7610654114995c592f58f 100644 --- a/RTCP/Cobalt/OutputProc/src/MeasurementSetFormat.cc +++ b/RTCP/Cobalt/OutputProc/src/MeasurementSetFormat.cc @@ -110,10 +110,10 @@ namespace LOFAR antPos.size() << " == " << itsPS.nrStations()); } - itsStartTime = toMJDs(itsPS.startTime()); + itsStartTime = toMJDs(itsPS.settings.startTime); - itsTimeStep = itsPS.IONintegrationTime(); - itsNrTimes = itsPS.nrCorrelatedBlocks(); + itsTimeStep = itsPS.settings.correlator.integrationTime(); + itsNrTimes = itsPS.settings.correlator.nrIntegrationsPerBlock * itsPS.settings.nrBlocks(); } @@ -349,7 +349,7 @@ namespace LOFAR void MeasurementSetFormat::fillPola() { - const unsigned npolarizations = itsPS.nrCrossPolarisations(); + const unsigned npolarizations = itsPS.settings.nrCrossPolarisations(); MSPolarization mspol = itsMS->polarization(); MSPolarizationColumns mspolCol(mspol); @@ -414,11 +414,11 @@ namespace LOFAR double minFreq = *std::min_element( freqs.begin(), freqs.end() ); double maxFreq = *std::max_element( freqs.begin(), freqs.end() ); - size_t nchan = itsPS.nrChannelsPerSubband(); + const size_t nchan = itsPS.settings.correlator.nrChannels; if( nchan > 1 ) { // 2nd PPF shifts frequencies downwards by half a channel - double width = itsPS.channelWidth(); + const double width = itsPS.settings.correlator.channelWidth; minFreq -= 0.5 * nchan * width; maxFreq -= 0.5 * nchan * width; @@ -469,7 +469,7 @@ namespace LOFAR msobsCol.nofBitsPerSample().put(0, itsPS.nrBitsPerSample()); msobsCol.antennaSet().put(0, itsPS.antennaSet()); msobsCol.filterSelection().put(0, itsPS.bandFilter()); - msobsCol.clockFrequencyQuant().put(0, Quantity(itsPS.clockSpeed(), "Hz")); + msobsCol.clockFrequencyQuant().put(0, Quantity(itsPS.settings.clockHz(), "Hz")); msobsCol.target().put(0, ctargets); msobsCol.systemVersion().put(0, Version::getInfo<OutputProcVersion>("OutputProc", "brief")); @@ -485,8 +485,8 @@ namespace LOFAR void MeasurementSetFormat::fillSpecWindow(unsigned subband) { const double refFreq = itsPS.settings.subbands[subband].centralFrequency; - const size_t nchan = itsPS.nrChannelsPerSubband(); - const double chanWidth = itsPS.channelWidth(); + const size_t nchan = itsPS.settings.correlator.nrChannels; + const double chanWidth = itsPS.settings.correlator.channelWidth; const double totalBW = nchan * chanWidth; const double channel0freq = itsPS.channel0Frequency(subband, nchan); @@ -583,15 +583,18 @@ namespace LOFAR aio.putstart("LofarStMan", LofarStManVersion); aio << ant1 << ant2 << itsStartTime - << itsPS.IONintegrationTime() - << itsPS.nrChannelsPerSubband() - << itsPS.nrCrossPolarisations() - << static_cast<double>(itsPS.CNintegrationSteps() * itsPS.IONintegrationSteps()) + << itsPS.settings.correlator.integrationTime() + << itsPS.settings.correlator.nrChannels + << itsPS.settings.nrCrossPolarisations() + << static_cast<double>(itsPS.settings.correlator.nrSamplesPerIntegration()) << itsAlignment << false; // isBigEndian if (LofarStManVersion > 1) { - uInt itsNrBytesPerNrValidSamples = - itsPS.integrationSteps() < 256 ? 1 : itsPS.integrationSteps() < 65536 ? 2 : 4; + const size_t integrationSteps = itsPS.settings.correlator.nrSamplesPerIntegration(); + const uInt itsNrBytesPerNrValidSamples = + integrationSteps < 256 ? 1 : + integrationSteps < 65536 ? 2 : + 4; aio << itsNrBytesPerNrValidSamples; } aio.close(); diff --git a/RTCP/Cobalt/OutputProc/src/OutputThread.cc b/RTCP/Cobalt/OutputProc/src/OutputThread.cc index 7c9897d8e4b86b5e918d2c19af7c21a228fe3744..5c5743216db3b692ed4b4c174c4535de4499b495 100644 --- a/RTCP/Cobalt/OutputProc/src/OutputThread.cc +++ b/RTCP/Cobalt/OutputProc/src/OutputThread.cc @@ -293,7 +293,7 @@ namespace LOFAR #endif } - itsNrExpectedBlocks = itsParset.nrCorrelatedBlocks(); + itsNrExpectedBlocks = itsParset.settings.nrBlocks() * itsParset.settings.correlator.nrIntegrationsPerBlock; } @@ -366,7 +366,7 @@ namespace LOFAR #endif } - itsNrExpectedBlocks = itsParset.nrBeamFormedBlocks(); + itsNrExpectedBlocks = itsParset.settings.nrBlocks(); } } // namespace Cobalt } // namespace LOFAR diff --git a/RTCP/Cobalt/OutputProc/src/SubbandWriter.cc b/RTCP/Cobalt/OutputProc/src/SubbandWriter.cc index 288d3132776bea49ca0c020e5edc0b7cc8b95a1b..074ad4d5a8228072be1435f4204ebcd408785d6c 100644 --- a/RTCP/Cobalt/OutputProc/src/SubbandWriter.cc +++ b/RTCP/Cobalt/OutputProc/src/SubbandWriter.cc @@ -40,7 +40,7 @@ namespace LOFAR itsOutputThread(parset, streamNr, itsOutputPool, mdLogger, mdKeyPrefix, logPrefix) { for (unsigned i = 0; i < maxReceiveQueueSize; i++) - itsOutputPool.free.append(new CorrelatedData(parset.nrMergedStations(), parset.nrChannelsPerSubband(), parset.integrationSteps(), heapAllocator, 512)); + itsOutputPool.free.append(new CorrelatedData(parset.settings.correlator.stations.size(), parset.settings.correlator.nrChannels, parset.settings.correlator.nrSamplesPerIntegration(), heapAllocator, 512)); } diff --git a/RTCP/Cobalt/OutputProc/src/plotMS.cc b/RTCP/Cobalt/OutputProc/src/plotMS.cc index 28f1a5de1fac04b1a4ec6fb3e3c6826884132598..11215cb8483f96f1e868bcac41af3369b99df7cf 100644 --- a/RTCP/Cobalt/OutputProc/src/plotMS.cc +++ b/RTCP/Cobalt/OutputProc/src/plotMS.cc @@ -131,14 +131,16 @@ int main(int argc, char *argv[]) usage(argv[0], 1); Parset parset(parset_filename); + ASSERT( parset.settings.correlator.enabled ); + FileStream datafile(table_filename); - CorrelatedData *data = new CorrelatedData(parset.nrMergedStations(), parset.nrChannelsPerSubband(), parset.integrationSteps(), heapAllocator, 512); + CorrelatedData *data = new CorrelatedData(parset.nrMergedStations(), parset.settings.correlator.nrChannels, parset.settings.correlator.nrSamplesPerIntegration(), heapAllocator, 512); if (channel == -1) - channel = parset.nrChannelsPerSubband() == 1 ? 0 : 1; // default to first useful channel + channel = parset.settings.correlator.nrChannels == 1 ? 0 : 1; // default to first useful channel ASSERT( data ); - ASSERT( channel >= 0 && (unsigned)channel < parset.nrChannelsPerSubband() ); + ASSERT( channel >= 0 && (unsigned)channel < parset.settings.correlator.nrChannels ); // determine base line from string casa::Block<int32> itsAnt1; diff --git a/RTCP/Cobalt/OutputProc/test/tMSWriterCorrelated.cc b/RTCP/Cobalt/OutputProc/test/tMSWriterCorrelated.cc index 47a703f8156c6eec77db54ba1b31f04e9eab24fe..ffdccf0b1184a37784ab7b91b8305f25fc1d2e9c 100644 --- a/RTCP/Cobalt/OutputProc/test/tMSWriterCorrelated.cc +++ b/RTCP/Cobalt/OutputProc/test/tMSWriterCorrelated.cc @@ -42,7 +42,7 @@ int main() MSWriterCorrelated writer("", "tMSWriterCorrelated.in_1/SB000.MS", parset, 0); // Write some data - CorrelatedData data(parset.nrMergedStations(), parset.nrChannelsPerSubband(), parset.integrationSteps(), heapAllocator, 512); + CorrelatedData data(parset.nrMergedStations(), parset.settings.correlator.nrChannels, parset.settings.correlator.nrSamplesPerIntegration(), heapAllocator, 512); writer.write(&data); diff --git a/RTCP/Cobalt/OutputProc/test/tSubbandWriter.cc b/RTCP/Cobalt/OutputProc/test/tSubbandWriter.cc index 8331c8f30d28babea805819a8f78340fe8ef5fa5..02d5cd431e7757627926383bc6386980a210c2c8 100644 --- a/RTCP/Cobalt/OutputProc/test/tSubbandWriter.cc +++ b/RTCP/Cobalt/OutputProc/test/tSubbandWriter.cc @@ -104,7 +104,7 @@ SUITE(SubbandWriter) SmartPtr<Stream> inputStream = createStream(sendDesc, false, 0); - CorrelatedData data(ps.nrMergedStations(), ps.nrChannelsPerSubband(), ps.integrationSteps(), heapAllocator, 512); + CorrelatedData data(ps.nrMergedStations(), ps.settings.correlator.nrChannels, ps.settings.correlator.nrSamplesPerIntegration(), heapAllocator, 512); for (size_t i = 0; i < data.visibilities.num_elements(); ++i) { *(data.visibilities.origin() + i) = complex<float>(i, 2*i); @@ -118,7 +118,7 @@ SUITE(SubbandWriter) { FileStream f("tWriter.out_raw/table.f0data"); - CorrelatedData data(ps.nrMergedStations(), ps.nrChannelsPerSubband(), ps.integrationSteps(), heapAllocator, 512); + CorrelatedData data(ps.nrMergedStations(), ps.settings.correlator.nrChannels, ps.settings.correlator.nrSamplesPerIntegration(), heapAllocator, 512); data.read(&f, true, 512);