diff --git a/RTCP/IONProc/src/BeamletBufferToComputeNode.cc b/RTCP/IONProc/src/BeamletBufferToComputeNode.cc index 64b10274a346065cdb92b28104b941611a3c96a4..2c6a72507e63f6e89b8f03137081e63ad00ffa07 100644 --- a/RTCP/IONProc/src/BeamletBufferToComputeNode.cc +++ b/RTCP/IONProc/src/BeamletBufferToComputeNode.cc @@ -386,6 +386,7 @@ template<typename SAMPLE_TYPE> void BeamletBufferToComputeNode<SAMPLE_TYPE>::pro if (itsIsRealTime) { itsCorrelationStartTime = itsCurrentTimeStamp + itsNrSamplesPerSubband + itsMaxNetworkDelay + itsMaximumDelay; + itsWallClock.waitUntil(itsCorrelationStartTime); } diff --git a/RTCP/IONProc/src/ION_main.cc b/RTCP/IONProc/src/ION_main.cc index 87c63b0686bf83203e942e75e318ab53c41e4167..f42f270c7890f6205c9e92b518caa772530538b5 100644 --- a/RTCP/IONProc/src/ION_main.cc +++ b/RTCP/IONProc/src/ION_main.cc @@ -391,7 +391,7 @@ void Job::jobThread() if (itsParset.realTime()) { // claim resources two seconds before observation start WallClockTime wallClock; - time_t closeToStart = itsParset.startTime() - 2; + time_t closeToStart = static_cast<time_t>(itsParset.startTime()) - 2; char buf[26]; ctime_r(&closeToStart, buf); buf[24] = '\0'; @@ -513,15 +513,25 @@ template <typename SAMPLE_TYPE> void Job::toCNthread() void Job::fromCNthread() { + CN_Configuration configuration(itsParset); + const CN_ProcessingPlan<> plan(configuration); + const unsigned nrOutputs = plan.nrOutputs(); + LOG_DEBUG("starting from_CN thread"); - OutputSection outputSection(&itsParset, myPsetNumber, itsCNstreams); - outputSection.preprocess(); + { + std::vector<OutputSection *> outputSections( nrOutputs ); - for (unsigned run = 0; run < itsNrRuns; run ++) - outputSection.process(); + for (unsigned output = 0; output < nrOutputs; output++) { + outputSections[output] = new OutputSection(&itsParset, myPsetNumber, output, itsCNstreams, output == nrOutputs-1); + } + + // destructor of OutputSections will wait for threads to complete + for (unsigned output = 0; output < nrOutputs; output++) { + delete outputSections[output]; + } + } - outputSection.postprocess(); LOG_DEBUG("from_CN thread finished"); } diff --git a/RTCP/IONProc/src/OutputSection.cc b/RTCP/IONProc/src/OutputSection.cc index 2c5aa47284fd7c40dbbb5fc9489f301cc201c96e..838d7dbe75e62653101cea22673252aee736310c 100644 --- a/RTCP/IONProc/src/OutputSection.cc +++ b/RTCP/IONProc/src/OutputSection.cc @@ -42,84 +42,104 @@ #include <fcntl.h> #include <unistd.h> #include <errno.h> +#include <pthread.h> namespace LOFAR { namespace RTCP { -OutputSection::OutputSection(const Parset *ps, unsigned psetNumber, const std::vector<Stream *> &streamsFromCNs) +OutputSection::OutputSection(const Parset *ps, unsigned psetNumber, unsigned outputNumber, const std::vector<Stream *> &streamsFromCNs, bool lastOutput) : + stop(false), itsParset(ps), itsPsetIndex(ps->outputPsetIndex(psetNumber)), + itsOutputNr(outputNumber), itsNrComputeCores(ps->nrCoresPerPset()), itsCurrentComputeCore(0), itsNrSubbands(ps->nrSubbands()), itsNrSubbandsPerPset(ps->nrSubbandsPerPset()), itsRealTime(ps->realTime()), itsPlan(0), - itsStreamsFromCNs(streamsFromCNs) -{ -} - -void OutputSection::preprocess() + itsStreamsFromCNs(streamsFromCNs), + thread(0), + lastOutput(lastOutput) { itsDroppedCount.resize(itsNrSubbandsPerPset); CN_Configuration configuration(*itsParset); // allocate output structures and temporary data holders - itsPlan = new CN_ProcessingPlan<>( configuration, false, true ); + itsPlan = new CN_ProcessingPlan<>(configuration); itsPlan->removeNonOutputs(); - itsPlan->allocateOutputs( hugeMemoryAllocator ); - itsOutputs.resize(itsPlan->nrOutputs()); - - for (unsigned o = 0; o < itsPlan->plan.size(); o++ ) { - struct OutputSection::SingleOutput &output = itsOutputs[o]; - const ProcessingPlan::planlet &p = itsPlan->plan[o]; - output.nrIntegrationSteps = 1; - output.currentIntegrationStep = 0; - output.sequenceNumber = 0; - output.tmpSum = p.source; - } + const ProcessingPlan::planlet &p = itsPlan->plan[itsOutputNr]; // allocate partial sums -- only for those outputs that need it - itsSumPlans.resize(itsNrSubbandsPerPset); - for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++) { - unsigned subbandNumber = itsPsetIndex * itsNrSubbandsPerPset + subband; - - if (subbandNumber < itsNrSubbands) { - itsSumPlans[subband] = new CN_ProcessingPlan<>( configuration, false, true ); - itsSumPlans[subband]->removeNonOutputs(); + if( p.source->isIntegratable() && itsParset->IONintegrationSteps() <= 1 ) { + itsNrIntegrationSteps = itsParset->IONintegrationSteps(); - // create data structures to accumulate data, if needed - for (unsigned o = 0; o < itsSumPlans[subband]->plan.size(); o++ ) { - struct OutputSection::SingleOutput &output = itsOutputs[o]; - const ProcessingPlan::planlet &p = itsSumPlans[subband]->plan[o]; + for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++) { + unsigned subbandNumber = itsPsetIndex * itsNrSubbandsPerPset + subband; - if( !p.source->isIntegratable() ) { - continue; - } + if (subbandNumber < itsNrSubbands) { + StreamableData *clone = p.source->clone(); - if( itsParset->IONintegrationSteps() <= 1 ) { - continue; - } + clone->allocate(); - // set up this output to accumulate data - p.source->allocate( hugeMemoryAllocator ); - output.sums.push_back( p.source ); - output.nrIntegrationSteps = itsParset->IONintegrationSteps(); + itsSums.push_back( clone ); } + } + } else { + // no integration + itsNrIntegrationSteps = 1; + } + + itsCurrentIntegrationStep = 0; + itsSequenceNumber = 0; + itsTmpSum = p.source; + itsTmpSum->allocate(); + + // create an output thread for this subband + for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++) { + unsigned subbandNumber = itsPsetIndex * itsNrSubbandsPerPset + subband; - // create an output thread for this subband - itsOutputThreads.push_back(new OutputThread(subbandNumber, *itsParset)); + if (subbandNumber < itsNrSubbands) { + itsOutputThreads.push_back(new OutputThread(*itsParset, subbandNumber, itsOutputNr)); } } - + + #if defined HAVE_BGP_ION // FIXME: not in preprocess doNotRunOnCore0(); setPriority(2); #endif + + thread = new Thread( this, &OutputSection::mainLoop ); +} + +OutputSection::~OutputSection() +{ + delete thread; + + for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++) { + unsigned subbandNumber = itsPsetIndex * itsNrSubbandsPerPset + subband; + + if (subbandNumber < itsNrSubbands) { + notDroppingData(subband, subbandNumber); // for final warning message + + delete itsOutputThreads[subband]; + } + } + + // itsSums does not always contain data, so take its size as leading + for (unsigned subband = 0; subband < itsSums.size(); subband ++) { + delete itsSums[subband]; + } + + delete itsPlan; + + itsOutputThreads.clear(); + itsDroppedCount.clear(); } @@ -139,84 +159,80 @@ void OutputSection::notDroppingData(unsigned subband, unsigned subbandNumber) } -void OutputSection::process() +void OutputSection::mainLoop() { - for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++) { - // TODO: make sure that there are more free buffers than subbandsPerPset + const unsigned nrRuns = static_cast<unsigned>(ceil((itsParset->stopTime() - itsParset->startTime()) / itsParset->CNintegrationTime())); - unsigned subbandNumber = itsPsetIndex * itsNrSubbandsPerPset + subband; + static pthread_mutex_t mutex[64]; + static pthread_cond_t condition[64]; + static unsigned computeCoreStates[64]; - if (subbandNumber < itsNrSubbands) { - // read all outputs of one node at once, so that it becomes free to process the next set of samples - for (unsigned o = 0; o < itsOutputs.size(); o++ ) { - struct OutputSection::SingleOutput &output = itsOutputs[o]; - struct OutputThread::SingleOutput &outputThread = itsOutputThreads[subband]->itsOutputs[o]; - - bool firstTime = output.currentIntegrationStep == 0; - bool lastTime = output.currentIntegrationStep == output.nrIntegrationSteps - 1; - - if (lastTime) { - if (itsRealTime && outputThread.freeQueue.empty()) { - droppingData(subband, subbandNumber); - output.tmpSum->read(itsStreamsFromCNs[itsCurrentComputeCore], false); - } else { - notDroppingData(subband, subbandNumber); - StreamableData *data = outputThread.freeQueue.remove(); - - data->read(itsStreamsFromCNs[itsCurrentComputeCore], false); - - if (!firstTime) - *data += *output.sums[subband]; - - data->sequenceNumber = output.sequenceNumber; - outputThread.sendQueue.append(data); - - // report that data has been added to a send queue - itsOutputThreads[subband]->itsSendQueueActivity.append(o); - } - } else if (firstTime) { - output.sums[subband]->read(itsStreamsFromCNs[itsCurrentComputeCore], false); - } else { - output.tmpSum->read(itsStreamsFromCNs[itsCurrentComputeCore], false); - *output.sums[subband] += *output.tmpSum; - } - } + if( itsOutputNr == 0 ) { + for( unsigned i = 0; i < 64; i++ ) { + pthread_mutex_init( &mutex[i], 0 ); + pthread_cond_init( &condition[i], 0 ); + computeCoreStates[i] = 0; } - - if (++ itsCurrentComputeCore == itsNrComputeCores) - itsCurrentComputeCore = 0; } - for (unsigned o = 0; o < itsOutputs.size(); o ++) { - struct OutputSection::SingleOutput &output = itsOutputs[o]; - - if (++ output.currentIntegrationStep == output.nrIntegrationSteps) { - output.currentIntegrationStep = 0; - output.sequenceNumber++; - } - } -} + for( unsigned i = 0; i < nrRuns && !stop; i++ ) { + for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++) { + // TODO: make sure that there are more free buffers than subbandsPerPset + unsigned subbandNumber = itsPsetIndex * itsNrSubbandsPerPset + subband; -void OutputSection::postprocess() -{ - for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++) { - unsigned subbandNumber = itsPsetIndex * itsNrSubbandsPerPset + subband; + if (subbandNumber < itsNrSubbands) { + // wait for our turn for this core + pthread_mutex_lock(&mutex[itsCurrentComputeCore]); + while( computeCoreStates[itsCurrentComputeCore] != itsOutputNr ) { + pthread_cond_wait(&condition[itsCurrentComputeCore], &mutex[itsCurrentComputeCore]); + } + pthread_mutex_unlock(&mutex[itsCurrentComputeCore]); + + OutputThread *outputThread = itsOutputThreads[subband]; + + bool firstTime = itsCurrentIntegrationStep == 0; + bool lastTime = itsCurrentIntegrationStep == itsNrIntegrationSteps - 1; + + if (lastTime) { + if (itsRealTime && outputThread->itsFreeQueue.empty()) { + droppingData(subband, subbandNumber); + itsTmpSum->read(itsStreamsFromCNs[itsCurrentComputeCore], false); + } else { + notDroppingData(subband, subbandNumber); + std::auto_ptr<StreamableData> data( outputThread->itsFreeQueue.remove() ); + + data->read(itsStreamsFromCNs[itsCurrentComputeCore], false); + + if (!firstTime) + *data += *itsSums[subband]; + + data->sequenceNumber = itsSequenceNumber; + outputThread->itsSendQueue.append(data.release()); + } + } else if (firstTime) { + itsSums[subband]->read(itsStreamsFromCNs[itsCurrentComputeCore], false); + } else { + itsTmpSum->read(itsStreamsFromCNs[itsCurrentComputeCore], false); + *itsSums[subband] += *itsTmpSum; + } - if (subbandNumber < itsNrSubbands) { - notDroppingData(subband, subbandNumber); // for final warning message + // signal next output that we're done with this one + pthread_mutex_lock(&mutex[itsCurrentComputeCore]); + computeCoreStates[itsCurrentComputeCore] = lastOutput ? 0 : itsOutputNr + 1; + pthread_cond_broadcast(&condition[itsCurrentComputeCore]); + pthread_mutex_unlock(&mutex[itsCurrentComputeCore]); + } - delete itsOutputThreads[subband]; - delete itsSumPlans[subband]; + if (++ itsCurrentComputeCore == itsNrComputeCores) + itsCurrentComputeCore = 0; } - } - - delete itsPlan; - itsSumPlans.resize(0); - itsOutputThreads.clear(); - itsOutputs.clear(); - itsDroppedCount.clear(); + if (++ itsCurrentIntegrationStep == itsNrIntegrationSteps) { + itsCurrentIntegrationStep = 0; + itsSequenceNumber++; + } + } } } diff --git a/RTCP/IONProc/src/OutputSection.h b/RTCP/IONProc/src/OutputSection.h index 66793addfc35b4e5a0a8dae214428106d6c9676c..46a4be19046950417d7233ab2c491edae1bbadda 100644 --- a/RTCP/IONProc/src/OutputSection.h +++ b/RTCP/IONProc/src/OutputSection.h @@ -25,6 +25,8 @@ #include <Interface/CN_ProcessingPlan.h> #include <Stream/Stream.h> #include <IONProc/OutputThread.h> +#include <Interface/Thread.h> +#include <Interface/Mutex.h> #include <vector> @@ -34,14 +36,12 @@ namespace RTCP { class OutputSection { public: - OutputSection(const Parset *ps, unsigned psetNumber, const std::vector<Stream *> &streamsFromCNs); - - void preprocess(); - void process(); - void postprocess(); + OutputSection(const Parset *ps, unsigned psetNumber, unsigned outputNumber, const std::vector<Stream *> &streamsFromCNs, bool lastOutput); + ~OutputSection(); private: - void connectToStorage(); + bool stop; + void mainLoop(); void droppingData(unsigned subband, unsigned subbandNumber); void notDroppingData(unsigned subband, unsigned subbandNumber); @@ -49,20 +49,17 @@ class OutputSection std::vector<unsigned> itsDroppedCount; // [subband] std::vector<OutputThread *> itsOutputThreads; // [subband] - struct SingleOutput { - std::vector<StreamableData *> sums; - StreamableData *tmpSum; - - unsigned nrIntegrationSteps; - unsigned currentIntegrationStep; + std::vector<StreamableData *> itsSums; + StreamableData *itsTmpSum; - unsigned sequenceNumber; - }; + unsigned itsNrIntegrationSteps; + unsigned itsCurrentIntegrationStep; - std::vector<struct SingleOutput> itsOutputs; // [outputs] + unsigned itsSequenceNumber; const Parset *itsParset; const unsigned itsPsetIndex; + const unsigned itsOutputNr; unsigned itsNrComputeCores, itsCurrentComputeCore; const unsigned itsNrSubbands; const unsigned itsNrSubbandsPerPset; @@ -71,10 +68,12 @@ class OutputSection // the main plan, also holds temporary results CN_ProcessingPlan<> *itsPlan; - // plans that hold accumulated results - Vector<CN_ProcessingPlan<> *> itsSumPlans; - const std::vector<Stream *> &itsStreamsFromCNs; + + Thread *thread; + + // hack to syncrhonise multiple outputs per compute core + bool lastOutput; }; diff --git a/RTCP/IONProc/src/OutputThread.cc b/RTCP/IONProc/src/OutputThread.cc index 5a37c5c5eab0a45e04f7628399a0710cd481f14d..76310a6d0fa1ca7b6cd9cbfdd94f9ec2b75e3b3d 100644 --- a/RTCP/IONProc/src/OutputThread.cc +++ b/RTCP/IONProc/src/OutputThread.cc @@ -40,38 +40,27 @@ namespace LOFAR { namespace RTCP { -OutputThread::OutputThread(const unsigned subband, const Parset &ps ) +OutputThread::OutputThread(const Parset &ps, const unsigned subband, const unsigned output ) : - itsOutputs(0), - itsNrOutputs(0), - itsPlans(0), itsParset(ps), itsSubband(subband), + itsOutput(output), thread(0) { CN_Configuration configuration(ps); + CN_ProcessingPlan<> plan(configuration); + plan.removeNonOutputs(); + + const ProcessingPlan::planlet &p = plan.plan[output]; // transpose the data holders: create queues streams for the output streams // itsPlans is the owner of the pointers to sample data structures for (unsigned i = 0; i < maxSendQueueSize; i ++) { - CN_ProcessingPlan<> *plan = new CN_ProcessingPlan<>( configuration, false, true ); - plan->removeNonOutputs(); - plan->allocateOutputs( hugeMemoryAllocator ); - - itsPlans.push_back( plan ); + StreamableData *clone = p.source->clone(); - itsNrOutputs = plan->nrOutputs(); - - // only the first call will actually resize the array - if( !itsOutputs.size() ) { - itsOutputs.resize( itsNrOutputs ); - } - - for (unsigned o = 0; o < plan->plan.size(); o++ ) { - const ProcessingPlan::planlet &p = plan->plan[o]; + clone->allocate(); - itsOutputs[o].freeQueue.append( p.source ); - } + itsFreeQueue.append( clone ); } thread = new Thread(this, &OutputThread::mainLoop, 65536); @@ -80,23 +69,21 @@ OutputThread::OutputThread(const unsigned subband, const Parset &ps ) OutputThread::~OutputThread() { - itsSendQueueActivity.append(-1); // -1 indicates that no more messages will be sent + itsSendQueue.append(0); // 0 indicates that no more messages will be sent delete thread; - while (!itsSendQueueActivity.empty()) - itsSendQueueActivity.remove(); + while (!itsSendQueue.empty()) + delete itsSendQueue.remove(); - for (unsigned i = 0; i < itsPlans.size(); i++ ) { - delete itsPlans[i]; - } + while (!itsFreeQueue.empty()) + delete itsFreeQueue.remove(); } void OutputThread::mainLoop() { std::auto_ptr<Stream> streamToStorage; - int o; #if defined HAVE_BGP_ION doNotRunOnCore0(); @@ -111,7 +98,7 @@ void OutputThread::mainLoop() streamToStorage.reset( new NullStream ); } else if (connectionType == "TCP") { const std::string server = itsParset.storageHostName(prefix + "_ServerHosts", itsSubband); - const unsigned short port = boost::lexical_cast<unsigned short>(itsParset.getPortsOf(prefix)[itsSubband]); + const unsigned short port = itsParset.getStoragePort(prefix, itsSubband, itsOutput); LOG_DEBUG_STR("subband " << itsSubband << " written to tcp:" << server << ':' << port << " connecting.."); streamToStorage.reset( new SocketStream(server.c_str(), port, SocketStream::TCP, SocketStream::Client) ); @@ -131,29 +118,25 @@ void OutputThread::mainLoop() // TODO: race condition on creation // TODO: if a storage node blocks, ionproc can't write anymore // in any thread - static Semaphore semaphore(1); - - while ((o = itsSendQueueActivity.remove()) >= 0) { - struct OutputThread::SingleOutput &output = itsOutputs[o]; - StreamableData *data = output.sendQueue.remove(); // will be freed by itsPlans, so we can freely use it without worry we lose the pointer + static Semaphore semaphore(2); + StreamableData *data; + while ((data = itsSendQueue.remove()) != 0) { // prevent too many concurrent writers by locking this scope semaphore.down(); try { - // write header: nr of output - streamToStorage->write( &o, sizeof o ); - // write data, including serial nr data->write(streamToStorage.get(), true); } catch( ... ) { semaphore.up(); + itsFreeQueue.append( data ); // make sure data will be freed throw; } semaphore.up(); // data can now be reused - output.freeQueue.append( data ); + itsFreeQueue.append( data ); } } diff --git a/RTCP/IONProc/src/OutputThread.h b/RTCP/IONProc/src/OutputThread.h index b83f9302bc7daf3508e4e420db669cd6b2b6b571..66c8afd4b833b9d2ed5602abe46ae872bc726c6d 100644 --- a/RTCP/IONProc/src/OutputThread.h +++ b/RTCP/IONProc/src/OutputThread.h @@ -26,7 +26,6 @@ //# Never #include <config.h> or #include <lofar_config.h> in a header file! #include <Interface/StreamableData.h> -#include <Interface/CN_ProcessingPlan.h> #include <Interface/Queue.h> #include <Interface/Parset.h> #include <Interface/Thread.h> @@ -39,27 +38,19 @@ namespace RTCP { class OutputThread { public: - OutputThread(const unsigned subband, const Parset &ps); + OutputThread(const Parset &ps, const unsigned subband, const unsigned output); ~OutputThread(); static const unsigned maxSendQueueSize = 3; - // each output stream has its own queues. the itsSendQueueActivity contains - // the output stream numbers of the queues to which data has been added - struct SingleOutput { - Queue<StreamableData *> freeQueue, sendQueue; - }; - Vector<struct SingleOutput> itsOutputs; // [itsNrOutputs] - unsigned itsNrOutputs; - std::vector<CN_ProcessingPlan<> *> itsPlans; - - Queue<int> itsSendQueueActivity; + Queue<StreamableData *> itsFreeQueue, itsSendQueue; private: void mainLoop(); const Parset &itsParset; const unsigned itsSubband; + const unsigned itsOutput; Thread *thread; }; diff --git a/RTCP/Run/src/LOFAR/Parset.py b/RTCP/Run/src/LOFAR/Parset.py index cabd465b72ee2d9e0bc1c420b1008c85dd051272..2a13305e9e9e9a109dffb84bc5a2a42aeca9329f 100644 --- a/RTCP/Run/src/LOFAR/Parset.py +++ b/RTCP/Run/src/LOFAR/Parset.py @@ -368,9 +368,11 @@ class Parset(util.Parset.Parset): for i,s in enumerate(subbandMapping): node = storageNodes[s] - portnr = globalPorts[i] - localPorts[node].append(portnr) + for o in xrange(nrOutputs): + portnr = globalPorts[i * len(subbandMapping) + o] + + localPorts[node].append(portnr) return localPorts diff --git a/RTCP/Storage/include/Storage/InputThread.h b/RTCP/Storage/include/Storage/InputThread.h index 5199ed779cb4c42f9676f5d8f95f903722174963..c899a356d825839e29c8a90a044ac201e637e915 100644 --- a/RTCP/Storage/include/Storage/InputThread.h +++ b/RTCP/Storage/include/Storage/InputThread.h @@ -27,7 +27,6 @@ #include <Interface/StreamableData.h> #include <Interface/MultiDimArray.h> -#include <Interface/CN_ProcessingPlan.h> #include <Interface/Queue.h> #include <Interface/Thread.h> #include <Stream/Stream.h> @@ -39,33 +38,25 @@ namespace RTCP { class InputThread { public: - InputThread(const Parset *ps, unsigned subbandNumber); + InputThread(const Parset *ps, unsigned subbandNumber, unsigned outputNumber); ~InputThread(); static const unsigned maxReceiveQueueSize = 3; // report if fetching an item from the receive queue takes longer than this (seconds) static const float reportQueueRemoveDelay = 0.05; // report if reading data takes longer than this (seconds) - static const float reportReadDelay = 0.05; + static const float reportReadDelay = 1.10; - struct SingleInput { - Queue<StreamableData *> freeQueue, receiveQueue; - }; - Vector<struct SingleInput> itsInputs; // [itsNrInputs] - unsigned itsNrInputs; - - Queue<unsigned> itsReceiveQueueActivity; - static Queue<unsigned> itsRcvdQueue; + Queue<StreamableData *> itsFreeQueue, itsReceiveQueue; private: void mainLoop(); const Parset *itsPS; - Stream *itsStreamFromION; Thread *thread; - Vector<CN_ProcessingPlan<> *> itsPlans; // [maxReceiveQueueSize] const unsigned itsSubbandNumber; + const unsigned itsOutputNumber; const unsigned itsObservationID; }; diff --git a/RTCP/Storage/include/Storage/OutputThread.h b/RTCP/Storage/include/Storage/OutputThread.h index 9750fcd23d5fce53e649173f2842e22072919b5a..2d893b3871bb42af1dc61b825d49fdbfd4f178ef 100644 --- a/RTCP/Storage/include/Storage/OutputThread.h +++ b/RTCP/Storage/include/Storage/OutputThread.h @@ -42,7 +42,7 @@ namespace RTCP { class OutputThread { public: - OutputThread(const Parset *ps, unsigned subbandNumber, InputThread *inputThread, unsigned nrOutputs, const CN_ProcessingPlan<> &plan); + OutputThread(const Parset *ps, unsigned subbandNumber, unsigned outputNumber, InputThread *inputThread); ~OutputThread(); // report any writes that take longer than this (seconds) @@ -50,7 +50,7 @@ class OutputThread private: void writeLogMessage(); - void checkForDroppedData(StreamableData *data, unsigned output); + void checkForDroppedData(StreamableData *data); void mainLoop(); const Parset *itsPS; @@ -58,13 +58,13 @@ class OutputThread InputThread *itsInputThread; - const unsigned itsNrOutputs; const unsigned itsSubbandNumber; + const unsigned itsOutputNumber; const unsigned itsObservationID; - Vector<MSWriter*> itsWriters; - std::vector<unsigned> itsNextSequenceNumbers; + MSWriter* itsWriter; + unsigned itsNextSequenceNumber; }; } // namespace RTCP diff --git a/RTCP/Storage/include/Storage/SubbandWriter.h b/RTCP/Storage/include/Storage/SubbandWriter.h index 7ca6d9e4ca65d78616dad6c1daff032713026a2e..a08c71fbdb53085fc15ad954eaa09f2b042850d2 100644 --- a/RTCP/Storage/include/Storage/SubbandWriter.h +++ b/RTCP/Storage/include/Storage/SubbandWriter.h @@ -35,10 +35,6 @@ #include <Common/Timer.h> #include <Common/lofar_vector.h> #include <Interface/Parset.h> -#include <Interface/CN_Configuration.h> -#include <Interface/CN_ProcessingPlan.h> -#include <Interface/StreamableData.h> -#include <Interface/Queue.h> #include <Storage/InputThread.h> #include <Storage/OutputThread.h> #include <Storage/MSWriter.h> @@ -63,9 +59,6 @@ class SubbandWriter unsigned itsRank; unsigned itsSize; unsigned itsObservationID; - CN_Configuration itsConfiguration; - CN_ProcessingPlan<> itsPlan; - unsigned itsNrOutputs; std::vector<InputThread *> itsInputThreads; std::vector<OutputThread *> itsOutputThreads; diff --git a/RTCP/Storage/src/InputThread.cc b/RTCP/Storage/src/InputThread.cc index a91d2a62df60bc965897b1d409cfde88071984e0..78fdbbc0ee35eb32f5b5ad996c0c2885dae6606e 100644 --- a/RTCP/Storage/src/InputThread.cc +++ b/RTCP/Storage/src/InputThread.cc @@ -25,6 +25,7 @@ #include <Storage/InputThread.h> #include <Interface/StreamableData.h> +#include <Interface/CN_ProcessingPlan.h> #include <Stream/FileStream.h> #include <Stream/NullStream.h> #include <Stream/SocketStream.h> @@ -34,34 +35,26 @@ namespace LOFAR { namespace RTCP { - Queue<unsigned> InputThread::itsRcvdQueue; - - InputThread::InputThread(const Parset *ps, unsigned subbandNumber) +InputThread::InputThread(const Parset *ps, unsigned subbandNumber, unsigned outputNumber) : - itsInputs(0), - itsNrInputs(0), itsPS(ps), - itsPlans(maxReceiveQueueSize), itsSubbandNumber(subbandNumber), + itsOutputNumber(outputNumber), itsObservationID(ps->observationID()) { // transpose output stream holders CN_Configuration configuration(*ps); + CN_ProcessingPlan<> plan(configuration); + plan.removeNonOutputs(); + + const ProcessingPlan::planlet &p = plan.plan[outputNumber]; for (unsigned i = 0; i < maxReceiveQueueSize; i ++) { - itsPlans[i] = new CN_ProcessingPlan<>( configuration, false, true ); - itsPlans[i]->removeNonOutputs(); - itsPlans[i]->allocateOutputs( heapAllocator ); - - if( itsNrInputs == 0 ) { - // do this only the first time - itsNrInputs = itsPlans[i]->nrOutputs(); - itsInputs.resize( itsNrInputs ); - } + StreamableData *data = p.source->clone(); - for (unsigned o = 0; o < itsNrInputs; o ++ ) { - itsInputs[o].freeQueue.append( itsPlans[i]->plan[o].source ); - } + data->allocate(); + + itsFreeQueue.append( data ); } thread = new Thread(this, &InputThread::mainLoop); @@ -71,6 +64,12 @@ namespace RTCP { InputThread::~InputThread() { delete thread; + + while (!itsReceiveQueue.empty()) + delete itsReceiveQueue.remove(); + + while (!itsFreeQueue.empty()) + delete itsFreeQueue.remove(); } @@ -81,23 +80,21 @@ void InputThread::mainLoop() string connectionType = itsPS->getString(prefix + "_Transport"); bool nullInput = false; - unsigned subbandNumber = itsSubbandNumber; - if (connectionType == "NULL") { - LOG_DEBUG_STR("subband " << subbandNumber << " read from null stream"); + LOG_DEBUG_STR("subband " << itsSubbandNumber << " read from null stream"); streamFromION.reset( new NullStream ); nullInput = true; } else if (connectionType == "TCP") { - std::string server = itsPS->storageHostName(prefix + "_ServerHosts", subbandNumber); - unsigned short port = boost::lexical_cast<unsigned short>(itsPS->getPortsOf(prefix)[subbandNumber]); + std::string server = itsPS->storageHostName(prefix + "_ServerHosts", itsSubbandNumber); + const unsigned short port = itsPS->getStoragePort(prefix, itsSubbandNumber, itsOutputNumber); - LOG_DEBUG_STR("subband " << subbandNumber << " read from tcp:" << server << ':' << port); + LOG_DEBUG_STR("subband " << itsSubbandNumber << " read from tcp:" << server << ':' << port); streamFromION.reset( new SocketStream(server.c_str(), port, SocketStream::TCP, SocketStream::Server) ); } else if (connectionType == "FILE") { std::string filename = itsPS->getString(prefix + "_BaseFileName") + '.' + - boost::lexical_cast<std::string>(subbandNumber); + boost::lexical_cast<std::string>(itsSubbandNumber); - LOG_DEBUG_STR("subband " << subbandNumber << " read from file:" << filename); + LOG_DEBUG_STR("subband " << itsSubbandNumber << " read from file:" << filename); streamFromION.reset( new FileStream(filename.c_str()) ); } else { THROW(StorageException, "unsupported ION->Storage stream type: " << connectionType); @@ -105,8 +102,7 @@ void InputThread::mainLoop() // limit reads from NullStream to 10 blocks; otherwise unlimited unsigned increment = nullInput ? 1 : 0; - StreamableData *data = 0; - + std::auto_ptr<StreamableData> data; try { for (unsigned count = 0; count < 10; count += increment) { @@ -114,22 +110,13 @@ void InputThread::mainLoop() NSTimer queueTimer("retrieve freeQueue item",false,false); NSTimer readTimer("read data",false,false); - // read header: output number - streamFromION->read( &o, sizeof o ); - -#if !defined WORDS_BIGENDIAN - dataConvert( LittleEndian, &o, 1 ); -#endif - - struct InputThread::SingleInput &input = itsInputs[o]; - // read data queueTimer.start(); - data = input.freeQueue.remove(); + data.reset( itsFreeQueue.remove() ); queueTimer.stop(); if( queueTimer.getElapsed() > reportQueueRemoveDelay ) { - LOG_WARN_STR( "observation " << itsObservationID << " subband " << itsSubbandNumber << " output " << o << " " << queueTimer ); + LOG_WARN_STR( "observation " << itsObservationID << " subband " << itsSubbandNumber << " output " << itsOutputNumber << " " << queueTimer ); } readTimer.start(); @@ -137,22 +124,15 @@ void InputThread::mainLoop() readTimer.stop(); if( readTimer.getElapsed() > reportReadDelay ) { - LOG_WARN_STR( "observation " << itsObservationID << " subband " << itsSubbandNumber << " output " << o << " " << readTimer ); + LOG_WARN_STR( "observation " << itsObservationID << " subband " << itsSubbandNumber << " output " << itsOutputNumber << " " << readTimer ); } - input.receiveQueue.append(data); - - // signal to the subbandwriter that we obtained data - itsReceiveQueueActivity.append(o); + itsReceiveQueue.append(data.release()); } } catch (Stream::EndOfStreamException &) { - itsInputs[0].freeQueue.append(data); // to include data when freeing, so actual queue number does not matter } - for (unsigned o = 0; o < itsNrInputs; o++ ) { - itsInputs[o].receiveQueue.append(0); // no more data - itsReceiveQueueActivity.append(o); - } + itsReceiveQueue.append(0); // no more data } diff --git a/RTCP/Storage/src/OutputThread.cc b/RTCP/Storage/src/OutputThread.cc index 1585c9816954e0295649a5e1d6c8d0cb85ab1330..679ed00ca3a38c95505cddbc19ba450200de6991 100644 --- a/RTCP/Storage/src/OutputThread.cc +++ b/RTCP/Storage/src/OutputThread.cc @@ -28,50 +28,56 @@ #include <Storage/MSWriterFile.h> #include <Storage/MSWriterNull.h> #include <Interface/StreamableData.h> +#include <Interface/CN_ProcessingPlan.h> #include <Common/DataConvert.h> +#include <stdio.h> namespace LOFAR { namespace RTCP { -OutputThread::OutputThread(const Parset *ps, unsigned subbandNumber, InputThread *inputThread, unsigned nrOutputs, const CN_ProcessingPlan<> &plan) +OutputThread::OutputThread(const Parset *ps, unsigned subbandNumber, unsigned outputNumber, InputThread *inputThread) : itsPS(ps), itsInputThread(inputThread), - itsNrOutputs(nrOutputs), itsSubbandNumber(subbandNumber), + itsOutputNumber(outputNumber), itsObservationID(ps->observationID()), - itsNextSequenceNumbers(itsNrOutputs,0) + itsNextSequenceNumber(0) { - itsWriters.resize(itsNrOutputs); - for (unsigned output = 0; output < itsNrOutputs; output++ ) { - string filename; + string filename; + CN_Configuration configuration(*ps); + CN_ProcessingPlan<> plan(configuration); + plan.removeNonOutputs(); + + const ProcessingPlan::planlet &p = plan.plan[outputNumber]; #if 0 - // null writer - itsWriters[output] = new MSWriterNull(); + // null writer + itsWriters[output] = new MSWriterNull(); - LOG_DEBUG_STR("subband " << subbandNumber << " written to null"); + LOG_DEBUG_STR("subband " << subbandNumber << " written to null"); #else - if( dynamic_cast<CorrelatedData*>( plan.plan[output].source ) ) { - std::stringstream out; - out << itsPS->getMSname(subbandNumber) << "/table.f" << output << "data"; - filename = out.str(); - } else { - // raw writer - filename = itsPS->getMSname(subbandNumber) + plan.plan[output].filenameSuffix; - } + if( dynamic_cast<CorrelatedData*>( p.source ) ) { + std::stringstream out; + out << itsPS->getMSname(subbandNumber) << "/table.f" << outputNumber << "data"; + filename = out.str(); + } else { + // raw writer + std::stringstream out; + out << itsPS->getMSname(subbandNumber) << p.filenameSuffix; + filename = out.str(); + } - LOG_DEBUG_STR("subband " << subbandNumber << " written to " << filename); + LOG_DEBUG_STR("subband " << subbandNumber << " output " << outputNumber << " written to " << filename); - try { - itsWriters[output] = new MSWriterFile(filename.c_str()); - } catch( SystemCallException &ex ) { - LOG_ERROR_STR( "Cannot open " << filename << ": " << ex ); + try { + itsWriter = new MSWriterFile(filename.c_str()); + } catch( SystemCallException &ex ) { + LOG_ERROR_STR( "Cannot open " << filename << ": " << ex ); - itsWriters[output] = new MSWriterNull(); - } -#endif + itsWriter = new MSWriterNull(); } +#endif thread = new Thread(this, &OutputThread::mainLoop); } @@ -81,9 +87,7 @@ OutputThread::~OutputThread() { delete thread; - for (unsigned i = 0; i < itsNrOutputs; i++) { - delete itsWriters[i]; - } + delete itsWriter; } @@ -109,16 +113,16 @@ void OutputThread::writeLogMessage() } -void OutputThread::checkForDroppedData(StreamableData *data, unsigned output) +void OutputThread::checkForDroppedData(StreamableData *data) { - unsigned expectedSequenceNumber = itsNextSequenceNumbers[output]; + unsigned expectedSequenceNumber = itsNextSequenceNumber; unsigned droppedBlocks = data->sequenceNumber - expectedSequenceNumber; if (droppedBlocks > 0) { - LOG_WARN_STR("dropped " << droppedBlocks << (droppedBlocks == 1 ? " block for subband " : " blocks for subband ") << itsSubbandNumber << " and output " << output << " of obsID " << itsObservationID); + LOG_WARN_STR("dropped " << droppedBlocks << (droppedBlocks == 1 ? " block for subband " : " blocks for subband ") << itsSubbandNumber << " and output " << itsOutputNumber << " of obsID " << itsObservationID); } - itsNextSequenceNumbers[output] = data->sequenceNumber + 1; + itsNextSequenceNumber = data->sequenceNumber + 1; } @@ -133,31 +137,28 @@ void OutputThread::mainLoop() for(;;) { - unsigned o = itsInputThread->itsReceiveQueueActivity.remove(); - struct InputThread::SingleInput &input = itsInputThread->itsInputs[o]; NSTimer writeTimer("write data",false,false); + std::auto_ptr<StreamableData> data( itsInputThread->itsReceiveQueue.remove() ); - StreamableData *data = input.receiveQueue.remove(); - - if (data == 0) { + if (data.get() == 0) { break; } - checkForDroppedData(data, o); + checkForDroppedData(data.get()); writeTimer.start(); semaphore.down(); - itsWriters[o]->write(data); + itsWriter->write(data.get()); semaphore.up(); writeTimer.stop(); if( writeTimer.getElapsed() > reportWriteDelay ) { - LOG_WARN_STR( "observation " << itsObservationID << " subband " << itsSubbandNumber << " output " << o << " " << writeTimer ); + LOG_WARN_STR( "observation " << itsObservationID << " subband " << itsSubbandNumber << " output " << itsOutputNumber << " " << writeTimer ); } - input.freeQueue.append(data); + itsInputThread->itsFreeQueue.append(data.release()); } } diff --git a/RTCP/Storage/src/SubbandWriter.cc b/RTCP/Storage/src/SubbandWriter.cc index b5d4b245bfce67bb40ff9402026da6f1630a6edd..ad6264843a56afa7240c3fa7dded38d8a24bd5a8 100644 --- a/RTCP/Storage/src/SubbandWriter.cc +++ b/RTCP/Storage/src/SubbandWriter.cc @@ -29,7 +29,8 @@ #include <Storage/MeasurementSetFormat.h> #include <Stream/SystemCallException.h> #include <Interface/Exceptions.h> -#include <Interface/CorrelatedData.h> +#include <Interface/CN_Configuration.h> +#include <Interface/CN_ProcessingPlan.h> #ifdef USE_MAC_PI #include <GCF/GCF_PVDouble.h> @@ -49,16 +50,11 @@ SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank, unsigned size) itsPS(ps), itsRank(rank), itsSize(size), - itsObservationID(ps->observationID()), - itsConfiguration(*ps), - itsPlan(itsConfiguration,false,true), - itsNrOutputs(itsPlan.nrOutputs()) + itsObservationID(ps->observationID()) #ifdef USE_MAC_PI ,itsPropertySet(0) #endif { - itsPlan.removeNonOutputs(); - #ifdef USE_MAC_PI itsWriteToMAC = itsPS.getBool("Storage.WriteToMAC"); #endif @@ -89,6 +85,9 @@ void SubbandWriter::preprocess() unsigned lastSubband = std::min( firstSubband + nrSubbandsPerStorage, nrSubbands ) - 1; unsigned myNrSubbands = lastSubband - firstSubband + 1; + CN_Configuration configuration(*itsPS); + const CN_ProcessingPlan<> plan(configuration); + #if defined HAVE_AIPSPP LOG_TRACE_FLOW("SubbandWriter enabling PropertySet"); #ifdef USE_MAC_PI @@ -116,16 +115,18 @@ void SubbandWriter::preprocess() LOG_INFO_STR("MeasurementSet created"); } - LOG_DEBUG_STR("Subbands per storage = " << nrSubbandsPerStorage << ", I will store " << myNrSubbands << " subbands, nrOutputs = " << itsNrOutputs); + LOG_DEBUG_STR("Subbands per storage = " << nrSubbandsPerStorage << ", I will store " << myNrSubbands << " subbands, nrOutputs = " << plan.nrOutputs()); #endif // defined HAVE_AIPSPP for (unsigned sb = firstSubband; sb <= lastSubband; sb ++) { - InputThread *i = new InputThread(itsPS, sb); - OutputThread *o = new OutputThread(itsPS, sb, i, itsNrOutputs, itsPlan); + for (unsigned output = 0; output < plan.nrOutputs(); output ++) { + InputThread *i = new InputThread(itsPS, sb, output); + OutputThread *o = new OutputThread(itsPS, sb, output, i); - itsInputThreads.push_back(i); - itsOutputThreads.push_back(o); + itsInputThreads.push_back(i); + itsOutputThreads.push_back(o); + } } }