Skip to content
Snippets Groups Projects
Commit 337503b2 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #3634: Forward broken tile information plus some minor fixes

parent ffe7d01c
No related branches found
No related tags found
No related merge requests found
......@@ -129,6 +129,10 @@ Job::~Job()
// a valid Job object to work on
delete itsPLCClient.release();
// stop any started Storage processes
if (myPsetNumber == 0)
stopStorageProcesses();
if (LOG_CONDITION)
LOG_INFO_STR(itsLogPrefix << "----- Job " << (itsIsRunning ? "finished" : "cancelled") << " successfully");
}
......@@ -313,7 +317,10 @@ void Job::StorageProcess::controlThread()
// Send final meta data once it is available
itsJob.itsFinalMetaDataAvailable.down();
LOG_DEBUG_STR(itsLogPrefix << "[ControlThread] sending final meta data");
itsJob.itsFinalMetaData.write(stream);
LOG_DEBUG_STR(itsLogPrefix << "[ControlThread] sent final meta data");
}
......@@ -401,31 +408,34 @@ void Job::stopStorageProcesses()
LOG_DEBUG_STR(itsLogPrefix << "Stopping storage processes");
time_t deadline = time(0) + 300;
struct timespec immediately = { 0, 0 };
size_t nrRunning = itsStorageProcesses.size();
size_t nrRunning = 0;
for (unsigned rank = 0; rank < itsStorageProcesses.size(); rank ++)
if (itsStorageProcesses[rank].get())
nrRunning++;
while(nrRunning > 0) {
for (unsigned rank = 0; rank < itsStorageProcesses.size(); rank ++) {
if (!itsStorageProcesses[rank].get())
continue;
if (itsStorageProcesses[rank]->isDone() || time(0) >= deadline) {
struct timespec immediately = { 0, 0 };
do {
for (unsigned rank = 0; rank < itsStorageProcesses.size(); rank ++)
if (itsStorageProcesses[rank]->isDone()) {
itsStorageProcesses[rank]->stop(immediately);
itsStorageProcesses[rank] = 0;
nrRunning--;
}
}
if (nrRunning > 0)
sleep(1);
} while( nrRunning > 0 && time(0) < deadline );
LOG_DEBUG_STR(itsLogPrefix << "Killing any remaining storage processes");
for (unsigned rank = 0; rank < itsStorageProcesses.size(); rank ++) {
itsStorageProcesses[rank]->stop(immediately);
itsStorageProcesses[rank] = 0;
}
itsStorageProcesses.clear();
LOG_DEBUG_STR(itsLogPrefix << "Storage processes are stopped");
}
......
......@@ -178,6 +178,11 @@ bool SSHconnection::open_session( FileDescriptorBasedStream &sock )
if (rc) {
LOG_ERROR_STR( itsLogPrefix << "Authentication by public key failed: " << rc);
// unrecoverable errors
if (rc == LIBSSH2_ERROR_FILE)
THROW(SSHException, "Error reading read key file " << itsSSHKey);
return false;
}
......
......@@ -28,6 +28,7 @@
#ifdef HAVE_LIBSSH2
#include <Common/Thread/Thread.h>
#include <Common/Exception.h>
#include <Stream/FileDescriptorBasedStream.h>
#include <libssh2.h>
#include <Interface/SmartPtr.h>
......@@ -47,6 +48,8 @@ void SSH_Finalize();
class SSHconnection {
public:
EXCEPTION_CLASS(SSHException, LOFAR::Exception);
SSHconnection(const string &logPrefix, const string &hostname, const string &commandline, const string &username, const string &sshkey, bool captureStdout = false);
~SSHconnection();
......
......@@ -135,8 +135,8 @@ void FinalMetaData::write(Stream &s)
void FinalMetaData::read(Stream &s)
{
StreamWriter< std::vector<struct BrokenRCU> >::write(s, brokenRCUsAtBegin);
StreamWriter< std::vector<struct BrokenRCU> >::write(s, brokenRCUsDuring);
StreamWriter< std::vector<struct BrokenRCU> >::read(s, brokenRCUsAtBegin);
StreamWriter< std::vector<struct BrokenRCU> >::read(s, brokenRCUsDuring);
}
}
......
......@@ -122,44 +122,48 @@ void parseBrokenHardware (const vector<OTDBvalue> &hardware, vector<struct Final
// A broken antenna element/tile entry must contain .status_state
for (size_t i = 0; i < hardware.size(); i++) {
if (hardware[i].name.find(".status_state") != string::npos) {
vector<string> parts = StringUtil::split (hardware[i].name, '.');
try {
if (hardware[i].name.find(".status_state") != string::npos) {
vector<string> parts = StringUtil::split (hardware[i].name, '.');
// parts[3] is station name (f.e. CS001)
string station = parts.size() > 3 ? parts[3] : "";
// parts[3] is station name (f.e. CS001)
string station = parts.size() > 3 ? parts[3] : "";
// parts[4] is tile name/number (f.e. HBA1 or LBA3)
string tile = parts.size() > 4 ? parts[4] : "";
// parts[4] is tile name/number (f.e. HBA1 or LBA3)
string tile = parts.size() > 4 ? parts[4] : "";
// parts[7] is RCU name/number (f.e. RCU20)
string rcu = parts.size() > 7 ? parts[7] : "";
// parts[7] is RCU name/number (f.e. RCU20)
string rcu = parts.size() > 7 ? parts[7] : "";
string tiletype = tile.substr(0,3);
string rcutype = rcu.substr(0,3);
string tiletype = tile.substr(0,3);
string rcutype = rcu.substr(0,3);
string type = "";
int seqnr = 0;
string type = "";
int seqnr = 0;
if (tiletype == "LBA" || tiletype == "HBA") {
// broken tile
type = tiletype;
seqnr = boost::lexical_cast<int>(tile.substr(3));
} else if (rcutype == "RCU") {
// broken rcu
type = rcutype;
seqnr = boost::lexical_cast<int>(rcu.substr(3));
}
if (tiletype == "LBA" || tiletype == "HBA") {
// broken tile
type = tiletype;
seqnr = boost::lexical_cast<int>(tile.substr(3));
} else if (rcutype == "RCU") {
// broken rcu
type = rcutype;
seqnr = boost::lexical_cast<int>(rcu.substr(3));
}
if (type != "") {
struct FinalMetaData::BrokenRCU info;
if (type != "") {
struct FinalMetaData::BrokenRCU info;
info.station = station;
info.type = type;
info.seqnr = seqnr;
info.time = to_simple_string(hardware[i].time);
info.station = station;
info.type = type;
info.seqnr = seqnr;
info.time = to_simple_string(hardware[i].time);
brokenrcus.push_back(info);
brokenrcus.push_back(info);
}
}
} catch(std::out_of_range &ex) {
LOG_ERROR_STR(logPrefix << "Error parsing name '" << hardware[i].name << "' time '" << hardware[i].time << "': " << ex.what());
}
}
......
......@@ -78,7 +78,7 @@ else
IONPROC_PARSET="$LOGDIR/L$OBSID.parset"
EXTRA_KEYS="
OLAP.Storage.userName = $USER
OLAP.Storage.sshIdentityFile = $HOME/.ssh/id_rsa
OLAP.Storage.sshIdentityFile = $HOME/id_rsa
OLAP.Storage.msWriter = $STORAGE
OLAP.Storage.AntennaSetsConf = /data/home/lofarsys/production/lofar/etc/AntennaSets.conf
OLAP.Storage.AntennaFieldsDir = /data/home/lofarsys/production/lofar/etc/StaticMetaData
......
......@@ -57,6 +57,9 @@ class MSWriterCorrelated : public MSWriterFile
std::vector<unsigned> itsSequenceNumbers;
SmartPtr<FileStream> itsSequenceNumbersFile;
private:
void makeMeasurementSet(const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex, bool isBigEndian);
};
......
......@@ -43,11 +43,27 @@ namespace RTCP {
MSWriterCorrelated::MSWriterCorrelated (const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex, bool isBigEndian)
:
MSWriterFile(str(format("%s/table.f0data") % msName)),
MSWriterFile(
(makeMeasurementSet(logPrefix, msName, parset, subbandIndex, isBigEndian),
str(format("%s/table.f0data") % msName))),
itsLogPrefix(logPrefix),
itsMSname(msName),
itsParset(parset)
{
if (itsParset.getLofarStManVersion() > 1) {
string seqfilename = str(format("%s/table.f0seqnr") % msName);
try {
itsSequenceNumbersFile = new FileStream(seqfilename, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
} catch (...) {
LOG_WARN_STR(itsLogPrefix << "Could not open sequence numbers file " << seqfilename);
}
itsSequenceNumbers.reserve(64);
}
// derive baseline names
std::vector<std::string> stationNames = parset.mergedStationNames();
std::vector<std::string> baselineNames(parset.nrBaselines());
unsigned nrStations = stationNames.size();
......@@ -61,27 +77,6 @@ MSWriterCorrelated::MSWriterCorrelated (const std::string &logPrefix, const std:
for(unsigned s2 = 0; s2 <= s1; s2++)
baselineNames[bl++] = str(format("%s_%s") % stationNames[s1] % stationNames[s2]);
// Make MeasurementSet filestructures and required tables
#if defined HAVE_AIPSPP
MeasurementSetFormat myFormat(itsParset, 512);
myFormat.addSubband(msName, subbandIndex, isBigEndian);
LOG_INFO_STR(itsLogPrefix << "MeasurementSet created");
#endif // defined HAVE_AIPSPP
if (itsParset.getLofarStManVersion() > 1) {
string seqfilename = str(format("%s/table.f0seqnr") % msName);
try {
itsSequenceNumbersFile = new FileStream(seqfilename, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
} catch (...) {
LOG_WARN_STR(itsLogPrefix << "Could not open sequence numbers file " << seqfilename);
}
itsSequenceNumbers.reserve(64);
}
}
......@@ -91,6 +86,18 @@ MSWriterCorrelated::~MSWriterCorrelated()
}
void MSWriterCorrelated::makeMeasurementSet(const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex, bool isBigEndian)
{
#if defined HAVE_AIPSPP
MeasurementSetFormat myFormat(parset, 512);
myFormat.addSubband(msName, subbandIndex, isBigEndian);
LOG_INFO_STR(logPrefix << "MeasurementSet created");
#endif // defined HAVE_AIPSPP
}
void MSWriterCorrelated::write(StreamableData *data)
{
CorrelatedData *cdata = dynamic_cast<CorrelatedData*>(data);
......
......@@ -169,7 +169,7 @@ void OutputThread::createMS()
itsWriter = new MSWriterFile(path);
break;
}
} catch (SystemCallException &ex) {
} catch (Exception &ex) {
LOG_ERROR_STR(itsLogPrefix << "Cannot open " << path << ": " << ex);
itsWriter = new MSWriterNull;
}
......@@ -294,6 +294,12 @@ void OutputThread::cleanUp()
void OutputThread::augment( const FinalMetaData &finalMetaData )
{
// wait for writer thread to finish, so we'll have an itsWriter
ASSERT(itsThread.get());
itsThread = 0;
// augment the data product
ASSERT(itsWriter.get());
itsWriter->augment(finalMetaData);
......
......@@ -142,15 +142,18 @@ int main(int argc, char *argv[])
// Add final meta data (broken tile information, etc)
// that is obtained after the end of an observation.
LOG_INFO_STR(obsLogPrefix << "Reading final meta data");
LOG_INFO_STR(obsLogPrefix << "Waiting for final meta data");
FinalMetaData finalMetaData;
finalMetaData.read(controlStream);
LOG_INFO_STR(obsLogPrefix << "Processing final meta data");
for (size_t i = 0; i < subbandWriters.size(); ++i)
subbandWriters[i]->augment(finalMetaData);
try {
subbandWriters[i]->augment(finalMetaData);
} catch (Exception &ex) {
LOG_WARN_STR(obsLogPrefix << "Could not add final meta data: " << ex);
}
}
} catch (Exception &ex) {
LOG_FATAL_STR("[obs unknown] Caught Exception: " << ex);
return 1;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment