From add22818bb2f7d89ae8c00af5a07ce6bb34a9c65 Mon Sep 17 00:00:00 2001
From: Lofar test build account <klijn@astron.nl>
Date: Thu, 16 Jan 2014 14:46:04 +0000
Subject: [PATCH] Task #5385: [mol] Fixed race condition causing dead lock (if
 one thread adds a subband for a block that has to be fetched, it holds a
 mutex needed by other threads to complete older blocks)

---
 RTCP/Cobalt/CoInterface/src/TABTranspose.cc | 88 ++++++++++++++++++---
 RTCP/Cobalt/CoInterface/src/TABTranspose.h  | 18 ++++-
 2 files changed, 92 insertions(+), 14 deletions(-)

diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc
index 8a6d51f2dc7..b1b9417c7da 100644
--- a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc
+++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc
@@ -50,21 +50,28 @@ void Subband::write(Stream &stream) const {
   stream.write(&dim1, sizeof dim1);
   stream.write(&dim2, sizeof dim2);
   stream.write(data.origin(), data.num_elements() * sizeof *data.origin());
-  //LOG_DEBUG_STR("Written block");
+
+  LOG_DEBUG_STR("Written block "  << id);
 }
 
 
 void Subband::read(Stream &stream) {
-  //LOG_DEBUG_STR("Reading block id");
+  LOG_DEBUG_STR("Reading block id");
   stream.read(&id, sizeof id);
-  //LOG_DEBUG_STR("Read block id");
+  LOG_DEBUG_STR("Read block id " << id);
 
   size_t dim1, dim2;
   stream.read(&dim1, sizeof dim1);
   stream.read(&dim2, sizeof dim2);
   data.resize(boost::extents[dim1][dim2]);
   stream.read(data.origin(), data.num_elements() * sizeof *data.origin());
-  //LOG_DEBUG_STR("Read block");
+  LOG_DEBUG_STR("Read block " << id);
+}
+
+
+std::ostream &operator<<(std::ostream &str, const Subband::BlockID &id)
+{
+  return str << "file " << id.fileIdx << " block " << id.block << " subband " << id.subband;
 }
 
 
@@ -74,6 +81,7 @@ Block::Block( size_t nrSubbands, size_t nrSamples, size_t nrChannels )
   subbandWritten(nrSubbands, false),
   fileIdx(0),
   block(0),
+  nrSubbands(nrSubbands),
   nrSubbandsLeft(nrSubbands)
 {
 }
@@ -86,16 +94,22 @@ void Block::addSubband( const Subband &subband ) {
   ASSERT(subband.id.fileIdx == fileIdx);
   ASSERT(subband.id.block   == block);
 
+  // Subbands should not arrive twice
+  ASSERT(subbandWritten[subband.id.subband] == false);
+
   memcpy(samples[subband.id.subband].origin(), subband.data.origin(), subband.data.num_elements() * sizeof *subband.data.origin());
   subbandWritten[subband.id.subband] = true;
 
   nrSubbandsLeft--;
+
+  LOG_DEBUG_STR("Block: added " << subband.id << ", " << nrSubbandsLeft << " subbands left");
 }
 
 
 void Block::zeroRemainingSubbands() {
   for (size_t subbandIdx = 0; subbandIdx < subbandWritten.size(); ++subbandIdx) {
     if (!subbandWritten[subbandIdx]) {
+      LOG_DEBUG_STR("File " << fileIdx << " block " << block << ": zeroing subband " << subbandIdx);
       memset(samples[subbandIdx].origin(), 0, samples[subbandIdx].size() * sizeof *samples[subbandIdx].origin());
     }
   }
@@ -107,11 +121,27 @@ bool Block::complete() const {
 }
 
 
+void Block::reset( size_t fileIdx, size_t blockIdx ) {
+  // Apply annotation, also for the super class
+  fileIdx = fileIdx;
+  block = blockIdx;
+
+  setSequenceNumber(blockIdx);
+
+  // Mark all subbands as not-written
+  for (size_t subbandIdx = 0; subbandIdx < subbandWritten.size(); ++subbandIdx) {
+    subbandWritten[subbandIdx] = false;
+  }
+  nrSubbandsLeft = nrSubbands;
+}
+
+
 BlockCollector::BlockCollector( Pool<Block> &outputPool, size_t fileIdx, size_t nrBlocks, size_t maxBlocksInFlight )
 :
   outputPool(outputPool),
   fileIdx(fileIdx),
   nrBlocks(nrBlocks),
+  fetchingNextBlock(false),
   maxBlocksInFlight(maxBlocksInFlight),
   canDrop(maxBlocksInFlight > 0),
   lastEmitted(-1)
@@ -122,6 +152,8 @@ BlockCollector::BlockCollector( Pool<Block> &outputPool, size_t fileIdx, size_t
 void BlockCollector::addSubband( const Subband &subband ) {
   ScopedLock sl(mutex);
 
+  LOG_DEBUG_STR("BlockCollector: Add " << subband.id);
+
   const size_t &blockIdx = subband.id.block;
 
   ASSERT(nrBlocks == 0 || blockIdx < nrBlocks);
@@ -130,7 +162,7 @@ void BlockCollector::addSubband( const Subband &subband ) {
     if (canDrop) {
       if ((ssize_t)blockIdx <= lastEmitted) {
 	      // too late -- discard packet
-        LOG_DEBUG_STR("TABTranspose::BlockCollector: Dropped subband " << subband.id.subband  << " of file " << subband.id.fileIdx);
+        LOG_DEBUG_STR("BlockCollector: Dropped subband " << subband.id.subband  << " of file " << subband.id.fileIdx);
 	      return;
       }
     } else {
@@ -139,10 +171,11 @@ void BlockCollector::addSubband( const Subband &subband ) {
       ASSERTSTR((ssize_t)blockIdx > lastEmitted, "Received block " << blockIdx << ", but already emitted up to " << lastEmitted << " for file " << subband.id.fileIdx << " subband " << subband.id.subband);
     }
 
+    // Note: fetch can release the mutex if it has to wait,
+    //       causing any assumptions made earlier to be invalid.
     fetch(blockIdx);
   }
 
-  // augment existing block
   SmartPtr<Block> &block = blocks.at(blockIdx);
 
   block->addSubband(subband);
@@ -156,7 +189,7 @@ void BlockCollector::addSubband( const Subband &subband ) {
 
     if (nrBlocks > 0 && blockIdx == nrBlocks - 1) {
       // Received last block -- wrap up
-      LOG_INFO_STR("TABTranspose::BlockCollector: Received last block of file " << fileIdx);
+      LOG_INFO_STR("BlockCollector: Received last block of file " << fileIdx);
 
       ASSERT(blocks.empty());
 
@@ -209,6 +242,8 @@ void BlockCollector::emit(size_t blockIdx) {
   // clear data we didn't receive
   SmartPtr<Block> &block = blocks.at(blockIdx);
 
+  LOG_DEBUG_STR("BlockCollector: emitting block " << blockIdx << " of file " << block->fileIdx);
+
   block->zeroRemainingSubbands();
   
   // emit to outputPool.filled()
@@ -234,17 +269,46 @@ bool BlockCollector::have(size_t block) const {
 void BlockCollector::fetch(size_t block) {
   ASSERT(!have(block));
 
+  // Make sure we don't exceed our maximum cache size
   if (canDrop && blocks.size() >= maxBlocksInFlight) {
     // No more room -- force out oldest block
     emit(minBlock());
+  } else if (!canDrop) {
+    if (fetchingNextBlock) {
+      // We know that if some other thread is fetching, that it MUST be the same block.
+      // That's because we cannot drop data, so the same block cannot be completed without
+      // before this fetch() call returns.
+      LOG_DEBUG_STR("BlockCollector: some thread is already fetching block " << block);
+
+      fetchSignal.wait(mutex);
+
+      ASSERT(have(block));
+      return;
+    }
   }
 
-  blocks[block] = outputPool.free.remove();
+  LOG_DEBUG_STR("BlockCollector: fetching block " << block);
+
+  SmartPtr<Block> newBlock;
+
+  fetchingNextBlock = true;
+  {
+    // Allow other threads to manipulate older blocks while we're waiting for
+    // a new free one.
+    ScopedLock sl(mutex, true);
+    newBlock = outputPool.free.remove();
+  }
+  fetchingNextBlock = false;
+
+  LOG_DEBUG_STR("BlockCollector: fetched block " << block);
+
+  // Add and annotate
+  ASSERT(!have(block));
+  blocks[block] = newBlock;
+  blocks[block]->reset(fileIdx, block);
 
-  // Annotate
-  blocks[block]->fileIdx = fileIdx;
-  blocks[block]->block = block;
-  blocks[block]->setSequenceNumber(block);
+  // Signal other threads that are waiting for this block
+  fetchSignal.broadcast();
 }
 
 
diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.h b/RTCP/Cobalt/CoInterface/src/TABTranspose.h
index 2b5940aec53..1b6130d3f8e 100644
--- a/RTCP/Cobalt/CoInterface/src/TABTranspose.h
+++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.h
@@ -54,7 +54,7 @@ namespace LOFAR
 
         Subband( size_t nrSamples = 0, size_t nrChannels = 0 );
 
-        struct {
+        struct BlockID {
           size_t fileIdx;
           size_t subband;
           size_t block;
@@ -64,6 +64,8 @@ namespace LOFAR
         void read(Stream &stream);
       };
 
+      std::ostream &operator<<(std::ostream &str, const Subband::BlockID &id);
+
       /*
        * A block of data, representing for one time slice all
        * subbands.
@@ -97,7 +99,16 @@ namespace LOFAR
          */
         bool complete() const;
 
+        /*
+         * Reset the meta data for this block. NOTE: The dimensions
+         * of the data remain unchanged.
+         */
+        void reset( size_t fileIdx, size_t blockIdx );
+
       private:
+        // The total number of subbands in this block.
+        const size_t nrSubbands;
+
         // The number of subbands left to receive.
         size_t nrSubbandsLeft;
       };
@@ -152,7 +163,10 @@ namespace LOFAR
         Pool<Block> &outputPool;
         const size_t fileIdx;
         const size_t nrBlocks;
-        Mutex mutex;
+        Mutex mutex; // protects concurrent access to `blocks'
+
+        bool fetchingNextBlock;
+        Condition fetchSignal;
 
         // upper limit for blocks.size(), or 0 if unlimited
         const size_t maxBlocksInFlight;
-- 
GitLab