Skip to content
Snippets Groups Projects
Commit 3e2d99ba authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #2669: Minor polishing

parent 25e2db83
No related branches found
No related tags found
No related merge requests found
......@@ -103,7 +103,7 @@ Job::~Job()
{
// stop any started Storage processes
if (myPsetNumber == 0)
itsStorageProcesses.stop();
itsStorageProcesses.stop(0);
if (LOG_CONDITION)
LOG_INFO_STR(itsLogPrefix << "----- Job " << (itsIsRunning ? "finished" : "cancelled") << " successfully");
......@@ -322,12 +322,12 @@ void Job::jobThread()
jobQueue.itsReevaluate.broadcast();
if (myPsetNumber == 0) {
itsStorageProcesses.forwardFinalMetaData();
itsStorageProcesses.forwardFinalMetaData(time(0) + 240);
// all InputSections and OutputSections have finished their processing, so
// Storage should be done any second now.
itsStorageProcesses.stop();
itsStorageProcesses.stop(time(0) + 300);
}
// Augment the LTA feedback logging
......
#include "lofar_config.h"
#include <StorageProcesses.h>
#include <sys/time.h>
#include <unistd.h>
#include <Common/Thread/Thread.h>
#include <Stream/PortBroker.h>
#include <SSH.h>
......@@ -17,8 +18,19 @@ using boost::format;
*
* hostList = "OLAP.Storage.hosts"
*
* for(host in hostList):
* spawnThread("ssh host <storage process>")
* 1. Create a StorageProcesses manager object:
* manager = StorageProcesses(parset, logprefix)
* 2. Spawn all Storage processes:
* manager.start()
* Which will ssh to all hosts in hostList and start Storage_main,
* and establish a control channel to each.
* 3. Let your application connect to whatever is indicated by
* getStreamDescriptorBetweenIONandStorage(parset, outputType, streamNr)
* 4. After the observation, generate and forward the final metadata by calling
* manager.forwardFinalMetaData( deadline )
* 5. Optionally, wait for the Storage_main processes to finish with a given
* deadline:
* manager.stop( deadline )
*/
......@@ -42,7 +54,6 @@ StorageProcess::~StorageProcess()
void StorageProcess::start()
{
// fork (child process will exec)
std::string userName = itsParset.getString("OLAP.Storage.userName");
std::string sshKey = itsParset.getString("OLAP.Storage.sshIdentityFile");
std::string executable = itsParset.getString("OLAP.Storage.msWriter");
......@@ -78,6 +89,11 @@ void StorageProcess::start()
void StorageProcess::stop(struct timespec deadline)
{
if (!itsSSHconnection) {
// never started
return;
}
itsSSHconnection->wait(deadline);
itsThread->cancel();
......@@ -118,6 +134,12 @@ StorageProcesses::StorageProcesses( const Parset &parset, const std::string &log
{
}
StorageProcesses::~StorageProcesses()
{
// never let any processes linger
stop(0);
}
void StorageProcesses::start()
{
......@@ -134,12 +156,10 @@ void StorageProcesses::start()
}
void StorageProcesses::stop()
void StorageProcesses::stop( time_t deadline )
{
LOG_DEBUG_STR(itsLogPrefix << "Stopping storage processes");
time_t deadline = time(0) + 300;
size_t nrRunning = 0;
for (unsigned rank = 0; rank < itsStorageProcesses.size(); rank ++)
......@@ -171,15 +191,15 @@ void StorageProcesses::stop()
}
void StorageProcesses::forwardFinalMetaData()
void StorageProcesses::forwardFinalMetaData( time_t deadline )
{
struct timespec deadline = { time(0) + 240, 0 };
struct timespec deadline_ts = { deadline, 0 };
Thread thread(this, &StorageProcesses::finalMetaDataThread, itsLogPrefix + "[FinalMetaDataThread] ", 65536);
// abort the thread if deadline passes
try {
if (!thread.wait(deadline)) {
if (!thread.wait(deadline_ts)) {
LOG_WARN_STR(itsLogPrefix << "Cancelling FinalMetaDataThread");
thread.cancel();
......
......@@ -59,9 +59,9 @@ public:
StorageProcesses( const Parset &parset, const std::string &logPrefix );
void start();
void stop();
void stop( time_t deadline );
void forwardFinalMetaData();
void forwardFinalMetaData( time_t deadline );
private:
const Parset &itsParset;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment