diff --git a/RTCP/IONProc/src/BeamletBuffer.cc b/RTCP/IONProc/src/BeamletBuffer.cc index 45feced2faffa279c829841781c896e1d5a3df3d..4c9e0e4b492b0148b9515c06860cf76f62dfa64a 100644 --- a/RTCP/IONProc/src/BeamletBuffer.cc +++ b/RTCP/IONProc/src/BeamletBuffer.cc @@ -31,6 +31,7 @@ #include <RSP.h> #include <boost/lexical_cast.hpp> +#include <cstring> #include <stdexcept> @@ -50,6 +51,8 @@ template<typename SAMPLE_TYPE> BeamletBuffer<SAMPLE_TYPE>::BeamletBuffer(const P itsPacketSize(sizeof(struct RSP::header) + itsNrTimesPerPacket * itsNrSubbands * NR_POLARIZATIONS * sizeof(SAMPLE_TYPE)), itsSize(align(ps->inputBufferSize(), itsNrTimesPerPacket)), itsHistorySize(ps->nrHistorySamples()), + itsIsRealTime(ps->realTime()), + itsSynchronizedReaderWriter(itsIsRealTime ? 0 : new SynchronizedReaderAndWriter(itsSize)), // FIXME: does not work for multiple observations itsSBBuffers(boost::extents[itsNrSubbands][itsSize][NR_POLARIZATIONS], 128, hugeMemoryAllocator), itsOffset(0), #if defined HAVE_BGP @@ -63,14 +66,16 @@ template<typename SAMPLE_TYPE> BeamletBuffer<SAMPLE_TYPE>::BeamletBuffer(const P if (ps->getUint32("OLAP.nrTimesInFrame") != itsNrTimesPerPacket) THROW(IONProcException, "OLAP.nrTimesInFrame should be " << boost::lexical_cast<std::string>(itsNrTimesPerPacket)); +#if 0 if (ps->realTime()) itsSynchronizedReaderWriter = new TimeSynchronizedReader(ps->maxNetworkDelay()); else itsSynchronizedReaderWriter = new SynchronizedReaderAndWriter(itsSize); +#endif - itsEnd.resize(ps->nrBeams()); - itsStartI.resize(ps->nrBeams()); - itsEndI.resize(ps->nrBeams()); +#if defined HAVE_VALGRIND + memset(itsSBBuffers.origin(), 0, itsSBBuffers.num_elements() * sizeof(SAMPLE_TYPE)); +#endif LOG_DEBUG_STR("Circular buffer at " << itsSBBuffers.origin() << "; contains " << itsSize << " samples"); } @@ -142,7 +147,9 @@ template<typename SAMPLE_TYPE> void BeamletBuffer<SAMPLE_TYPE>::writeConsecutive SAMPLE_TYPE *dst = itsSBBuffers[0][startI].origin(); // in synchronous mode, do not overrun tail of reader - itsSynchronizedReaderWriter->startWrite(begin, end); + if (!itsIsRealTime) + itsSynchronizedReaderWriter->startWrite(begin, end); + // do not write in circular buffer section that is being read itsLockedRanges.lock(startI, endI, itsSize); @@ -161,7 +168,9 @@ template<typename SAMPLE_TYPE> void BeamletBuffer<SAMPLE_TYPE>::writeConsecutive updateValidData(begin, end); itsLockedRanges.unlock(startI, endI, itsSize); - itsSynchronizedReaderWriter->finishedWrite(end); + + if (!itsIsRealTime) + itsSynchronizedReaderWriter->finishedWrite(end); } @@ -255,7 +264,9 @@ template<typename SAMPLE_TYPE> void BeamletBuffer<SAMPLE_TYPE>::writePacketData( itsPreviousI = endI; // in synchronous mode, do not overrun tail of reader - itsSynchronizedReaderWriter->startWrite(begin, end); + if (!itsIsRealTime) + itsSynchronizedReaderWriter->startWrite(begin, end); + // do not write in circular buffer section that is being read itsLockedRanges.lock(startI, endI, itsSize); @@ -275,21 +286,32 @@ template<typename SAMPLE_TYPE> void BeamletBuffer<SAMPLE_TYPE>::writePacketData( itsValidDataMutex.unlock(); itsLockedRanges.unlock(startI, endI, itsSize); - itsSynchronizedReaderWriter->finishedWrite(end); + + if (!itsIsRealTime) + itsSynchronizedReaderWriter->finishedWrite(end); + itsWriteTimer.stop(); } template<typename SAMPLE_TYPE> void BeamletBuffer<SAMPLE_TYPE>::startReadTransaction(const std::vector<TimeStamp> &begin, unsigned nrElements) { - TimeStamp minBegin = *std::min_element(begin.begin(), begin.end()); - TimeStamp maxEnd = *std::max_element(begin.begin(), begin.end()) + nrElements; // in synchronous mode, do not overrun writer - itsSynchronizedReaderWriter->startRead(minBegin, maxEnd); + if (!itsIsRealTime) { + TimeStamp minBegin = *std::min_element(begin.begin(), begin.end()); + TimeStamp maxEnd = *std::max_element(begin.begin(), begin.end()) + nrElements; + itsSynchronizedReaderWriter->startRead(minBegin, maxEnd); + } itsReadMutex.lock(); // only one reader per BeamletBuffer allowed itsReadTimer.start(); + unsigned nrBeams = begin.size(); + + itsEnd.resize(nrBeams); + itsStartI.resize(nrBeams); + itsEndI.resize(nrBeams); + itsBegin = begin; for (unsigned beam = 0; beam < begin.size(); beam ++) { @@ -358,9 +380,11 @@ template<typename SAMPLE_TYPE> SparseSet<unsigned> BeamletBuffer<SAMPLE_TYPE>::r template<typename SAMPLE_TYPE> void BeamletBuffer<SAMPLE_TYPE>::stopReadTransaction() { itsLockedRanges.unlock(itsMinStartI, itsMaxEndI, itsMaxEndI - itsMinStartI); - itsSynchronizedReaderWriter->finishedRead(itsMinEnd - (itsHistorySize + 16)); - // subtract 16 extra; due to alignment restrictions and the changing delays, - // it is hard to predict where the next read will begin. + + if (!itsIsRealTime) + itsSynchronizedReaderWriter->finishedRead(itsMinEnd - (itsHistorySize + 16)); + // subtract 16 extra; due to alignment restrictions and the changing delays, + // it is hard to predict where the next read will begin. itsReadTimer.stop(); itsReadMutex.unlock(); diff --git a/RTCP/IONProc/src/BeamletBuffer.h b/RTCP/IONProc/src/BeamletBuffer.h index d3362f5244dc7181afaba0f73d63117c1542ed3e..e5e15f00250a201b8f9738679089b924e911bd08 100644 --- a/RTCP/IONProc/src/BeamletBuffer.h +++ b/RTCP/IONProc/src/BeamletBuffer.h @@ -76,7 +76,8 @@ template<typename SAMPLE_TYPE> class BeamletBuffer unsigned itsNrSubbands; size_t itsPacketSize; unsigned itsSize, itsHistorySize; - ReaderAndWriterSynchronization *itsSynchronizedReaderWriter; + bool itsIsRealTime; + SynchronizedReaderAndWriter *itsSynchronizedReaderWriter; LockedRanges itsLockedRanges; Cube<SAMPLE_TYPE> itsSBBuffers; int itsOffset; diff --git a/RTCP/IONProc/src/BeamletBufferToComputeNode.cc b/RTCP/IONProc/src/BeamletBufferToComputeNode.cc index 1bc94cd2aafa344cc512d324166f2f2e2e12f827..8ced9fc70d27de4034be232577d88a7bb05ca3c6 100644 --- a/RTCP/IONProc/src/BeamletBufferToComputeNode.cc +++ b/RTCP/IONProc/src/BeamletBufferToComputeNode.cc @@ -49,6 +49,9 @@ namespace LOFAR { namespace RTCP { +template<typename SAMPLE_TYPE> const unsigned BeamletBufferToComputeNode<SAMPLE_TYPE>::itsMaximumDelay; + + template<typename SAMPLE_TYPE> BeamletBufferToComputeNode<SAMPLE_TYPE>::BeamletBufferToComputeNode(const std::vector<Stream *> &cnStreams, const std::vector<BeamletBuffer<SAMPLE_TYPE> *> &beamletBuffers, unsigned psetNumber) : itsRawDataStream(0), @@ -75,7 +78,7 @@ template<typename SAMPLE_TYPE> void BeamletBufferToComputeNode<SAMPLE_TYPE>::pre itsSampleRate = ps->sampleRate(); itsNrSubbands = ps->nrSubbands(); itsNrSubbandsPerPset = ps->nrSubbandsPerPset(); - itsNrSamplesPerSec = ps->nrSubbandSamples(); + itsNrSamplesPerSubband = ps->nrSubbandSamples(); itsNrBeams = ps->nrBeams(); itsNrPencilBeams = ps->nrPencilBeams(); itsNrOutputPsets = ps->outputPsets().size(); @@ -198,7 +201,7 @@ template<typename SAMPLE_TYPE> void BeamletBufferToComputeNode<SAMPLE_TYPE>::com template<typename SAMPLE_TYPE> void BeamletBufferToComputeNode<SAMPLE_TYPE>::startTransaction() { for (unsigned rsp = 0; rsp < itsNrInputs; rsp ++) { - itsBeamletBuffers[rsp]->startReadTransaction(itsDelayedStamps, itsNrSamplesPerSec + itsNrHistorySamples); + itsBeamletBuffers[rsp]->startReadTransaction(itsDelayedStamps, itsNrSamplesPerSubband + itsNrHistorySamples); for (unsigned beam = 0; beam < itsNrBeams; beam ++) /*if (itsMustComputeFlags[rsp][beam])*/ { // TODO @@ -220,7 +223,7 @@ template<typename SAMPLE_TYPE> void BeamletBufferToComputeNode<SAMPLE_TYPE>::wri gettimeofday(&tv, 0); double currentTime = tv.tv_sec + tv.tv_usec / 1e6; - double expectedTime = (itsCurrentTimeStamp + itsNrSamplesPerSec + itsMaxNetworkDelay) * itsSampleDuration; + double expectedTime = itsCorrelationStartTime * itsSampleDuration; logStr << ", late: " << PrettyTime(currentTime - expectedTime); } @@ -234,7 +237,7 @@ template<typename SAMPLE_TYPE> void BeamletBufferToComputeNode<SAMPLE_TYPE>::wri } for (unsigned rsp = 0; rsp < itsNrInputs; rsp ++) - logStr << ", flags " << rsp << ": " << itsFlags[rsp][0] << '(' << std::setprecision(3) << (100.0 * itsFlags[rsp][0].count() / (itsNrSamplesPerSec + itsNrHistorySamples)) << "%)"; // not really correct; beam(0) may be shifted + logStr << ", flags " << rsp << ": " << itsFlags[rsp][0] << '(' << std::setprecision(3) << (100.0 * itsFlags[rsp][0].count() / (itsNrSamplesPerSubband + itsNrHistorySamples)) << "%)"; // not really correct; beam(0) may be shifted LOG_INFO(logStr.str()); } @@ -327,7 +330,7 @@ template<typename SAMPLE_TYPE> void BeamletBufferToComputeNode<SAMPLE_TYPE>::dum bfraw_data.header.bitsPerSample = 16; bfraw_data.header.nrPolarizations = 2; bfraw_data.header.nrSubbands = nrSubbands; - bfraw_data.header.nrSamplesPerSubband = itsNrSamplesPerSec; + bfraw_data.header.nrSamplesPerSubband = itsNrSamplesPerSubband; bfraw_data.header.sampleRate = itsSampleRate; strncpy(bfraw_data.header.station, itsPS->getStationNamesAndRSPboardNumbers(itsPsetNumber)[0].station.c_str(), sizeof bfraw_data.header.station); @@ -379,6 +382,12 @@ template<typename SAMPLE_TYPE> void BeamletBufferToComputeNode<SAMPLE_TYPE>::pro itsDelayedStamps[beam] = itsCurrentTimeStamp - itsNrHistorySamples; computeDelays(); + + if (itsIsRealTime) { + itsCorrelationStartTime = itsCurrentTimeStamp + itsNrSamplesPerSubband + itsMaxNetworkDelay + itsMaximumDelay; + itsWallClock.waitUntil(itsCorrelationStartTime); + } + startTransaction(); writeLogMessage(); } @@ -393,7 +402,7 @@ template<typename SAMPLE_TYPE> void BeamletBufferToComputeNode<SAMPLE_TYPE>::pro if (itsNrInputs > 0) { stopTransaction(); - itsCurrentTimeStamp += itsNrSamplesPerSec; + itsCurrentTimeStamp += itsNrSamplesPerSubband; } timer.stop(); diff --git a/RTCP/IONProc/src/BeamletBufferToComputeNode.h b/RTCP/IONProc/src/BeamletBufferToComputeNode.h index ec2d81581eb01287909652e8f5a26b67d6101048..29ffed35b1f9188c15e624351bdcdc1867a0d7c8 100644 --- a/RTCP/IONProc/src/BeamletBufferToComputeNode.h +++ b/RTCP/IONProc/src/BeamletBufferToComputeNode.h @@ -87,7 +87,7 @@ template <typename SAMPLE_TYPE> class BeamletBufferToComputeNode { unsigned itsMaxNetworkDelay; // in samples unsigned itsNrSubbands; unsigned itsNrSubbandsPerPset; - unsigned itsNrSamplesPerSec; + unsigned itsNrSamplesPerSubband; unsigned itsNrHistorySamples; unsigned itsNrInputs; unsigned itsNrBeams; @@ -106,6 +106,10 @@ template <typename SAMPLE_TYPE> class BeamletBufferToComputeNode { Matrix<float> itsFineDelaysAtBegin, itsFineDelaysAfterEnd; + static const unsigned itsMaximumDelay = 1000; // samples; roughly 1500 km + TimeStamp itsCorrelationStartTime; + WallClockTime itsWallClock; + NSTimer itsDelayTimer; }; diff --git a/RTCP/IONProc/src/ION_main.cc b/RTCP/IONProc/src/ION_main.cc index 5f3dfe906ae0cf3eb732bf4db56d17db91520a76..7cc2e5e925d9a97ffe83fab8348a126d7c6bfe8a 100644 --- a/RTCP/IONProc/src/ION_main.cc +++ b/RTCP/IONProc/src/ION_main.cc @@ -126,7 +126,7 @@ void terminate_with_backtrace() static unsigned myPsetNumber; -static std::vector<Stream *> allClientStreams; +static std::vector<Stream *> allCNstreams; static const unsigned nrCNcoresInPset = 64; // TODO: how to figure out the number of CN cores? static std::vector<Mutex> sharedWriteToCNmutexes(nrCNcoresInPset); static std::vector<Mutex> sharedReadFromCNmutexes(nrCNcoresInPset); @@ -137,7 +137,7 @@ static bool fcnp_inited; -static void createAllClientStreams(const std::string &streamType) +static void createAllCNstreams(const std::string &streamType) { #if defined HAVE_FCNP && defined __PPC__ if (streamType == "FCNP") { @@ -146,29 +146,29 @@ static void createAllClientStreams(const std::string &streamType) } #endif - allClientStreams.resize(nrCNcoresInPset); + allCNstreams.resize(nrCNcoresInPset); for (unsigned core = 0; core < nrCNcoresInPset; core ++) { #if defined HAVE_FCNP && defined __PPC__ if (streamType == "FCNP") - allClientStreams[core] = new FCNP_ServerStream(core); + allCNstreams[core] = new FCNP_ServerStream(core); else #endif if (streamType == "NULL") - allClientStreams[core] = new NullStream; + allCNstreams[core] = new NullStream; else if (streamType == "TCP") - allClientStreams[core] = new SocketStream("127.0.0.1", 5000 + core, SocketStream::TCP, SocketStream::Server); + allCNstreams[core] = new SocketStream("127.0.0.1", 5000 + core, SocketStream::TCP, SocketStream::Server); else THROW(IONProcException, "unknown Stream type between ION and CN"); } } -static void deleteAllClientStreams() +static void deleteAllCNstreams() { for (unsigned core = 0; core < nrCNcoresInPset; core ++) - delete allClientStreams[core]; + delete allCNstreams[core]; #if defined HAVE_FCNP && defined __PPC__ if (fcnp_inited) { @@ -186,7 +186,7 @@ static void stopCNs() CN_Command command(CN_Command::STOP); for (unsigned core = 0; core < nrCNcoresInPset; core ++) - command.write(allClientStreams[core]); + command.write(allCNstreams[core]); LOG_DEBUG_STR("stopping " << nrCNcoresInPset << " cores done"); } @@ -274,7 +274,7 @@ template <typename SAMPLE_TYPE> class Job : public JobParent virtual ~Job(); private: - void createClientStreams(); + void createCNstreams(); void configureCNs(), unconfigureCNs(); void toCNthread(); @@ -321,7 +321,7 @@ template <typename SAMPLE_TYPE> Job<SAMPLE_TYPE>::Job(const Parset *parset) itsHasOutputSection = parset->outputPsetIndex(myPsetNumber) >= 0; if (itsHasInputSection || itsHasOutputSection) { - createClientStreams(); + createCNstreams(); itsNrRuns = static_cast<unsigned>(ceil((parset->stopTime() - parset->startTime()) / parset->CNintegrationTime())); LOG_DEBUG_STR("itsNrRuns = " << itsNrRuns); @@ -369,14 +369,14 @@ template <typename SAMPLE_TYPE> Job<SAMPLE_TYPE>::~Job() } -template <typename SAMPLE_TYPE> void Job<SAMPLE_TYPE>::createClientStreams() +template <typename SAMPLE_TYPE> void Job<SAMPLE_TYPE>::createCNstreams() { std::vector<unsigned> usedCoresInPset = itsParset->usedCoresInPset(); itsCNstreams.resize(usedCoresInPset.size()); for (unsigned core = 0; core < usedCoresInPset.size(); core ++) - itsCNstreams[core] = allClientStreams[CN_Mapping::mapCoreOnPset(usedCoresInPset[core], myPsetNumber)]; + itsCNstreams[core] = allCNstreams[CN_Mapping::mapCoreOnPset(usedCoresInPset[core], myPsetNumber)]; } @@ -544,10 +544,10 @@ template <typename SAMPLE_TYPE> void *Job<SAMPLE_TYPE>::fromCNthreadStub(void *j } -static void checkParset(const Parset &parset) +static void checkParset(const Parset *parset) { - if (parset.nrCoresPerPset() > nrCNcoresInPset) { - LOG_ERROR_STR("nrCoresPerPset (" << parset.nrCoresPerPset() << ") cannot exceed " << nrCNcoresInPset); + if (parset->nrCoresPerPset() > nrCNcoresInPset) { + LOG_ERROR_STR("nrCoresPerPset (" << parset->nrCoresPerPset() << ") cannot exceed " << nrCNcoresInPset); exit(1); } } @@ -580,8 +580,12 @@ void master_thread(int argc, char **argv) exit(1); } - //createAllClientStreams(parset->getTransportType("OLAP.OLAP_Conn.IONProc_CNProc")); - createAllClientStreams("FCNP"); // FIXME + //createAllCNstreams(parset->getTransportType("OLAP.OLAP_Conn.IONProc_CNProc")); +#if defined HAVE_VALGRIND // FIXME + createAllCNstreams("TCP"); +#else + createAllCNstreams("FCNP"); +#endif std::vector<JobParent *> jobs; @@ -599,7 +603,7 @@ void master_thread(int argc, char **argv) } #endif - checkParset(*parset); + checkParset(parset); JobParent *job; @@ -631,7 +635,7 @@ void master_thread(int argc, char **argv) unmapFlatMemory(); #endif - deleteAllClientStreams(); + deleteAllCNstreams(); #if defined CATCH_EXCEPTIONS } catch (Exception &ex) { diff --git a/RTCP/IONProc/src/OutputThread.cc b/RTCP/IONProc/src/OutputThread.cc index bce6c48cea0949c52c5044c6acb78c42c3c46ec3..17c4000a5d3775e612c9c385b9f595c1c4afa757 100644 --- a/RTCP/IONProc/src/OutputThread.cc +++ b/RTCP/IONProc/src/OutputThread.cc @@ -73,8 +73,6 @@ OutputThread::OutputThread(Stream *streamToStorage, const Parset &ps ) OutputThread::~OutputThread() { - LOG_DEBUG("OutputThread::~OutputThread"); - itsSendQueueActivity.append(-1); // -1 indicates that no more messages will be sent if (pthread_join(thread, 0) != 0)