diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc index b75a7a0f437c9087a7de87dfec871f66b79dfe38..add368335d64ed5beef422a8dce5ab03992ae7d9 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc @@ -444,7 +444,8 @@ namespace LOFAR size_t block = output->block; unsigned subband = output->subband; - CorrelatorWorkQueue &queue = output->queue; // cache queue object, because `output' will be destroyed + // Cache workQueue reference, because `output' will be destroyed. + CorrelatorWorkQueue &workQueue = output->workQueue; if (subband == 0 || subband == ps.nrSubbands() - 1) { LOG_INFO_STR("[block " << block << ", subband " << subband << "] Writing start"); @@ -460,7 +461,7 @@ namespace LOFAR } // Hand the object back to the workQueue it originally came from - queue.outputPool.free.append(output); + workQueue.outputPool.free.append(output); ASSERT(!output); diff --git a/RTCP/Cobalt/GPUProc/src/cuda/WorkQueues/CorrelatorWorkQueue.cc b/RTCP/Cobalt/GPUProc/src/cuda/WorkQueues/CorrelatorWorkQueue.cc index 4f217501aa56bb2634f1e70af0b87f32b998a753..4b5b752a4d827886db0e0fe56fec238a710bbf54 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/WorkQueues/CorrelatorWorkQueue.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/WorkQueues/CorrelatorWorkQueue.cc @@ -100,22 +100,23 @@ namespace LOFAR // // At least 3 items are needed for a smooth Pool operation. size_t nrInputDatas = std::max(3UL, ps.nrSubbands()); - for(size_t i = 0; i < nrInputDatas; ++i) { + for (size_t i = 0; i < nrInputDatas; ++i) { inputPool.free.append(new WorkQueueInputData( ps.nrBeams(), ps.nrStations(), NR_POLARIZATIONS, ps.nrHistorySamples() + ps.nrSamplesPerSubband(), ps.nrBytesPerComplexSample(), - devInput)); + context)); } // put enough objects in the outputPool to operate - for(size_t i = 0; i < 3; ++i) { + for (size_t i = 0; i < 3; ++i) { outputPool.free.append(new CorrelatedDataHostBuffer( ps.nrStations(), ps.nrChannelsPerSubband(), ps.integrationSteps(), + context, *this)); } @@ -152,13 +153,13 @@ namespace LOFAR // first verify that the device platform still allows workqueue overlap. size_t firWeightsSize = filterBank.getWeights().num_elements() * sizeof(float); gpu::HostMemory firWeights(context, firWeightsSize); - std::memcpy(firWeights.get<void>(), filterBank.getWeights().origin(), fbBytes); + std::memcpy(firWeights.get<void>(), filterBank.getWeights().origin(), firWeightsSize); queue.writeBuffer(devFIRweights, firWeights, true); if (ps.correctBandPass()) { gpu::HostMemory bpWeights(context, ps.nrChannelsPerSubband() * sizeof(float)); - BandPass::computeCorrectionFactors(bpWeights.origin(), + BandPass::computeCorrectionFactors(bpWeights.get<float>(), ps.nrChannelsPerSubband()); queue.writeBuffer(devBandPassCorrectionWeights, bpWeights, true); } @@ -339,17 +340,18 @@ namespace LOFAR template<typename T> void CorrelatorWorkQueue::flagFunctions::applyFractionOfFlaggedSamplesOnVisibilities(Parset const &parset, CorrelatedData &output) { - for (unsigned bl = 0; bl < output.itsNrBaselines; ++bl) { + for (unsigned bl = 0; bl < output.itsNrBaselines; ++bl) + { // Calculate the weights for the channels // // Channel 0 is already flagged according to specs, so we can simply // include it both for 1 and >1 channels/subband. - for(unsigned ch = 0; ch < parset.nrChannelsPerSubband(); ch ++) + for (unsigned ch = 0; ch < parset.nrChannelsPerSubband(); ch++) { T nrValidSamples = output.nrValidSamples<T>(bl, ch); - // If all samples flagged weights is zero - // TODO: make a lookup table for the expensive division + // If all samples flagged, weights is zero. + // TODO: make a lookup table for the expensive division; measure first float weight = nrValidSamples ? 1e-6f / nrValidSamples : 0; applyWeightingToAllPolarizations(bl, ch, weight, output); @@ -379,7 +381,7 @@ namespace LOFAR #if defined USE_B7015 OMP_ScopedLock scopedLock(pipeline.hostToDeviceLock[gpu / 2]); #endif - input.inputSamples.hostToDevice(true); + queue.writeBuffer(devInput.inputSamples, input.inputSamples, true); // counters["input - samples"]->doOperation(input.inputSamples.deviceBuffer.event, 0, 0, input.inputSamples.bytesize()); timers["GPU - input"]->stop(); @@ -394,9 +396,9 @@ namespace LOFAR // Only upload delays if they changed w.r.t. the previous subband if ((int)SAP != prevSAP || (ssize_t)block != prevBlock) { - input.delaysAtBegin.hostToDevice(false); - input.delaysAfterEnd.hostToDevice(false); - input.phaseOffsets.hostToDevice(false); + queue.writeBuffer(devInput.delaysAtBegin, input.delaysAtBegin, false); + queue.writeBuffer(devInput.delaysAfterEnd, input.delaysAfterEnd, false); + queue.writeBuffer(devInput.phaseOffsets, input.phaseOffsets, false); prevSAP = SAP; prevBlock = block; @@ -436,8 +438,8 @@ namespace LOFAR #ifdef USE_B7015 OMP_ScopedLock scopedLock(pipeline.deviceToHostLock[gpu / 2]); #endif - output.deviceToHost(true); - // now perform weighting of the data based on the number of valid samples + queue.readBuffer(output, devFilteredData, true); + // now perform weighting of the data based on the number of valid samples; TODO??? // counters["output - visibilities"]->doOperation(output.deviceBuffer.event, 0, output.bytesize(), 0); diff --git a/RTCP/Cobalt/GPUProc/src/cuda/WorkQueues/CorrelatorWorkQueue.h b/RTCP/Cobalt/GPUProc/src/cuda/WorkQueues/CorrelatorWorkQueue.h index af86ee52a3916239135529d5ae6dae36cb59d241..78b37bff6c57a5211d0fb63c3eecd072dc4f94bd 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/WorkQueues/CorrelatorWorkQueue.h +++ b/RTCP/Cobalt/GPUProc/src/cuda/WorkQueues/CorrelatorWorkQueue.h @@ -106,13 +106,14 @@ namespace LOFAR { public: CorrelatedDataHostBuffer(unsigned nrStations, unsigned nrChannels, - unsigned maxNrValidSamples, CorrelatorWorkQueue &workQueue) + unsigned maxNrValidSamples, gpu::Context &context, + CorrelatorWorkQueue &workQueue) : CorrelatedData(nrStations, nrChannels, maxNrValidSamples, this->origin(), this->num_elements(), heapAllocator, 1), MultiDimArrayHostBuffer<fcomplex, 4>(boost::extents[nrStations * (nrStations + 1) / 2] [nrChannels][NR_POLARIZATIONS] - [NR_POLARIZATIONS], 0), + [NR_POLARIZATIONS], context, 0), workQueue(workQueue) { } @@ -187,12 +188,16 @@ namespace LOFAR // Create the inputData object we need shared host/device memory on the supplied devicequeue WorkQueueInputData(size_t n_beams, size_t n_stations, size_t n_polarizations, size_t n_samples, size_t bytes_per_complex_sample, - unsigned int hostBufferFlags = 0) + gpu::Context &context, unsigned int hostBufferFlags = 0) : - delaysAtBegin(boost::extents[n_beams][n_stations][n_polarizations], hostBufferFlags), - delaysAfterEnd(boost::extents[n_beams][n_stations][n_polarizations], hostBufferFlags), - phaseOffsets(boost::extents[n_stations][n_polarizations], hostBufferFlags), - inputSamples(boost::extents[n_stations][n_samples][n_polarizations][bytes_per_complex_sample], hostBufferFlags), // TODO: The size of the buffer is NOT validated + delaysAtBegin(boost::extents[n_beams][n_stations][n_polarizations], + context, hostBufferFlags), + delaysAfterEnd(boost::extents[n_beams][n_stations][n_polarizations], + context, hostBufferFlags), + phaseOffsets(boost::extents[n_stations][n_polarizations], + context, hostBufferFlags), + inputSamples(boost::extents[n_stations][n_samples][n_polarizations][bytes_per_complex_sample], + context, hostBufferFlags), // TODO: The size of the buffer is NOT validated inputFlags(boost::extents[n_stations]) { }