Skip to content
Snippets Groups Projects
Commit 4c26e736 authored by Alexander van Amesfoort's avatar Alexander van Amesfoort
Browse files

Task #6639: cherry-pick merge from...

Task #6639: cherry-pick merge from LOFAR-Release-2_5_0-PVSS-event-overload-Task6639. All can go to the trunk except for the .log_prop to DEBUG changes (r30126)
parent 0d27ea74
No related branches found
No related tags found
No related merge requests found
Showing
with 37 additions and 34 deletions
......@@ -12,8 +12,8 @@
# - Select the appropriate log-level for the rootLogger (DEBUG or INFO)
# - Leave the TRC logger on DEBUG,DUMP
# - Comment out the rootLogger and the TRC logger in the TEST section of this file
#log4cplus.rootLogger=DEBUG, DAILYFILE
log4cplus.rootLogger=INFO, DAILYFILE, MACCLP
log4cplus.rootLogger=DEBUG, DAILYFILE
#log4cplus.rootLogger=INFO, DAILYFILE, MACCLP
log4cplus.logger.TRC=DEBUG, DUMP
# For TESTING:
......
......@@ -86,7 +86,6 @@ public:
template <typename T>
inline void log(const string& key, const T& value)
{
LOG_DEBUG_STR("[RTmetadata " << itsRegisterName << " -> " << itsHostName << "] " << key << " = " << value);
log(KVpair(key, value));
}
......
......@@ -46,6 +46,9 @@ RTmetadata::RTmetadata(uint32 observationID,
itsKVTport (NULL),
itsNrEventsDropped (0)
{
// use negative seqnr to avoid ack messages
itsLogEvents.seqnr = -1;
itsLogEvents.kvps.reserve(MAX_QUEUED_EVENTS);
itsQueuedEvents.reserve(MAX_QUEUED_EVENTS);
}
......@@ -69,7 +72,7 @@ RTmetadata::~RTmetadata()
}
if (itsNrEventsDropped > 0) {
LOG_WARN_STR("RTmetadata object dropped " << itsNrEventsDropped << " event(s) for PVSS");
LOG_WARN_STR("[RTmetadata " << itsRegisterName << "] dropped " << itsNrEventsDropped << " PVSS events");
}
}
......@@ -81,13 +84,13 @@ void RTmetadata::start()
// Some tests clear the supplied hostname (don't use PVSSGatewayStub).
// Code under test may still log(), but that will be lost as intended.
if (itsHostName.empty()) {
LOG_WARN("Empty hostname, so logged PVSS data points will be dropped.");
LOG_WARN_STR("[RTmetadata (PVSS) " << itsRegisterName << "] Empty PVSSGateway hostname; dropping all events");
return;
}
ScopedLock lock(itsQueuedEventsMutex);
if (!itsThread) {
itsThread.reset(new Thread(this, &RTmetadata::rtmLoop, "RTMetadata (PVSS) thread: "));
itsThread.reset(new Thread(this, &RTmetadata::rtmLoop, "[RTMetadata (PVSS) send thread] "));
}
}
......@@ -96,6 +99,8 @@ void RTmetadata::start()
//
void RTmetadata::log(const KVpair& pair)
{
LOG_DEBUG_STR("[RTmetadata " << itsRegisterName << "] log() " << pair);
ScopedLock lock(itsQueuedEventsMutex);
// Limit the queue size, possibly losing events.
......@@ -130,9 +135,9 @@ void RTmetadata::log(const vector<KVpair>& pairs)
if (nfree == MAX_QUEUED_EVENTS) {
itsQueuedEventsCond.signal();
}
} else {
itsNrEventsDropped += pairs.size() - nfree;
}
itsNrEventsDropped += pairs.size() - count;
}
......@@ -154,12 +159,11 @@ void RTmetadata::rtmLoop()
sendEventsLoop();
// not reached
} catch (LOFAR::AssertError& exc) {
LOG_WARN_STR("Connection failure to PVSS Gateway: " << exc.what() << ". Will attempt to reconnect in a moment.");
itsLogEvents.kvps.clear(); // trash possibly half-sent events
LOG_WARN_STR("[RTmetadata " << itsRegisterName << "] Connection failure to PVSS Gateway: " << exc.what() << ". Will attempt to reconnect in " << sleepTime << " us.");
delete itsKVTport;
itsKVTport = 0;
} catch (...) {
LOG_DEBUG("Caught cancellation (or unknown) exception. Stopping...");
LOG_DEBUG_STR("[RTmetadata " << itsRegisterName << "] Caught cancellation (or unknown) exception. Stopping...");
delete itsKVTport;
itsKVTport = 0;
throw; // cancellation exc must be re-thrown
......@@ -180,19 +184,20 @@ void RTmetadata::setupConnection()
{
// Use synchronous socket (last arg), since we already have a thread
// to provide full async (and thread-safety on log()).
//
// Note: the EventPort connect()s in the constructor
LOG_DEBUG_STR("[RTmetadata " << itsRegisterName << "] Creating EventPort for host " << itsHostName);
// Note: the EventPort connect()s in the constructor
itsKVTport = new EventPort(MAC_SVCMASK_PVSSGATEWAY, false, KVT_PROTOCOL,
itsHostName, true); // may throw AssertError exc
LOG_DEBUG("Registering at PVSSGateway");
LOG_DEBUG_STR("[RTmetadata " << itsRegisterName << "] Registering at PVSSGateway");
KVTRegisterEvent regEvent;
regEvent.obsID = itsObsID;
regEvent.name = itsRegisterName;
ASSERTSTR(itsKVTport->send(&regEvent),
"failed to send registration to PVSSGateway"); // send() may throw AssertError exc
LOG_DEBUG("Waiting for PVSSGateway register acknowledgement");
LOG_DEBUG_STR("[RTmetadata " << itsRegisterName << "] Waiting for PVSSGateway register acknowledgement");
GCFEvent* ackPtr;
ASSERTSTR((ackPtr = itsKVTport->receive()) != NULL,
"bad registration ack from PVSSGateway"); // receive may throw AssertError exc
......@@ -200,8 +205,7 @@ void RTmetadata::setupConnection()
ASSERTSTR(ack.obsID == itsObsID && ack.name == itsRegisterName,
"PVSSGateway identity error");
itsLogEvents.seqnr = 0;
LOG_DEBUG("Connected to and registered at the PVSSGateway");
LOG_DEBUG_STR("[RTmetadata " << itsRegisterName << "] Connected to and registered at the PVSSGateway");
}
//
......@@ -210,6 +214,8 @@ void RTmetadata::setupConnection()
void RTmetadata::sendEventsLoop()
{
while (true) {
itsLogEvents.kvps.clear();
{
ScopedLock lock(itsQueuedEventsMutex);
......@@ -219,11 +225,10 @@ void RTmetadata::sendEventsLoop()
itsQueuedEvents.swap(itsLogEvents.kvps);
}
// use negative seqnrs to avoid ack messages
itsLogEvents.seqnr -= 1;
LOG_DEBUG_STR("[RTmetadata " << itsRegisterName << "] sending " << itsLogEvents.kvps.size() << " PVSS DPs; 1st: " <<
itsLogEvents.kvps[0] << " last: " << itsLogEvents.kvps[itsLogEvents.kvps.size() - 1]);
itsKVTport->send(&itsLogEvents); // may throw AssertError exc
LOG_DEBUG_STR("Sent " << itsLogEvents.kvps.size() << " PVSS data point events");
itsLogEvents.kvps.clear();
LOG_DEBUG_STR("[RTmetadata " << itsRegisterName << "] sent " << itsLogEvents.kvps.size() << " PVSS DPs");
}
}
......
......@@ -12,7 +12,4 @@ Cobalt.Feedback.remotePath = /opt/lofar/var/run
# If empty, data points are never sent.
# One can also start a PVSSGateway_Stub
# on localhost, which writes to a file.
#Cobalt.PVSSGateway.host = ccu001
# Disable PVSSGateway until ticket #5843 is fixed.
Cobalt.PVSSGateway.host =
Cobalt.PVSSGateway.host = ccu001
......@@ -2,7 +2,7 @@
# output on stdout.
# Configure the loggers
log4cplus.rootLogger=INFO, STDERR
log4cplus.rootLogger=DEBUG, STDERR
log4cplus.logger.TRC=INFO
# prevent debug messages: accept >=WARN only, and don't forward messages to the rootLogger
......
......@@ -2,7 +2,7 @@
# output on stdout.
# Configure the loggers
log4cplus.rootLogger=INFO, STDERR
log4cplus.rootLogger=DEBUG, STDERR
log4cplus.logger.TRC=INFO
# prevent debug messages: accept >=WARN only, and don't forward messages to the rootLogger
......
......@@ -2,7 +2,7 @@
# Configure the loggers
# logging to the MACCLP socket appender.
log4cplus.rootLogger=INFO, STDERR
log4cplus.rootLogger=DEBUG, STDERR
log4cplus.logger.TRC=INFO
# prevent debug messages: accept >=WARN only, and don't forward messages to the rootLogger
......
......@@ -190,7 +190,8 @@ int main(int argc, char **argv)
// Create mdLogger for monitoring (PVSS). We can already log(), but start() the event send thread
// much later, after the pipeline creation (post-fork()), so we don't crash.
const string mdRegisterName = PST_COBALTGPU_PROC;
const string mdRegisterName = PST_COBALTGPU_PROC + boost::lexical_cast<string>(cpuNr) + ":" +
boost::lexical_cast<string>(ps.observationID()) + "@" + hostName;
const string mdHostName = ps.getString("Cobalt.PVSSGateway.host", "");
// Don't connect to PVSS for non-real-time observations -- they have no proper flow control
......
......@@ -3,7 +3,7 @@
# TODO: rtcp currently crashes randomly with a segmentation fault when
# logging to the MACCLP socket appender.
#log4cplus.rootLogger=INFO, STDOUT, MACCLP
log4cplus.rootLogger=INFO, STDOUT
log4cplus.rootLogger=DEBUG, STDOUT
log4cplus.logger.TRC=INFO
# prevent debug messages: accept >=WARN only, and don't forward messages to the rootLogger
......
......@@ -2,7 +2,7 @@
# output on stdout.
# Configure the loggers
log4cplus.rootLogger=INFO, STDERR
log4cplus.rootLogger=DEBUG, STDERR
log4cplus.logger.TRC=INFO
# prevent debug messages: accept >=WARN only, and don't forward messages to the rootLogger
......
......@@ -2,7 +2,7 @@
# output on stdout.
# Configure the loggers
log4cplus.rootLogger=INFO, STDERR
log4cplus.rootLogger=DEBUG, STDERR
log4cplus.logger.TRC=INFO
# prevent debug messages: accept >=WARN only, and don't forward messages to the rootLogger
......
......@@ -83,7 +83,8 @@ bool process(Stream &controlStream, unsigned myRank)
LOG_INFO_STR("MACProcessScope: " << mdKeyPrefix);
mdKeyPrefix.push_back('.'); // keys look like: "keyPrefix.subKeyName[x]"
const string mdRegisterName = PST_COBALT_OUTPUT_PROC;
const string mdRegisterName = string(PST_COBALT_OUTPUT_PROC) + ":" +
lexical_cast<string>(parset.observationID()) + "@" + myHostName;
const string mdHostName = parset.getString("Cobalt.PVSSGateway.host", "");
MACIO::RTmetadata mdLogger(parset.observationID(), mdRegisterName, mdHostName);
mdLogger.start();
......
......@@ -3,7 +3,7 @@
# TODO: outputProc currently crashes randomly with a segmentation fault when
# logging to the MACCLP socket appender.
#log4cplus.rootLogger=INFO, STDOUT, MACCLP
log4cplus.rootLogger=INFO, STDOUT
log4cplus.rootLogger=DEBUG, STDOUT
log4cplus.logger.TRC=INFO
# prevent debug messages: accept >=WARN only, and don't forward messages to the rootLogger
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment