Skip to content
Snippets Groups Projects
Commit d641b1c0 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #5555: Compute delays in caller's thread, simplifying the code. Caller is...

Task #5555: Compute delays in caller's thread, simplifying the code. Caller is a light-weight thread already (GPUProc/Station/StationInput::StationMetaData)
parent 2bdde61e
No related branches found
No related tags found
No related merge requests found
...@@ -48,33 +48,22 @@ namespace LOFAR ...@@ -48,33 +48,22 @@ namespace LOFAR
stationIdx(stationIdx), stationIdx(stationIdx),
from(from), from(from),
increment(increment), increment(increment),
currentTime(from),
stop(false),
// we need an extra entry for the central beam
buffer(bufferSize, AllDelays(parset)),
head(0),
tail(0),
bufferFree(bufferSize),
bufferUsed(0),
delayTimer("delay producer", true, true) delayTimer("delay producer", true, true)
{ {
ASSERTSTR(test(), "Delay compensation engine is broken"); ASSERTSTR(test(), "Delay compensation engine is broken");
init();
} }
void Delays::start() void Delays::start()
{ {
thread = new Thread(this, &Delays::mainLoop, "[DelayCompensation] ");
} }
Delays::~Delays() Delays::~Delays()
{ {
ScopedDelayCancellation dc; // Semaphores provide cancellation points
// trigger mainLoop and force it to stop
stop = true;
bufferFree.up(nrCalcDelays);
} }
...@@ -148,12 +137,6 @@ namespace LOFAR ...@@ -148,12 +137,6 @@ namespace LOFAR
ScopedLock lock(casacoreMutex); ScopedLock lock(casacoreMutex);
ScopedDelayCancellation dc; ScopedDelayCancellation dc;
// We need bufferSize to be a multiple of batchSize to avoid wraparounds in
// the middle of the batch calculations. This makes life a lot easier and there is no
// need to support other cases.
ASSERT(bufferSize % nrCalcDelays == 0);
// Set an initial epoch for the frame // Set an initial epoch for the frame
frame.set(MEpoch(toUTC(from), MEpoch::UTC)); frame.set(MEpoch(toUTC(from), MEpoch::UTC));
...@@ -281,80 +264,12 @@ namespace LOFAR ...@@ -281,80 +264,12 @@ namespace LOFAR
} }
void Delays::mainLoop()
{
LOG_DEBUG("Delay compensation thread running");
init();
// the current time, in samples
TimeStamp currentTime = from;
try {
while (!stop) {
bufferFree.down(nrCalcDelays);
delayTimer.start();
// Calculate nrCalcDelays seconds worth of delays. Technically, we do not have
// to calculate that many at the end of the run, but there is no need to
// prevent the few excess delays from being calculated.
{
for (size_t i = 0; i < nrCalcDelays; i++) {
// Check whether we will store results in a valid place
ASSERTSTR(tail < bufferSize, tail << " < " << bufferSize);
// Calculate the delays and store them in buffer[tail]
calcDelays(currentTime, buffer[tail]);
// Advance time for the next calculation
currentTime += increment;
// Advance to the next result set.
// since bufferSize % nrCalcDelays == 0, wrap
// around can only occur between runs
++tail;
}
}
// check for wrap around for the next run
if (tail >= bufferSize)
tail = 0;
delayTimer.stop();
bufferUsed.up(nrCalcDelays);
}
} catch (Exception &ex) {
// trigger getNextDelays and force it to stop
stop = true;
bufferUsed.up(1);
throw;
}
LOG_DEBUG("Delay compensation thread stopped");
}
void Delays::getNextDelays( AllDelays &result ) void Delays::getNextDelays( AllDelays &result )
{ {
ASSERT(thread); // Calculate the delays and store them in result
calcDelays(currentTime, result);
bufferUsed.down();
if (stop)
THROW(Exception, "Cannot obtain delays -- delay thread stopped running");
// copy the directions at buffer[head]
result = buffer[head];
// increment the head pointer
if (++head == bufferSize)
head = 0;
bufferFree.up(); currentTime += increment;
} }
void Delays::generateMetaData( const AllDelays &delaysAtBegin, const AllDelays &delaysAfterEnd, const vector<size_t> &subbands, vector<SubbandMetaData> &metaDatas, vector<ssize_t> &read_offsets ) void Delays::generateMetaData( const AllDelays &delaysAtBegin, const AllDelays &delaysAfterEnd, const vector<size_t> &subbands, vector<SubbandMetaData> &metaDatas, vector<ssize_t> &read_offsets )
......
...@@ -33,8 +33,6 @@ ...@@ -33,8 +33,6 @@
#include <Common/LofarTypes.h> #include <Common/LofarTypes.h>
#include <Common/Timer.h> #include <Common/Timer.h>
#include <Common/Thread/Thread.h>
#include <Common/Thread/Semaphore.h>
#include <CoInterface/MultiDimArray.h> #include <CoInterface/MultiDimArray.h>
#include <CoInterface/Parset.h> #include <CoInterface/Parset.h>
#include <CoInterface/SubbandMetaData.h> #include <CoInterface/SubbandMetaData.h>
...@@ -159,30 +157,7 @@ namespace LOFAR ...@@ -159,30 +157,7 @@ namespace LOFAR
const size_t stationIdx; const size_t stationIdx;
const TimeStamp from; const TimeStamp from;
const size_t increment; const size_t increment;
TimeStamp currentTime;
// do the delay compensation calculations in a separate thread to allow bulk
// calculations and to avoid blocking other threads
void mainLoop();
volatile bool stop;
// the number of seconds to maintain in the buffer, must be a multiple of
// nrCalcDelays.
static const size_t bufferSize = 128;
// the number of delays to calculate in a single run
static const size_t nrCalcDelays = 16;
// the circular buffer to hold the moving beam directions for every second of data
std::vector<AllDelays> buffer;
size_t head, tail;
// two semaphores are used: one to trigger the producer that free space is available,
// another to trigger the consumer that data is available.
Semaphore bufferFree, bufferUsed;
// Resize the given delay set to the right proportions.
void setAllDelaysSize( AllDelays &result ) const;
// Test whether the conversion engine actually works. // Test whether the conversion engine actually works.
bool test(); bool test();
...@@ -213,8 +188,6 @@ namespace LOFAR ...@@ -213,8 +188,6 @@ namespace LOFAR
#endif #endif
NSTimer delayTimer; NSTimer delayTimer;
SmartPtr<Thread> thread;
}; };
} // namespace Cobalt } // namespace Cobalt
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment