Skip to content
Snippets Groups Projects
Commit 1cfadc6b authored by John Romein's avatar John Romein
Browse files

bug 225:

Start storage from IONProc by using ssh
parent 308d984f
No related branches found
No related tags found
No related merge requests found
......@@ -38,28 +38,21 @@ namespace RTCP {
class InputThread
{
public:
InputThread(const Parset *ps, unsigned subbandNumber, unsigned outputNumber, StreamableData *dataTemplate);
InputThread(const Parset &, unsigned subbandNumber, unsigned outputNumber, const std::string &inputDescription, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue);
~InputThread();
static const unsigned maxReceiveQueueSize = 3;
// report if fetching an item from the receive queue takes longer than this (seconds)
static const float reportQueueRemoveDelay = 0.05;
// report if reading data takes longer than this (seconds)
static const float reportReadDelay = 1.10;
Queue<StreamableData *> itsFreeQueue, itsReceiveQueue;
private:
void mainLoop();
const Parset *itsPS;
InterruptibleThread *itsThread;
const unsigned itsSubbandNumber;
const unsigned itsOutputNumber;
const std::string itsInputDescription;
const unsigned itsObservationID;
Queue<StreamableData *> &itsFreeQueue, &itsReceiveQueue;
volatile bool itsConnecting;
InterruptibleThread itsThread;
};
} // namespace RTCP
......
......@@ -28,11 +28,11 @@
#include <Interface/StreamableData.h>
#include <Interface/MultiDimArray.h>
#include <Interface/CN_ProcessingPlan.h>
#include <Interface/Queue.h>
#include <Interface/Thread.h>
#include <Interface/Mutex.h>
#include <Stream/Stream.h>
#include <Stream/FileStream.h>
#include <Storage/InputThread.h>
#include <Stream/Stream.h>
#include <Storage/MSWriter.h>
#include <Common/Semaphore.h>
#include <Common/Timer.h>
......@@ -46,25 +46,20 @@ namespace RTCP {
class OutputThread
{
public:
OutputThread(const Parset *ps, unsigned subbandNumber, unsigned outputNumber, InputThread *inputThread, const ProcessingPlan::planlet &outputConfig);
OutputThread(const Parset &, unsigned subbandNumber, unsigned outputNumber, const ProcessingPlan::planlet &outputConfig, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue);
~OutputThread();
// report any writes that take longer than this (seconds)
static const float reportWriteDelay = 0.05;
private:
void writeLogMessage();
void flushSeqNumbers();
void writeLogMessage(unsigned sequenceNumber);
void flushSequenceNumbers();
void checkForDroppedData(StreamableData *data);
void mainLoop();
const Parset *itsPS;
Thread *itsThread;
InputThread *itsInputThread;
const unsigned itsSubbandNumber;
const unsigned itsOutputNumber;
......@@ -73,9 +68,10 @@ class OutputThread
MSWriter* itsWriter;
unsigned itsNextSequenceNumber;
std::vector<unsigned> itsSequenceNumbers;
FileStream *itsFile;
Queue<StreamableData *> &itsFreeQueue, &itsReceiveQueue;
std::vector<unsigned> itsSequenceNumbers;
FileStream *itsSequenceNumbersFile;
};
} // namespace RTCP
......
......@@ -44,6 +44,7 @@
namespace LOFAR {
namespace RTCP {
#if 0
class SubbandWriter
{
public:
......@@ -65,6 +66,25 @@ class SubbandWriter
GCF::Common::GCFPValueArray itsVArray;
#endif
};
#else
class SubbandWriter
{
public:
SubbandWriter(const char *parset, const char *portname, unsigned subband, unsigned outputType);
~SubbandWriter();
private:
static const unsigned maxReceiveQueueSize = 3;
Parset itsParset;
Queue<StreamableData *> itsReceiveQueue, itsFreeQueue;
InputThread *itsInputThread;
OutputThread *itsOutputThread;
};
#endif
} // namespace RTCP
} // namespace LOFAR
......
......@@ -36,52 +36,41 @@ using boost::format;
namespace LOFAR {
namespace RTCP {
InputThread::InputThread(const Parset *ps, unsigned subbandNumber, unsigned outputNumber, StreamableData *dataTemplate)
InputThread::InputThread(const Parset &parset, unsigned subbandNumber, unsigned outputNumber, const std::string &inputDescription, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue)
:
itsPS(ps),
itsSubbandNumber(subbandNumber),
itsOutputNumber(outputNumber),
itsObservationID(ps->observationID()),
itsConnecting(true)
itsInputDescription(inputDescription),
itsObservationID(parset.observationID()),
itsFreeQueue(freeQueue),
itsReceiveQueue(receiveQueue),
itsConnecting(true),
itsThread(this, &InputThread::mainLoop)
{
for (unsigned i = 0; i < maxReceiveQueueSize; i ++) {
StreamableData *data = dataTemplate->clone();
data->allocate();
itsFreeQueue.append( data );
}
//thread = new Thread(this, &InputThread::mainLoop, str(format("InputThread (obs %d sb %d output %d)") % ps->observationID() % subbandNumber % outputNumber));
itsThread = new InterruptibleThread(this, &InputThread::mainLoop);
}
InputThread::~InputThread()
{
#if 0 // this does not work yet
#if 0 // TODO: this does not work yet
if (itsConnecting)
itsThread->abort();
itsThread.abort();
#endif
delete itsThread;
while (!itsReceiveQueue.empty())
delete itsReceiveQueue.remove();
while (!itsFreeQueue.empty())
delete itsFreeQueue.remove();
LOG_DEBUG("InputThread::~InputThread()");
}
void InputThread::mainLoop()
{
try {
std::auto_ptr<Stream> streamFromION;
string prefix = "OLAP.OLAP_Conn.IONProc_Storage";
string connectionType = itsPS->getString(prefix + "_Transport");
#if 0
std::string prefix = "OLAP.OLAP_Conn.IONProc_Storage";
std::string connectionType = itsPS->getString(prefix + "_Transport");
bool nullInput = false;
try {
if (connectionType == "NULL") {
LOG_DEBUG_STR("subband " << itsSubbandNumber << " read from null stream");
streamFromION.reset(new NullStream);
......@@ -102,34 +91,34 @@ void InputThread::mainLoop()
itsReceiveQueue.append(0); // no more data
THROW(StorageException, "unsupported ION->Storage stream type: " << connectionType);
}
#endif
LOG_INFO_STR("Creating connection to " << itsInputDescription);
streamFromION.reset(Parset::createStream(itsInputDescription, false));
itsConnecting = false; // FIXME: race condition
LOG_INFO_STR("Created connection to " << itsInputDescription);
// limit reads from NullStream to 10 blocks; otherwise unlimited
unsigned increment = nullInput ? 1 : 0;
std::auto_ptr<StreamableData> data;
bool nullInput = dynamic_cast<NullStream *>(streamFromION.get()) != 0;
unsigned maxCount = nullInput ? 10 : ~0;
for (unsigned count = 0; count < 10; count += increment) {
NSTimer queueTimer("retrieve freeQueue item", false, false);
for (unsigned count = 0; count < maxCount; count ++) {
NSTimer readTimer("read data", false, false);
queueTimer.start();
data.reset(itsFreeQueue.remove());
queueTimer.stop();
if (queueTimer.getElapsed() > reportQueueRemoveDelay)
LOG_WARN_STR("InputThread: ObsID = " << itsObservationID << ", sb = " << itsSubbandNumber << ", output = " << itsOutputNumber << ": " << queueTimer);
std::auto_ptr<StreamableData> data(itsFreeQueue.remove());
readTimer.start();
data->read(streamFromION.get(), true);
readTimer.stop();
if (readTimer.getElapsed() > reportReadDelay)
LOG_WARN_STR("InputThread: ObsID = " << itsObservationID << ", sb = " << itsSubbandNumber << ", output = " << itsOutputNumber << ": " << readTimer);
LOG_INFO_STR("InputThread: ObsID = " << itsObservationID << ", sb = " << itsSubbandNumber << ", output = " << itsOutputNumber << ": " << readTimer);
if (nullInput)
data.get()->sequenceNumber = count;
itsReceiveQueue.append(data.release());
}
} catch (Stream::EndOfStreamException &) {
LOG_INFO("caught Stream::EndOfStreamException (this is normal, and indicates the end of the observation)");
} catch (SystemCallException &ex) {
if (ex.error != EINTR) {
itsReceiveQueue.append(0); // no more data
......@@ -137,6 +126,7 @@ void InputThread::mainLoop()
}
}
LOG_DEBUG("no more data blocks");
itsReceiveQueue.append(0); // no more data
}
......
......@@ -41,45 +41,40 @@ using boost::format;
namespace LOFAR {
namespace RTCP {
OutputThread::OutputThread(const Parset *ps, unsigned subbandNumber, unsigned outputNumber, InputThread *inputThread, const ProcessingPlan::planlet &outputConfig )
OutputThread::OutputThread(const Parset &parset, unsigned subbandNumber, unsigned outputNumber, const ProcessingPlan::planlet &outputConfig, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue)
:
itsPS(ps),
itsInputThread(inputThread),
itsSubbandNumber(subbandNumber),
itsOutputNumber(outputNumber),
itsObservationID(ps->observationID()),
itsObservationID(parset.observationID()),
itsNextSequenceNumber(0),
itsFile(NULL)
itsFreeQueue(freeQueue),
itsReceiveQueue(receiveQueue),
itsSequenceNumbersFile(0)
{
string filename;
string seqfilename;
#if 0
// null writer
itsWriters[output] = new MSWriterNull();
LOG_DEBUG_STR("subband " << subbandNumber << " written to null");
#else
std::string filename, seqfilename;
if (dynamic_cast<CorrelatedData *>(outputConfig.source)) {
std::stringstream out;
out << itsPS->getMSname(subbandNumber) << "/table.f" << outputNumber << "data";
out << parset.getMSname(subbandNumber) << "/table.f" << outputNumber << "data";
filename = out.str();
if (itsPS->getLofarStManVersion() == 2) {
if (parset.getLofarStManVersion() == 2) {
std::stringstream seq;
seq << itsPS->getMSname(subbandNumber) <<"/table.f" << outputNumber << "seqnr";
seq << parset.getMSname(subbandNumber) <<"/table.f" << outputNumber << "seqnr";
seqfilename = seq.str();
try {
itsFile = new FileStream(seqfilename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
itsSequenceNumbersFile = new FileStream(seqfilename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
} catch (...) {
itsFile = NULL;
LOG_WARN("Could not open sequence numbers file");
itsSequenceNumbersFile = 0;
}
}
} else {
// raw writer
std::stringstream out;
out << itsPS->getMSname(subbandNumber) << outputConfig.filenameSuffix;
out << parset.getMSname(subbandNumber) << outputConfig.filenameSuffix;
filename = out.str();
}
......@@ -89,10 +84,8 @@ OutputThread::OutputThread(const Parset *ps, unsigned subbandNumber, unsigned ou
itsWriter = new MSWriterFile(filename.c_str());
} catch (SystemCallException &ex) {
LOG_ERROR_STR("Cannot open " << filename << ": " << ex);
itsWriter = new MSWriterNull();
}
#endif
//thread = new Thread(this, &OutputThread::mainLoop, str(format("OutputThread (obs %d sb %d output %d)") % ps->observationID() % subbandNumber % outputNumber));
itsThread = new Thread(this, &OutputThread::mainLoop);
......@@ -101,20 +94,16 @@ OutputThread::OutputThread(const Parset *ps, unsigned subbandNumber, unsigned ou
OutputThread::~OutputThread()
{
flushSeqNumbers();
if (itsFile != NULL) delete itsFile;
delete itsThread;
delete itsWriter;
flushSequenceNumbers();
delete itsSequenceNumbersFile;
}
void OutputThread::writeLogMessage()
void OutputThread::writeLogMessage(unsigned sequenceNumber)
{
/*
static int counter = 0;
time_t now = time(0);
char buf[26];
......@@ -122,20 +111,16 @@ void OutputThread::writeLogMessage()
buf[24] = '\0';
LOG_INFO_STR("time = " << buf <<
//", obsID = " << itsObservationID <<
", count = " << counter ++ <<
", timestamp = " << itsStartStamp + ((itsPreviousSequenceNumbers[0] + 1) *
itsPS->nrSubbandSamples() *
itsPS->IONintegrationSteps()));
// itsStartStamp += itsPS->nrSubbandSamples() * itsPS->IONintegrationSteps();
*/
", obsID = " << itsObservationID <<
", seqno = " << sequenceNumber);
}
void OutputThread::flushSeqNumbers() {
if (itsFile != NULL) {
void OutputThread::flushSequenceNumbers()
{
if (itsSequenceNumbersFile != 0) {
LOG_INFO_STR("Flushing sequence numbers");
itsFile->write(itsSequenceNumbers.data(), itsSequenceNumbers.size()*sizeof(unsigned));
itsSequenceNumbersFile->write(itsSequenceNumbers.data(), itsSequenceNumbers.size()*sizeof(unsigned));
itsSequenceNumbers.clear();
}
}
......@@ -149,13 +134,11 @@ void OutputThread::checkForDroppedData(StreamableData *data)
if (droppedBlocks > 0)
LOG_WARN_STR("OutputThread: ObsID = " << itsObservationID << ", subband = " << itsSubbandNumber << ", output = " << itsOutputNumber << ": dropped " << droppedBlocks << (droppedBlocks == 1 ? " block" : " blocks"));
if (itsPS->getLofarStManVersion() == 2 && itsFile != NULL) {
if (itsSequenceNumbersFile != 0) {
itsSequenceNumbers.push_back(data->sequenceNumber);
if (itsSequenceNumbers.size() > 64) {
flushSeqNumbers();
}
if (itsSequenceNumbers.size() > 64)
flushSequenceNumbers();
}
itsNextSequenceNumber = data->sequenceNumber + 1;
......@@ -164,38 +147,26 @@ void OutputThread::checkForDroppedData(StreamableData *data)
void OutputThread::mainLoop()
{
/// allow only a limited number of thread to write at a time
/// TODO: race at creation
static Semaphore semaphore(4);
while (true) {
NSTimer writeTimer("write data", false, false);
std::auto_ptr<StreamableData> data(itsInputThread->itsReceiveQueue.remove());
//NSTimer writeTimer("write data", false, false);
std::auto_ptr<StreamableData> data(itsReceiveQueue.remove());
if (data.get() == 0)
break;
checkForDroppedData(data.get());
writeTimer.start();
semaphore.down();
try {
//writeTimer.start();
itsWriter->write(data.get());
} catch (...) {
semaphore.up();
throw;
}
//writeTimer.stop();
semaphore.up();
writeTimer.stop();
writeLogMessage(data.get()->sequenceNumber);
//LOG_INFO_STR("OutputThread: ObsID = " << itsObservationID << ", subband = " << itsSubbandNumber << ", output = " << itsOutputNumber << ": " << writeTimer);
if (writeTimer.getElapsed() > reportWriteDelay)
LOG_WARN_STR("OutputThread: ObsID = " << itsObservationID << ", subband = " << itsSubbandNumber << ", output = " << itsOutputNumber << ": " << writeTimer);
itsInputThread->itsFreeQueue.append(data.release());
itsFreeQueue.append(data.release());
}
flushSeqNumbers();
flushSequenceNumbers();
}
} // namespace RTCP
......
......@@ -34,6 +34,7 @@ using namespace LOFAR;
using namespace LOFAR::RTCP;
#if 0
class Job
{
public:
......@@ -93,6 +94,7 @@ static void child(int argc, char *argv[], int rank, int size)
exit(1);
}
}
#endif
int main(int argc, char *argv[])
......@@ -115,6 +117,7 @@ int main(int argc, char *argv[])
setLevel("Global",8);
#endif
#if 0
#if defined HAVE_MPI
int rank;
int size;
......@@ -167,6 +170,29 @@ int main(int argc, char *argv[])
MPI_Finalize();
#endif
#else
try {
if (argc != 5)
throw StorageException(std::string("usage: ") + argv[0] + " parset input subband type"); // TODO: more descriptive
char stdoutbuf[1024], stderrbuf[1024];
setvbuf(stdout, stdoutbuf, _IOLBF, sizeof stdoutbuf);
setvbuf(stderr, stdoutbuf, _IOLBF, sizeof stderrbuf);
LOG_INFO_STR("started: " << argv[0] << ' ' << argv[1] << ' ' << argv[2] << ' ' << argv[3] << ' ' << argv[4]);
SubbandWriter(argv[1], argv[2], boost::lexical_cast<unsigned>(argv[3]), boost::lexical_cast<unsigned>(argv[4]));
} catch (Exception &ex) {
LOG_FATAL_STR("caught Exception: " << ex);
exit(1);
} catch (std::exception &ex) {
LOG_FATAL_STR("caught std::exception: " << ex.what());
exit(1);
} catch (...) {
LOG_FATAL_STR("caught non-std::exception: ");
exit(1);
}
#endif
LOG_INFO("Program end");
return 0; // always return 0, otherwise mpirun kills other storage processes
return 0;
}
......@@ -32,15 +32,8 @@
#include <Interface/CN_Configuration.h>
#include <Interface/CN_ProcessingPlan.h>
#ifdef USE_MAC_PI
#include <GCF/GCF_PVDouble.h>
#include <GCF/GCF_PVString.h>
#endif
#include <boost/lexical_cast.hpp>
#include <boost/format.hpp>
using boost::format;
#include <time.h>
#include <sys/stat.h>
......@@ -48,6 +41,7 @@ using boost::format;
namespace LOFAR {
namespace RTCP {
#if 0
SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank, unsigned size)
:
itsPS(ps),
......@@ -168,6 +162,62 @@ SubbandWriter::~SubbandWriter()
itsVArray.clear();
#endif
}
#else
SubbandWriter::SubbandWriter(const char *parset, const char *inputDescription, unsigned subband, unsigned outputType)
:
itsParset(parset)
{
CN_Configuration configuration(itsParset);
CN_ProcessingPlan<> plan(configuration);
plan.removeNonOutputs();
#if defined HAVE_AIPSPP
if (outputType == 0 && itsParset.outputCorrelatedData()) {
MeasurementSetFormat myFormat(&itsParset, 512);
// create root directory of the observation tree
if (mkdir(itsParset.getMSBaseDir().c_str(), 0770) != 0 && errno != EEXIST) {
unsigned savedErrno = errno; // first argument below clears errno
throw SystemCallException(("mkdir " + itsParset.getMSBaseDir()).c_str(), savedErrno, THROW_ARGS);
}
/// Make MeasurementSet filestructures and required tables
myFormat.addSubband(subband);
LOG_INFO_STR("MeasurementSet created");
}
#endif // defined HAVE_AIPSPP
ProcessingPlan::planlet &outputConfig = plan.plan[outputType];
StreamableData *dataTemplate = outputConfig.source;
for (unsigned i = 0; i < maxReceiveQueueSize; i ++) {
StreamableData *data = dataTemplate->clone();
data->allocate();
itsFreeQueue.append(data);
}
itsInputThread = new InputThread(itsParset, subband, outputType, inputDescription, itsFreeQueue, itsReceiveQueue);
itsOutputThread = new OutputThread(itsParset, subband, outputType, outputConfig, itsFreeQueue, itsReceiveQueue);
}
SubbandWriter::~SubbandWriter()
{
delete itsInputThread;
delete itsOutputThread;
while (!itsReceiveQueue.empty())
delete itsReceiveQueue.remove();
while (!itsFreeQueue.empty())
delete itsFreeQueue.remove();
}
#endif
} // namespace RTCP
} // namespace LOFAR
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment