Skip to content
Snippets Groups Projects
Commit 0bc4b733 authored by Alexander van Amesfoort's avatar Alexander van Amesfoort
Browse files

Task #4804: more build fixes. Move metadata init string back into GPUProcIO, as we need the parset.

parent e3a26b3e
No related branches found
No related tags found
No related merge requests found
...@@ -21,7 +21,8 @@ ...@@ -21,7 +21,8 @@
//# Always #include <lofar_config.h> first! //# Always #include <lofar_config.h> first!
#include <lofar_config.h> #include <lofar_config.h>
#include <string> #include "GPUProcIO.h"
#include <vector> #include <vector>
#include <omp.h> #include <omp.h>
#include <boost/format.hpp> #include <boost/format.hpp>
...@@ -30,9 +31,14 @@ ...@@ -30,9 +31,14 @@
#include <Common/StringUtil.h> #include <Common/StringUtil.h>
#include <Common/Exceptions.h> #include <Common/Exceptions.h>
#include <Stream/PortBroker.h> #include <Stream/PortBroker.h>
#include <ApplCommon/PVSSDatapointDefs.h>
#include <ApplCommon/StationInfo.h>
#include <MACIO/RTmetadata.h>
#include <CoInterface/Exceptions.h> #include <CoInterface/Exceptions.h>
#include <CoInterface/Parset.h> #include <CoInterface/Parset.h>
#include <CoInterface/FinalMetaData.h>
#include <CoInterface/Stream.h> #include <CoInterface/Stream.h>
#include <CoInterface/SmartPtr.h>
#include "SubbandWriter.h" #include "SubbandWriter.h"
#include "OutputThread.h" #include "OutputThread.h"
...@@ -44,10 +50,23 @@ using boost::format; ...@@ -44,10 +50,23 @@ using boost::format;
namespace LOFAR { namespace LOFAR {
namespace Cobalt { namespace Cobalt {
bool process(Stream &controlStream, const string& myHostName) bool process(Stream &controlStream, unsigned myRank)
{ {
bool success(true); bool success(true);
Parset parset(&controlStream); Parset parset(&controlStream);
// Send identification string to the MAC Log Processor
string fmtStr(createPropertySetName(PSN_COBALT_OUTPUT_PROC, "",
parset.getString("_DPname")));
format prFmt;
prFmt.exceptions(boost::io::no_error_bits); // avoid throw
prFmt.parse(fmtStr);
LOG_INFO_STR("MACProcessScope: " << str(prFmt % myRank));
const vector<string> &hostnames = parset.settings.outputProcHosts;
ASSERT(myRank < hostnames.size());
string myHostName = hostnames[myRank];
{ {
// make sure "parset" stays in scope for the lifetime of the SubbandWriters // make sure "parset" stays in scope for the lifetime of the SubbandWriters
...@@ -65,7 +84,7 @@ bool process(Stream &controlStream, const string& myHostName) ...@@ -65,7 +84,7 @@ bool process(Stream &controlStream, const string& myHostName)
if (parset.settings.correlator.files[fileIdx].location.host != myHostName) if (parset.settings.correlator.files[fileIdx].location.host != myHostName)
continue; continue;
string logPrefix = boost::str(boost::format("[obs %u correlated stream %3u] ") string logPrefix = str(format("[obs %u correlated stream %3u] ")
% parset.observationID() % fileIdx); % parset.observationID() % fileIdx);
SubbandWriter *writer = new SubbandWriter(parset, fileIdx, logPrefix); SubbandWriter *writer = new SubbandWriter(parset, fileIdx, logPrefix);
...@@ -107,7 +126,7 @@ bool process(Stream &controlStream, const string& myHostName) ...@@ -107,7 +126,7 @@ bool process(Stream &controlStream, const string& myHostName)
collectors[fileIdx] = new TABTranspose::BlockCollector( collectors[fileIdx] = new TABTranspose::BlockCollector(
*outputPools[fileIdx], fileIdx, nrSubbands, nrChannels, nrSamples, parset.nrBeamFormedBlocks(), parset.realTime() ? 5 : 0); *outputPools[fileIdx], fileIdx, nrSubbands, nrChannels, nrSamples, parset.nrBeamFormedBlocks(), parset.realTime() ? 5 : 0);
string logPrefix = boost::str(boost::format("[obs %u beamformed stream %3u] ") string logPrefix = str(format("[obs %u beamformed stream %3u] ")
% parset.observationID() % fileIdx); % parset.observationID() % fileIdx);
TABOutputThread *writer = new TABOutputThread(parset, fileIdx, *outputPools[fileIdx], logPrefix); TABOutputThread *writer = new TABOutputThread(parset, fileIdx, *outputPools[fileIdx], logPrefix);
......
...@@ -24,11 +24,7 @@ ...@@ -24,11 +24,7 @@
//# Never #include <config.h> or #include <lofar_config.h> in a header file! //# Never #include <config.h> or #include <lofar_config.h> in a header file!
#include <string> #include <string>
#include <vector>
#include <Stream/Stream.h> #include <Stream/Stream.h>
#include <CoInterface/SmartPtr.h>
#include <CoInterface/FinalMetaData.h>
namespace LOFAR namespace LOFAR
{ {
...@@ -44,7 +40,7 @@ namespace LOFAR ...@@ -44,7 +40,7 @@ namespace LOFAR
// * Call writeFeedbackLTA to obtain the LTA feedback from all writers, // * Call writeFeedbackLTA to obtain the LTA feedback from all writers,
// and write it to GPUProc. // and write it to GPUProc.
// \return \c true upon success, \c false upon failure. // \return \c true upon success, \c false upon failure.
bool process(Stream &controlStream, const std::string& myHostName); bool process(Stream &controlStream, unsigned myRank);
} // namespace Cobalt } // namespace Cobalt
} // namespace LOFAR } // namespace LOFAR
......
...@@ -28,14 +28,10 @@ ...@@ -28,14 +28,10 @@
#include <omp.h> #include <omp.h>
#include <boost/format.hpp> #include <boost/format.hpp>
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
#include <Common/LofarLogger.h> #include <Common/LofarLogger.h>
#include <Common/CasaLogSink.h> #include <Common/CasaLogSink.h>
#include <Common/Exceptions.h> #include <Common/Exceptions.h>
#include <Common/NewHandler.h> #include <Common/NewHandler.h>
#include <ApplCommon/PVSSDatapointDefs.h>
#include <ApplCommon/StationInfo.h>
#include <MACIO/RTmetadata.h>
#include <Stream/PortBroker.h> #include <Stream/PortBroker.h>
#include <CoInterface/Exceptions.h> #include <CoInterface/Exceptions.h>
#include <CoInterface/Parset.h> #include <CoInterface/Parset.h>
...@@ -46,12 +42,13 @@ ...@@ -46,12 +42,13 @@
#define STDLOG_BUFFER_SIZE 1024 #define STDLOG_BUFFER_SIZE 1024
// install a new handler to produce backtraces for bad_alloc
LOFAR::NewHandler h(LOFAR::BadAllocException::newHandler);
using namespace LOFAR; using namespace LOFAR;
using namespace LOFAR::Cobalt; using namespace LOFAR::Cobalt;
using namespace std; using namespace std;
using boost::format;
// install a new handler to produce backtraces for bad_alloc
LOFAR::NewHandler h(LOFAR::BadAllocException::newHandler);
// Use a terminate handler that can produce a backtrace. // Use a terminate handler that can produce a backtrace.
Exception::TerminateHandler t(Exception::terminate); Exception::TerminateHandler t(Exception::terminate);
...@@ -76,46 +73,36 @@ static void usage(const char *argv0) ...@@ -76,46 +73,36 @@ static void usage(const char *argv0)
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
INIT_LOGGER("outputProc"); // also attaches to CasaLogSink setvbuf(stdout, stdoutbuf, _IOLBF, sizeof stdoutbuf);
setvbuf(stderr, stderrbuf, _IOLBF, sizeof stderrbuf);
// Send identification string to the MAC Log Processor before other logging
string fmtStr(createPropertySetName(PSN_COBALT_OUTPUT_PROC, "",
parset.getString("_DPname")));
boost::format prFmt;
prFmt.exceptions(boost::io::no_error_bits); // avoid throw
prFmt.parse(fmtStr);
LOG_INFO_STR("MACProcessScope: " << str(prFmt % myRank));
LOG_INFO_STR("OutputProc version " << OutputProcVersion::getVersion() << " r" << OutputProcVersion::getRevision());
int opt; int opt;
while ((opt = getopt(argc, argv, "h")) != -1) { while ((opt = getopt(argc, argv, "h")) != -1) {
switch (opt) { switch (opt) {
case 'h': case 'h':
usage(argv[0]); usage(argv[0]);
exit(0); exit(EXIT_SUCCESS);
default: /* '?' */ default: /* '?' */
usage(argv[0]); usage(argv[0]);
exit(1); exit(EXIT_FAILURE);
} }
} }
if (argc != 3) if (argc != 3) {
{
usage(argv[0]); usage(argv[0]);
return EXIT_FAILURE; return EXIT_FAILURE;
} }
setvbuf(stdout, stdoutbuf, _IOLBF, sizeof stdoutbuf); INIT_LOGGER("outputProc"); // also attaches to CasaLogSink
setvbuf(stderr, stderrbuf, _IOLBF, sizeof stderrbuf);
omp_set_nested(true);
LOG_DEBUG_STR("Started: " << argv[0] << ' ' << argv[1] << ' ' << argv[2]); LOG_DEBUG_STR("Started: " << argv[0] << ' ' << argv[1] << ' ' << argv[2]);
LOG_INFO_STR("OutputProc version " << OutputProcVersion::getVersion() << " r" << OutputProcVersion::getRevision());
int observationID = boost::lexical_cast<int>(argv[1]); int observationID = boost::lexical_cast<int>(argv[1]);
size_t myRank = boost::lexical_cast<size_t>(argv[2]); unsigned myRank = boost::lexical_cast<unsigned>(argv[2]);
omp_set_nested(true);
setIOpriority(); setIOpriority();
setRTpriority(); setRTpriority();
...@@ -123,15 +110,11 @@ int main(int argc, char *argv[]) ...@@ -123,15 +110,11 @@ int main(int argc, char *argv[])
PortBroker::createInstance(storageBrokerPort(observationID)); PortBroker::createInstance(storageBrokerPort(observationID));
// retrieve the parset // retrieve control stream to receive the parset and report back
string resource = getStorageControlDescription(observationID, myRank); string resource = getStorageControlDescription(observationID, myRank);
PortBroker::ServerStream controlStream(resource); PortBroker::ServerStream controlStream(resource);
const vector<string> &hostnames = parset.settings.outputProcHosts; if (process(controlStream, myRank)) {
ASSERT(myRank < hostnames.size());
string myHostName = hostnames[myRank];
if (process(controlStream, myHostName)) {
LOG_INFO("Program terminated succesfully"); LOG_INFO("Program terminated succesfully");
return EXIT_SUCCESS; return EXIT_SUCCESS;
} else { } else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment