Skip to content
Snippets Groups Projects
Commit add22818 authored by Lofar test build account's avatar Lofar test build account
Browse files

Task #5385: [mol] Fixed race condition causing dead lock (if one thread adds a...

Task #5385: [mol] Fixed race condition causing dead lock (if one thread adds a subband for a block that has to be fetched, it holds a mutex needed by other threads to complete older blocks)
parent 0c28f56c
No related branches found
No related tags found
No related merge requests found
......@@ -50,21 +50,28 @@ void Subband::write(Stream &stream) const {
stream.write(&dim1, sizeof dim1);
stream.write(&dim2, sizeof dim2);
stream.write(data.origin(), data.num_elements() * sizeof *data.origin());
//LOG_DEBUG_STR("Written block");
LOG_DEBUG_STR("Written block " << id);
}
void Subband::read(Stream &stream) {
//LOG_DEBUG_STR("Reading block id");
LOG_DEBUG_STR("Reading block id");
stream.read(&id, sizeof id);
//LOG_DEBUG_STR("Read block id");
LOG_DEBUG_STR("Read block id " << id);
size_t dim1, dim2;
stream.read(&dim1, sizeof dim1);
stream.read(&dim2, sizeof dim2);
data.resize(boost::extents[dim1][dim2]);
stream.read(data.origin(), data.num_elements() * sizeof *data.origin());
//LOG_DEBUG_STR("Read block");
LOG_DEBUG_STR("Read block " << id);
}
std::ostream &operator<<(std::ostream &str, const Subband::BlockID &id)
{
return str << "file " << id.fileIdx << " block " << id.block << " subband " << id.subband;
}
......@@ -74,6 +81,7 @@ Block::Block( size_t nrSubbands, size_t nrSamples, size_t nrChannels )
subbandWritten(nrSubbands, false),
fileIdx(0),
block(0),
nrSubbands(nrSubbands),
nrSubbandsLeft(nrSubbands)
{
}
......@@ -86,16 +94,22 @@ void Block::addSubband( const Subband &subband ) {
ASSERT(subband.id.fileIdx == fileIdx);
ASSERT(subband.id.block == block);
// Subbands should not arrive twice
ASSERT(subbandWritten[subband.id.subband] == false);
memcpy(samples[subband.id.subband].origin(), subband.data.origin(), subband.data.num_elements() * sizeof *subband.data.origin());
subbandWritten[subband.id.subband] = true;
nrSubbandsLeft--;
LOG_DEBUG_STR("Block: added " << subband.id << ", " << nrSubbandsLeft << " subbands left");
}
void Block::zeroRemainingSubbands() {
for (size_t subbandIdx = 0; subbandIdx < subbandWritten.size(); ++subbandIdx) {
if (!subbandWritten[subbandIdx]) {
LOG_DEBUG_STR("File " << fileIdx << " block " << block << ": zeroing subband " << subbandIdx);
memset(samples[subbandIdx].origin(), 0, samples[subbandIdx].size() * sizeof *samples[subbandIdx].origin());
}
}
......@@ -107,11 +121,27 @@ bool Block::complete() const {
}
void Block::reset( size_t fileIdx, size_t blockIdx ) {
// Apply annotation, also for the super class
fileIdx = fileIdx;
block = blockIdx;
setSequenceNumber(blockIdx);
// Mark all subbands as not-written
for (size_t subbandIdx = 0; subbandIdx < subbandWritten.size(); ++subbandIdx) {
subbandWritten[subbandIdx] = false;
}
nrSubbandsLeft = nrSubbands;
}
BlockCollector::BlockCollector( Pool<Block> &outputPool, size_t fileIdx, size_t nrBlocks, size_t maxBlocksInFlight )
:
outputPool(outputPool),
fileIdx(fileIdx),
nrBlocks(nrBlocks),
fetchingNextBlock(false),
maxBlocksInFlight(maxBlocksInFlight),
canDrop(maxBlocksInFlight > 0),
lastEmitted(-1)
......@@ -122,6 +152,8 @@ BlockCollector::BlockCollector( Pool<Block> &outputPool, size_t fileIdx, size_t
void BlockCollector::addSubband( const Subband &subband ) {
ScopedLock sl(mutex);
LOG_DEBUG_STR("BlockCollector: Add " << subband.id);
const size_t &blockIdx = subband.id.block;
ASSERT(nrBlocks == 0 || blockIdx < nrBlocks);
......@@ -130,7 +162,7 @@ void BlockCollector::addSubband( const Subband &subband ) {
if (canDrop) {
if ((ssize_t)blockIdx <= lastEmitted) {
// too late -- discard packet
LOG_DEBUG_STR("TABTranspose::BlockCollector: Dropped subband " << subband.id.subband << " of file " << subband.id.fileIdx);
LOG_DEBUG_STR("BlockCollector: Dropped subband " << subband.id.subband << " of file " << subband.id.fileIdx);
return;
}
} else {
......@@ -139,10 +171,11 @@ void BlockCollector::addSubband( const Subband &subband ) {
ASSERTSTR((ssize_t)blockIdx > lastEmitted, "Received block " << blockIdx << ", but already emitted up to " << lastEmitted << " for file " << subband.id.fileIdx << " subband " << subband.id.subband);
}
// Note: fetch can release the mutex if it has to wait,
// causing any assumptions made earlier to be invalid.
fetch(blockIdx);
}
// augment existing block
SmartPtr<Block> &block = blocks.at(blockIdx);
block->addSubband(subband);
......@@ -156,7 +189,7 @@ void BlockCollector::addSubband( const Subband &subband ) {
if (nrBlocks > 0 && blockIdx == nrBlocks - 1) {
// Received last block -- wrap up
LOG_INFO_STR("TABTranspose::BlockCollector: Received last block of file " << fileIdx);
LOG_INFO_STR("BlockCollector: Received last block of file " << fileIdx);
ASSERT(blocks.empty());
......@@ -209,6 +242,8 @@ void BlockCollector::emit(size_t blockIdx) {
// clear data we didn't receive
SmartPtr<Block> &block = blocks.at(blockIdx);
LOG_DEBUG_STR("BlockCollector: emitting block " << blockIdx << " of file " << block->fileIdx);
block->zeroRemainingSubbands();
// emit to outputPool.filled()
......@@ -234,17 +269,46 @@ bool BlockCollector::have(size_t block) const {
void BlockCollector::fetch(size_t block) {
ASSERT(!have(block));
// Make sure we don't exceed our maximum cache size
if (canDrop && blocks.size() >= maxBlocksInFlight) {
// No more room -- force out oldest block
emit(minBlock());
} else if (!canDrop) {
if (fetchingNextBlock) {
// We know that if some other thread is fetching, that it MUST be the same block.
// That's because we cannot drop data, so the same block cannot be completed without
// before this fetch() call returns.
LOG_DEBUG_STR("BlockCollector: some thread is already fetching block " << block);
fetchSignal.wait(mutex);
ASSERT(have(block));
return;
}
}
blocks[block] = outputPool.free.remove();
LOG_DEBUG_STR("BlockCollector: fetching block " << block);
SmartPtr<Block> newBlock;
fetchingNextBlock = true;
{
// Allow other threads to manipulate older blocks while we're waiting for
// a new free one.
ScopedLock sl(mutex, true);
newBlock = outputPool.free.remove();
}
fetchingNextBlock = false;
LOG_DEBUG_STR("BlockCollector: fetched block " << block);
// Add and annotate
ASSERT(!have(block));
blocks[block] = newBlock;
blocks[block]->reset(fileIdx, block);
// Annotate
blocks[block]->fileIdx = fileIdx;
blocks[block]->block = block;
blocks[block]->setSequenceNumber(block);
// Signal other threads that are waiting for this block
fetchSignal.broadcast();
}
......
......@@ -54,7 +54,7 @@ namespace LOFAR
Subband( size_t nrSamples = 0, size_t nrChannels = 0 );
struct {
struct BlockID {
size_t fileIdx;
size_t subband;
size_t block;
......@@ -64,6 +64,8 @@ namespace LOFAR
void read(Stream &stream);
};
std::ostream &operator<<(std::ostream &str, const Subband::BlockID &id);
/*
* A block of data, representing for one time slice all
* subbands.
......@@ -97,7 +99,16 @@ namespace LOFAR
*/
bool complete() const;
/*
* Reset the meta data for this block. NOTE: The dimensions
* of the data remain unchanged.
*/
void reset( size_t fileIdx, size_t blockIdx );
private:
// The total number of subbands in this block.
const size_t nrSubbands;
// The number of subbands left to receive.
size_t nrSubbandsLeft;
};
......@@ -152,7 +163,10 @@ namespace LOFAR
Pool<Block> &outputPool;
const size_t fileIdx;
const size_t nrBlocks;
Mutex mutex;
Mutex mutex; // protects concurrent access to `blocks'
bool fetchingNextBlock;
Condition fetchSignal;
// upper limit for blocks.size(), or 0 if unlimited
const size_t maxBlocksInFlight;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment