Skip to content
Snippets Groups Projects
Commit 186bb7b9 authored by zwart's avatar zwart
Browse files

BugID: 692

Added statistics for skipped data
Added lock around flags. This could have given a problem in the past (two threads accessing the flags)
parent 104bfb44
No related branches found
No related tags found
No related merge requests found
......@@ -36,6 +36,7 @@
#include <CS1_Interface/DH_RSP.h>
#include <CS1_Interface/RSPTimeStamp.h>
#include <CS1_Interface/SparseSet.h>
#include <boost/thread.hpp>
namespace LOFAR
{
......@@ -88,8 +89,12 @@ namespace LOFAR
};
}
// checked for skippeddata and flag it in chunks
void checkForSkippedData(TimeStamp writeBegin);
//# Datamembers
vector<Beamlet *> itsSBBuffers;
mutex itsFlagsMutex;
SparseSet itsFlags;
uint itsNSubbands;
int itsSize;
......@@ -101,6 +106,7 @@ namespace LOFAR
// These are for statistics
uint itsDroppedItems;
uint itsDummyItems;
uint itsSkippedItems;
NSTimer itsWriteTimer;
NSTimer itsReadTimer;
......
......@@ -39,18 +39,20 @@ namespace LOFAR {
itsLockedRange(bufferSize, readWriteDelay, bufferSize - history, 0),
itsDroppedItems(0),
itsDummyItems(0),
itsSkippedItems(0),
itsWriteTimer("write"),
itsReadTimer("read")
{
for (uint sb = 0; sb < nSubbands; sb ++) {
itsSBBuffers.push_back(new Beamlet[bufferSize]);
}
mutex::scoped_lock sl(itsFlagsMutex);
itsFlags.include(0, bufferSize);
}
BeamletBuffer::~BeamletBuffer()
{
cout<<"BeamletBuffer did not receive "<<itsDummyItems<<" stamps and received "<<itsDroppedItems<<" items too late."<<endl;
cout<<"BeamletBuffer did not receive "<<itsDummyItems<<" stamps and received "<<itsDroppedItems<<" items too late. "<<itsSkippedItems<<" items were skipped (but may be received later)."<<endl;
cout<<"BeamletBufferTimers:"<<endl;
cout<<itsReadTimer<<endl;
cout<<itsWriteTimer<<endl;
......@@ -62,17 +64,22 @@ namespace LOFAR {
}
}
uint BeamletBuffer::writeElements(Beamlet* data, TimeStamp begin, uint nElements, uint stride)
{
// if this part start beyond itsHighestWritten, there is a gap in the data in the buffer
// so set that data to zero and invalidate it.
if ((begin > itsHighestWritten) and (itsHighestWritten > TimeStamp())) {
TimeStamp realBegin = itsLockedRange.writeLock(itsHighestWritten, begin);
void BeamletBuffer::checkForSkippedData(TimeStamp writeBegin) {
// flag the data from itsHighestWritten to end
// in debug mode also put zeros there
while ((writeBegin > itsHighestWritten) and (itsHighestWritten > TimeStamp())) {
// take only the first part, so the buffer won't block
TimeStamp flagEnd = itsHighestWritten + itsSize/4;
if (flagEnd > writeBegin) flagEnd = writeBegin;
TimeStamp realBegin = itsLockedRange.writeLock(itsHighestWritten, flagEnd);
itsWriteTimer.start();
cerr<<"BeamletBuffer: skipping "<<flagEnd<<" - "<<itsHighestWritten<<" ("<<flagEnd-itsHighestWritten<<")"<<endl;
itsSkippedItems += flagEnd - itsHighestWritten;
// we skipped a part, so write zeros there
uint startI = mapTime2Index(realBegin);
uint endI = mapTime2Index(begin);
uint endI = mapTime2Index(flagEnd);
if (endI < startI) {
// the data wraps around the allocated memory, so do it in two parts
......@@ -83,6 +90,7 @@ namespace LOFAR {
memset(&(itsSBBuffers[sb])[0], 0, endI * sizeof(Beamlet));
}
#endif
mutex::scoped_lock sl(itsFlagsMutex);
itsFlags.include(0, endI).include(startI, firstChunk);
} else {
#if defined USE_DEBUG
......@@ -90,15 +98,27 @@ namespace LOFAR {
memset(&(itsSBBuffers[sb])[startI], 0, (endI - startI) * sizeof(Beamlet));
}
#endif
mutex::scoped_lock sl(itsFlagsMutex);
itsFlags.include(startI, endI);
}
}
itsWriteTimer.stop();
itsLockedRange.writeUnlock(begin);
}
if (itsHighestWritten < flagEnd) itsHighestWritten = flagEnd;
itsLockedRange.writeUnlock(flagEnd);
}
}
uint BeamletBuffer::writeElements(Beamlet* data, TimeStamp begin, uint nElements, uint stride)
{
// if this part start beyond itsHighestWritten, there is a gap in the data in the buffer
// so set that data to zero and invalidate it.
//cerr<<"BeamletBuffer checking for skipped data"<<endl;
checkForSkippedData(begin);
// Now write the normal data
//cerr<<"BeamletBuffer writing normal data"<<endl;
TimeStamp end = begin + nElements;
TimeStamp realBegin = itsLockedRange.writeLock(begin, end);
//cerr<<"BeamletBuffer writelock received"<<endl;
itsDroppedItems += realBegin - begin;
......@@ -121,6 +141,7 @@ namespace LOFAR {
}
data += stride;
}
mutex::scoped_lock sl(itsFlagsMutex);
itsFlags.exclude(startI, itsSize).exclude(0, endI);
} else {
for (uint i = startI; i < endI; i ++) {
......@@ -129,6 +150,7 @@ namespace LOFAR {
}
data += stride;
}
mutex::scoped_lock sl(itsFlagsMutex);
itsFlags.exclude(startI, endI);
}
......@@ -166,15 +188,15 @@ namespace LOFAR {
//itsDummyItems += blabla;
}
mutex::scoped_lock sl(itsFlagsMutex);
*flags |= (itsFlags.subset(0, endI) += firstChunk);
*flags |= (itsFlags.subset(startI, itsSize) -= startI);
} else {
// copy in one part
for (uint sb = 0; sb < itsNSubbands; sb++) {
memcpy(&buffers[sb][0], &itsSBBuffers[sb][startI], (endI - startI) * sizeof(Beamlet));
//flags[sb]->setFlags(itsFlags[blabla]);
//itsDummyItems += blabla;
}
mutex::scoped_lock sl(itsFlagsMutex);
*flags |= (itsFlags.subset(startI, endI) -= startI);
}
......@@ -185,6 +207,7 @@ namespace LOFAR {
memset(&buffers[sb][0], 0, nInvalid * sizeof(Beamlet));
}
#endif
//cout<<"BeamletBuffer: getting elements "<<begin<<" - "<<begin+nElements<<": "<<*flags<<endl;
itsReadTimer.stop();
itsLockedRange.readUnlock(end);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment