Skip to content
Snippets Groups Projects
Commit fdc2294e authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #4391: Cleanup, and shortened code by defining an operator<< on BlockID

parent 6a2935b6
No related branches found
No related tags found
No related merge requests found
//# BlockID.cc
//# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy)
//# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
//#
//# This file is part of the LOFAR software suite.
//# The LOFAR software suite is free software: you can redistribute it and/or
//# modify it under the terms of the GNU General Public License as published
//# by the Free Software Foundation, either version 3 of the License, or
//# (at your option) any later version.
//#
//# The LOFAR software suite is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License along
//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
//#
//# $Id$
#include <lofar_config.h>
#include "BlockID.h"
namespace LOFAR
{
namespace Cobalt
{
std::ostream &operator<<(std::ostream &str, const struct BlockID &id) {
str << "block " << id.block << " subband " << id.globalSubbandIdx << " (local index " << id.localSubbandIdx << ")";
return str;
}
}
}
......@@ -21,6 +21,8 @@
#ifndef LOFAR_GPUPROC_BLOCKID_H
#define LOFAR_GPUPROC_BLOCKID_H
#include <iostream>
namespace LOFAR
{
namespace Cobalt
......@@ -36,6 +38,7 @@ namespace LOFAR
size_t localSubbandIdx;
};
std::ostream &operator<<(std::ostream &str, const struct BlockID &id);
} // namespace Cobalt
} // namespace LOFAR
......
......@@ -10,6 +10,7 @@ execute_process(COMMAND ${CMAKE_COMMAND} -E create_symlink
set(_gpuproc_sources
#Package__Version.cc
BandPass.cc
BlockID.cc
FilterBank.cc
global_defines.cc
Storage/SSH.cc
......
......@@ -257,9 +257,11 @@ namespace LOFAR
SmartPtr<WorkQueueInputData> data = queue.inputPool.free.remove();
// Annotate the block
data->blockID.block = block;
data->blockID.globalSubbandIdx = subbandIndices[inputIdx];
data->blockID.localSubbandIdx = inputIdx;
struct BlockID id;
id.block = block;
id.globalSubbandIdx = subbandIndices[inputIdx];
id.localSubbandIdx = inputIdx;
data->blockID = id;
// Incorporate it in the receiver's input set.
for (size_t stat = 0; stat < ps.nrStations(); ++stat) {
......@@ -324,16 +326,15 @@ namespace LOFAR
// Keep fetching input objects until end-of-input
while ((input = workQueue.inputPool.filled.remove()) != NULL) {
size_t block = input->blockID.block;
unsigned globalSubbandIdx = input->blockID.globalSubbandIdx;
const struct BlockID id = input->blockID;
LOG_INFO_STR("[block " << block << ", subband " << globalSubbandIdx << "] Processing start");
LOG_INFO_STR("[" << id << "] Processing start");
// Also fetch an output object to store results
SmartPtr<CorrelatedDataHostBuffer> output = workQueue.outputPool.free.remove();
ASSERT(output != NULL); // Only we signal end-of-data, so we should never receive it
output->blockID = input->blockID;
output->blockID = id;
// Perform calculations
workQueue.timers["CPU - process"]->start();
......@@ -348,7 +349,7 @@ namespace LOFAR
workQueue.inputPool.free.append(input);
ASSERT(!input);
LOG_DEBUG_STR("[block " << block << ", subband " << globalSubbandIdx << "] Forwarded output to post processing");
LOG_DEBUG_STR("[" << id << "] Forwarded output to post processing");
}
}
......@@ -363,23 +364,23 @@ namespace LOFAR
// Keep fetching output objects until end-of-output
while ((output = workQueue.outputPool.filled.remove()) != NULL) {
size_t block = output->blockID.block;
unsigned globalSubbandIdx = output->blockID.globalSubbandIdx;
unsigned localSubbandIdx = output->blockID.localSubbandIdx;
const struct BlockID id = output->blockID;
LOG_INFO_STR("[block " << block << ", subband " << globalSubbandIdx << "] Post processing start");
LOG_INFO_STR("[" << id << "] Post processing start");
workQueue.timers["CPU - postprocess"]->start();
workQueue.postprocessSubband(*output);
workQueue.timers["CPU - postprocess"]->stop();
// Hand off output, force in-order as Storage expects it that way
subbandPool[localSubbandIdx].sync.waitFor(block);
struct Output &pool = subbandPool[id.localSubbandIdx];
pool.sync.waitFor(id.block);
// We do the ordering, so we set the sequence numbers
output->setSequenceNumber(block);
output->setSequenceNumber(id.block);
if (!subbandPool[localSubbandIdx].bequeue->append(output)) {
if (!pool.bequeue->append(output)) {
nrBlocksDropped++;
//LOG_WARN_STR("[block " << block << "] Dropped for subband " << globalSubbandIdx);
......@@ -390,11 +391,11 @@ namespace LOFAR
}
// Allow next block to be written
subbandPool[localSubbandIdx].sync.advanceTo(block + 1);
pool.sync.advanceTo(id.block + 1);
ASSERT(!output);
LOG_DEBUG_STR("[block " << block << ", subband " << globalSubbandIdx << "] Forwarded output to writer");
LOG_DEBUG_STR("[" << id << "] Forwarded output to writer");
if (time(0) != lastLogTime) {
lastLogTime = time(0);
......@@ -411,18 +412,18 @@ namespace LOFAR
// Connect to output stream
try {
if (ps.getHostName(CORRELATED_DATA, subband) == "") {
if (ps.getHostName(CORRELATED_DATA, globalSubbandIdx) == "") {
// an empty host name means 'write to disk directly', to
// make debugging easier for now
outputStream = new FileStream(ps.getFileName(CORRELATED_DATA, subband), 0666);
outputStream = new FileStream(ps.getFileName(CORRELATED_DATA, globalSubbandIdx), 0666);
} else {
// connect to the Storage_main process for this output
const std::string desc = getStreamDescriptorBetweenIONandStorage(ps, CORRELATED_DATA, subband);
const std::string desc = getStreamDescriptorBetweenIONandStorage(ps, CORRELATED_DATA, globalSubbandIdx);
outputStream = createStream(desc, false, 0);
}
} catch(Exception &ex) {
LOG_ERROR_STR("Dropping rest of subband " << subband << ": " << ex);
LOG_ERROR_STR("Dropping rest of subband " << globalSubbandIdx << ": " << ex);
outputStream = new NullStream;
}
......@@ -431,18 +432,18 @@ namespace LOFAR
// Process pool elements until end-of-output
while ((outputData = output.bequeue->remove()) != NULL) {
size_t block = outputData->blockID.block;
ASSERT( globalSubbandIdx == outputData->blockID.globalSubbandIdx );
const struct BlockID id = outputData->blockID;
ASSERT( globalSubbandIdx == id.globalSubbandIdx );
CorrelatorWorkQueue &queue = outputData->queue; // cache queue object, because `output' will be destroyed
LOG_INFO_STR("[block " << block << ", subband " << globalSubbandIdx << "] Writing start");
LOG_INFO_STR("[" << id << "] Writing start");
// Write block to disk
try {
outputData->write(outputStream.get(), true);
} catch(Exception &ex) {
LOG_ERROR_STR("Dropping rest of subband " << globalSubbandIdx << ": " << ex);
LOG_ERROR_STR("Dropping rest of subband " << id.globalSubbandIdx << ": " << ex);
outputStream = new NullStream;
}
......@@ -452,7 +453,7 @@ namespace LOFAR
ASSERT(!outputData);
LOG_INFO_STR("[block " << block << ", subband " << globalSubbandIdx << "] Done");
LOG_INFO_STR("[" << id << "] Done");
}
}
......
......@@ -98,9 +98,9 @@ function parse_logs
export LOFARROOT=$srcdir/.. &&
# run correlator -- without profiling
mpirun -H localhost -np 3 $BINDIR/rtcp $PARSET > performance_normal.txt 2>&1 &&
mpirun -H localhost -np 4 $BINDIR/rtcp $PARSET > performance_normal.txt 2>&1 &&
# run correlator -- with profiling
mpirun -H localhost -np 3 $BINDIR/rtcp -p $PARSET > performance_profiled.txt 2>&1 &&
mpirun -H localhost -np 4 $BINDIR/rtcp -p $PARSET > performance_profiled.txt 2>&1 &&
# compare output
if [ "x" != "x$REFDIR" ]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment