diff --git a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/BeamletBuffer.h b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/BeamletBuffer.h index f9c7a9d9b531a5ad6da4e27ee90d91173b848328..81b0fd174e155930e8465d93abe709df2173da25 100644 --- a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/BeamletBuffer.h +++ b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/BeamletBuffer.h @@ -36,6 +36,7 @@ #include <CS1_Interface/DH_RSP.h> #include <CS1_Interface/RSPTimeStamp.h> #include <CS1_Interface/SparseSet.h> +#include <boost/thread.hpp> namespace LOFAR { @@ -88,8 +89,12 @@ namespace LOFAR }; } + // checked for skippeddata and flag it in chunks + void checkForSkippedData(TimeStamp writeBegin); + //# Datamembers vector<Beamlet *> itsSBBuffers; + mutex itsFlagsMutex; SparseSet itsFlags; uint itsNSubbands; int itsSize; @@ -101,6 +106,7 @@ namespace LOFAR // These are for statistics uint itsDroppedItems; uint itsDummyItems; + uint itsSkippedItems; NSTimer itsWriteTimer; NSTimer itsReadTimer; diff --git a/Appl/CEP/CS1/CS1_InputSection/src/BeamletBuffer.cc b/Appl/CEP/CS1/CS1_InputSection/src/BeamletBuffer.cc index 5bc06924d6a2f20bffbdf0a01dc6dec492aae57c..a169d7014ac469d9031b5606de12f5171a1fe760 100644 --- a/Appl/CEP/CS1/CS1_InputSection/src/BeamletBuffer.cc +++ b/Appl/CEP/CS1/CS1_InputSection/src/BeamletBuffer.cc @@ -39,18 +39,20 @@ namespace LOFAR { itsLockedRange(bufferSize, readWriteDelay, bufferSize - history, 0), itsDroppedItems(0), itsDummyItems(0), + itsSkippedItems(0), itsWriteTimer("write"), itsReadTimer("read") { for (uint sb = 0; sb < nSubbands; sb ++) { itsSBBuffers.push_back(new Beamlet[bufferSize]); } + mutex::scoped_lock sl(itsFlagsMutex); itsFlags.include(0, bufferSize); } BeamletBuffer::~BeamletBuffer() { - cout<<"BeamletBuffer did not receive "<<itsDummyItems<<" stamps and received "<<itsDroppedItems<<" items too late."<<endl; + cout<<"BeamletBuffer did not receive "<<itsDummyItems<<" stamps and received "<<itsDroppedItems<<" items too late. "<<itsSkippedItems<<" items were skipped (but may be received later)."<<endl; cout<<"BeamletBufferTimers:"<<endl; cout<<itsReadTimer<<endl; cout<<itsWriteTimer<<endl; @@ -62,17 +64,22 @@ namespace LOFAR { } } - uint BeamletBuffer::writeElements(Beamlet* data, TimeStamp begin, uint nElements, uint stride) - { - // if this part start beyond itsHighestWritten, there is a gap in the data in the buffer - // so set that data to zero and invalidate it. - if ((begin > itsHighestWritten) and (itsHighestWritten > TimeStamp())) { - TimeStamp realBegin = itsLockedRange.writeLock(itsHighestWritten, begin); + void BeamletBuffer::checkForSkippedData(TimeStamp writeBegin) { + // flag the data from itsHighestWritten to end + // in debug mode also put zeros there + + while ((writeBegin > itsHighestWritten) and (itsHighestWritten > TimeStamp())) { + // take only the first part, so the buffer won't block + TimeStamp flagEnd = itsHighestWritten + itsSize/4; + if (flagEnd > writeBegin) flagEnd = writeBegin; + TimeStamp realBegin = itsLockedRange.writeLock(itsHighestWritten, flagEnd); itsWriteTimer.start(); + cerr<<"BeamletBuffer: skipping "<<flagEnd<<" - "<<itsHighestWritten<<" ("<<flagEnd-itsHighestWritten<<")"<<endl; + itsSkippedItems += flagEnd - itsHighestWritten; // we skipped a part, so write zeros there uint startI = mapTime2Index(realBegin); - uint endI = mapTime2Index(begin); + uint endI = mapTime2Index(flagEnd); if (endI < startI) { // the data wraps around the allocated memory, so do it in two parts @@ -83,6 +90,7 @@ namespace LOFAR { memset(&(itsSBBuffers[sb])[0], 0, endI * sizeof(Beamlet)); } #endif + mutex::scoped_lock sl(itsFlagsMutex); itsFlags.include(0, endI).include(startI, firstChunk); } else { #if defined USE_DEBUG @@ -90,15 +98,27 @@ namespace LOFAR { memset(&(itsSBBuffers[sb])[startI], 0, (endI - startI) * sizeof(Beamlet)); } #endif + mutex::scoped_lock sl(itsFlagsMutex); itsFlags.include(startI, endI); - } + } itsWriteTimer.stop(); - itsLockedRange.writeUnlock(begin); - } + if (itsHighestWritten < flagEnd) itsHighestWritten = flagEnd; + itsLockedRange.writeUnlock(flagEnd); + } + } + + uint BeamletBuffer::writeElements(Beamlet* data, TimeStamp begin, uint nElements, uint stride) + { + // if this part start beyond itsHighestWritten, there is a gap in the data in the buffer + // so set that data to zero and invalidate it. + //cerr<<"BeamletBuffer checking for skipped data"<<endl; + checkForSkippedData(begin); // Now write the normal data + //cerr<<"BeamletBuffer writing normal data"<<endl; TimeStamp end = begin + nElements; TimeStamp realBegin = itsLockedRange.writeLock(begin, end); + //cerr<<"BeamletBuffer writelock received"<<endl; itsDroppedItems += realBegin - begin; @@ -121,6 +141,7 @@ namespace LOFAR { } data += stride; } + mutex::scoped_lock sl(itsFlagsMutex); itsFlags.exclude(startI, itsSize).exclude(0, endI); } else { for (uint i = startI; i < endI; i ++) { @@ -129,6 +150,7 @@ namespace LOFAR { } data += stride; } + mutex::scoped_lock sl(itsFlagsMutex); itsFlags.exclude(startI, endI); } @@ -166,15 +188,15 @@ namespace LOFAR { //itsDummyItems += blabla; } + mutex::scoped_lock sl(itsFlagsMutex); *flags |= (itsFlags.subset(0, endI) += firstChunk); *flags |= (itsFlags.subset(startI, itsSize) -= startI); } else { // copy in one part for (uint sb = 0; sb < itsNSubbands; sb++) { memcpy(&buffers[sb][0], &itsSBBuffers[sb][startI], (endI - startI) * sizeof(Beamlet)); - //flags[sb]->setFlags(itsFlags[blabla]); - //itsDummyItems += blabla; } + mutex::scoped_lock sl(itsFlagsMutex); *flags |= (itsFlags.subset(startI, endI) -= startI); } @@ -185,6 +207,7 @@ namespace LOFAR { memset(&buffers[sb][0], 0, nInvalid * sizeof(Beamlet)); } #endif + //cout<<"BeamletBuffer: getting elements "<<begin<<" - "<<begin+nElements<<": "<<*flags<<endl; itsReadTimer.stop(); itsLockedRange.readUnlock(end);