diff --git a/MAC/APL/CEPCU/src/CEPlogProcessor/CEPlogProcessor.cc b/MAC/APL/CEPCU/src/CEPlogProcessor/CEPlogProcessor.cc index 1f32aaaf49f720c3fa2aef8acdc536df112b9591..e166c0b945e34c99f93ec8f03f94f83fd4bcd596 100644 --- a/MAC/APL/CEPCU/src/CEPlogProcessor/CEPlogProcessor.cc +++ b/MAC/APL/CEPCU/src/CEPlogProcessor/CEPlogProcessor.cc @@ -84,6 +84,7 @@ CEPlogProcessor::CEPlogProcessor(const string& cntlrName) : registerProtocol(DP_PROTOCOL, DP_PROTOCOL_STRINGS); + thisLogProcessor = this; } @@ -105,12 +106,21 @@ CEPlogProcessor::~CEPlogProcessor() delete itsStorage[storage]; } + // close all streams + while( !itsLogStreams.empty() ) + _deleteStream( *((*itsLogStreams.begin()).first) ); + + // and reap the port objects immediately + collectGarbage(); + if (itsListener) { itsListener->close(); delete itsListener; } delete itsTimerPort; + + delete itsOwnPropertySet; } // @@ -219,22 +229,24 @@ GCFEvent::TResult CEPlogProcessor::createPropertySets(GCFEvent& event, // create propSets for the inputbuffer processes itsInputBuffers.resize(itsNrInputBuffers, 0); string inputBufferNameMask (createPropertySetName(PSN_INPUT_BUFFER, getName())); - for (int32 inputBuffer = 0; inputBuffer < itsNrInputBuffers; inputBuffer++) { - string PSname(formatString(inputBufferNameMask.c_str(), inputBuffer)); + for (unsigned inputBuffer = 0; inputBuffer < itsNrInputBuffers; inputBuffer++) { if (!itsInputBuffers[inputBuffer]) { + string PSname(formatString(inputBufferNameMask.c_str(), inputBuffer)); itsInputBuffers[inputBuffer] = new RTDBPropertySet(PSname, PST_INPUT_BUFFER, PSAT_WO | PSAT_CW, this); } + usleep (2000); // wait 2 ms in order not to overload the system } // create propSets for the adder processes itsAdders.resize (itsNrAdders, 0); string adderNameMask(createPropertySetName(PSN_ADDER, getName())); - for (int32 adder = 0; adder < itsNrAdders; adder++) { - string PSname(formatString(adderNameMask.c_str(), adder)); + for (unsigned adder = 0; adder < itsNrAdders; adder++) { if (!itsAdders[adder]) { + string PSname(formatString(adderNameMask.c_str(), adder)); itsAdders[adder] = new RTDBPropertySet(PSname, PST_ADDER, PSAT_WO | PSAT_CW, this); } + usleep (2000); // wait 2 ms in order not to overload the system } itsDroppingCount.resize (itsNrAdders, 0); @@ -243,23 +255,23 @@ GCFEvent::TResult CEPlogProcessor::createPropertySets(GCFEvent& event, // create propSets for the storage processes itsStorage.resize (itsNrStorage, 0); string storageNameMask(createPropertySetName(PSN_STORAGE, getName())); - for (int32 storage = 0; storage < itsNrStorage; storage++) { - string PSname(formatString(storageNameMask.c_str(), storage)); + for (unsigned storage = 0; storage < itsNrStorage; storage++) { if (!itsStorage[storage]) { + string PSname(formatString(storageNameMask.c_str(), storage)); itsStorage[storage] = new RTDBPropertySet(PSname, PST_STORAGE, PSAT_WO | PSAT_CW, this); } + usleep (2000); // wait 2 ms in order not to overload the system } - itsStorageBuf.resize(itsNrStorage); - for (int i=0; i < itsNrStorage; i++) { + + itsStorageBuf.resize (itsNrStorage); + for (unsigned i = 0; i < itsNrStorage; i++) { //set array sizes itsStorageBuf[i].timeStr.resize(MPIProcs); itsStorageBuf[i].count.resize(MPIProcs,0); itsStorageBuf[i].dropped.resize(MPIProcs); } - - LOG_INFO("Giving PVSS 5 seconds to process the requests"); itsTimerPort->setTimer(5.0); // give database some time to finish the job } @@ -267,13 +279,13 @@ GCFEvent::TResult CEPlogProcessor::createPropertySets(GCFEvent& event, case F_TIMER: { // database should be ready by ts, check if allocation was succesfull - for (int32 inputBuffer = 0; inputBuffer < itsNrInputBuffers; inputBuffer++) { + for (unsigned inputBuffer = 0; inputBuffer < itsNrInputBuffers; inputBuffer++) { ASSERTSTR(itsInputBuffers[inputBuffer], "Allocation of PS for inputBuffer " << inputBuffer << " failed."); } - for (int32 adder = 0; adder < itsNrAdders; adder++) { + for (unsigned adder = 0; adder < itsNrAdders; adder++) { ASSERTSTR(itsAdders[adder], "Allocation of PS for adder " << adder << " failed."); } - for (int32 storage = 0; storage < itsNrStorage; storage++) { + for (unsigned storage = 0; storage < itsNrStorage; storage++) { ASSERTSTR(itsStorage[storage], "Allocation of PS for storage " << storage << " failed."); } LOG_DEBUG_STR("Allocation of all propertySets successfull, going to open the listener"); @@ -323,6 +335,15 @@ GCFEvent::TResult CEPlogProcessor::startListener(GCFEvent& event, GCFPortInterf return (GCFEvent::HANDLED); } +void CEPlogProcessor::collectGarbage() +{ + LOG_DEBUG("Cleaning up garbage"); + for (unsigned i = 0; i < itsLogStreamsGarbage.size(); i++) + delete itsLogStreamsGarbage[i]; + + itsLogStreamsGarbage.clear(); +} + // // operational(event, port) @@ -338,21 +359,34 @@ GCFEvent::TResult CEPlogProcessor::operational(GCFEvent& event, GCFPortInterface case F_TIMER: LOG_DEBUG("Timer event, preparing PVSS arrays"); - for (int j=0; j < itsNrStorage; j++) { + + for (unsigned j = 0; j < itsNrStorage; j++) { GCFPValueArray timeArray; GCFPValueArray countArray; GCFPValueArray droppedArray; - for (int i = 0; i<MPIProcs;i++) { - timeArray.push_back(new GCFPVString(itsStorageBuf[j].timeStr[i])); - countArray.push_back(new GCFPVInteger(itsStorageBuf[j].count[i])); - droppedArray.push_back(new GCFPVString(itsStorageBuf[j].dropped[i])); + timeArray.resize(MPIProcs,0); + countArray.resize(MPIProcs,0); + droppedArray.resize(MPIProcs,0); + + for (unsigned i = 0; i<MPIProcs;i++) { + timeArray[i] = new GCFPVString(itsStorageBuf[j].timeStr[i]); + countArray[i] = new GCFPVInteger(itsStorageBuf[j].count[i]); + droppedArray[i] = new GCFPVString(itsStorageBuf[j].dropped[i]); } itsStorage[j]->setValue(PN_STR_TIME, GCFPVDynArr(LPT_DYNSTRING, timeArray)); itsStorage[j]->setValue(PN_STR_COUNT, GCFPVDynArr(LPT_DYNINTEGER, countArray)); itsStorage[j]->setValue(PN_STR_DROPPED, GCFPVDynArr(LPT_DYNSTRING, droppedArray)); + + for (unsigned i = 0; i<MPIProcs;i++) { + delete timeArray[i]; + delete countArray[i]; + delete droppedArray[i]; + } } + + collectGarbage(); break; case F_ACCEPT_REQ: _handleConnectionRequest(); @@ -362,7 +396,6 @@ GCFEvent::TResult CEPlogProcessor::operational(GCFEvent& event, GCFPortInterface break; case F_DISCONNECTED: { - port.close(); _deleteStream(port); break; } @@ -380,12 +413,19 @@ GCFEvent::TResult CEPlogProcessor::operational(GCFEvent& event, GCFPortInterface void CEPlogProcessor::_deleteStream(GCFPortInterface& port) { LOG_DEBUG_STR("_deleteStream"); + port.close(); + map<GCFPortInterface*, streamBuffer_t>::iterator theStream = itsLogStreams.find(&port); if (theStream != itsLogStreams.end()) { - delete theStream->second.buffer; - theStream->second.buffer = 0; + streamBuffer_t &sb = theStream->second; + delete sb.buffer; + itsLogStreams.erase(theStream); } + + // schedule to delete, since the parent may still be referring to + // port and require info from it + itsLogStreamsGarbage.push_back(&port); } // @@ -420,7 +460,6 @@ void CEPlogProcessor::_handleDataStream(GCFPortInterface* port) int newBytes = stream.socket->recv( stream.buffer->tail, stream.buffer->tailFreeSpace() ); if (newBytes < 0) { LOG_DEBUG_STR("Closing connection."); - port->close(); _deleteStream(*port); return; } @@ -430,7 +469,7 @@ void CEPlogProcessor::_handleDataStream(GCFPortInterface* port) char lineBuf[1024]; while (stream.buffer->getLine( lineBuf, sizeof lineBuf )) { - LOG_DEBUG_STR("Read log line " << lineBuf ); + LOG_DEBUG_STR("Read log line '" << lineBuf << "'" ); _processLogLine(lineBuf); } } @@ -491,17 +530,22 @@ void CEPlogProcessor::_processLogLine(const char *cString) return; } + // debug hack + if (!strcmp(cString,"quit")) { + finish(); + return; + } + // example log line: // Storage@00 09-12-10 11:33:13.240 DEBUG [obs 21855 output 1 subband 223] InputThread::~InputThread() unsigned bufsize = strlen( cString ) + 1; - vector<char> processName(bufsize), date(bufsize), time(bufsize), loglevel(bufsize), msg(bufsize); - int processNr; + vector<char> processName(bufsize), processHost(bufsize), date(bufsize), time(bufsize), loglevel(bufsize), msg(bufsize); // TODO: support both exe@nr (IONProc@00) and exe@host (Storage_main@locus002) - if (sscanf(cString, "%[^@]@%d %s %s %s %[^\n]", + if (sscanf(cString, "%[^@]@%s %s %s %s %[^\n]", &processName[0], - &processNr, + &processHost[0], &date[0], &time[0], &loglevel[0], @@ -523,28 +567,59 @@ void CEPlogProcessor::_processLogLine(const char *cString) time_t ts = _parseDateTime(&date[0], &time[0]); if (!strcmp(&processName[0],"IONProc")) { - _processIONProcLine(processNr, ts, &loglevel[0], &msg[0]); + _processIONProcLine(&processHost[0], ts, &loglevel[0], &msg[0]); } else if (!strcmp(&processName[0],"CNProc")) { - _processCNProcLine(processNr, ts, &loglevel[0], &msg[0]); + _processCNProcLine(&processHost[0], ts, &loglevel[0], &msg[0]); } else if (!strcmp(&processName[0],"Storage")) { - _processStorageLine(processNr, ts, &loglevel[0], &msg[0]); + _processStorageLine(&processHost[0], ts, &loglevel[0], &msg[0]); } } // // _processIONProcLine(cstring) // -void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const char *loglevel, const char *msg) +void CEPlogProcessor::_processIONProcLine(const char *host, time_t ts, const char *loglevel, const char *msg) { - LOG_DEBUG_STR("_processIONProcLine(" << processNr << "," << ts << "," << loglevel << "," << msg << ")"); + LOG_DEBUG_STR("_processIONProcLine(" << host << "," << ts << "," << loglevel << "," << msg << ")"); - if (processNr < 0 || processNr >= itsNrInputBuffers) { + unsigned processNr = 0; + + if (processNr >= itsNrInputBuffers) { LOG_WARN_STR("Inputbuffer range = 0.." << itsNrInputBuffers << ". Index " << processNr << " is invalid"); return; } char* result; + // IONProc@00 31-03-11 00:17:22.438 INFO [obs 24811] ----- Creating new job + // IONProc@00 31-03-11 00:17:22.550 INFO [obs 24811] Waiting for job to start: sleeping until Thu Mar 31 00:18:50 2011 + // IONProc@00 31-03-11 00:18:50.008 INFO Storage writer on lse012: starting as rank 0 + // IONProc@00 31-03-11 00:18:50.031 INFO [obs 24811] ----- Observation start + + int obsID; + unsigned bufsize = strlen( msg ) + 1; + + if (sscanf(msg,"[obs %d] ----- Creating new job", &obsID) == 1) { + LOG_DEBUG_STR("obs " << obsID << " created"); + } + + if (sscanf(msg,"[obs %d] Waiting for job to start", &obsID) == 1) { + LOG_DEBUG_STR("obs " << obsID << " waiting to start"); + } + + { + vector<char> host(bufsize); + int rank; + + if (sscanf(msg,"[obs %d] Storage writer on %[^:]: starting as rank %d", &obsID, &host[0], &rank) == 3) { + LOG_DEBUG_STR("obs " << obsID << " starts storage writer " << rank << " on host " << &host[0]); + } + } + + if (sscanf(msg,"[obs %d] ----- Observation start", &obsID) == 1) { + LOG_DEBUG_STR("obs " << obsID << " run()"); + } + // // InputBuffer // @@ -646,7 +721,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const char * LOG_DEBUG(formatString("[%d] Dropping data started ",processNr)); itsAdders[processNr]->setValue(PN_ADD_DROPPING, GCFPVBool(true), ts); itsAdders[processNr]->setValue(PN_ADD_LOG_LINE,GCFPVString(result),ts); - itsDroppingCount[processNr]=itsDroppingCount[processNr]+1; + itsDroppingCount[processNr]++; LOG_DEBUG(formatString("Dropping count[%d] : %d", processNr,itsDroppingCount[processNr])); return; } @@ -659,7 +734,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const char * itsAdders[processNr]->setValue(PN_ADD_NR_BLOCKS_DROPPED, GCFPVInteger(dropped), ts); } itsAdders[processNr]->setValue(PN_ADD_LOG_LINE,GCFPVString(result),ts); - itsDroppingCount[processNr]=itsDroppingCount[processNr]-1; + itsDroppingCount[processNr]--; LOG_DEBUG(formatString("Dropping count[%d] : %d", processNr,itsDroppingCount[processNr])); // if dropcount = 0 again, if so reset dropping flag if (itsDroppingCount[processNr] <= 0) { @@ -670,18 +745,20 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const char * } } -void CEPlogProcessor::_processCNProcLine(int processNr, time_t ts, const char *loglevel, const char *msg) -{ - LOG_DEBUG_STR("_processCNProcLine(" << processNr << "," << ts << "," << loglevel << "," << msg << ")"); +void CEPlogProcessor::_processCNProcLine(const char *host, time_t ts, const char *loglevel, const char *msg) +{ + LOG_DEBUG_STR("_processCNProcLine(" << host << "," << ts << "," << loglevel << "," << msg << ")"); // TODO } -void CEPlogProcessor::_processStorageLine(int processNr, time_t ts, const char *loglevel, const char *msg) +void CEPlogProcessor::_processStorageLine(const char *host, time_t ts, const char *loglevel, const char *msg) { - LOG_DEBUG_STR("_processStorageLine(" << processNr << "," << ts << "," << loglevel << "," << msg << ")"); + LOG_DEBUG_STR("_processStorageLine(" << host << "," << ts << "," << loglevel << "," << msg << ")"); - if (processNr < 0 || processNr >= itsNrStorage) { + unsigned processNr; + + if (processNr >= itsNrStorage) { LOG_WARN_STR("Storage range = 0.." << itsNrStorage << ". Index " << processNr << " is invalid"); return; } @@ -740,9 +817,11 @@ GCFEvent::TResult CEPlogProcessor::finish_state(GCFEvent& event, GCFPortInterfac case F_ENTRY: { // update PVSS itsOwnPropertySet->setValue(string(PN_FSM_CURRENT_ACTION),GCFPVString("finished")); + + itsTimerPort->cancelAllTimers(); break; } - + case DP_SET: break; diff --git a/MAC/APL/CEPCU/src/CEPlogProcessor/CEPlogProcessor.h b/MAC/APL/CEPCU/src/CEPlogProcessor/CEPlogProcessor.h index a5c067155113d5c2c189a52fa5086fb05d33d449..cac8007d73273daa95b8d4adf409df774fd483b4 100644 --- a/MAC/APL/CEPCU/src/CEPlogProcessor/CEPlogProcessor.h +++ b/MAC/APL/CEPCU/src/CEPlogProcessor/CEPlogProcessor.h @@ -35,13 +35,13 @@ #include <time.h> namespace LOFAR { - using MACIO::GCFEvent; - using GCF::TM::GCFTask; - using GCF::TM::GCFTCPPort; - using GCF::TM::GCFTimerPort; - using GCF::TM::GCFPortInterface; - using GCF::RTDB::RTDBPropertySet; - namespace APL { + using MACIO::GCFEvent; + using GCF::TM::GCFTask; + using GCF::TM::GCFTCPPort; + using GCF::TM::GCFTimerPort; + using GCF::TM::GCFPortInterface; + using GCF::RTDB::RTDBPropertySet; + namespace APL { // \addtogroup CEPCU // @{ @@ -51,76 +51,78 @@ namespace LOFAR { class CEPlogProcessor : public GCFTask { public: - explicit CEPlogProcessor(const string& cntlrName); - ~CEPlogProcessor(); - - // its processing states - GCFEvent::TResult initial_state (GCFEvent& event, GCFPortInterface& port); - GCFEvent::TResult createPropertySets(GCFEvent& event, GCFPortInterface& port); - GCFEvent::TResult startListener (GCFEvent& event, GCFPortInterface& port); - GCFEvent::TResult operational (GCFEvent& event, GCFPortInterface& port); - GCFEvent::TResult finish_state (GCFEvent& event, GCFPortInterface& port); - - // Interrupthandler for switching to the finish state when exiting the program - static void signalHandler (int signum); - void finish(); - + explicit CEPlogProcessor(const string& cntlrName); + ~CEPlogProcessor(); + + // its processing states + GCFEvent::TResult initial_state (GCFEvent& event, GCFPortInterface& port); + GCFEvent::TResult createPropertySets(GCFEvent& event, GCFPortInterface& port); + GCFEvent::TResult startListener (GCFEvent& event, GCFPortInterface& port); + GCFEvent::TResult operational (GCFEvent& event, GCFPortInterface& port); + GCFEvent::TResult finish_state (GCFEvent& event, GCFPortInterface& port); + + // Interrupthandler for switching to the finish state when exiting the program + static void signalHandler (int signum); + void finish(); + private: - // Copying is not allowed - CEPlogProcessor(); - CEPlogProcessor(const CEPlogProcessor& that); - CEPlogProcessor& operator=(const CEPlogProcessor& that); - - // Admin functions - void _deleteStream (GCFPortInterface& port); - void _handleConnectionRequest(); - - // Routines for processing the loglines. - void _handleDataStream (GCFPortInterface* port); - time_t _parseDateTime (const char *datestr, const char *timestr) const; - void _processLogLine (const char *cString); - - void _processIONProcLine(int processNr, time_t ts, const char *loglevel, const char *msg); - void _processCNProcLine(int processNr, time_t ts, const char *loglevel, const char *msg); - void _processStorageLine(int processNr, time_t ts, const char *loglevel, const char *msg); - - //# --- Datamembers --- - // The listener socket to receive the requests on. - GCFTCPPort* itsListener; - - RTDBPropertySet* itsOwnPropertySet; - GCFTimerPort* itsTimerPort; - - // internal structure for admin for 1 stream - typedef struct { - GCFTCPPort* socket; - CircularBuffer* buffer; - } streamBuffer_t; - - // internal structure for lse based logging - typedef struct { - vector<string> timeStr; - vector<int> count; - vector<string> dropped; - } logBuffer_t; - - - - // Map containing all the streambuffers. - map<GCFPortInterface*, streamBuffer_t> itsLogStreams; - - vector<RTDBPropertySet*> itsInputBuffers; - vector<RTDBPropertySet*> itsAdders; - vector<RTDBPropertySet*> itsStorage; - vector<int> itsDroppingCount; + // Copying is not allowed + CEPlogProcessor(); + CEPlogProcessor(const CEPlogProcessor& that); + CEPlogProcessor& operator=(const CEPlogProcessor& that); + + // Admin functions + void _deleteStream (GCFPortInterface& port); + void _handleConnectionRequest(); + + // Routines for processing the loglines. + void _handleDataStream (GCFPortInterface* port); + time_t _parseDateTime (const char *datestr, const char *timestr) const; + void _processLogLine (const char *cString); + + void collectGarbage(); + + void _processIONProcLine(const char *host, time_t ts, const char *loglevel, const char *msg); + void _processCNProcLine(const char *host, time_t ts, const char *loglevel, const char *msg); + void _processStorageLine(const char *host, time_t ts, const char *loglevel, const char *msg); + + //# --- Datamembers --- + // The listener socket to receive the requests on. + GCFTCPPort* itsListener; + + RTDBPropertySet* itsOwnPropertySet; + GCFTimerPort* itsTimerPort; + + // internal structure for admin for 1 stream + typedef struct { + GCFTCPPort* socket; + CircularBuffer* buffer; + } streamBuffer_t; + + // internal structure for lse based logging + typedef struct { + vector<string> timeStr; + vector<int> count; + vector<string> dropped; + } logBuffer_t; + + + // Map containing all the streambuffers. + map<GCFPortInterface*, streamBuffer_t> itsLogStreams; + vector<GCFPortInterface*> itsLogStreamsGarbage; + + vector<RTDBPropertySet*> itsInputBuffers; + vector<RTDBPropertySet*> itsAdders; + vector<RTDBPropertySet*> itsStorage; + vector<int> itsDroppingCount; vector<logBuffer_t> itsStorageBuf; - // values read from the conf file. - int itsNrInputBuffers; - int itsNrAdders; - int itsNrStorage; - int itsBufferSize; + // values read from the conf file. + unsigned itsNrInputBuffers; + unsigned itsNrAdders; + unsigned itsNrStorage; + unsigned itsBufferSize; }; // @} addgroup