diff --git a/RTCP/IONProc/src/Job.cc b/RTCP/IONProc/src/Job.cc index e14b5930f326f05b12011b89f97242d87467a3dd..186b0ced1aaa47037ee8bc54103d8c33b65adead 100644 --- a/RTCP/IONProc/src/Job.cc +++ b/RTCP/IONProc/src/Job.cc @@ -103,7 +103,7 @@ Job::~Job() { // stop any started Storage processes if (myPsetNumber == 0) - itsStorageProcesses.stop(); + itsStorageProcesses.stop(0); if (LOG_CONDITION) LOG_INFO_STR(itsLogPrefix << "----- Job " << (itsIsRunning ? "finished" : "cancelled") << " successfully"); @@ -322,12 +322,12 @@ void Job::jobThread() jobQueue.itsReevaluate.broadcast(); if (myPsetNumber == 0) { - itsStorageProcesses.forwardFinalMetaData(); + itsStorageProcesses.forwardFinalMetaData(time(0) + 240); // all InputSections and OutputSections have finished their processing, so // Storage should be done any second now. - itsStorageProcesses.stop(); + itsStorageProcesses.stop(time(0) + 300); } // Augment the LTA feedback logging diff --git a/RTCP/IONProc/src/StorageProcesses.cc b/RTCP/IONProc/src/StorageProcesses.cc index 6fd399b709efec684a7023687a7f69b6f909da4a..e85af02ead8c0b98b90923f7dc7881a844e04b4f 100644 --- a/RTCP/IONProc/src/StorageProcesses.cc +++ b/RTCP/IONProc/src/StorageProcesses.cc @@ -1,6 +1,7 @@ #include "lofar_config.h" #include <StorageProcesses.h> #include <sys/time.h> +#include <unistd.h> #include <Common/Thread/Thread.h> #include <Stream/PortBroker.h> #include <SSH.h> @@ -17,8 +18,19 @@ using boost::format; * * hostList = "OLAP.Storage.hosts" * - * for(host in hostList): - * spawnThread("ssh host <storage process>") + * 1. Create a StorageProcesses manager object: + * manager = StorageProcesses(parset, logprefix) + * 2. Spawn all Storage processes: + * manager.start() + * Which will ssh to all hosts in hostList and start Storage_main, + * and establish a control channel to each. + * 3. Let your application connect to whatever is indicated by + * getStreamDescriptorBetweenIONandStorage(parset, outputType, streamNr) + * 4. After the observation, generate and forward the final metadata by calling + * manager.forwardFinalMetaData( deadline ) + * 5. Optionally, wait for the Storage_main processes to finish with a given + * deadline: + * manager.stop( deadline ) */ @@ -42,7 +54,6 @@ StorageProcess::~StorageProcess() void StorageProcess::start() { - // fork (child process will exec) std::string userName = itsParset.getString("OLAP.Storage.userName"); std::string sshKey = itsParset.getString("OLAP.Storage.sshIdentityFile"); std::string executable = itsParset.getString("OLAP.Storage.msWriter"); @@ -78,6 +89,11 @@ void StorageProcess::start() void StorageProcess::stop(struct timespec deadline) { + if (!itsSSHconnection) { + // never started + return; + } + itsSSHconnection->wait(deadline); itsThread->cancel(); @@ -118,6 +134,12 @@ StorageProcesses::StorageProcesses( const Parset &parset, const std::string &log { } +StorageProcesses::~StorageProcesses() +{ + // never let any processes linger + stop(0); +} + void StorageProcesses::start() { @@ -134,12 +156,10 @@ void StorageProcesses::start() } -void StorageProcesses::stop() +void StorageProcesses::stop( time_t deadline ) { LOG_DEBUG_STR(itsLogPrefix << "Stopping storage processes"); - time_t deadline = time(0) + 300; - size_t nrRunning = 0; for (unsigned rank = 0; rank < itsStorageProcesses.size(); rank ++) @@ -171,15 +191,15 @@ void StorageProcesses::stop() } -void StorageProcesses::forwardFinalMetaData() +void StorageProcesses::forwardFinalMetaData( time_t deadline ) { - struct timespec deadline = { time(0) + 240, 0 }; + struct timespec deadline_ts = { deadline, 0 }; Thread thread(this, &StorageProcesses::finalMetaDataThread, itsLogPrefix + "[FinalMetaDataThread] ", 65536); // abort the thread if deadline passes try { - if (!thread.wait(deadline)) { + if (!thread.wait(deadline_ts)) { LOG_WARN_STR(itsLogPrefix << "Cancelling FinalMetaDataThread"); thread.cancel(); diff --git a/RTCP/IONProc/src/StorageProcesses.h b/RTCP/IONProc/src/StorageProcesses.h index cd25037b26a3bc99f0e6185b93c47b4ecb9a62d8..35052cd442529b963cafed508b9996ea064d0070 100644 --- a/RTCP/IONProc/src/StorageProcesses.h +++ b/RTCP/IONProc/src/StorageProcesses.h @@ -59,9 +59,9 @@ public: StorageProcesses( const Parset &parset, const std::string &logPrefix ); void start(); - void stop(); + void stop( time_t deadline ); - void forwardFinalMetaData(); + void forwardFinalMetaData( time_t deadline ); private: const Parset &itsParset;