diff --git a/RTCP/Cobalt/OutputProc/src/InputThread.cc b/RTCP/Cobalt/OutputProc/src/InputThread.cc index 564b076cad2546442d3652fa417f3d11494c3c9e..cb0458763b4280894db16f33fa3b7ee16b9033a8 100644 --- a/RTCP/Cobalt/OutputProc/src/InputThread.cc +++ b/RTCP/Cobalt/OutputProc/src/InputThread.cc @@ -49,20 +49,7 @@ namespace LOFAR } - void InputThread::start() - { - itsThread = new Thread(this, &InputThread::mainLoop, itsLogPrefix); - } - - - void InputThread::cancel() - { - if (itsThread) - itsThread->cancel(); - } - - - void InputThread::mainLoop() + void InputThread::process() { try { LOG_INFO_STR(itsLogPrefix << "Creating connection from " << itsInputDescriptor << "..." ); diff --git a/RTCP/Cobalt/OutputProc/src/InputThread.h b/RTCP/Cobalt/OutputProc/src/InputThread.h index 028d97e5668fb3f59fae8851d775812a5a508183..013c6904b809a74c3b1138d9528bb2f023e87a94 100644 --- a/RTCP/Cobalt/OutputProc/src/InputThread.h +++ b/RTCP/Cobalt/OutputProc/src/InputThread.h @@ -25,7 +25,6 @@ #include <string> -#include <Common/Thread/Thread.h> #include <Common/Thread/Queue.h> #include <CoInterface/OutputTypes.h> #include <CoInterface/Parset.h> @@ -49,15 +48,11 @@ namespace LOFAR Queue<SmartPtr<StreamableData> > &receiveQueue, const std::string &logPrefix); - void start(); - void cancel(); + void process(); private: - void mainLoop(); - const std::string itsLogPrefix, itsInputDescriptor; Queue<SmartPtr<StreamableData> > &itsFreeQueue, &itsReceiveQueue; - SmartPtr<Thread> itsThread; const double itsDeadline; }; diff --git a/RTCP/Cobalt/OutputProc/src/OutputThread.cc b/RTCP/Cobalt/OutputProc/src/OutputThread.cc index 6440f6d628784aa32b2f9de64fe2b81491a28675..8a318ccb8d4c71440666d7b0f3033ef0213a6406 100644 --- a/RTCP/Cobalt/OutputProc/src/OutputThread.cc +++ b/RTCP/Cobalt/OutputProc/src/OutputThread.cc @@ -30,6 +30,7 @@ #include <unistd.h> #include <iomanip> #include <boost/format.hpp> +#include <boost/algorithm/string.hpp> #include <Common/StringUtil.h> #include <Common/SystemCallException.h> @@ -88,7 +89,7 @@ namespace LOFAR string curdir; vector<string> splitName; - split(splitName, dirname, is_any_of("/")); + boost::split(splitName, dirname, boost::is_any_of("/")); for (unsigned i = 0; i < splitName.size(); i++) { curdir += splitName[i] + '/'; @@ -114,12 +115,6 @@ namespace LOFAR } - void OutputThread::start() - { - itsThread = new Thread(this, &OutputThread::mainLoop, itsLogPrefix); - } - - void OutputThread::createMS() { // even the HDF5 writer accesses casacore, to perform conversions @@ -259,11 +254,6 @@ namespace LOFAR void OutputThread::augment( const FinalMetaData &finalMetaData ) { - // wait for writer thread to finish, so we'll have an itsWriter - ASSERT(itsThread.get()); - - itsThread = 0; - // augment the data product ASSERT(itsWriter.get()); @@ -271,9 +261,9 @@ namespace LOFAR } - void OutputThread::mainLoop() + void OutputThread::process() { - LOG_DEBUG_STR(itsLogPrefix << "OutputThread::mainLoop() entered"); + LOG_DEBUG_STR(itsLogPrefix << "OutputThread::process() entered"); try { createMS(); diff --git a/RTCP/Cobalt/OutputProc/src/OutputThread.h b/RTCP/Cobalt/OutputProc/src/OutputThread.h index 99193ed2bae9d8fdfc2c5b8c20af7e8caaae2118..01258015ce051861f3a667d8545d7eb698c43125 100644 --- a/RTCP/Cobalt/OutputProc/src/OutputThread.h +++ b/RTCP/Cobalt/OutputProc/src/OutputThread.h @@ -27,7 +27,6 @@ #include <vector> #include <Common/Thread/Queue.h> -#include <Common/Thread/Thread.h> #include <Stream/FileStream.h> #include <CoInterface/OutputTypes.h> #include <CoInterface/SmartPtr.h> @@ -46,7 +45,7 @@ namespace LOFAR public: OutputThread(const Parset &, OutputType, unsigned streamNr, Queue<SmartPtr<StreamableData> > &freeQueue, Queue<SmartPtr<StreamableData> > &receiveQueue, const std::string &logPrefix, const std::string &targetDirectory = ""); - void start(); + void process(); // needed in createHeaders.cc void createMS(); @@ -59,7 +58,6 @@ namespace LOFAR private: void checkForDroppedData(StreamableData *); void doWork(); - void mainLoop(); const Parset &itsParset; const OutputType itsOutputType; @@ -73,7 +71,6 @@ namespace LOFAR unsigned itsNrExpectedBlocks; unsigned itsNextSequenceNumber; SmartPtr<MSWriter> itsWriter; - SmartPtr<Thread> itsThread; }; diff --git a/RTCP/Cobalt/OutputProc/src/SubbandWriter.cc b/RTCP/Cobalt/OutputProc/src/SubbandWriter.cc index 84aeff06be4c170a51e106a716b13a9cd74664cd..82843937b986aaba448d6bc824922de01c871c1d 100644 --- a/RTCP/Cobalt/OutputProc/src/SubbandWriter.cc +++ b/RTCP/Cobalt/OutputProc/src/SubbandWriter.cc @@ -33,22 +33,27 @@ namespace LOFAR SubbandWriter::SubbandWriter(const Parset &parset, OutputType outputType, unsigned streamNr, const std::string &logPrefix) { itsInputThread = new InputThread(parset, outputType, streamNr, itsFreeQueue, itsReceiveQueue, logPrefix); - itsInputThread->start(); - - try - { - itsOutputThread = new OutputThread(parset, outputType, streamNr, itsFreeQueue, itsReceiveQueue, logPrefix); - itsOutputThread->start(); - } - catch (...) - { - itsInputThread->cancel(); - throw; - } + itsOutputThread = new OutputThread(parset, outputType, streamNr, itsFreeQueue, itsReceiveQueue, logPrefix); for (unsigned i = 0; i < maxReceiveQueueSize; i++) itsFreeQueue.append(newStreamableData(parset, outputType, streamNr)); + } + + void SubbandWriter::process() + { +# pragma omp parallel sections num_threads(2) + { +# pragma omp section + { + itsInputThread->process(); + } + +# pragma omp section + { + itsOutputThread->process(); + } + } } void SubbandWriter::augment( const FinalMetaData &finalMetaData ) diff --git a/RTCP/Cobalt/OutputProc/src/SubbandWriter.h b/RTCP/Cobalt/OutputProc/src/SubbandWriter.h index b5f59cd5c83c5bd32181cbfacdf6ec6deef3c8f5..2accd4db5ecaa01bb6de6e6b9e60a3179928f3a3 100644 --- a/RTCP/Cobalt/OutputProc/src/SubbandWriter.h +++ b/RTCP/Cobalt/OutputProc/src/SubbandWriter.h @@ -45,6 +45,8 @@ namespace LOFAR unsigned streamNr, const std::string &logPrefix); + void process(); + void augment(const FinalMetaData &finalMetaData); ParameterSet feedbackLTA() const; diff --git a/RTCP/Cobalt/OutputProc/src/outputProc.cc b/RTCP/Cobalt/OutputProc/src/outputProc.cc index 8d3b73780d04a0983523f6db6fb8931852a2ea1c..21af732014ffa2199105191a9e3d0d212364674e 100644 --- a/RTCP/Cobalt/OutputProc/src/outputProc.cc +++ b/RTCP/Cobalt/OutputProc/src/outputProc.cc @@ -24,6 +24,7 @@ #include <cstdlib> #include <cstdio> #include <cstring> +#include <omp.h> #include <string> #include <vector> @@ -139,6 +140,10 @@ void process(Stream &controlStream, size_t myRank) vector<SmartPtr<SubbandWriter> > subbandWriters; + /* + * Construct writers + */ + // Process correlated data if (parset.settings.correlator.enabled) { for (size_t fileIdx = 0; fileIdx < parset.settings.correlator.files.size(); ++fileIdx) @@ -169,6 +174,16 @@ void process(Stream &controlStream, size_t myRank) } } + /* + * PROCESS + */ + +# pragma omp parallel for num_threads(subbandWriters.size()) + for (int i = 0; i < (int)subbandWriters.size(); ++i) + { + subbandWriters[i]->process(); + } + /* * FINAL META DATA */ @@ -194,6 +209,8 @@ int main(int argc, char *argv[]) setvbuf(stdout, stdoutbuf, _IOLBF, sizeof stdoutbuf); setvbuf(stderr, stderrbuf, _IOLBF, sizeof stderrbuf); + omp_set_nested(true); + LOG_DEBUG_STR("Started: " << argv[0] << ' ' << argv[1] << ' ' << argv[2]); int observationID = boost::lexical_cast<int>(argv[1]);