diff --git a/RTCP/IONProc/src/Job.cc b/RTCP/IONProc/src/Job.cc index d0203fe07936c45692d4a2bd5e76dda0985f59e6..063e6b9632131a352730a136754e066104b13304 100644 --- a/RTCP/IONProc/src/Job.cc +++ b/RTCP/IONProc/src/Job.cc @@ -129,6 +129,10 @@ Job::~Job() // a valid Job object to work on delete itsPLCClient.release(); + // stop any started Storage processes + if (myPsetNumber == 0) + stopStorageProcesses(); + if (LOG_CONDITION) LOG_INFO_STR(itsLogPrefix << "----- Job " << (itsIsRunning ? "finished" : "cancelled") << " successfully"); } @@ -313,7 +317,10 @@ void Job::StorageProcess::controlThread() // Send final meta data once it is available itsJob.itsFinalMetaDataAvailable.down(); + + LOG_DEBUG_STR(itsLogPrefix << "[ControlThread] sending final meta data"); itsJob.itsFinalMetaData.write(stream); + LOG_DEBUG_STR(itsLogPrefix << "[ControlThread] sent final meta data"); } @@ -401,31 +408,34 @@ void Job::stopStorageProcesses() LOG_DEBUG_STR(itsLogPrefix << "Stopping storage processes"); time_t deadline = time(0) + 300; - struct timespec immediately = { 0, 0 }; - size_t nrRunning = itsStorageProcesses.size(); + size_t nrRunning = 0; + + for (unsigned rank = 0; rank < itsStorageProcesses.size(); rank ++) + if (itsStorageProcesses[rank].get()) + nrRunning++; + + while(nrRunning > 0) { + for (unsigned rank = 0; rank < itsStorageProcesses.size(); rank ++) { + if (!itsStorageProcesses[rank].get()) + continue; + + if (itsStorageProcesses[rank]->isDone() || time(0) >= deadline) { + struct timespec immediately = { 0, 0 }; - do { - for (unsigned rank = 0; rank < itsStorageProcesses.size(); rank ++) - if (itsStorageProcesses[rank]->isDone()) { itsStorageProcesses[rank]->stop(immediately); + itsStorageProcesses[rank] = 0; nrRunning--; } + } if (nrRunning > 0) sleep(1); - - } while( nrRunning > 0 && time(0) < deadline ); - - LOG_DEBUG_STR(itsLogPrefix << "Killing any remaining storage processes"); - - - for (unsigned rank = 0; rank < itsStorageProcesses.size(); rank ++) { - itsStorageProcesses[rank]->stop(immediately); - itsStorageProcesses[rank] = 0; } + itsStorageProcesses.clear(); + LOG_DEBUG_STR(itsLogPrefix << "Storage processes are stopped"); } diff --git a/RTCP/IONProc/src/SSH.cc b/RTCP/IONProc/src/SSH.cc index 1b7932ed600afdad40035b9dd71d637ff11db7c8..46a3bf4ca0194d21906c716ca45b0223e80285f5 100644 --- a/RTCP/IONProc/src/SSH.cc +++ b/RTCP/IONProc/src/SSH.cc @@ -178,6 +178,11 @@ bool SSHconnection::open_session( FileDescriptorBasedStream &sock ) if (rc) { LOG_ERROR_STR( itsLogPrefix << "Authentication by public key failed: " << rc); + + // unrecoverable errors + if (rc == LIBSSH2_ERROR_FILE) + THROW(SSHException, "Error reading read key file " << itsSSHKey); + return false; } diff --git a/RTCP/IONProc/src/SSH.h b/RTCP/IONProc/src/SSH.h index 12a4bb8021074b9bda153df197d6570a48e8afc6..e9bf02c471de229fd5c669bfa5eca0b3cbf5046a 100644 --- a/RTCP/IONProc/src/SSH.h +++ b/RTCP/IONProc/src/SSH.h @@ -28,6 +28,7 @@ #ifdef HAVE_LIBSSH2 #include <Common/Thread/Thread.h> +#include <Common/Exception.h> #include <Stream/FileDescriptorBasedStream.h> #include <libssh2.h> #include <Interface/SmartPtr.h> @@ -47,6 +48,8 @@ void SSH_Finalize(); class SSHconnection { public: + EXCEPTION_CLASS(SSHException, LOFAR::Exception); + SSHconnection(const string &logPrefix, const string &hostname, const string &commandline, const string &username, const string &sshkey, bool captureStdout = false); ~SSHconnection(); diff --git a/RTCP/Interface/src/FinalMetaData.cc b/RTCP/Interface/src/FinalMetaData.cc index ae28b69dacd8e6197cf84186a64d3d1df93d0182..c081c636fdb27f41dc7a5992c8d5ecddce2bf4d4 100644 --- a/RTCP/Interface/src/FinalMetaData.cc +++ b/RTCP/Interface/src/FinalMetaData.cc @@ -135,8 +135,8 @@ void FinalMetaData::write(Stream &s) void FinalMetaData::read(Stream &s) { - StreamWriter< std::vector<struct BrokenRCU> >::write(s, brokenRCUsAtBegin); - StreamWriter< std::vector<struct BrokenRCU> >::write(s, brokenRCUsDuring); + StreamWriter< std::vector<struct BrokenRCU> >::read(s, brokenRCUsAtBegin); + StreamWriter< std::vector<struct BrokenRCU> >::read(s, brokenRCUsDuring); } } diff --git a/RTCP/MetaDataGatherer/src/FinalMetaDataGatherer.cc b/RTCP/MetaDataGatherer/src/FinalMetaDataGatherer.cc index 93ab66c31f90224e1812e410ab13f631b2cd2f6d..6b8e0295ff9a9863f41581c333997a3525ac4b02 100644 --- a/RTCP/MetaDataGatherer/src/FinalMetaDataGatherer.cc +++ b/RTCP/MetaDataGatherer/src/FinalMetaDataGatherer.cc @@ -122,44 +122,48 @@ void parseBrokenHardware (const vector<OTDBvalue> &hardware, vector<struct Final // A broken antenna element/tile entry must contain .status_state for (size_t i = 0; i < hardware.size(); i++) { - if (hardware[i].name.find(".status_state") != string::npos) { - vector<string> parts = StringUtil::split (hardware[i].name, '.'); + try { + if (hardware[i].name.find(".status_state") != string::npos) { + vector<string> parts = StringUtil::split (hardware[i].name, '.'); - // parts[3] is station name (f.e. CS001) - string station = parts.size() > 3 ? parts[3] : ""; + // parts[3] is station name (f.e. CS001) + string station = parts.size() > 3 ? parts[3] : ""; - // parts[4] is tile name/number (f.e. HBA1 or LBA3) - string tile = parts.size() > 4 ? parts[4] : ""; + // parts[4] is tile name/number (f.e. HBA1 or LBA3) + string tile = parts.size() > 4 ? parts[4] : ""; - // parts[7] is RCU name/number (f.e. RCU20) - string rcu = parts.size() > 7 ? parts[7] : ""; + // parts[7] is RCU name/number (f.e. RCU20) + string rcu = parts.size() > 7 ? parts[7] : ""; - string tiletype = tile.substr(0,3); - string rcutype = rcu.substr(0,3); + string tiletype = tile.substr(0,3); + string rcutype = rcu.substr(0,3); - string type = ""; - int seqnr = 0; + string type = ""; + int seqnr = 0; - if (tiletype == "LBA" || tiletype == "HBA") { - // broken tile - type = tiletype; - seqnr = boost::lexical_cast<int>(tile.substr(3)); - } else if (rcutype == "RCU") { - // broken rcu - type = rcutype; - seqnr = boost::lexical_cast<int>(rcu.substr(3)); - } + if (tiletype == "LBA" || tiletype == "HBA") { + // broken tile + type = tiletype; + seqnr = boost::lexical_cast<int>(tile.substr(3)); + } else if (rcutype == "RCU") { + // broken rcu + type = rcutype; + seqnr = boost::lexical_cast<int>(rcu.substr(3)); + } - if (type != "") { - struct FinalMetaData::BrokenRCU info; + if (type != "") { + struct FinalMetaData::BrokenRCU info; - info.station = station; - info.type = type; - info.seqnr = seqnr; - info.time = to_simple_string(hardware[i].time); + info.station = station; + info.type = type; + info.seqnr = seqnr; + info.time = to_simple_string(hardware[i].time); - brokenrcus.push_back(info); + brokenrcus.push_back(info); + } } + } catch(std::out_of_range &ex) { + LOG_ERROR_STR(logPrefix << "Error parsing name '" << hardware[i].name << "' time '" << hardware[i].time << "': " << ex.what()); } } diff --git a/RTCP/Run/src/locations.sh.in b/RTCP/Run/src/locations.sh.in index 2b71526f8b409cfadf29cacb0aae505087c448fc..fa8ff4c9d2d2bb9022233297c375d0c6326105b7 100644 --- a/RTCP/Run/src/locations.sh.in +++ b/RTCP/Run/src/locations.sh.in @@ -78,7 +78,7 @@ else IONPROC_PARSET="$LOGDIR/L$OBSID.parset" EXTRA_KEYS=" OLAP.Storage.userName = $USER -OLAP.Storage.sshIdentityFile = $HOME/.ssh/id_rsa +OLAP.Storage.sshIdentityFile = $HOME/id_rsa OLAP.Storage.msWriter = $STORAGE OLAP.Storage.AntennaSetsConf = /data/home/lofarsys/production/lofar/etc/AntennaSets.conf OLAP.Storage.AntennaFieldsDir = /data/home/lofarsys/production/lofar/etc/StaticMetaData diff --git a/RTCP/Storage/include/Storage/MSWriterCorrelated.h b/RTCP/Storage/include/Storage/MSWriterCorrelated.h index 99b888f9b21ff344ae7618451cfc85fb42a0356b..77e6d9124d160cbc487dd56c3b95610980581f4c 100644 --- a/RTCP/Storage/include/Storage/MSWriterCorrelated.h +++ b/RTCP/Storage/include/Storage/MSWriterCorrelated.h @@ -57,6 +57,9 @@ class MSWriterCorrelated : public MSWriterFile std::vector<unsigned> itsSequenceNumbers; SmartPtr<FileStream> itsSequenceNumbersFile; + + private: + void makeMeasurementSet(const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex, bool isBigEndian); }; diff --git a/RTCP/Storage/src/MSWriterCorrelated.cc b/RTCP/Storage/src/MSWriterCorrelated.cc index d9271844d93c8c754036e6ff20c6d204e61ccbe9..c4250d8683db82493ef9a16ad8206d86ed6e8453 100644 --- a/RTCP/Storage/src/MSWriterCorrelated.cc +++ b/RTCP/Storage/src/MSWriterCorrelated.cc @@ -43,11 +43,27 @@ namespace RTCP { MSWriterCorrelated::MSWriterCorrelated (const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex, bool isBigEndian) : - MSWriterFile(str(format("%s/table.f0data") % msName)), + MSWriterFile( + (makeMeasurementSet(logPrefix, msName, parset, subbandIndex, isBigEndian), + str(format("%s/table.f0data") % msName))), itsLogPrefix(logPrefix), itsMSname(msName), itsParset(parset) { + if (itsParset.getLofarStManVersion() > 1) { + string seqfilename = str(format("%s/table.f0seqnr") % msName); + + try { + itsSequenceNumbersFile = new FileStream(seqfilename, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + } catch (...) { + LOG_WARN_STR(itsLogPrefix << "Could not open sequence numbers file " << seqfilename); + } + + itsSequenceNumbers.reserve(64); + } + + // derive baseline names + std::vector<std::string> stationNames = parset.mergedStationNames(); std::vector<std::string> baselineNames(parset.nrBaselines()); unsigned nrStations = stationNames.size(); @@ -61,27 +77,6 @@ MSWriterCorrelated::MSWriterCorrelated (const std::string &logPrefix, const std: for(unsigned s2 = 0; s2 <= s1; s2++) baselineNames[bl++] = str(format("%s_%s") % stationNames[s1] % stationNames[s2]); - // Make MeasurementSet filestructures and required tables - -#if defined HAVE_AIPSPP - MeasurementSetFormat myFormat(itsParset, 512); - - myFormat.addSubband(msName, subbandIndex, isBigEndian); - - LOG_INFO_STR(itsLogPrefix << "MeasurementSet created"); -#endif // defined HAVE_AIPSPP - - if (itsParset.getLofarStManVersion() > 1) { - string seqfilename = str(format("%s/table.f0seqnr") % msName); - - try { - itsSequenceNumbersFile = new FileStream(seqfilename, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); - } catch (...) { - LOG_WARN_STR(itsLogPrefix << "Could not open sequence numbers file " << seqfilename); - } - - itsSequenceNumbers.reserve(64); - } } @@ -91,6 +86,18 @@ MSWriterCorrelated::~MSWriterCorrelated() } +void MSWriterCorrelated::makeMeasurementSet(const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex, bool isBigEndian) +{ +#if defined HAVE_AIPSPP + MeasurementSetFormat myFormat(parset, 512); + + myFormat.addSubband(msName, subbandIndex, isBigEndian); + + LOG_INFO_STR(logPrefix << "MeasurementSet created"); +#endif // defined HAVE_AIPSPP +} + + void MSWriterCorrelated::write(StreamableData *data) { CorrelatedData *cdata = dynamic_cast<CorrelatedData*>(data); diff --git a/RTCP/Storage/src/OutputThread.cc b/RTCP/Storage/src/OutputThread.cc index 4cd169a42cbab2cb7635aba73be9d4a1f3144eec..a59cf47c33b5bce5fd6de08c8462267c7fc489ee 100644 --- a/RTCP/Storage/src/OutputThread.cc +++ b/RTCP/Storage/src/OutputThread.cc @@ -169,7 +169,7 @@ void OutputThread::createMS() itsWriter = new MSWriterFile(path); break; } - } catch (SystemCallException &ex) { + } catch (Exception &ex) { LOG_ERROR_STR(itsLogPrefix << "Cannot open " << path << ": " << ex); itsWriter = new MSWriterNull; } @@ -294,6 +294,12 @@ void OutputThread::cleanUp() void OutputThread::augment( const FinalMetaData &finalMetaData ) { + // wait for writer thread to finish, so we'll have an itsWriter + ASSERT(itsThread.get()); + + itsThread = 0; + + // augment the data product ASSERT(itsWriter.get()); itsWriter->augment(finalMetaData); diff --git a/RTCP/Storage/src/Storage_main.cc b/RTCP/Storage/src/Storage_main.cc index 606bf717e4e90d69c5fbd15d429f30193fa3457e..f282b0db84567f72d5e003b01de293c0e9cbe468 100644 --- a/RTCP/Storage/src/Storage_main.cc +++ b/RTCP/Storage/src/Storage_main.cc @@ -142,15 +142,18 @@ int main(int argc, char *argv[]) // Add final meta data (broken tile information, etc) // that is obtained after the end of an observation. - LOG_INFO_STR(obsLogPrefix << "Reading final meta data"); + LOG_INFO_STR(obsLogPrefix << "Waiting for final meta data"); FinalMetaData finalMetaData; finalMetaData.read(controlStream); LOG_INFO_STR(obsLogPrefix << "Processing final meta data"); for (size_t i = 0; i < subbandWriters.size(); ++i) - subbandWriters[i]->augment(finalMetaData); + try { + subbandWriters[i]->augment(finalMetaData); + } catch (Exception &ex) { + LOG_WARN_STR(obsLogPrefix << "Could not add final meta data: " << ex); + } } - } catch (Exception &ex) { LOG_FATAL_STR("[obs unknown] Caught Exception: " << ex); return 1;