diff --git a/RTCP/Storage/include/Storage/InputThread.h b/RTCP/Storage/include/Storage/InputThread.h index d1961eca82a2dea8664ebbd7d0ceaf2e286587b4..f8470695639fdf832d2321e35960a0a0f9d798a8 100644 --- a/RTCP/Storage/include/Storage/InputThread.h +++ b/RTCP/Storage/include/Storage/InputThread.h @@ -38,28 +38,21 @@ namespace RTCP { class InputThread { public: - InputThread(const Parset *ps, unsigned subbandNumber, unsigned outputNumber, StreamableData *dataTemplate); + InputThread(const Parset &, unsigned subbandNumber, unsigned outputNumber, const std::string &inputDescription, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue); ~InputThread(); - static const unsigned maxReceiveQueueSize = 3; - // report if fetching an item from the receive queue takes longer than this (seconds) - static const float reportQueueRemoveDelay = 0.05; - // report if reading data takes longer than this (seconds) - static const float reportReadDelay = 1.10; - - Queue<StreamableData *> itsFreeQueue, itsReceiveQueue; - private: void mainLoop(); - const Parset *itsPS; - InterruptibleThread *itsThread; - const unsigned itsSubbandNumber; const unsigned itsOutputNumber; + const std::string itsInputDescription; const unsigned itsObservationID; + Queue<StreamableData *> &itsFreeQueue, &itsReceiveQueue; + volatile bool itsConnecting; + InterruptibleThread itsThread; }; } // namespace RTCP diff --git a/RTCP/Storage/include/Storage/OutputThread.h b/RTCP/Storage/include/Storage/OutputThread.h index 7aa71a3143e2128a8bb46c809230403da5a7205b..748512990a7ab07578130b12fe506f20942f9476 100644 --- a/RTCP/Storage/include/Storage/OutputThread.h +++ b/RTCP/Storage/include/Storage/OutputThread.h @@ -28,11 +28,11 @@ #include <Interface/StreamableData.h> #include <Interface/MultiDimArray.h> #include <Interface/CN_ProcessingPlan.h> +#include <Interface/Queue.h> #include <Interface/Thread.h> #include <Interface/Mutex.h> -#include <Stream/Stream.h> #include <Stream/FileStream.h> -#include <Storage/InputThread.h> +#include <Stream/Stream.h> #include <Storage/MSWriter.h> #include <Common/Semaphore.h> #include <Common/Timer.h> @@ -46,25 +46,20 @@ namespace RTCP { class OutputThread { public: - OutputThread(const Parset *ps, unsigned subbandNumber, unsigned outputNumber, InputThread *inputThread, const ProcessingPlan::planlet &outputConfig); - ~OutputThread(); - - + OutputThread(const Parset &, unsigned subbandNumber, unsigned outputNumber, const ProcessingPlan::planlet &outputConfig, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue); + ~OutputThread(); // report any writes that take longer than this (seconds) static const float reportWriteDelay = 0.05; private: - void writeLogMessage(); - void flushSeqNumbers(); + void writeLogMessage(unsigned sequenceNumber); + void flushSequenceNumbers(); void checkForDroppedData(StreamableData *data); void mainLoop(); - const Parset *itsPS; Thread *itsThread; - InputThread *itsInputThread; - const unsigned itsSubbandNumber; const unsigned itsOutputNumber; @@ -73,9 +68,10 @@ class OutputThread MSWriter* itsWriter; unsigned itsNextSequenceNumber; - std::vector<unsigned> itsSequenceNumbers; - FileStream *itsFile; + Queue<StreamableData *> &itsFreeQueue, &itsReceiveQueue; + std::vector<unsigned> itsSequenceNumbers; + FileStream *itsSequenceNumbersFile; }; } // namespace RTCP diff --git a/RTCP/Storage/include/Storage/SubbandWriter.h b/RTCP/Storage/include/Storage/SubbandWriter.h index 1bf79d7d024852a78893ba9339bf684655e5969a..e98af3533a249429694253e60d6fbac58f552c51 100644 --- a/RTCP/Storage/include/Storage/SubbandWriter.h +++ b/RTCP/Storage/include/Storage/SubbandWriter.h @@ -44,6 +44,7 @@ namespace LOFAR { namespace RTCP { +#if 0 class SubbandWriter { public: @@ -65,6 +66,25 @@ class SubbandWriter GCF::Common::GCFPValueArray itsVArray; #endif }; +#else + +class SubbandWriter +{ + public: + SubbandWriter(const char *parset, const char *portname, unsigned subband, unsigned outputType); + ~SubbandWriter(); + + private: + static const unsigned maxReceiveQueueSize = 3; + Parset itsParset; + + Queue<StreamableData *> itsReceiveQueue, itsFreeQueue; + + InputThread *itsInputThread; + OutputThread *itsOutputThread; +}; + +#endif } // namespace RTCP } // namespace LOFAR diff --git a/RTCP/Storage/src/InputThread.cc b/RTCP/Storage/src/InputThread.cc index 55e30b4f79d55b60479cc61261f2cd5a22489470..8c71fd99cabe1eac115e35a8dbad72325ae561f8 100644 --- a/RTCP/Storage/src/InputThread.cc +++ b/RTCP/Storage/src/InputThread.cc @@ -36,52 +36,41 @@ using boost::format; namespace LOFAR { namespace RTCP { -InputThread::InputThread(const Parset *ps, unsigned subbandNumber, unsigned outputNumber, StreamableData *dataTemplate) +InputThread::InputThread(const Parset &parset, unsigned subbandNumber, unsigned outputNumber, const std::string &inputDescription, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue) : - itsPS(ps), itsSubbandNumber(subbandNumber), itsOutputNumber(outputNumber), - itsObservationID(ps->observationID()), - itsConnecting(true) + itsInputDescription(inputDescription), + itsObservationID(parset.observationID()), + itsFreeQueue(freeQueue), + itsReceiveQueue(receiveQueue), + itsConnecting(true), + itsThread(this, &InputThread::mainLoop) { - for (unsigned i = 0; i < maxReceiveQueueSize; i ++) { - StreamableData *data = dataTemplate->clone(); - - data->allocate(); - - itsFreeQueue.append( data ); - } - //thread = new Thread(this, &InputThread::mainLoop, str(format("InputThread (obs %d sb %d output %d)") % ps->observationID() % subbandNumber % outputNumber)); - itsThread = new InterruptibleThread(this, &InputThread::mainLoop); } InputThread::~InputThread() { -#if 0 // this does not work yet +#if 0 // TODO: this does not work yet if (itsConnecting) - itsThread->abort(); + itsThread.abort(); #endif - - delete itsThread; - - while (!itsReceiveQueue.empty()) - delete itsReceiveQueue.remove(); - - while (!itsFreeQueue.empty()) - delete itsFreeQueue.remove(); + LOG_DEBUG("InputThread::~InputThread()"); } void InputThread::mainLoop() { - std::auto_ptr<Stream> streamFromION; - string prefix = "OLAP.OLAP_Conn.IONProc_Storage"; - string connectionType = itsPS->getString(prefix + "_Transport"); - bool nullInput = false; - try { + std::auto_ptr<Stream> streamFromION; +#if 0 + std::string prefix = "OLAP.OLAP_Conn.IONProc_Storage"; + std::string connectionType = itsPS->getString(prefix + "_Transport"); + + bool nullInput = false; + if (connectionType == "NULL") { LOG_DEBUG_STR("subband " << itsSubbandNumber << " read from null stream"); streamFromION.reset(new NullStream); @@ -102,34 +91,34 @@ void InputThread::mainLoop() itsReceiveQueue.append(0); // no more data THROW(StorageException, "unsupported ION->Storage stream type: " << connectionType); } +#endif + LOG_INFO_STR("Creating connection to " << itsInputDescription); + streamFromION.reset(Parset::createStream(itsInputDescription, false)); itsConnecting = false; // FIXME: race condition + LOG_INFO_STR("Created connection to " << itsInputDescription); // limit reads from NullStream to 10 blocks; otherwise unlimited - unsigned increment = nullInput ? 1 : 0; - std::auto_ptr<StreamableData> data; + bool nullInput = dynamic_cast<NullStream *>(streamFromION.get()) != 0; + unsigned maxCount = nullInput ? 10 : ~0; - for (unsigned count = 0; count < 10; count += increment) { - NSTimer queueTimer("retrieve freeQueue item", false, false); - NSTimer readTimer("read data", false, false); - - queueTimer.start(); - data.reset(itsFreeQueue.remove()); - queueTimer.stop(); - - if (queueTimer.getElapsed() > reportQueueRemoveDelay) - LOG_WARN_STR("InputThread: ObsID = " << itsObservationID << ", sb = " << itsSubbandNumber << ", output = " << itsOutputNumber << ": " << queueTimer); + for (unsigned count = 0; count < maxCount; count ++) { + NSTimer readTimer("read data", false, false); + std::auto_ptr<StreamableData> data(itsFreeQueue.remove()); readTimer.start(); data->read(streamFromION.get(), true); readTimer.stop(); - if (readTimer.getElapsed() > reportReadDelay) - LOG_WARN_STR("InputThread: ObsID = " << itsObservationID << ", sb = " << itsSubbandNumber << ", output = " << itsOutputNumber << ": " << readTimer); + LOG_INFO_STR("InputThread: ObsID = " << itsObservationID << ", sb = " << itsSubbandNumber << ", output = " << itsOutputNumber << ": " << readTimer); + + if (nullInput) + data.get()->sequenceNumber = count; itsReceiveQueue.append(data.release()); } } catch (Stream::EndOfStreamException &) { + LOG_INFO("caught Stream::EndOfStreamException (this is normal, and indicates the end of the observation)"); } catch (SystemCallException &ex) { if (ex.error != EINTR) { itsReceiveQueue.append(0); // no more data @@ -137,6 +126,7 @@ void InputThread::mainLoop() } } + LOG_DEBUG("no more data blocks"); itsReceiveQueue.append(0); // no more data } diff --git a/RTCP/Storage/src/OutputThread.cc b/RTCP/Storage/src/OutputThread.cc index ffd6d6b7a66a3ba0c070023782ad0e639ed50736..6668e77f98dc4e803ad8253a0317371e2e42fc16 100644 --- a/RTCP/Storage/src/OutputThread.cc +++ b/RTCP/Storage/src/OutputThread.cc @@ -41,45 +41,40 @@ using boost::format; namespace LOFAR { namespace RTCP { -OutputThread::OutputThread(const Parset *ps, unsigned subbandNumber, unsigned outputNumber, InputThread *inputThread, const ProcessingPlan::planlet &outputConfig ) +OutputThread::OutputThread(const Parset &parset, unsigned subbandNumber, unsigned outputNumber, const ProcessingPlan::planlet &outputConfig, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue) : - itsPS(ps), - itsInputThread(inputThread), itsSubbandNumber(subbandNumber), itsOutputNumber(outputNumber), - itsObservationID(ps->observationID()), + itsObservationID(parset.observationID()), itsNextSequenceNumber(0), - itsFile(NULL) + itsFreeQueue(freeQueue), + itsReceiveQueue(receiveQueue), + itsSequenceNumbersFile(0) { - string filename; - string seqfilename; -#if 0 - // null writer - itsWriters[output] = new MSWriterNull(); - - LOG_DEBUG_STR("subband " << subbandNumber << " written to null"); -#else - if( dynamic_cast<CorrelatedData*>( outputConfig.source ) ) { + std::string filename, seqfilename; + + if (dynamic_cast<CorrelatedData *>(outputConfig.source)) { std::stringstream out; - out << itsPS->getMSname(subbandNumber) << "/table.f" << outputNumber << "data"; + out << parset.getMSname(subbandNumber) << "/table.f" << outputNumber << "data"; filename = out.str(); - if (itsPS->getLofarStManVersion() == 2) { + if (parset.getLofarStManVersion() == 2) { std::stringstream seq; - seq << itsPS->getMSname(subbandNumber) <<"/table.f" << outputNumber << "seqnr"; + seq << parset.getMSname(subbandNumber) <<"/table.f" << outputNumber << "seqnr"; seqfilename = seq.str(); try { - itsFile = new FileStream(seqfilename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + itsSequenceNumbersFile = new FileStream(seqfilename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); } catch (...) { - itsFile = NULL; + LOG_WARN("Could not open sequence numbers file"); + itsSequenceNumbersFile = 0; } } } else { // raw writer std::stringstream out; - out << itsPS->getMSname(subbandNumber) << outputConfig.filenameSuffix; + out << parset.getMSname(subbandNumber) << outputConfig.filenameSuffix; filename = out.str(); } @@ -87,12 +82,10 @@ OutputThread::OutputThread(const Parset *ps, unsigned subbandNumber, unsigned ou try { itsWriter = new MSWriterFile(filename.c_str()); - } catch( SystemCallException &ex ) { - LOG_ERROR_STR( "Cannot open " << filename << ": " << ex ); - + } catch (SystemCallException &ex) { + LOG_ERROR_STR("Cannot open " << filename << ": " << ex); itsWriter = new MSWriterNull(); } -#endif //thread = new Thread(this, &OutputThread::mainLoop, str(format("OutputThread (obs %d sb %d output %d)") % ps->observationID() % subbandNumber % outputNumber)); itsThread = new Thread(this, &OutputThread::mainLoop); @@ -101,41 +94,33 @@ OutputThread::OutputThread(const Parset *ps, unsigned subbandNumber, unsigned ou OutputThread::~OutputThread() { - flushSeqNumbers(); - - if (itsFile != NULL) delete itsFile; - delete itsThread; - delete itsWriter; + + flushSequenceNumbers(); + delete itsSequenceNumbersFile; } -void OutputThread::writeLogMessage() +void OutputThread::writeLogMessage(unsigned sequenceNumber) { -/* - static int counter = 0; - time_t now = time(0); - char buf[26]; + time_t now = time(0); + char buf[26]; ctime_r(&now, buf); buf[24] = '\0'; LOG_INFO_STR("time = " << buf << - //", obsID = " << itsObservationID << - ", count = " << counter ++ << - ", timestamp = " << itsStartStamp + ((itsPreviousSequenceNumbers[0] + 1) * - itsPS->nrSubbandSamples() * - itsPS->IONintegrationSteps())); - - // itsStartStamp += itsPS->nrSubbandSamples() * itsPS->IONintegrationSteps(); -*/ + ", obsID = " << itsObservationID << + ", seqno = " << sequenceNumber); } -void OutputThread::flushSeqNumbers() { - if (itsFile != NULL) { + +void OutputThread::flushSequenceNumbers() +{ + if (itsSequenceNumbersFile != 0) { LOG_INFO_STR("Flushing sequence numbers"); - itsFile->write(itsSequenceNumbers.data(), itsSequenceNumbers.size()*sizeof(unsigned)); + itsSequenceNumbersFile->write(itsSequenceNumbers.data(), itsSequenceNumbers.size()*sizeof(unsigned)); itsSequenceNumbers.clear(); } } @@ -144,18 +129,16 @@ void OutputThread::flushSeqNumbers() { void OutputThread::checkForDroppedData(StreamableData *data) { unsigned expectedSequenceNumber = itsNextSequenceNumber; - unsigned droppedBlocks = data->sequenceNumber - expectedSequenceNumber; + unsigned droppedBlocks = data->sequenceNumber - expectedSequenceNumber; if (droppedBlocks > 0) LOG_WARN_STR("OutputThread: ObsID = " << itsObservationID << ", subband = " << itsSubbandNumber << ", output = " << itsOutputNumber << ": dropped " << droppedBlocks << (droppedBlocks == 1 ? " block" : " blocks")); - if (itsPS->getLofarStManVersion() == 2 && itsFile != NULL) { - + if (itsSequenceNumbersFile != 0) { itsSequenceNumbers.push_back(data->sequenceNumber); - if (itsSequenceNumbers.size() > 64) { - flushSeqNumbers(); - } + if (itsSequenceNumbers.size() > 64) + flushSequenceNumbers(); } itsNextSequenceNumber = data->sequenceNumber + 1; @@ -164,38 +147,26 @@ void OutputThread::checkForDroppedData(StreamableData *data) void OutputThread::mainLoop() { - /// allow only a limited number of thread to write at a time - /// TODO: race at creation - static Semaphore semaphore(4); - while (true) { - NSTimer writeTimer("write data", false, false); - std::auto_ptr<StreamableData> data(itsInputThread->itsReceiveQueue.remove()); + //NSTimer writeTimer("write data", false, false); + std::auto_ptr<StreamableData> data(itsReceiveQueue.remove()); if (data.get() == 0) break; checkForDroppedData(data.get()); - writeTimer.start(); - semaphore.down(); + //writeTimer.start(); + itsWriter->write(data.get()); + //writeTimer.stop(); - try { - itsWriter->write(data.get()); - } catch (...) { - semaphore.up(); - throw; - } + writeLogMessage(data.get()->sequenceNumber); + //LOG_INFO_STR("OutputThread: ObsID = " << itsObservationID << ", subband = " << itsSubbandNumber << ", output = " << itsOutputNumber << ": " << writeTimer); - semaphore.up(); - writeTimer.stop(); - - if (writeTimer.getElapsed() > reportWriteDelay) - LOG_WARN_STR("OutputThread: ObsID = " << itsObservationID << ", subband = " << itsSubbandNumber << ", output = " << itsOutputNumber << ": " << writeTimer); - - itsInputThread->itsFreeQueue.append(data.release()); + itsFreeQueue.append(data.release()); } - flushSeqNumbers(); + + flushSequenceNumbers(); } } // namespace RTCP diff --git a/RTCP/Storage/src/Storage_main.cc b/RTCP/Storage/src/Storage_main.cc index f3dc760f3eb03ba28070f80169a401abdc68fc6d..b2b17385793d46fd5f2a8f4fa363cf78fe27f2fb 100644 --- a/RTCP/Storage/src/Storage_main.cc +++ b/RTCP/Storage/src/Storage_main.cc @@ -34,6 +34,7 @@ using namespace LOFAR; using namespace LOFAR::RTCP; +#if 0 class Job { public: @@ -93,6 +94,7 @@ static void child(int argc, char *argv[], int rank, int size) exit(1); } } +#endif int main(int argc, char *argv[]) @@ -115,6 +117,7 @@ int main(int argc, char *argv[]) setLevel("Global",8); #endif +#if 0 #if defined HAVE_MPI int rank; int size; @@ -167,6 +170,29 @@ int main(int argc, char *argv[]) MPI_Finalize(); #endif +#else + try { + if (argc != 5) + throw StorageException(std::string("usage: ") + argv[0] + " parset input subband type"); // TODO: more descriptive + + char stdoutbuf[1024], stderrbuf[1024]; + setvbuf(stdout, stdoutbuf, _IOLBF, sizeof stdoutbuf); + setvbuf(stderr, stdoutbuf, _IOLBF, sizeof stderrbuf); + + LOG_INFO_STR("started: " << argv[0] << ' ' << argv[1] << ' ' << argv[2] << ' ' << argv[3] << ' ' << argv[4]); + SubbandWriter(argv[1], argv[2], boost::lexical_cast<unsigned>(argv[3]), boost::lexical_cast<unsigned>(argv[4])); + } catch (Exception &ex) { + LOG_FATAL_STR("caught Exception: " << ex); + exit(1); + } catch (std::exception &ex) { + LOG_FATAL_STR("caught std::exception: " << ex.what()); + exit(1); + } catch (...) { + LOG_FATAL_STR("caught non-std::exception: "); + exit(1); + } +#endif + LOG_INFO("Program end"); - return 0; // always return 0, otherwise mpirun kills other storage processes + return 0; } diff --git a/RTCP/Storage/src/SubbandWriter.cc b/RTCP/Storage/src/SubbandWriter.cc index 1dcc69910b5a3562e9e64c713cbf6bbff212888b..68cbf8f432fd311acd4b14664ad862aa6403113f 100644 --- a/RTCP/Storage/src/SubbandWriter.cc +++ b/RTCP/Storage/src/SubbandWriter.cc @@ -32,15 +32,8 @@ #include <Interface/CN_Configuration.h> #include <Interface/CN_ProcessingPlan.h> -#ifdef USE_MAC_PI -#include <GCF/GCF_PVDouble.h> -#include <GCF/GCF_PVString.h> -#endif - #include <boost/lexical_cast.hpp> -#include <boost/format.hpp> -using boost::format; #include <time.h> #include <sys/stat.h> @@ -48,6 +41,7 @@ using boost::format; namespace LOFAR { namespace RTCP { +#if 0 SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank, unsigned size) : itsPS(ps), @@ -168,6 +162,62 @@ SubbandWriter::~SubbandWriter() itsVArray.clear(); #endif } +#else + + +SubbandWriter::SubbandWriter(const char *parset, const char *inputDescription, unsigned subband, unsigned outputType) +: + itsParset(parset) +{ + CN_Configuration configuration(itsParset); + CN_ProcessingPlan<> plan(configuration); + plan.removeNonOutputs(); + +#if defined HAVE_AIPSPP + if (outputType == 0 && itsParset.outputCorrelatedData()) { + MeasurementSetFormat myFormat(&itsParset, 512); + + // create root directory of the observation tree + if (mkdir(itsParset.getMSBaseDir().c_str(), 0770) != 0 && errno != EEXIST) { + unsigned savedErrno = errno; // first argument below clears errno + throw SystemCallException(("mkdir " + itsParset.getMSBaseDir()).c_str(), savedErrno, THROW_ARGS); + } + + /// Make MeasurementSet filestructures and required tables + myFormat.addSubband(subband); + + LOG_INFO_STR("MeasurementSet created"); + } +#endif // defined HAVE_AIPSPP + + ProcessingPlan::planlet &outputConfig = plan.plan[outputType]; + StreamableData *dataTemplate = outputConfig.source; + + for (unsigned i = 0; i < maxReceiveQueueSize; i ++) { + StreamableData *data = dataTemplate->clone(); + + data->allocate(); + itsFreeQueue.append(data); + } + + itsInputThread = new InputThread(itsParset, subband, outputType, inputDescription, itsFreeQueue, itsReceiveQueue); + itsOutputThread = new OutputThread(itsParset, subband, outputType, outputConfig, itsFreeQueue, itsReceiveQueue); +} + + +SubbandWriter::~SubbandWriter() +{ + delete itsInputThread; + delete itsOutputThread; + + while (!itsReceiveQueue.empty()) + delete itsReceiveQueue.remove(); + + while (!itsFreeQueue.empty()) + delete itsFreeQueue.remove(); +} + +#endif } // namespace RTCP } // namespace LOFAR