From fdc2294e84a455593e44f38bbf3eac13f2eefd78 Mon Sep 17 00:00:00 2001 From: Jan David Mol <mol@astron.nl> Date: Wed, 5 Jun 2013 08:12:03 +0000 Subject: [PATCH] Task #4391: Cleanup, and shortened code by defining an operator<< on BlockID --- RTCP/Cobalt/GPUProc/src/BlockID.cc | 36 +++++++++++++ RTCP/Cobalt/GPUProc/src/BlockID.h | 3 ++ RTCP/Cobalt/GPUProc/src/CMakeLists.txt | 1 + .../src/cuda/Pipelines/CorrelatorPipeline.cc | 53 ++++++++++--------- RTCP/Cobalt/GPUProc/test/testParset.sh | 4 +- 5 files changed, 69 insertions(+), 28 deletions(-) create mode 100644 RTCP/Cobalt/GPUProc/src/BlockID.cc diff --git a/RTCP/Cobalt/GPUProc/src/BlockID.cc b/RTCP/Cobalt/GPUProc/src/BlockID.cc new file mode 100644 index 00000000000..3eea8c264f4 --- /dev/null +++ b/RTCP/Cobalt/GPUProc/src/BlockID.cc @@ -0,0 +1,36 @@ +//# BlockID.cc +//# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy) +//# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +//# +//# This file is part of the LOFAR software suite. +//# The LOFAR software suite is free software: you can redistribute it and/or +//# modify it under the terms of the GNU General Public License as published +//# by the Free Software Foundation, either version 3 of the License, or +//# (at your option) any later version. +//# +//# The LOFAR software suite is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License along +//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +//# +//# $Id$ +#include <lofar_config.h> + +#include "BlockID.h" + +namespace LOFAR +{ + namespace Cobalt + { + std::ostream &operator<<(std::ostream &str, const struct BlockID &id) { + str << "block " << id.block << " subband " << id.globalSubbandIdx << " (local index " << id.localSubbandIdx << ")"; + + return str; + } + + } +} + diff --git a/RTCP/Cobalt/GPUProc/src/BlockID.h b/RTCP/Cobalt/GPUProc/src/BlockID.h index 77a9e76fab2..30dcb6eca5f 100644 --- a/RTCP/Cobalt/GPUProc/src/BlockID.h +++ b/RTCP/Cobalt/GPUProc/src/BlockID.h @@ -21,6 +21,8 @@ #ifndef LOFAR_GPUPROC_BLOCKID_H #define LOFAR_GPUPROC_BLOCKID_H +#include <iostream> + namespace LOFAR { namespace Cobalt @@ -36,6 +38,7 @@ namespace LOFAR size_t localSubbandIdx; }; + std::ostream &operator<<(std::ostream &str, const struct BlockID &id); } // namespace Cobalt } // namespace LOFAR diff --git a/RTCP/Cobalt/GPUProc/src/CMakeLists.txt b/RTCP/Cobalt/GPUProc/src/CMakeLists.txt index 10f6e55749d..1900979c5e0 100644 --- a/RTCP/Cobalt/GPUProc/src/CMakeLists.txt +++ b/RTCP/Cobalt/GPUProc/src/CMakeLists.txt @@ -10,6 +10,7 @@ execute_process(COMMAND ${CMAKE_COMMAND} -E create_symlink set(_gpuproc_sources #Package__Version.cc BandPass.cc + BlockID.cc FilterBank.cc global_defines.cc Storage/SSH.cc diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc index faa367143cd..bbe693198c2 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/CorrelatorPipeline.cc @@ -257,9 +257,11 @@ namespace LOFAR SmartPtr<WorkQueueInputData> data = queue.inputPool.free.remove(); // Annotate the block - data->blockID.block = block; - data->blockID.globalSubbandIdx = subbandIndices[inputIdx]; - data->blockID.localSubbandIdx = inputIdx; + struct BlockID id; + id.block = block; + id.globalSubbandIdx = subbandIndices[inputIdx]; + id.localSubbandIdx = inputIdx; + data->blockID = id; // Incorporate it in the receiver's input set. for (size_t stat = 0; stat < ps.nrStations(); ++stat) { @@ -324,16 +326,15 @@ namespace LOFAR // Keep fetching input objects until end-of-input while ((input = workQueue.inputPool.filled.remove()) != NULL) { - size_t block = input->blockID.block; - unsigned globalSubbandIdx = input->blockID.globalSubbandIdx; + const struct BlockID id = input->blockID; - LOG_INFO_STR("[block " << block << ", subband " << globalSubbandIdx << "] Processing start"); + LOG_INFO_STR("[" << id << "] Processing start"); // Also fetch an output object to store results SmartPtr<CorrelatedDataHostBuffer> output = workQueue.outputPool.free.remove(); ASSERT(output != NULL); // Only we signal end-of-data, so we should never receive it - output->blockID = input->blockID; + output->blockID = id; // Perform calculations workQueue.timers["CPU - process"]->start(); @@ -348,7 +349,7 @@ namespace LOFAR workQueue.inputPool.free.append(input); ASSERT(!input); - LOG_DEBUG_STR("[block " << block << ", subband " << globalSubbandIdx << "] Forwarded output to post processing"); + LOG_DEBUG_STR("[" << id << "] Forwarded output to post processing"); } } @@ -363,23 +364,23 @@ namespace LOFAR // Keep fetching output objects until end-of-output while ((output = workQueue.outputPool.filled.remove()) != NULL) { - size_t block = output->blockID.block; - unsigned globalSubbandIdx = output->blockID.globalSubbandIdx; - unsigned localSubbandIdx = output->blockID.localSubbandIdx; + const struct BlockID id = output->blockID; - LOG_INFO_STR("[block " << block << ", subband " << globalSubbandIdx << "] Post processing start"); + LOG_INFO_STR("[" << id << "] Post processing start"); workQueue.timers["CPU - postprocess"]->start(); workQueue.postprocessSubband(*output); workQueue.timers["CPU - postprocess"]->stop(); // Hand off output, force in-order as Storage expects it that way - subbandPool[localSubbandIdx].sync.waitFor(block); + struct Output &pool = subbandPool[id.localSubbandIdx]; + + pool.sync.waitFor(id.block); // We do the ordering, so we set the sequence numbers - output->setSequenceNumber(block); + output->setSequenceNumber(id.block); - if (!subbandPool[localSubbandIdx].bequeue->append(output)) { + if (!pool.bequeue->append(output)) { nrBlocksDropped++; //LOG_WARN_STR("[block " << block << "] Dropped for subband " << globalSubbandIdx); @@ -390,11 +391,11 @@ namespace LOFAR } // Allow next block to be written - subbandPool[localSubbandIdx].sync.advanceTo(block + 1); + pool.sync.advanceTo(id.block + 1); ASSERT(!output); - LOG_DEBUG_STR("[block " << block << ", subband " << globalSubbandIdx << "] Forwarded output to writer"); + LOG_DEBUG_STR("[" << id << "] Forwarded output to writer"); if (time(0) != lastLogTime) { lastLogTime = time(0); @@ -411,18 +412,18 @@ namespace LOFAR // Connect to output stream try { - if (ps.getHostName(CORRELATED_DATA, subband) == "") { + if (ps.getHostName(CORRELATED_DATA, globalSubbandIdx) == "") { // an empty host name means 'write to disk directly', to // make debugging easier for now - outputStream = new FileStream(ps.getFileName(CORRELATED_DATA, subband), 0666); + outputStream = new FileStream(ps.getFileName(CORRELATED_DATA, globalSubbandIdx), 0666); } else { // connect to the Storage_main process for this output - const std::string desc = getStreamDescriptorBetweenIONandStorage(ps, CORRELATED_DATA, subband); + const std::string desc = getStreamDescriptorBetweenIONandStorage(ps, CORRELATED_DATA, globalSubbandIdx); outputStream = createStream(desc, false, 0); } } catch(Exception &ex) { - LOG_ERROR_STR("Dropping rest of subband " << subband << ": " << ex); + LOG_ERROR_STR("Dropping rest of subband " << globalSubbandIdx << ": " << ex); outputStream = new NullStream; } @@ -431,18 +432,18 @@ namespace LOFAR // Process pool elements until end-of-output while ((outputData = output.bequeue->remove()) != NULL) { - size_t block = outputData->blockID.block; - ASSERT( globalSubbandIdx == outputData->blockID.globalSubbandIdx ); + const struct BlockID id = outputData->blockID; + ASSERT( globalSubbandIdx == id.globalSubbandIdx ); CorrelatorWorkQueue &queue = outputData->queue; // cache queue object, because `output' will be destroyed - LOG_INFO_STR("[block " << block << ", subband " << globalSubbandIdx << "] Writing start"); + LOG_INFO_STR("[" << id << "] Writing start"); // Write block to disk try { outputData->write(outputStream.get(), true); } catch(Exception &ex) { - LOG_ERROR_STR("Dropping rest of subband " << globalSubbandIdx << ": " << ex); + LOG_ERROR_STR("Dropping rest of subband " << id.globalSubbandIdx << ": " << ex); outputStream = new NullStream; } @@ -452,7 +453,7 @@ namespace LOFAR ASSERT(!outputData); - LOG_INFO_STR("[block " << block << ", subband " << globalSubbandIdx << "] Done"); + LOG_INFO_STR("[" << id << "] Done"); } } diff --git a/RTCP/Cobalt/GPUProc/test/testParset.sh b/RTCP/Cobalt/GPUProc/test/testParset.sh index 2b565356f22..e047965591d 100755 --- a/RTCP/Cobalt/GPUProc/test/testParset.sh +++ b/RTCP/Cobalt/GPUProc/test/testParset.sh @@ -98,9 +98,9 @@ function parse_logs export LOFARROOT=$srcdir/.. && # run correlator -- without profiling - mpirun -H localhost -np 3 $BINDIR/rtcp $PARSET > performance_normal.txt 2>&1 && + mpirun -H localhost -np 4 $BINDIR/rtcp $PARSET > performance_normal.txt 2>&1 && # run correlator -- with profiling - mpirun -H localhost -np 3 $BINDIR/rtcp -p $PARSET > performance_profiled.txt 2>&1 && + mpirun -H localhost -np 4 $BINDIR/rtcp -p $PARSET > performance_profiled.txt 2>&1 && # compare output if [ "x" != "x$REFDIR" ] -- GitLab