Skip to content
Snippets Groups Projects
Commit 6de505c4 authored by Jan David Mol's avatar Jan David Mol
Browse files

bug 1362: fixed memory leaks, and made progress towards proper parsing

parent d6fad163
No related branches found
No related tags found
No related merge requests found
......@@ -84,6 +84,7 @@ CEPlogProcessor::CEPlogProcessor(const string& cntlrName) :
registerProtocol(DP_PROTOCOL, DP_PROTOCOL_STRINGS);
thisLogProcessor = this;
}
......@@ -105,12 +106,21 @@ CEPlogProcessor::~CEPlogProcessor()
delete itsStorage[storage];
}
// close all streams
while( !itsLogStreams.empty() )
_deleteStream( *((*itsLogStreams.begin()).first) );
// and reap the port objects immediately
collectGarbage();
if (itsListener) {
itsListener->close();
delete itsListener;
}
delete itsTimerPort;
delete itsOwnPropertySet;
}
//
......@@ -219,22 +229,24 @@ GCFEvent::TResult CEPlogProcessor::createPropertySets(GCFEvent& event,
// create propSets for the inputbuffer processes
itsInputBuffers.resize(itsNrInputBuffers, 0);
string inputBufferNameMask (createPropertySetName(PSN_INPUT_BUFFER, getName()));
for (int32 inputBuffer = 0; inputBuffer < itsNrInputBuffers; inputBuffer++) {
string PSname(formatString(inputBufferNameMask.c_str(), inputBuffer));
for (unsigned inputBuffer = 0; inputBuffer < itsNrInputBuffers; inputBuffer++) {
if (!itsInputBuffers[inputBuffer]) {
string PSname(formatString(inputBufferNameMask.c_str(), inputBuffer));
itsInputBuffers[inputBuffer] = new RTDBPropertySet(PSname, PST_INPUT_BUFFER, PSAT_WO | PSAT_CW, this);
}
usleep (2000); // wait 2 ms in order not to overload the system
}
// create propSets for the adder processes
itsAdders.resize (itsNrAdders, 0);
string adderNameMask(createPropertySetName(PSN_ADDER, getName()));
for (int32 adder = 0; adder < itsNrAdders; adder++) {
string PSname(formatString(adderNameMask.c_str(), adder));
for (unsigned adder = 0; adder < itsNrAdders; adder++) {
if (!itsAdders[adder]) {
string PSname(formatString(adderNameMask.c_str(), adder));
itsAdders[adder] = new RTDBPropertySet(PSname, PST_ADDER, PSAT_WO | PSAT_CW, this);
}
usleep (2000); // wait 2 ms in order not to overload the system
}
itsDroppingCount.resize (itsNrAdders, 0);
......@@ -243,23 +255,23 @@ GCFEvent::TResult CEPlogProcessor::createPropertySets(GCFEvent& event,
// create propSets for the storage processes
itsStorage.resize (itsNrStorage, 0);
string storageNameMask(createPropertySetName(PSN_STORAGE, getName()));
for (int32 storage = 0; storage < itsNrStorage; storage++) {
string PSname(formatString(storageNameMask.c_str(), storage));
for (unsigned storage = 0; storage < itsNrStorage; storage++) {
if (!itsStorage[storage]) {
string PSname(formatString(storageNameMask.c_str(), storage));
itsStorage[storage] = new RTDBPropertySet(PSname, PST_STORAGE, PSAT_WO | PSAT_CW, this);
}
usleep (2000); // wait 2 ms in order not to overload the system
}
itsStorageBuf.resize(itsNrStorage);
for (int i=0; i < itsNrStorage; i++) {
itsStorageBuf.resize (itsNrStorage);
for (unsigned i = 0; i < itsNrStorage; i++) {
//set array sizes
itsStorageBuf[i].timeStr.resize(MPIProcs);
itsStorageBuf[i].count.resize(MPIProcs,0);
itsStorageBuf[i].dropped.resize(MPIProcs);
}
LOG_INFO("Giving PVSS 5 seconds to process the requests");
itsTimerPort->setTimer(5.0); // give database some time to finish the job
}
......@@ -267,13 +279,13 @@ GCFEvent::TResult CEPlogProcessor::createPropertySets(GCFEvent& event,
case F_TIMER: {
// database should be ready by ts, check if allocation was succesfull
for (int32 inputBuffer = 0; inputBuffer < itsNrInputBuffers; inputBuffer++) {
for (unsigned inputBuffer = 0; inputBuffer < itsNrInputBuffers; inputBuffer++) {
ASSERTSTR(itsInputBuffers[inputBuffer], "Allocation of PS for inputBuffer " << inputBuffer << " failed.");
}
for (int32 adder = 0; adder < itsNrAdders; adder++) {
for (unsigned adder = 0; adder < itsNrAdders; adder++) {
ASSERTSTR(itsAdders[adder], "Allocation of PS for adder " << adder << " failed.");
}
for (int32 storage = 0; storage < itsNrStorage; storage++) {
for (unsigned storage = 0; storage < itsNrStorage; storage++) {
ASSERTSTR(itsStorage[storage], "Allocation of PS for storage " << storage << " failed.");
}
LOG_DEBUG_STR("Allocation of all propertySets successfull, going to open the listener");
......@@ -323,6 +335,15 @@ GCFEvent::TResult CEPlogProcessor::startListener(GCFEvent& event, GCFPortInterf
return (GCFEvent::HANDLED);
}
void CEPlogProcessor::collectGarbage()
{
LOG_DEBUG("Cleaning up garbage");
for (unsigned i = 0; i < itsLogStreamsGarbage.size(); i++)
delete itsLogStreamsGarbage[i];
itsLogStreamsGarbage.clear();
}
//
// operational(event, port)
......@@ -338,21 +359,34 @@ GCFEvent::TResult CEPlogProcessor::operational(GCFEvent& event, GCFPortInterface
case F_TIMER:
LOG_DEBUG("Timer event, preparing PVSS arrays");
for (int j=0; j < itsNrStorage; j++) {
for (unsigned j = 0; j < itsNrStorage; j++) {
GCFPValueArray timeArray;
GCFPValueArray countArray;
GCFPValueArray droppedArray;
for (int i = 0; i<MPIProcs;i++) {
timeArray.push_back(new GCFPVString(itsStorageBuf[j].timeStr[i]));
countArray.push_back(new GCFPVInteger(itsStorageBuf[j].count[i]));
droppedArray.push_back(new GCFPVString(itsStorageBuf[j].dropped[i]));
timeArray.resize(MPIProcs,0);
countArray.resize(MPIProcs,0);
droppedArray.resize(MPIProcs,0);
for (unsigned i = 0; i<MPIProcs;i++) {
timeArray[i] = new GCFPVString(itsStorageBuf[j].timeStr[i]);
countArray[i] = new GCFPVInteger(itsStorageBuf[j].count[i]);
droppedArray[i] = new GCFPVString(itsStorageBuf[j].dropped[i]);
}
itsStorage[j]->setValue(PN_STR_TIME, GCFPVDynArr(LPT_DYNSTRING, timeArray));
itsStorage[j]->setValue(PN_STR_COUNT, GCFPVDynArr(LPT_DYNINTEGER, countArray));
itsStorage[j]->setValue(PN_STR_DROPPED, GCFPVDynArr(LPT_DYNSTRING, droppedArray));
for (unsigned i = 0; i<MPIProcs;i++) {
delete timeArray[i];
delete countArray[i];
delete droppedArray[i];
}
}
collectGarbage();
break;
case F_ACCEPT_REQ:
_handleConnectionRequest();
......@@ -362,7 +396,6 @@ GCFEvent::TResult CEPlogProcessor::operational(GCFEvent& event, GCFPortInterface
break;
case F_DISCONNECTED: {
port.close();
_deleteStream(port);
break;
}
......@@ -380,12 +413,19 @@ GCFEvent::TResult CEPlogProcessor::operational(GCFEvent& event, GCFPortInterface
void CEPlogProcessor::_deleteStream(GCFPortInterface& port)
{
LOG_DEBUG_STR("_deleteStream");
port.close();
map<GCFPortInterface*, streamBuffer_t>::iterator theStream = itsLogStreams.find(&port);
if (theStream != itsLogStreams.end()) {
delete theStream->second.buffer;
theStream->second.buffer = 0;
streamBuffer_t &sb = theStream->second;
delete sb.buffer;
itsLogStreams.erase(theStream);
}
// schedule to delete, since the parent may still be referring to
// port and require info from it
itsLogStreamsGarbage.push_back(&port);
}
//
......@@ -420,7 +460,6 @@ void CEPlogProcessor::_handleDataStream(GCFPortInterface* port)
int newBytes = stream.socket->recv( stream.buffer->tail, stream.buffer->tailFreeSpace() );
if (newBytes < 0) {
LOG_DEBUG_STR("Closing connection.");
port->close();
_deleteStream(*port);
return;
}
......@@ -430,7 +469,7 @@ void CEPlogProcessor::_handleDataStream(GCFPortInterface* port)
char lineBuf[1024];
while (stream.buffer->getLine( lineBuf, sizeof lineBuf )) {
LOG_DEBUG_STR("Read log line " << lineBuf );
LOG_DEBUG_STR("Read log line '" << lineBuf << "'" );
_processLogLine(lineBuf);
}
}
......@@ -491,17 +530,22 @@ void CEPlogProcessor::_processLogLine(const char *cString)
return;
}
// debug hack
if (!strcmp(cString,"quit")) {
finish();
return;
}
// example log line:
// Storage@00 09-12-10 11:33:13.240 DEBUG [obs 21855 output 1 subband 223] InputThread::~InputThread()
unsigned bufsize = strlen( cString ) + 1;
vector<char> processName(bufsize), date(bufsize), time(bufsize), loglevel(bufsize), msg(bufsize);
int processNr;
vector<char> processName(bufsize), processHost(bufsize), date(bufsize), time(bufsize), loglevel(bufsize), msg(bufsize);
// TODO: support both exe@nr (IONProc@00) and exe@host (Storage_main@locus002)
if (sscanf(cString, "%[^@]@%d %s %s %s %[^\n]",
if (sscanf(cString, "%[^@]@%s %s %s %s %[^\n]",
&processName[0],
&processNr,
&processHost[0],
&date[0],
&time[0],
&loglevel[0],
......@@ -523,28 +567,59 @@ void CEPlogProcessor::_processLogLine(const char *cString)
time_t ts = _parseDateTime(&date[0], &time[0]);
if (!strcmp(&processName[0],"IONProc")) {
_processIONProcLine(processNr, ts, &loglevel[0], &msg[0]);
_processIONProcLine(&processHost[0], ts, &loglevel[0], &msg[0]);
} else if (!strcmp(&processName[0],"CNProc")) {
_processCNProcLine(processNr, ts, &loglevel[0], &msg[0]);
_processCNProcLine(&processHost[0], ts, &loglevel[0], &msg[0]);
} else if (!strcmp(&processName[0],"Storage")) {
_processStorageLine(processNr, ts, &loglevel[0], &msg[0]);
_processStorageLine(&processHost[0], ts, &loglevel[0], &msg[0]);
}
}
//
// _processIONProcLine(cstring)
//
void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const char *loglevel, const char *msg)
void CEPlogProcessor::_processIONProcLine(const char *host, time_t ts, const char *loglevel, const char *msg)
{
LOG_DEBUG_STR("_processIONProcLine(" << processNr << "," << ts << "," << loglevel << "," << msg << ")");
LOG_DEBUG_STR("_processIONProcLine(" << host << "," << ts << "," << loglevel << "," << msg << ")");
if (processNr < 0 || processNr >= itsNrInputBuffers) {
unsigned processNr = 0;
if (processNr >= itsNrInputBuffers) {
LOG_WARN_STR("Inputbuffer range = 0.." << itsNrInputBuffers << ". Index " << processNr << " is invalid");
return;
}
char* result;
// IONProc@00 31-03-11 00:17:22.438 INFO [obs 24811] ----- Creating new job
// IONProc@00 31-03-11 00:17:22.550 INFO [obs 24811] Waiting for job to start: sleeping until Thu Mar 31 00:18:50 2011
// IONProc@00 31-03-11 00:18:50.008 INFO Storage writer on lse012: starting as rank 0
// IONProc@00 31-03-11 00:18:50.031 INFO [obs 24811] ----- Observation start
int obsID;
unsigned bufsize = strlen( msg ) + 1;
if (sscanf(msg,"[obs %d] ----- Creating new job", &obsID) == 1) {
LOG_DEBUG_STR("obs " << obsID << " created");
}
if (sscanf(msg,"[obs %d] Waiting for job to start", &obsID) == 1) {
LOG_DEBUG_STR("obs " << obsID << " waiting to start");
}
{
vector<char> host(bufsize);
int rank;
if (sscanf(msg,"[obs %d] Storage writer on %[^:]: starting as rank %d", &obsID, &host[0], &rank) == 3) {
LOG_DEBUG_STR("obs " << obsID << " starts storage writer " << rank << " on host " << &host[0]);
}
}
if (sscanf(msg,"[obs %d] ----- Observation start", &obsID) == 1) {
LOG_DEBUG_STR("obs " << obsID << " run()");
}
//
// InputBuffer
//
......@@ -646,7 +721,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const char *
LOG_DEBUG(formatString("[%d] Dropping data started ",processNr));
itsAdders[processNr]->setValue(PN_ADD_DROPPING, GCFPVBool(true), ts);
itsAdders[processNr]->setValue(PN_ADD_LOG_LINE,GCFPVString(result),ts);
itsDroppingCount[processNr]=itsDroppingCount[processNr]+1;
itsDroppingCount[processNr]++;
LOG_DEBUG(formatString("Dropping count[%d] : %d", processNr,itsDroppingCount[processNr]));
return;
}
......@@ -659,7 +734,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const char *
itsAdders[processNr]->setValue(PN_ADD_NR_BLOCKS_DROPPED, GCFPVInteger(dropped), ts);
}
itsAdders[processNr]->setValue(PN_ADD_LOG_LINE,GCFPVString(result),ts);
itsDroppingCount[processNr]=itsDroppingCount[processNr]-1;
itsDroppingCount[processNr]--;
LOG_DEBUG(formatString("Dropping count[%d] : %d", processNr,itsDroppingCount[processNr]));
// if dropcount = 0 again, if so reset dropping flag
if (itsDroppingCount[processNr] <= 0) {
......@@ -670,18 +745,20 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const char *
}
}
void CEPlogProcessor::_processCNProcLine(int processNr, time_t ts, const char *loglevel, const char *msg)
{
LOG_DEBUG_STR("_processCNProcLine(" << processNr << "," << ts << "," << loglevel << "," << msg << ")");
void CEPlogProcessor::_processCNProcLine(const char *host, time_t ts, const char *loglevel, const char *msg)
{
LOG_DEBUG_STR("_processCNProcLine(" << host << "," << ts << "," << loglevel << "," << msg << ")");
// TODO
}
void CEPlogProcessor::_processStorageLine(int processNr, time_t ts, const char *loglevel, const char *msg)
void CEPlogProcessor::_processStorageLine(const char *host, time_t ts, const char *loglevel, const char *msg)
{
LOG_DEBUG_STR("_processStorageLine(" << processNr << "," << ts << "," << loglevel << "," << msg << ")");
LOG_DEBUG_STR("_processStorageLine(" << host << "," << ts << "," << loglevel << "," << msg << ")");
if (processNr < 0 || processNr >= itsNrStorage) {
unsigned processNr;
if (processNr >= itsNrStorage) {
LOG_WARN_STR("Storage range = 0.." << itsNrStorage << ". Index " << processNr << " is invalid");
return;
}
......@@ -740,9 +817,11 @@ GCFEvent::TResult CEPlogProcessor::finish_state(GCFEvent& event, GCFPortInterfac
case F_ENTRY: {
// update PVSS
itsOwnPropertySet->setValue(string(PN_FSM_CURRENT_ACTION),GCFPVString("finished"));
itsTimerPort->cancelAllTimers();
break;
}
case DP_SET:
break;
......
......@@ -35,13 +35,13 @@
#include <time.h>
namespace LOFAR {
using MACIO::GCFEvent;
using GCF::TM::GCFTask;
using GCF::TM::GCFTCPPort;
using GCF::TM::GCFTimerPort;
using GCF::TM::GCFPortInterface;
using GCF::RTDB::RTDBPropertySet;
namespace APL {
using MACIO::GCFEvent;
using GCF::TM::GCFTask;
using GCF::TM::GCFTCPPort;
using GCF::TM::GCFTimerPort;
using GCF::TM::GCFPortInterface;
using GCF::RTDB::RTDBPropertySet;
namespace APL {
// \addtogroup CEPCU
// @{
......@@ -51,76 +51,78 @@ namespace LOFAR {
class CEPlogProcessor : public GCFTask
{
public:
explicit CEPlogProcessor(const string& cntlrName);
~CEPlogProcessor();
// its processing states
GCFEvent::TResult initial_state (GCFEvent& event, GCFPortInterface& port);
GCFEvent::TResult createPropertySets(GCFEvent& event, GCFPortInterface& port);
GCFEvent::TResult startListener (GCFEvent& event, GCFPortInterface& port);
GCFEvent::TResult operational (GCFEvent& event, GCFPortInterface& port);
GCFEvent::TResult finish_state (GCFEvent& event, GCFPortInterface& port);
// Interrupthandler for switching to the finish state when exiting the program
static void signalHandler (int signum);
void finish();
explicit CEPlogProcessor(const string& cntlrName);
~CEPlogProcessor();
// its processing states
GCFEvent::TResult initial_state (GCFEvent& event, GCFPortInterface& port);
GCFEvent::TResult createPropertySets(GCFEvent& event, GCFPortInterface& port);
GCFEvent::TResult startListener (GCFEvent& event, GCFPortInterface& port);
GCFEvent::TResult operational (GCFEvent& event, GCFPortInterface& port);
GCFEvent::TResult finish_state (GCFEvent& event, GCFPortInterface& port);
// Interrupthandler for switching to the finish state when exiting the program
static void signalHandler (int signum);
void finish();
private:
// Copying is not allowed
CEPlogProcessor();
CEPlogProcessor(const CEPlogProcessor& that);
CEPlogProcessor& operator=(const CEPlogProcessor& that);
// Admin functions
void _deleteStream (GCFPortInterface& port);
void _handleConnectionRequest();
// Routines for processing the loglines.
void _handleDataStream (GCFPortInterface* port);
time_t _parseDateTime (const char *datestr, const char *timestr) const;
void _processLogLine (const char *cString);
void _processIONProcLine(int processNr, time_t ts, const char *loglevel, const char *msg);
void _processCNProcLine(int processNr, time_t ts, const char *loglevel, const char *msg);
void _processStorageLine(int processNr, time_t ts, const char *loglevel, const char *msg);
//# --- Datamembers ---
// The listener socket to receive the requests on.
GCFTCPPort* itsListener;
RTDBPropertySet* itsOwnPropertySet;
GCFTimerPort* itsTimerPort;
// internal structure for admin for 1 stream
typedef struct {
GCFTCPPort* socket;
CircularBuffer* buffer;
} streamBuffer_t;
// internal structure for lse based logging
typedef struct {
vector<string> timeStr;
vector<int> count;
vector<string> dropped;
} logBuffer_t;
// Map containing all the streambuffers.
map<GCFPortInterface*, streamBuffer_t> itsLogStreams;
vector<RTDBPropertySet*> itsInputBuffers;
vector<RTDBPropertySet*> itsAdders;
vector<RTDBPropertySet*> itsStorage;
vector<int> itsDroppingCount;
// Copying is not allowed
CEPlogProcessor();
CEPlogProcessor(const CEPlogProcessor& that);
CEPlogProcessor& operator=(const CEPlogProcessor& that);
// Admin functions
void _deleteStream (GCFPortInterface& port);
void _handleConnectionRequest();
// Routines for processing the loglines.
void _handleDataStream (GCFPortInterface* port);
time_t _parseDateTime (const char *datestr, const char *timestr) const;
void _processLogLine (const char *cString);
void collectGarbage();
void _processIONProcLine(const char *host, time_t ts, const char *loglevel, const char *msg);
void _processCNProcLine(const char *host, time_t ts, const char *loglevel, const char *msg);
void _processStorageLine(const char *host, time_t ts, const char *loglevel, const char *msg);
//# --- Datamembers ---
// The listener socket to receive the requests on.
GCFTCPPort* itsListener;
RTDBPropertySet* itsOwnPropertySet;
GCFTimerPort* itsTimerPort;
// internal structure for admin for 1 stream
typedef struct {
GCFTCPPort* socket;
CircularBuffer* buffer;
} streamBuffer_t;
// internal structure for lse based logging
typedef struct {
vector<string> timeStr;
vector<int> count;
vector<string> dropped;
} logBuffer_t;
// Map containing all the streambuffers.
map<GCFPortInterface*, streamBuffer_t> itsLogStreams;
vector<GCFPortInterface*> itsLogStreamsGarbage;
vector<RTDBPropertySet*> itsInputBuffers;
vector<RTDBPropertySet*> itsAdders;
vector<RTDBPropertySet*> itsStorage;
vector<int> itsDroppingCount;
vector<logBuffer_t> itsStorageBuf;
// values read from the conf file.
int itsNrInputBuffers;
int itsNrAdders;
int itsNrStorage;
int itsBufferSize;
// values read from the conf file.
unsigned itsNrInputBuffers;
unsigned itsNrAdders;
unsigned itsNrStorage;
unsigned itsBufferSize;
};
// @} addgroup
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment