diff --git a/RTCP/Cobalt/InputProc/src/Delays/Delays.cc b/RTCP/Cobalt/InputProc/src/Delays/Delays.cc index ff243eba0edcb547536233dcb1b0dde721f918eb..71637d69e2407b1abefe8a74ab13fb94bb851e17 100644 --- a/RTCP/Cobalt/InputProc/src/Delays/Delays.cc +++ b/RTCP/Cobalt/InputProc/src/Delays/Delays.cc @@ -48,33 +48,22 @@ namespace LOFAR stationIdx(stationIdx), from(from), increment(increment), - - stop(false), - // we need an extra entry for the central beam - buffer(bufferSize, AllDelays(parset)), - head(0), - tail(0), - bufferFree(bufferSize), - bufferUsed(0), + currentTime(from), delayTimer("delay producer", true, true) { ASSERTSTR(test(), "Delay compensation engine is broken"); + + init(); } void Delays::start() { - thread = new Thread(this, &Delays::mainLoop, "[DelayCompensation] "); } Delays::~Delays() { - ScopedDelayCancellation dc; // Semaphores provide cancellation points - - // trigger mainLoop and force it to stop - stop = true; - bufferFree.up(nrCalcDelays); } @@ -148,12 +137,6 @@ namespace LOFAR ScopedLock lock(casacoreMutex); ScopedDelayCancellation dc; - // We need bufferSize to be a multiple of batchSize to avoid wraparounds in - // the middle of the batch calculations. This makes life a lot easier and there is no - // need to support other cases. - - ASSERT(bufferSize % nrCalcDelays == 0); - // Set an initial epoch for the frame frame.set(MEpoch(toUTC(from), MEpoch::UTC)); @@ -281,80 +264,12 @@ namespace LOFAR } - void Delays::mainLoop() - { - LOG_DEBUG("Delay compensation thread running"); - - init(); - - // the current time, in samples - TimeStamp currentTime = from; - - try { - while (!stop) { - bufferFree.down(nrCalcDelays); - - delayTimer.start(); - - // Calculate nrCalcDelays seconds worth of delays. Technically, we do not have - // to calculate that many at the end of the run, but there is no need to - // prevent the few excess delays from being calculated. - - { - for (size_t i = 0; i < nrCalcDelays; i++) { - // Check whether we will store results in a valid place - ASSERTSTR(tail < bufferSize, tail << " < " << bufferSize); - - // Calculate the delays and store them in buffer[tail] - calcDelays(currentTime, buffer[tail]); - - // Advance time for the next calculation - currentTime += increment; - - // Advance to the next result set. - // since bufferSize % nrCalcDelays == 0, wrap - // around can only occur between runs - ++tail; - } - } - - // check for wrap around for the next run - if (tail >= bufferSize) - tail = 0; - - delayTimer.stop(); - - bufferUsed.up(nrCalcDelays); - } - } catch (Exception &ex) { - // trigger getNextDelays and force it to stop - stop = true; - bufferUsed.up(1); - - throw; - } - - LOG_DEBUG("Delay compensation thread stopped"); - } - - void Delays::getNextDelays( AllDelays &result ) { - ASSERT(thread); - - bufferUsed.down(); - - if (stop) - THROW(Exception, "Cannot obtain delays -- delay thread stopped running"); - - // copy the directions at buffer[head] - result = buffer[head]; - - // increment the head pointer - if (++head == bufferSize) - head = 0; + // Calculate the delays and store them in result + calcDelays(currentTime, result); - bufferFree.up(); + currentTime += increment; } void Delays::generateMetaData( const AllDelays &delaysAtBegin, const AllDelays &delaysAfterEnd, const vector<size_t> &subbands, vector<SubbandMetaData> &metaDatas, vector<ssize_t> &read_offsets ) diff --git a/RTCP/Cobalt/InputProc/src/Delays/Delays.h b/RTCP/Cobalt/InputProc/src/Delays/Delays.h index 7ac4773b277dbc5344a9e1edb413f5604c69ef76..7ebbb5f566fd8eb95ba520589b19c3fb2990aa07 100644 --- a/RTCP/Cobalt/InputProc/src/Delays/Delays.h +++ b/RTCP/Cobalt/InputProc/src/Delays/Delays.h @@ -33,8 +33,6 @@ #include <Common/LofarTypes.h> #include <Common/Timer.h> -#include <Common/Thread/Thread.h> -#include <Common/Thread/Semaphore.h> #include <CoInterface/MultiDimArray.h> #include <CoInterface/Parset.h> #include <CoInterface/SubbandMetaData.h> @@ -159,30 +157,7 @@ namespace LOFAR const size_t stationIdx; const TimeStamp from; const size_t increment; - - // do the delay compensation calculations in a separate thread to allow bulk - // calculations and to avoid blocking other threads - void mainLoop(); - - volatile bool stop; - - // the number of seconds to maintain in the buffer, must be a multiple of - // nrCalcDelays. - static const size_t bufferSize = 128; - - // the number of delays to calculate in a single run - static const size_t nrCalcDelays = 16; - - // the circular buffer to hold the moving beam directions for every second of data - std::vector<AllDelays> buffer; - size_t head, tail; - - // two semaphores are used: one to trigger the producer that free space is available, - // another to trigger the consumer that data is available. - Semaphore bufferFree, bufferUsed; - - // Resize the given delay set to the right proportions. - void setAllDelaysSize( AllDelays &result ) const; + TimeStamp currentTime; // Test whether the conversion engine actually works. bool test(); @@ -213,8 +188,6 @@ namespace LOFAR #endif NSTimer delayTimer; - - SmartPtr<Thread> thread; }; } // namespace Cobalt