-
Jan David Mol authoredJan David Mol authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
CEPlogProcessor.cc 30.22 KiB
//# CEPlogProcessor.cc: Moves the operator info from the logfiles to PVSS
//#
//# Copyright (C) 2009
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
#include <lofar_config.h>
#include <Common/LofarLogger.h>
#include <Common/LofarConstants.h>
#include <Common/LofarLocators.h>
#include <Common/StringUtil.h>
#include <Common/ParameterSet.h>
#include <GCF/PVSS/GCF_PVTypes.h>
#include <MACIO/MACServiceInfo.h>
#include <APL/APLCommon/ControllerDefines.h>
//#include <APL/RTDBCommon/RTDButilities.h>
//#include <APL/APLCommon/StationInfo.h>
#include <GCF/RTDB/DP_Protocol.ph>
#include <signal.h>
#include <stdlib.h>
#include <unistd.h> // usleep
#include "CEPlogProcessor.h"
#include "PVSSDatapointDefs.h"
namespace LOFAR {
using namespace APLCommon;
// using namespace APL::RTDBCommon;
using namespace GCF::TM;
using namespace GCF::PVSS;
using namespace GCF::RTDB;
namespace APL {
// static pointer to this object for signal handler
static CEPlogProcessor* thisLogProcessor = 0;
#define MPIProcs 16
//
// CEPlogProcessor()
//
CEPlogProcessor::CEPlogProcessor(const string& cntlrName) :
GCFTask ((State)&CEPlogProcessor::initial_state,cntlrName),
itsListener (0),
itsOwnPropertySet (0),
itsTimerPort (0),
itsNrInputBuffers (0),
itsNrAdders (0),
itsNrStorage (0),
itsBufferSize (0)
{
LOG_TRACE_OBJ_STR (cntlrName << " construction");
// need port for timers.
itsTimerPort = new GCFTimerPort(*this, "TimerPort");
// prepare TCP port to accept connections on
itsListener = new GCFTCPPort (*this, "BGPlogger:v1_0", GCFPortInterface::MSPP, 0);
// itsListener = new GCFTCPPort (*this, MAC_SVCMASK_CEPPROCMONITOR, GCFPortInterface::MSPP, 0); // TODO
ASSERTSTR(itsListener, "Cannot allocate listener port");
itsListener->setPortNumber(globalParameterSet()->getInt("CEPlogProcessor.portNr"));
itsBufferSize = globalParameterSet()->getInt("CEPlogProcessor.bufferSize", 1024);
itsNrInputBuffers = globalParameterSet()->getInt("CEPlogProcessor.nrInputBuffers", 64);
itsNrAdders = globalParameterSet()->getInt("CEPlogProcessor.nrAdders", 64);
itsNrStorage = globalParameterSet()->getInt("CEPlogProcessor.nrStorage", 100);
registerProtocol(DP_PROTOCOL, DP_PROTOCOL_STRINGS);
thisLogProcessor = this;
}
//
// ~CEPlogProcessor()
//
CEPlogProcessor::~CEPlogProcessor()
{
LOG_TRACE_OBJ_STR (getName() << " destruction");
// database should be ready by ts, check if allocation was succesfull
for (int inputBuf = itsNrInputBuffers - 1; inputBuf >= 0; inputBuf--) {
delete itsInputBuffers[inputBuf];
}
for (int adder = itsNrAdders - 1; adder >= 0; adder--) {
delete itsAdders[adder];
}
for (int storage = itsNrStorage - 1; storage >= 0; storage--) {
delete itsStorage[storage];
}
// close all streams
while( !itsLogStreams.empty() )
_deleteStream( *((*itsLogStreams.begin()).first) );
// and reap the port objects immediately
collectGarbage();
if (itsListener) {
itsListener->close();
delete itsListener;
}
delete itsTimerPort;
delete itsOwnPropertySet;
}
//
// signalHandler(signr)
//
void CEPlogProcessor::signalHandler(int signum)
{
LOG_DEBUG_STR("SIGNAL " << signum << " detected");
if (thisLogProcessor) {
thisLogProcessor->finish();
}
}
//
// finish()
//
void CEPlogProcessor::finish()
{
TRAN(CEPlogProcessor::finish_state);
}
//
// initial_state(event, port)
//
// Setup connection with PVSS
//
GCFEvent::TResult CEPlogProcessor::initial_state(GCFEvent& event,
GCFPortInterface& port)
{
LOG_DEBUG_STR ("initial:" << eventName(event) << "@" << port.getName());
GCFEvent::TResult status = GCFEvent::HANDLED;
switch (event.signal) {
case F_INIT:
break;
case F_ENTRY: {
// Get access to my own propertyset.
LOG_DEBUG_STR ("Activating PropertySet " << PSN_LOG_PROCESSOR);
itsTimerPort->setTimer(2.0);
itsOwnPropertySet = new RTDBPropertySet(PSN_LOG_PROCESSOR,
PST_LOG_PROCESSOR,
PSAT_WO,
this);
}
break;
case DP_CREATED: {
// NOTE: this function may be called DURING the construction of the PropertySet.
// Always exit this event in a way that GCF can end the construction.
DPCreatedEvent dpEvent(event);
LOG_DEBUG_STR("Result of creating " << dpEvent.DPname << " = " << dpEvent.result);
itsTimerPort->cancelAllTimers();
itsTimerPort->setTimer(0.0);
}
break;
case F_TIMER: {
// update PVSS.
LOG_TRACE_FLOW ("Updateing state to PVSS");
itsOwnPropertySet->setValue(PN_FSM_CURRENT_ACTION, GCFPVString("initial"));
LOG_DEBUG_STR("Going to create the datapoints in PVSS");
TRAN (CEPlogProcessor::createPropertySets);
}
case DP_SET:
break;
case F_QUIT:
TRAN (CEPlogProcessor::finish_state);
break;
default:
LOG_DEBUG_STR ("initial, DEFAULT");
break;
}
return (status);
}
//
// createPropertySets(event, port)
//
// Create PropertySets for all processes.
//
GCFEvent::TResult CEPlogProcessor::createPropertySets(GCFEvent& event,
GCFPortInterface& port)
{
LOG_DEBUG_STR ("createPropertySets:" << eventName(event) << "@" << port.getName());
GCFEvent::TResult status = GCFEvent::HANDLED;
switch (event.signal) {
case F_ENTRY: {
itsOwnPropertySet->setValue(PN_FSM_CURRENT_ACTION,GCFPVString("create PropertySets"));
// create propSets for the inputbuffer processes
itsInputBuffers.resize(itsNrInputBuffers, 0);
string inputBufferNameMask (createPropertySetName(PSN_INPUT_BUFFER, getName()));
for (unsigned inputBuffer = 0; inputBuffer < itsNrInputBuffers; inputBuffer++) {
if (!itsInputBuffers[inputBuffer]) {
string PSname(formatString(inputBufferNameMask.c_str(), inputBuffer));
itsInputBuffers[inputBuffer] = new RTDBPropertySet(PSname, PST_INPUT_BUFFER, PSAT_WO | PSAT_CW, this);
}
usleep (2000); // wait 2 ms in order not to overload the system
}
// create propSets for the adder processes
itsAdders.resize (itsNrAdders, 0);
string adderNameMask(createPropertySetName(PSN_ADDER, getName()));
for (unsigned adder = 0; adder < itsNrAdders; adder++) {
if (!itsAdders[adder]) {
string PSname(formatString(adderNameMask.c_str(), adder));
itsAdders[adder] = new RTDBPropertySet(PSname, PST_ADDER, PSAT_WO | PSAT_CW, this);
}
usleep (2000); // wait 2 ms in order not to overload the system
}
itsDroppingCount.resize (itsNrAdders, 0);
// create propSets for the storage processes
itsStorage.resize (itsNrStorage, 0);
string storageNameMask(createPropertySetName(PSN_STORAGE, getName()));
for (unsigned storage = 0; storage < itsNrStorage; storage++) {
if (!itsStorage[storage]) {
string PSname(formatString(storageNameMask.c_str(), storage));
itsStorage[storage] = new RTDBPropertySet(PSname, PST_STORAGE, PSAT_WO | PSAT_CW, this);
}
usleep (2000); // wait 2 ms in order not to overload the system
}
itsStorageBuf.resize (itsNrStorage);
for (unsigned i = 0; i < itsNrStorage; i++) {
//set array sizes
itsStorageBuf[i].timeStr.resize(MPIProcs);
itsStorageBuf[i].count.resize(MPIProcs,0);
itsStorageBuf[i].dropped.resize(MPIProcs);
}
LOG_INFO("Giving PVSS 5 seconds to process the requests");
itsTimerPort->setTimer(5.0); // give database some time to finish the job
}
break;
case F_TIMER: {
// database should be ready by ts, check if allocation was succesfull
for (unsigned inputBuffer = 0; inputBuffer < itsNrInputBuffers; inputBuffer++) {
ASSERTSTR(itsInputBuffers[inputBuffer], "Allocation of PS for inputBuffer " << inputBuffer << " failed.");
}
for (unsigned adder = 0; adder < itsNrAdders; adder++) {
ASSERTSTR(itsAdders[adder], "Allocation of PS for adder " << adder << " failed.");
}
for (unsigned storage = 0; storage < itsNrStorage; storage++) {
ASSERTSTR(itsStorage[storage], "Allocation of PS for storage " << storage << " failed.");
}
LOG_DEBUG_STR("Allocation of all propertySets successfull, going to open the listener");
TRAN(CEPlogProcessor::startListener);
}
break;
case DP_SET:
break;
case F_QUIT:
TRAN (CEPlogProcessor::finish_state);
break;
default:
LOG_DEBUG_STR ("createPropertySets, DEFAULT");
break;
}
return (status);
}
//
// startListener(event, port)
//
GCFEvent::TResult CEPlogProcessor::startListener(GCFEvent& event, GCFPortInterface& port)
{
LOG_DEBUG_STR("startListener:" << eventName(event) << "@" << port.getName());
switch (event.signal) {
case F_ENTRY:
itsListener->autoOpen(0, 10, 2); // report within 10 seconds.
break;
case F_CONNECTED:
LOG_DEBUG("Listener is started, going to operational mode");
TRAN (CEPlogProcessor::operational);
break;
case F_DISCONNECTED:
LOG_FATAL_STR("Cannot open the listener on port " << itsListener->getPortNumber() << ". Quiting!");
GCFScheduler::instance()->stop();
break;
}
return (GCFEvent::HANDLED);
}
void CEPlogProcessor::collectGarbage()
{
LOG_DEBUG("Cleaning up garbage");
for (unsigned i = 0; i < itsLogStreamsGarbage.size(); i++)
delete itsLogStreamsGarbage[i];
itsLogStreamsGarbage.clear();
}
//
// operational(event, port)
//
GCFEvent::TResult CEPlogProcessor::operational(GCFEvent& event, GCFPortInterface& port)
{
LOG_DEBUG_STR("operational:" << eventName(event) << "@" << port.getName());
switch (event.signal) {
case F_ENTRY:
itsTimerPort->setTimer(1.0,1.0);
break;
case F_TIMER:
LOG_DEBUG("Timer event, preparing PVSS arrays");
for (unsigned j = 0; j < itsNrStorage; j++) {
GCFPValueArray timeArray;
GCFPValueArray countArray;
GCFPValueArray droppedArray;
timeArray.resize(MPIProcs,0);
countArray.resize(MPIProcs,0);
droppedArray.resize(MPIProcs,0);
for (unsigned i = 0; i<MPIProcs;i++) {
timeArray[i] = new GCFPVString(itsStorageBuf[j].timeStr[i]);
countArray[i] = new GCFPVInteger(itsStorageBuf[j].count[i]);
droppedArray[i] = new GCFPVString(itsStorageBuf[j].dropped[i]);
}
itsStorage[j]->setValue(PN_STR_TIME, GCFPVDynArr(LPT_DYNSTRING, timeArray));
itsStorage[j]->setValue(PN_STR_COUNT, GCFPVDynArr(LPT_DYNINTEGER, countArray));
itsStorage[j]->setValue(PN_STR_DROPPED, GCFPVDynArr(LPT_DYNSTRING, droppedArray));
for (unsigned i = 0; i<MPIProcs;i++) {
delete timeArray[i];
delete countArray[i];
delete droppedArray[i];
}
}
collectGarbage();
break;
case F_ACCEPT_REQ:
_handleConnectionRequest();
break;
case F_CONNECTED:
break;
case F_DISCONNECTED: {
_deleteStream(port);
break;
}
case F_DATAIN:
_handleDataStream(&port);
break;
}
return (GCFEvent::HANDLED);
}
//
// _deleteStream(GCFPortInterface& port)
//
void CEPlogProcessor::_deleteStream(GCFPortInterface& port)
{
LOG_DEBUG_STR("_deleteStream");
port.close();
map<GCFPortInterface*, streamBuffer_t>::iterator theStream = itsLogStreams.find(&port);
if (theStream != itsLogStreams.end()) {
streamBuffer_t &sb = theStream->second;
delete sb.buffer;
itsLogStreams.erase(theStream);
}
// schedule to delete, since the parent may still be referring to
// port and require info from it
itsLogStreamsGarbage.push_back(&port);
}
//
// _handleConnectionRequest()
//
void CEPlogProcessor::_handleConnectionRequest()
{
GCFTCPPort* pNewClient = new GCFTCPPort();
ASSERT(pNewClient);
pNewClient->init(*this, "newClient", GCFPortInterface::SPP, 0, true);
if (!itsListener->accept(*pNewClient)) {
LOG_WARN("Connection with new client went wrong");
return;
}
// give stream its own buffer.
streamBuffer_t stream;
stream.socket = pNewClient;
stream.buffer = new CircularBuffer(itsBufferSize);
itsLogStreams[pNewClient] = stream;
LOG_INFO_STR("Added new client to my admin");
}
//
// _handleDataStream(sid)
//
void CEPlogProcessor::_handleDataStream(GCFPortInterface* port)
{
// read in the new bytes
streamBuffer_t &stream = itsLogStreams[port];
int newBytes = stream.socket->recv( stream.buffer->tail, stream.buffer->tailFreeSpace() );
if (newBytes < 0) {
LOG_DEBUG_STR("Closing connection.");
_deleteStream(*port);
return;
}
LOG_DEBUG_STR("Read " << newBytes << " bytes.");
stream.buffer->incTail( newBytes );
char lineBuf[1024];
while (stream.buffer->getLine( lineBuf, sizeof lineBuf )) {
LOG_DEBUG_STR("Read log line '" << lineBuf << "'" );
_processLogLine(lineBuf);
}
}
// Convert "23-02-11" and "01:02:58.687" into a time_t timestamp
time_t CEPlogProcessor::_parseDateTime(const char *datestr, const char *timestr) const
{
struct tm tm;
time_t ts;
bool validtime = true;
if (sscanf(datestr, "%u-%u-%u",
&tm.tm_mday, &tm.tm_mon, &tm.tm_year) != 3) {
validtime = false;
} else {
// tm_year starts counting from 1900
if (tm.tm_year > 1900) {
// YYYY
tm.tm_year -= 1900;
} else {
// YY -- we won't see loglines pre 2000.
tm.tm_year += 110;
}
}
if (sscanf(timestr, "%u:%u:%u", // ignore milliseconds
&tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 3) {
validtime = false;
}
if (validtime) {
tm.tm_isdst = 0; // UTC knows no daylight saving
ts = mktime(&tm);
if (ts <= 0)
validtime = false;
}
if (!validtime) {
LOG_WARN_STR("Invalid timestamp: " << datestr << " " << timestr << "; using now()");
ts = time(0L);
}
return ts;
}
//
// _processLogLine(char*)
//
//
void CEPlogProcessor::_processLogLine(const char *cString)
{
if (*cString == 0) {
return;
}
// debug hack
if (!strcmp(cString,"quit")) {
finish();
return;
}
// example log line:
// Storage@locus001 09-12-10 11:33:13.240 DEBUG [obs 21855 output 1 subband 223] InputThread::~InputThread()
// ^^^^^^^ ^^^^^^^^ ^^^^^^^^ ^^^^^^^^^^^^ ^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^
// | | date time | target msg
// | | loglevel
// | processHost
// processName
unsigned bufsize = strlen(cString) + 1;
vector<char> processName(bufsize), processHost(bufsize), date(bufsize), time(bufsize), loglevel(bufsize), msg(bufsize);
vector<char> target(bufsize), tail(bufsize);
// TODO: support both exe@nr (IONProc@00) and exe@host (Storage_main@locus002)
if (sscanf(cString, "%[^@]@%s %s %s %s %[^\n]",
&processName[0],
&processHost[0],
&date[0],
&time[0],
&loglevel[0],
&msg[0]) != 6) {
// this will include:
// * mpi/bgp errors
// * casacore messages
// * ssh/login messages
// * log4cplus/cxx messages
// * mangled messages (happens occasionally)
// * backtraces
// * C++/libc errors
LOG_DEBUG_STR("Unparsable log line: " << cString);
return;
}
LOG_DEBUG_STR("Process: " << &processName[0] << " Host: " << &processHost[0] << " Date: " << &date[0] << " Time: " << &time[0] << " Loglevel: " << &loglevel[0] << " Message: " << &msg[0]);
struct logline logline;
logline.process = &processName[0];
logline.host = &processHost[0];
logline.date = &date[0];
logline.time = &time[0];
logline.loglevel = &loglevel[0];
if (sscanf(&msg[0], "[%[^]]] %[^\n]", &target[0], &tail[0]) == 2) {
logline.target = &target[0];
logline.msg = &tail[0];
} else {
logline.target = "";
logline.msg = &msg[0];
}
logline.timestamp = _parseDateTime(logline.date, logline.time);
logline.obsID = _getParam(logline.target, "obs ");
string tempObsName = logline.obsID >= 0 ? getTempObsName(logline.obsID, logline.msg) : "";
logline.tempobsname = tempObsName.c_str();
if (!strcmp(logline.process,"IONProc")) {
_processIONProcLine(logline);
} else if (!strcmp(logline.process,"CNProc")) {
_processCNProcLine(logline);
} else if (!strcmp(logline.process,"Storage")) {
_processStorageLine(logline);
} else {
LOG_DEBUG_STR("Unknown process: " << logline.process);
}
}
int CEPlogProcessor::_getParam(const char *msg,const char *param) const
{
const char *result = strstr(msg, param);
int value;
if (!result)
return -1;
if (sscanf(result + strlen(param), "%d", &value) != 1)
return -1;
return value;
}
string CEPlogProcessor::getTempObsName(int obsID, const char *msg)
{
vector<char> tempObsName(strlen(msg)+1);
// register the tempObsName if this line announces it
if (sscanf(msg,"PVSS name: %[^\n]", &tempObsName[0]) == 1) {
LOG_DEBUG_STR("obs " << obsID << " is mapped to " << &tempObsName[0]);
itsTempObsMapping.set( obsID, string(&tempObsName[0]) );
}
if (!strcmp(msg,"----- Job finished succesfully")
|| !strcmp(msg,"----- Job cancelled succesfully")) {
LOG_DEBUG_STR("obs " << obsID << " ended");
itsTempObsMapping.erase(obsID);
}
// lookup the obsID in our list
if (!itsTempObsMapping.exists(obsID)) {
LOG_ERROR_STR("Observation ID " << obsID << " not mapped onto a temporary observation in PVSS. Cannot process log line.");
return "";
}
return itsTempObsMapping.lookup(obsID);
}
//
// _processIONProcLine(cstring)
//
void CEPlogProcessor::_processIONProcLine(const struct logline &logline)
{
unsigned processNr;
if (sscanf(logline.host, "%u", &processNr) != 1) {
LOG_WARN_STR("Could not extract host number from name: " << logline.host );
return;
}
if (processNr >= itsNrInputBuffers) {
LOG_WARN_STR("Inputbuffer range = 0.." << itsNrInputBuffers << ". Index " << processNr << " is invalid");
return;
}
char* result;
// IONProc@00 31-03-11 00:17:22.438 INFO [obs 24811] ----- Creating new job
// IONProc@00 31-03-11 00:17:22.550 INFO [obs 24811] Waiting for job to start: sleeping until Thu Mar 31 00:18:50 2011
// IONProc@00 31-03-11 00:18:50.008 INFO Storage writer on lse012: starting as rank 0
// IONProc@00 31-03-11 00:18:50.031 INFO [obs 24811] ----- Observation start
unsigned bufsize = strlen(logline.msg) + 1;
if (!strcmp(logline.msg,"----- Creating new job")) {
LOG_DEBUG_STR("obs " << logline.obsID << " created");
}
if (strstr(logline.msg,"Waiting for job to start")) {
LOG_DEBUG_STR("obs " << logline.obsID << " waiting to start");
}
{
vector<char> host(bufsize);
int rank;
if (sscanf(logline.msg,"Storage writer on %[^:]: starting as rank %d", &host[0], &rank) == 2) {
LOG_DEBUG_STR("obs " << logline.obsID << " starts storage writer " << rank << " on host " << &host[0]);
}
}
if (!strcmp(logline.msg,"----- Observation start")) {
LOG_DEBUG_STR("obs " << logline.obsID << " run()");
}
//
// InputBuffer
//
// IONProc@01 23-02-11 01:02:58.687 INFO [obs 23603 station CS005HBA1] [1298422977s, 80863], late: 17.6 ms, delays: [8.657333 us], flags 0: (0%), flags 1: (0%), flags 2: (0%), flags 3: (0%)
// IONProc@05 07-01-11 20:57:56.765 INFO [obs 1002069 station S10] [1294433876s, 0], late: 8.85 ms, delays: [-616.3421 ns], flags 0: [0..52992> (100%), flags 1: [0..52992> (100%), flags 2: [0..52992> (100%), flags 3: [0..52992> (100%)
if ((result = strstr(logline.msg, " late: "))) {
float late;
if (sscanf(result, " late: %f ", &late) == 1 ) {
LOG_DEBUG_STR("[" << processNr << "] Late: " << late);
itsInputBuffers[processNr]->setValue(PN_IPB_LATE, GCFPVDouble(late), logline.timestamp);
}
// 0% flags look like : flags 0: (0%)
// filled% flags look like : flags 0: [nr..nr> (10.5%)
if ((result = strstr(logline.msg, "flags 0:"))) {
float flags0, flags1, flags2, flags3;
if (sscanf(result, "flags 0:%*[^(](%f%%), flags 1:%*[^(](%f%%), flags 2:%*[^(](%f%%), flags 3:%*[^(](%f%%)",
&flags0, &flags1, &flags2, &flags3) == 4) {
LOG_DEBUG(formatString("[%d] %%bad: %.2f, %.2f, %.2f, %.2f", processNr, flags0, flags1, flags2, flags3));
itsInputBuffers[processNr]->setValue(PN_IPB_STREAM0_PERC_BAD, GCFPVDouble(flags0), logline.timestamp, false);
itsInputBuffers[processNr]->setValue(PN_IPB_STREAM1_PERC_BAD, GCFPVDouble(flags1), logline.timestamp, false);
itsInputBuffers[processNr]->setValue(PN_IPB_STREAM2_PERC_BAD, GCFPVDouble(flags2), logline.timestamp, false);
itsInputBuffers[processNr]->setValue(PN_IPB_STREAM3_PERC_BAD, GCFPVDouble(flags3), logline.timestamp, false);
itsInputBuffers[processNr]->flush();
}
}
return;
}
// IONProc@36 23-02-11 00:59:59.151 DEBUG [obs 23603 station CS003HBA0] ION->CN: 483 ms
if ((result = strstr(logline.msg, "ION->CN:"))) {
float ioTime;
if (sscanf(result, "ION->CN:%f", &ioTime) == 1) {
LOG_DEBUG_STR("[" << processNr << "] ioTime: " << ioTime);
itsInputBuffers[processNr]->setValue(PN_IPB_IO_TIME, GCFPVDouble(ioTime), logline.timestamp);
return;
}
}
// IONProc@36 23-02-11 00:59:59.673 INFO [station CS003HBA0] received packets = [12329,12328,12292,12329], us/sy/in/id(0): [21/20/10/51(25)]
if ((result = strstr(logline.msg, "received packets = ["))) {
int received[4] = {0,0,0,0};
int badsize[4] = {0,0,0,0};
int badtimestamp[4] = {0,0,0,0};
if (sscanf(result, "received packets = [%d,%d,%d,%d]", &received[0], &received[1], &received[2], &received[3]) == 4) {
LOG_DEBUG(formatString("[%d] blocks: %d, %d, %d, %d", processNr, received[0], received[1], received[2], received[3]));
itsInputBuffers[processNr]->setValue(PN_IPB_STREAM0_BLOCKS_IN, GCFPVInteger(received[0]), logline.timestamp, false);
itsInputBuffers[processNr]->setValue(PN_IPB_STREAM1_BLOCKS_IN, GCFPVInteger(received[1]), logline.timestamp, false);
itsInputBuffers[processNr]->setValue(PN_IPB_STREAM2_BLOCKS_IN, GCFPVInteger(received[2]), logline.timestamp, false);
itsInputBuffers[processNr]->setValue(PN_IPB_STREAM3_BLOCKS_IN, GCFPVInteger(received[3]), logline.timestamp, false);
itsInputBuffers[processNr]->flush();
}
// if rejected was found in same line this means that a certain amount of blocks was rejected,
// set this into the database. If no rejected was found, it means 0 blocks were rejected, so DB can be reset to 0
if ((result = strstr(logline.msg, " bad size = ["))) {
if (sscanf(result, " bad size = [%d,%d,%d,%d]", &badsize[0], &badsize[1], &badsize[2], &badsize[3]) == 4) {
LOG_DEBUG(formatString("[%d] rejected: bad size blocks: %d, %d, %d, %d", processNr, badsize[0], badsize[1], badsize[2], badsize[3]));
} else {
badsize[0] = 0;
badsize[1] = 0;
badsize[2] = 0;
badsize[3] = 0;
}
}
if ((result = strstr(logline.msg, " bad timestamps = ["))) {
if (sscanf(result, " bad timestamps = [%d,%d,%d,%d]", &badtimestamp[0], &badtimestamp[1], &badtimestamp[2], &badtimestamp[3]) == 4) {
LOG_DEBUG(formatString("[%d] rejected: bad timestamp blocks: %d, %d, %d, %d", processNr, badtimestamp[0], badtimestamp[1], badtimestamp[2], badtimestamp[3]));
} else {
badtimestamp[0] = 0;
badtimestamp[1] = 0;
badtimestamp[2] = 0;
badtimestamp[3] = 0;
}
}
itsInputBuffers[processNr]->setValue(PN_IPB_STREAM0_REJECTED, GCFPVInteger(badsize[0] + badtimestamp[0]), logline.timestamp, false);
itsInputBuffers[processNr]->setValue(PN_IPB_STREAM1_REJECTED, GCFPVInteger(badsize[1] + badtimestamp[1]), logline.timestamp, false);
itsInputBuffers[processNr]->setValue(PN_IPB_STREAM2_REJECTED, GCFPVInteger(badsize[2] + badtimestamp[2]), logline.timestamp, false);
itsInputBuffers[processNr]->setValue(PN_IPB_STREAM3_REJECTED, GCFPVInteger(badsize[3] + badtimestamp[3]), logline.timestamp, false);
itsInputBuffers[processNr]->flush();
return;
}
//
// Adder
//
// IONProc@17 07-01-11 20:59:00.981 WARN [obs 1002069 output 6 index L1002069_B102_S0_P000_bf.raw] Dropping data
if ((result = strstr(logline.msg, "Dropping data"))) {
LOG_DEBUG(formatString("[%d] Dropping data started ",processNr));
itsAdders[processNr]->setValue(PN_ADD_DROPPING, GCFPVBool(true), logline.timestamp);
itsAdders[processNr]->setValue(PN_ADD_LOG_LINE,GCFPVString(result), logline.timestamp);
itsDroppingCount[processNr]++;
LOG_DEBUG(formatString("Dropping count[%d] : %d", processNr,itsDroppingCount[processNr]));
return;
}
// IONProc@23 07-01-11 20:58:27.848 WARN [obs 1002069 output 6 index L1002069_B139_S0_P000_bf.raw] Dropped 9 blocks
if ((result = strstr(logline.msg, "Dropped "))) {
int dropped(0);
if (sscanf(result, "Dropped %d ", &dropped) == 1) {
LOG_DEBUG(formatString("[%d] Dropped %d ",processNr,dropped));
itsAdders[processNr]->setValue(PN_ADD_NR_BLOCKS_DROPPED, GCFPVInteger(dropped), logline.timestamp);
}
itsAdders[processNr]->setValue(PN_ADD_LOG_LINE,GCFPVString(result), logline.timestamp);
itsDroppingCount[processNr]--;
LOG_DEBUG(formatString("Dropping count[%d] : %d", processNr,itsDroppingCount[processNr]));
// if dropcount = 0 again, if so reset dropping flag
if (itsDroppingCount[processNr] <= 0) {
LOG_DEBUG(formatString("[%d] Dropping data ended ",processNr));
itsAdders[processNr]->setValue(PN_ADD_DROPPING, GCFPVBool(false), logline.timestamp);
}
return;
}
}
void CEPlogProcessor::_processCNProcLine(const struct logline &logline)
{
(void)logline;
}
void CEPlogProcessor::_processStorageLine(const struct logline &logline)
{
(void)logline;
return;
unsigned hostNr;
if (sscanf(logline.host, "%u", &hostNr) == 1) {
// Storage@00 will yield 00, the index of the first storage node, which is output by Log4Cout
LOG_FATAL_STR("Need a host name, not a number, for Storage (don't use Log4Cout?): " << logline.host );
return;
} else if (sscanf(logline.host, "%*[^0-9]%u", &hostNr) != 1) {
LOG_WARN_STR("Could not extract host number from name: " << logline.host );
return;
}
if (hostNr >= itsNrStorage) {
LOG_WARN_STR("Storage range = 0.." << itsNrStorage << ". Index " << hostNr << " is invalid");
return;
}
#if 0
char* result;
if ((result = strstr(msg, "time ="))) {
int rank(0), count(0);
char tim[24];
LOG_DEBUG_STR("_processStorageLine(" << processNr << "," << result << ")");
if (sscanf(result, "time = %[^,], rank = %d, count = %d", tim, &rank, &count)== 3)
{
LOG_DEBUG(formatString("[%d] time: %s, rank: %d, count: %d", processNr, tim, rank, count));
itsStorageBuf[processNr].timeStr[rank] = tim;
itsStorageBuf[processNr].count[rank] = count;
}
return;
}
#endif
#if 0
// IONProc already reports dropped blocks, and knows more (for example, blocks dropped at the end of an obs)
// Storage_main@locus001 25-05-11 19:36:38.862 WARN [obs 27304 output 1 index 224] OutputThread dropped 3 blocks
{
int blocks, index, output;
if (sscanf(result, "[obs %*d output %d index %d] OutputThread dropped %d blocks", &output, &index, &blocks) == 3) {
{
LOG_DEBUG(formatString("Dropped %d blocks: %d, subband: %d, output: %d", blocks, subband, output));
// dropped has no rank in yet
// itsStorageBuf[processNr].dropped[rank] = result;
}
return;
}
#endif
}
//
// finish_state(event, port)
//
// Write controller state to PVSS
//
GCFEvent::TResult CEPlogProcessor::finish_state(GCFEvent& event, GCFPortInterface& port)
{
LOG_DEBUG_STR ("finish_state:" << eventName(event) << "@" << port.getName());
GCFEvent::TResult status = GCFEvent::HANDLED;
switch (event.signal) {
case F_INIT:
break;
case F_ENTRY: {
// update PVSS
itsOwnPropertySet->setValue(string(PN_FSM_CURRENT_ACTION),GCFPVString("finished"));
itsTimerPort->cancelAllTimers();
break;
}
case DP_SET:
break;
default:
LOG_DEBUG("finishing_state, DEFAULT");
status = GCFEvent::NOT_HANDLED;
break;
}
return (status);
}
}; // StationCU
}; // LOFAR