Skip to content
Snippets Groups Projects
Commit 1fe66697 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #4820: Replaced Common/Thread model with OpenMP

parent d6533486
No related branches found
No related tags found
No related merge requests found
...@@ -49,20 +49,7 @@ namespace LOFAR ...@@ -49,20 +49,7 @@ namespace LOFAR
} }
void InputThread::start() void InputThread::process()
{
itsThread = new Thread(this, &InputThread::mainLoop, itsLogPrefix);
}
void InputThread::cancel()
{
if (itsThread)
itsThread->cancel();
}
void InputThread::mainLoop()
{ {
try { try {
LOG_INFO_STR(itsLogPrefix << "Creating connection from " << itsInputDescriptor << "..." ); LOG_INFO_STR(itsLogPrefix << "Creating connection from " << itsInputDescriptor << "..." );
......
...@@ -25,7 +25,6 @@ ...@@ -25,7 +25,6 @@
#include <string> #include <string>
#include <Common/Thread/Thread.h>
#include <Common/Thread/Queue.h> #include <Common/Thread/Queue.h>
#include <CoInterface/OutputTypes.h> #include <CoInterface/OutputTypes.h>
#include <CoInterface/Parset.h> #include <CoInterface/Parset.h>
...@@ -49,15 +48,11 @@ namespace LOFAR ...@@ -49,15 +48,11 @@ namespace LOFAR
Queue<SmartPtr<StreamableData> > &receiveQueue, Queue<SmartPtr<StreamableData> > &receiveQueue,
const std::string &logPrefix); const std::string &logPrefix);
void start(); void process();
void cancel();
private: private:
void mainLoop();
const std::string itsLogPrefix, itsInputDescriptor; const std::string itsLogPrefix, itsInputDescriptor;
Queue<SmartPtr<StreamableData> > &itsFreeQueue, &itsReceiveQueue; Queue<SmartPtr<StreamableData> > &itsFreeQueue, &itsReceiveQueue;
SmartPtr<Thread> itsThread;
const double itsDeadline; const double itsDeadline;
}; };
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include <unistd.h> #include <unistd.h>
#include <iomanip> #include <iomanip>
#include <boost/format.hpp> #include <boost/format.hpp>
#include <boost/algorithm/string.hpp>
#include <Common/StringUtil.h> #include <Common/StringUtil.h>
#include <Common/SystemCallException.h> #include <Common/SystemCallException.h>
...@@ -88,7 +89,7 @@ namespace LOFAR ...@@ -88,7 +89,7 @@ namespace LOFAR
string curdir; string curdir;
vector<string> splitName; vector<string> splitName;
split(splitName, dirname, is_any_of("/")); boost::split(splitName, dirname, boost::is_any_of("/"));
for (unsigned i = 0; i < splitName.size(); i++) { for (unsigned i = 0; i < splitName.size(); i++) {
curdir += splitName[i] + '/'; curdir += splitName[i] + '/';
...@@ -114,12 +115,6 @@ namespace LOFAR ...@@ -114,12 +115,6 @@ namespace LOFAR
} }
void OutputThread::start()
{
itsThread = new Thread(this, &OutputThread::mainLoop, itsLogPrefix);
}
void OutputThread::createMS() void OutputThread::createMS()
{ {
// even the HDF5 writer accesses casacore, to perform conversions // even the HDF5 writer accesses casacore, to perform conversions
...@@ -259,11 +254,6 @@ namespace LOFAR ...@@ -259,11 +254,6 @@ namespace LOFAR
void OutputThread::augment( const FinalMetaData &finalMetaData ) void OutputThread::augment( const FinalMetaData &finalMetaData )
{ {
// wait for writer thread to finish, so we'll have an itsWriter
ASSERT(itsThread.get());
itsThread = 0;
// augment the data product // augment the data product
ASSERT(itsWriter.get()); ASSERT(itsWriter.get());
...@@ -271,9 +261,9 @@ namespace LOFAR ...@@ -271,9 +261,9 @@ namespace LOFAR
} }
void OutputThread::mainLoop() void OutputThread::process()
{ {
LOG_DEBUG_STR(itsLogPrefix << "OutputThread::mainLoop() entered"); LOG_DEBUG_STR(itsLogPrefix << "OutputThread::process() entered");
try { try {
createMS(); createMS();
......
...@@ -27,7 +27,6 @@ ...@@ -27,7 +27,6 @@
#include <vector> #include <vector>
#include <Common/Thread/Queue.h> #include <Common/Thread/Queue.h>
#include <Common/Thread/Thread.h>
#include <Stream/FileStream.h> #include <Stream/FileStream.h>
#include <CoInterface/OutputTypes.h> #include <CoInterface/OutputTypes.h>
#include <CoInterface/SmartPtr.h> #include <CoInterface/SmartPtr.h>
...@@ -46,7 +45,7 @@ namespace LOFAR ...@@ -46,7 +45,7 @@ namespace LOFAR
public: public:
OutputThread(const Parset &, OutputType, unsigned streamNr, Queue<SmartPtr<StreamableData> > &freeQueue, Queue<SmartPtr<StreamableData> > &receiveQueue, const std::string &logPrefix, const std::string &targetDirectory = ""); OutputThread(const Parset &, OutputType, unsigned streamNr, Queue<SmartPtr<StreamableData> > &freeQueue, Queue<SmartPtr<StreamableData> > &receiveQueue, const std::string &logPrefix, const std::string &targetDirectory = "");
void start(); void process();
// needed in createHeaders.cc // needed in createHeaders.cc
void createMS(); void createMS();
...@@ -59,7 +58,6 @@ namespace LOFAR ...@@ -59,7 +58,6 @@ namespace LOFAR
private: private:
void checkForDroppedData(StreamableData *); void checkForDroppedData(StreamableData *);
void doWork(); void doWork();
void mainLoop();
const Parset &itsParset; const Parset &itsParset;
const OutputType itsOutputType; const OutputType itsOutputType;
...@@ -73,7 +71,6 @@ namespace LOFAR ...@@ -73,7 +71,6 @@ namespace LOFAR
unsigned itsNrExpectedBlocks; unsigned itsNrExpectedBlocks;
unsigned itsNextSequenceNumber; unsigned itsNextSequenceNumber;
SmartPtr<MSWriter> itsWriter; SmartPtr<MSWriter> itsWriter;
SmartPtr<Thread> itsThread;
}; };
......
...@@ -33,22 +33,27 @@ namespace LOFAR ...@@ -33,22 +33,27 @@ namespace LOFAR
SubbandWriter::SubbandWriter(const Parset &parset, OutputType outputType, unsigned streamNr, const std::string &logPrefix) SubbandWriter::SubbandWriter(const Parset &parset, OutputType outputType, unsigned streamNr, const std::string &logPrefix)
{ {
itsInputThread = new InputThread(parset, outputType, streamNr, itsFreeQueue, itsReceiveQueue, logPrefix); itsInputThread = new InputThread(parset, outputType, streamNr, itsFreeQueue, itsReceiveQueue, logPrefix);
itsInputThread->start(); itsOutputThread = new OutputThread(parset, outputType, streamNr, itsFreeQueue, itsReceiveQueue, logPrefix);
try
{
itsOutputThread = new OutputThread(parset, outputType, streamNr, itsFreeQueue, itsReceiveQueue, logPrefix);
itsOutputThread->start();
}
catch (...)
{
itsInputThread->cancel();
throw;
}
for (unsigned i = 0; i < maxReceiveQueueSize; i++) for (unsigned i = 0; i < maxReceiveQueueSize; i++)
itsFreeQueue.append(newStreamableData(parset, outputType, streamNr)); itsFreeQueue.append(newStreamableData(parset, outputType, streamNr));
}
void SubbandWriter::process()
{
# pragma omp parallel sections num_threads(2)
{
# pragma omp section
{
itsInputThread->process();
}
# pragma omp section
{
itsOutputThread->process();
}
}
} }
void SubbandWriter::augment( const FinalMetaData &finalMetaData ) void SubbandWriter::augment( const FinalMetaData &finalMetaData )
......
...@@ -45,6 +45,8 @@ namespace LOFAR ...@@ -45,6 +45,8 @@ namespace LOFAR
unsigned streamNr, unsigned streamNr,
const std::string &logPrefix); const std::string &logPrefix);
void process();
void augment(const FinalMetaData &finalMetaData); void augment(const FinalMetaData &finalMetaData);
ParameterSet feedbackLTA() const; ParameterSet feedbackLTA() const;
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include <cstdlib> #include <cstdlib>
#include <cstdio> #include <cstdio>
#include <cstring> #include <cstring>
#include <omp.h>
#include <string> #include <string>
#include <vector> #include <vector>
...@@ -139,6 +140,10 @@ void process(Stream &controlStream, size_t myRank) ...@@ -139,6 +140,10 @@ void process(Stream &controlStream, size_t myRank)
vector<SmartPtr<SubbandWriter> > subbandWriters; vector<SmartPtr<SubbandWriter> > subbandWriters;
/*
* Construct writers
*/
// Process correlated data // Process correlated data
if (parset.settings.correlator.enabled) { if (parset.settings.correlator.enabled) {
for (size_t fileIdx = 0; fileIdx < parset.settings.correlator.files.size(); ++fileIdx) for (size_t fileIdx = 0; fileIdx < parset.settings.correlator.files.size(); ++fileIdx)
...@@ -169,6 +174,16 @@ void process(Stream &controlStream, size_t myRank) ...@@ -169,6 +174,16 @@ void process(Stream &controlStream, size_t myRank)
} }
} }
/*
* PROCESS
*/
# pragma omp parallel for num_threads(subbandWriters.size())
for (int i = 0; i < (int)subbandWriters.size(); ++i)
{
subbandWriters[i]->process();
}
/* /*
* FINAL META DATA * FINAL META DATA
*/ */
...@@ -194,6 +209,8 @@ int main(int argc, char *argv[]) ...@@ -194,6 +209,8 @@ int main(int argc, char *argv[])
setvbuf(stdout, stdoutbuf, _IOLBF, sizeof stdoutbuf); setvbuf(stdout, stdoutbuf, _IOLBF, sizeof stdoutbuf);
setvbuf(stderr, stderrbuf, _IOLBF, sizeof stderrbuf); setvbuf(stderr, stderrbuf, _IOLBF, sizeof stderrbuf);
omp_set_nested(true);
LOG_DEBUG_STR("Started: " << argv[0] << ' ' << argv[1] << ' ' << argv[2]); LOG_DEBUG_STR("Started: " << argv[0] << ' ' << argv[1] << ' ' << argv[2]);
int observationID = boost::lexical_cast<int>(argv[1]); int observationID = boost::lexical_cast<int>(argv[1]);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment