diff --git a/RTCP/Cobalt/GPUProc/src/CMakeLists.txt b/RTCP/Cobalt/GPUProc/src/CMakeLists.txt index 03094df4b14299a6478b67672b10383c382b5557..33fb73b240310e3201e3d041b9eadf2b1c358c09 100644 --- a/RTCP/Cobalt/GPUProc/src/CMakeLists.txt +++ b/RTCP/Cobalt/GPUProc/src/CMakeLists.txt @@ -54,8 +54,8 @@ list(APPEND _gpuproc_sources #Kernels/UHEP_InvFIR_Kernel.cc #Kernels/UHEP_TransposeKernel.cc #Kernels/UHEP_TriggerKernel.cc - cuda/Pipelines/Pipeline.cc -# cuda/Pipelines/UHEP_Pipeline.cc + Pipelines/Pipeline.cc +# Pipelines/UHEP_Pipeline.cc cuda/SubbandProcs/SubbandProc.cc cuda/SubbandProcs/SubbandProcInputData.cc cuda/SubbandProcs/SubbandProcOutputData.cc diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc b/RTCP/Cobalt/GPUProc/src/Pipelines/Pipeline.cc similarity index 100% rename from RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc rename to RTCP/Cobalt/GPUProc/src/Pipelines/Pipeline.cc diff --git a/RTCP/Cobalt/GPUProc/src/Pipelines/Pipeline.h b/RTCP/Cobalt/GPUProc/src/Pipelines/Pipeline.h index 0718dcdac67368e0034c519ca6aa9232ba7f4d7f..b09516353d02eed582d95e36bb683953b9fe1ba1 100644 --- a/RTCP/Cobalt/GPUProc/src/Pipelines/Pipeline.h +++ b/RTCP/Cobalt/GPUProc/src/Pipelines/Pipeline.h @@ -1,6 +1,5 @@ //# Pipeline.h -//# -//# Copyright (C) 2013 ASTRON (Netherlands Institute for Radio Astronomy) +//# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy) //# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands //# //# This file is part of the LOFAR software suite. @@ -19,23 +18,130 @@ //# //# $Id$ -// \file -// Support for the beamformer pipeline. +#ifndef LOFAR_GPUPROC_CUDA_PIPELINE_H +#define LOFAR_GPUPROC_CUDA_PIPELINE_H -#ifndef LOFAR_GPUPROC_PIPELINE_H -#define LOFAR_GPUPROC_PIPELINE_H +#include <string> +#include <vector> -#if defined (USE_CUDA) && defined (USE_OPENCL) -# error "Either CUDA or OpenCL must be enabled, not both" -#endif +#include <Common/LofarTypes.h> +#include <MACIO/RTmetadata.h> +#include <CoInterface/Parset.h> +#include <CoInterface/SmartPtr.h> +#include <CoInterface/Pool.h> +#include <CoInterface/OMPThread.h> +#include <CoInterface/TABTranspose.h> -#if defined (USE_CUDA) -# include <GPUProc/cuda/Pipelines/Pipeline.h> -#elif defined (USE_OPENCL) -# include <GPUProc/opencl/Pipelines/Pipeline.h> -#else -# error "Either CUDA or OpenCL must be enabled, not neither" -#endif +#include <GPUProc/gpu_wrapper.h> +#include <GPUProc/PerformanceCounter.h> +#include <GPUProc/SubbandProcs/SubbandProc.h> +#include <GPUProc/SubbandProcs/KernelFactories.h> + +#include <GPUProc/MPIReceiver.h> + +namespace LOFAR +{ + namespace Cobalt + { + using MACIO::RTmetadata; + + class Pipeline + { + public: + Pipeline(const Parset &, const std::vector<size_t> &subbandIndices, + const std::vector<gpu::Device> &devices, + Pool<struct MPIRecvData> &pool, + RTmetadata &mdLogger, const std::string &mdKeyPrefix, + unsigned hostID = 0); + + ~Pipeline(); + + // allocate resources, such as GPU buffers. + // + // Pipeline deploys delayed construction, because the resources may still + // be occupied by a previous observation at the time of construction. + // + // An alternative to delayed allocation could be to retry the GPU malloc + // with a timeout, but that could potentially dead-lock two concurrent + // observations. + void allocateResources(); + + // for each subband get data from input stream, sync, start the kernels to process all data, write output in parallel + void processObservation(); + + struct Output + { + // output data queue + SmartPtr< Queue< SmartPtr<SubbandProcOutputData> > > queue; + }; + + std::vector< SmartPtr<SubbandProc> > subbandProcs; + + protected: + const Parset &ps; + const std::vector<gpu::Device> devices; + + const std::vector<size_t> subbandIndices; // [localSubbandIdx] + + // Whether we're the pipeline that processes the first subband. + // If true, we log our progress at INFO. Otherwise, at DEBUG. + const bool processingSubband0; + + const size_t nrSubbandsPerSubbandProc; + + RTmetadata &itsMdLogger; // non-const to be able to use its log() + const std::string itsMdKeyPrefix; + + // Threads that write to outputProc, and need to + // be killed when they stall at observation end. + OMPThreadSet outputThreads; + + Pool<struct MPIRecvData> &mpiPool; + + std::vector<struct Output> writePool; // [localSubbandIdx] + + KernelFactories factories; + + // For each block, transpose all subbands from all stations, and divide the + // work over the subbandProcs + void transposeInput(); + template<typename SampleT> void transposeInput(); + + // preprocess subbands on the CPU + void preprocessSubbands(SubbandProc &subbandProc); + + // process subbands on the GPU + void processSubbands(SubbandProc &subbandProc); + + // Post-process subbands on the CPU + void postprocessSubbands(SubbandProc &subbandProc); + + void writeBeamformedOutput( + unsigned globalSubbandIdx, + Queue< SmartPtr<SubbandProcOutputData> > &inputQueue, + Queue< SmartPtr<SubbandProcOutputData> > &outputQueue, + Queue< SmartPtr<SubbandProcOutputData> > &spillQueue ); + + void writeCorrelatedOutput( + unsigned globalSubbandIdx, + Queue< SmartPtr<SubbandProcOutputData> > &inputQueue, + Queue< SmartPtr<SubbandProcOutputData> > &outputQueue ); + + public: + // Send subbands to Storage + void writeOutput( + unsigned globalSubbandIdx, + Queue< SmartPtr<SubbandProcOutputData> > &inputQueue, + Queue< SmartPtr<SubbandProcOutputData> > &outputQueue ); + + // Output send engine, takes care of the host connections and the multiplexing. + TABTranspose::MultiSender multiSender; + + // MPI rank for this node + const unsigned hostID; + }; + } +} #endif diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/UHEP_Pipeline.cc b/RTCP/Cobalt/GPUProc/src/Pipelines/UHEP_Pipeline.cc similarity index 100% rename from RTCP/Cobalt/GPUProc/src/cuda/Pipelines/UHEP_Pipeline.cc rename to RTCP/Cobalt/GPUProc/src/Pipelines/UHEP_Pipeline.cc diff --git a/RTCP/Cobalt/GPUProc/src/Pipelines/UHEP_Pipeline.h b/RTCP/Cobalt/GPUProc/src/Pipelines/UHEP_Pipeline.h index 8fdcc994b2974170d7418a7057e5702bbed0abbd..617d168b42be3e1324d90a882967b7e382608684 100644 --- a/RTCP/Cobalt/GPUProc/src/Pipelines/UHEP_Pipeline.h +++ b/RTCP/Cobalt/GPUProc/src/Pipelines/UHEP_Pipeline.h @@ -1,6 +1,5 @@ //# UHEP_Pipeline.h -//# -//# Copyright (C) 2013 ASTRON (Netherlands Institute for Radio Astronomy) +//# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy) //# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands //# //# This file is part of the LOFAR software suite. @@ -19,23 +18,35 @@ //# //# $Id$ -// \file -// Include the right GPU API include with our options. +#ifndef LOFAR_GPUPROC_CUDA_UHEP_PIPELINE_H +#define LOFAR_GPUPROC_CUDA_UHEP_PIPELINE_H -#ifndef LOFAR_GPUPROC_UHEP_PIPELINE_H -#define LOFAR_GPUPROC_UHEP_PIPELINE_H +#include <CoInterface/Parset.h> -#if defined (USE_CUDA) && defined (USE_OPENCL) -# error "Either CUDA or OpenCL must be enabled, not both" -#endif +#include <GPUProc/gpu_wrapper.h> +#include "Pipeline.h" +#include <GPUProc/PerformanceCounter.h> -#if defined (USE_CUDA) -# include <GPUProc/cuda/Pipelines/UHEP_Pipeline.h> -#elif defined (USE_OPENCL) -# include <GPUProc/opencl/Pipelines/UHEP_Pipeline.h> -#else -# error "Either CUDA or OpenCL must be enabled, not neither" -#endif +namespace LOFAR +{ + namespace Cobalt + { + + class UHEP_Pipeline : public Pipeline + { + public: + UHEP_Pipeline(const Parset &, Pool<struct MPIRecvData> &pool); + + void doWork(); + + gpu::Module beamFormerProgram, transposeProgram, invFFTprogram, invFIRfilterProgram, triggerProgram; + + PerformanceCounter beamFormerCounter, transposeCounter, invFFTcounter, invFIRfilterCounter, triggerCounter; + PerformanceCounter beamFormerWeightsCounter, samplesCounter; + }; + + } +} #endif diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.h b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.h deleted file mode 100644 index b09516353d02eed582d95e36bb683953b9fe1ba1..0000000000000000000000000000000000000000 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.h +++ /dev/null @@ -1,147 +0,0 @@ -//# Pipeline.h -//# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy) -//# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands -//# -//# This file is part of the LOFAR software suite. -//# The LOFAR software suite is free software: you can redistribute it and/or -//# modify it under the terms of the GNU General Public License as published -//# by the Free Software Foundation, either version 3 of the License, or -//# (at your option) any later version. -//# -//# The LOFAR software suite is distributed in the hope that it will be useful, -//# but WITHOUT ANY WARRANTY; without even the implied warranty of -//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -//# GNU General Public License for more details. -//# -//# You should have received a copy of the GNU General Public License along -//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. -//# -//# $Id$ - -#ifndef LOFAR_GPUPROC_CUDA_PIPELINE_H -#define LOFAR_GPUPROC_CUDA_PIPELINE_H - -#include <string> -#include <vector> - -#include <Common/LofarTypes.h> -#include <MACIO/RTmetadata.h> -#include <CoInterface/Parset.h> -#include <CoInterface/SmartPtr.h> -#include <CoInterface/Pool.h> -#include <CoInterface/OMPThread.h> -#include <CoInterface/TABTranspose.h> - -#include <GPUProc/gpu_wrapper.h> -#include <GPUProc/PerformanceCounter.h> -#include <GPUProc/SubbandProcs/SubbandProc.h> -#include <GPUProc/SubbandProcs/KernelFactories.h> - -#include <GPUProc/MPIReceiver.h> - -namespace LOFAR -{ - namespace Cobalt - { - using MACIO::RTmetadata; - - class Pipeline - { - public: - Pipeline(const Parset &, const std::vector<size_t> &subbandIndices, - const std::vector<gpu::Device> &devices, - Pool<struct MPIRecvData> &pool, - RTmetadata &mdLogger, const std::string &mdKeyPrefix, - unsigned hostID = 0); - - ~Pipeline(); - - // allocate resources, such as GPU buffers. - // - // Pipeline deploys delayed construction, because the resources may still - // be occupied by a previous observation at the time of construction. - // - // An alternative to delayed allocation could be to retry the GPU malloc - // with a timeout, but that could potentially dead-lock two concurrent - // observations. - void allocateResources(); - - // for each subband get data from input stream, sync, start the kernels to process all data, write output in parallel - void processObservation(); - - struct Output - { - // output data queue - SmartPtr< Queue< SmartPtr<SubbandProcOutputData> > > queue; - }; - - std::vector< SmartPtr<SubbandProc> > subbandProcs; - - protected: - const Parset &ps; - const std::vector<gpu::Device> devices; - - const std::vector<size_t> subbandIndices; // [localSubbandIdx] - - // Whether we're the pipeline that processes the first subband. - // If true, we log our progress at INFO. Otherwise, at DEBUG. - const bool processingSubband0; - - const size_t nrSubbandsPerSubbandProc; - - RTmetadata &itsMdLogger; // non-const to be able to use its log() - const std::string itsMdKeyPrefix; - - // Threads that write to outputProc, and need to - // be killed when they stall at observation end. - OMPThreadSet outputThreads; - - Pool<struct MPIRecvData> &mpiPool; - - std::vector<struct Output> writePool; // [localSubbandIdx] - - KernelFactories factories; - - // For each block, transpose all subbands from all stations, and divide the - // work over the subbandProcs - void transposeInput(); - template<typename SampleT> void transposeInput(); - - // preprocess subbands on the CPU - void preprocessSubbands(SubbandProc &subbandProc); - - // process subbands on the GPU - void processSubbands(SubbandProc &subbandProc); - - // Post-process subbands on the CPU - void postprocessSubbands(SubbandProc &subbandProc); - - void writeBeamformedOutput( - unsigned globalSubbandIdx, - Queue< SmartPtr<SubbandProcOutputData> > &inputQueue, - Queue< SmartPtr<SubbandProcOutputData> > &outputQueue, - Queue< SmartPtr<SubbandProcOutputData> > &spillQueue ); - - void writeCorrelatedOutput( - unsigned globalSubbandIdx, - Queue< SmartPtr<SubbandProcOutputData> > &inputQueue, - Queue< SmartPtr<SubbandProcOutputData> > &outputQueue ); - - public: - // Send subbands to Storage - void writeOutput( - unsigned globalSubbandIdx, - Queue< SmartPtr<SubbandProcOutputData> > &inputQueue, - Queue< SmartPtr<SubbandProcOutputData> > &outputQueue ); - - // Output send engine, takes care of the host connections and the multiplexing. - TABTranspose::MultiSender multiSender; - - // MPI rank for this node - const unsigned hostID; - }; - } -} - -#endif - diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/UHEP_Pipeline.h b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/UHEP_Pipeline.h deleted file mode 100644 index 617d168b42be3e1324d90a882967b7e382608684..0000000000000000000000000000000000000000 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/UHEP_Pipeline.h +++ /dev/null @@ -1,52 +0,0 @@ -//# UHEP_Pipeline.h -//# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy) -//# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands -//# -//# This file is part of the LOFAR software suite. -//# The LOFAR software suite is free software: you can redistribute it and/or -//# modify it under the terms of the GNU General Public License as published -//# by the Free Software Foundation, either version 3 of the License, or -//# (at your option) any later version. -//# -//# The LOFAR software suite is distributed in the hope that it will be useful, -//# but WITHOUT ANY WARRANTY; without even the implied warranty of -//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -//# GNU General Public License for more details. -//# -//# You should have received a copy of the GNU General Public License along -//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. -//# -//# $Id$ - -#ifndef LOFAR_GPUPROC_CUDA_UHEP_PIPELINE_H -#define LOFAR_GPUPROC_CUDA_UHEP_PIPELINE_H - -#include <CoInterface/Parset.h> - -#include <GPUProc/gpu_wrapper.h> -#include "Pipeline.h" -#include <GPUProc/PerformanceCounter.h> - -namespace LOFAR -{ - namespace Cobalt - { - - class UHEP_Pipeline : public Pipeline - { - public: - UHEP_Pipeline(const Parset &, Pool<struct MPIRecvData> &pool); - - void doWork(); - - gpu::Module beamFormerProgram, transposeProgram, invFFTprogram, invFIRfilterProgram, triggerProgram; - - PerformanceCounter beamFormerCounter, transposeCounter, invFFTcounter, invFIRfilterCounter, triggerCounter; - PerformanceCounter beamFormerWeightsCounter, samplesCounter; - }; - - } -} - -#endif -