From 320a108d35bf7e6ee92acd25c7107cd52ed1000c Mon Sep 17 00:00:00 2001 From: Alexander Mueller <alexander.mueller@hs.uni-hamburg.de> Date: Fri, 14 Oct 2005 15:17:34 +0000 Subject: [PATCH] BugID: 447 During further tests and the LDR1.7 integration the KeyValueLoggerMaster and Daemon are not working very efficient together. So this is changed too. Fruther at the moment a API manager has requested the KVLDaemon to skip an PVSS update value the value was not remembered for the situation the connection to the KVLDaemon was broken. Now the remembered man ID will be resend if the connection is reestablished. --- .../LogSys/KVLogSys/src/GCF_KeyValueLogger.cc | 24 +- MAC/GCF/LogSys/KVLogSys/src/KVLDefines.h | 3 +- .../KVLogSys/src/KeyValueLoggerDaemon.cc | 285 +++++++++--------- .../KVLogSys/src/KeyValueLoggerDaemon.h | 2 + .../KVLogSys/src/KeyValueLoggerMaster.cc | 8 +- .../include/GCF/LogSys/GCF_KeyValueLogger.h | 1 + 6 files changed, 160 insertions(+), 163 deletions(-) diff --git a/MAC/GCF/LogSys/KVLogSys/src/GCF_KeyValueLogger.cc b/MAC/GCF/LogSys/KVLogSys/src/GCF_KeyValueLogger.cc index 0198822bc54..493987109ed 100644 --- a/MAC/GCF/LogSys/KVLogSys/src/GCF_KeyValueLogger.cc +++ b/MAC/GCF/LogSys/KVLogSys/src/GCF_KeyValueLogger.cc @@ -51,7 +51,8 @@ GCFKeyValueLogger* GCFKeyValueLogger::instance() } GCFKeyValueLogger::GCFKeyValueLogger() : - GCFTask((State)&GCFKeyValueLogger::initial, KVL_CLIENT_TASK_NAME) + GCFTask((State)&GCFKeyValueLogger::initial, KVL_CLIENT_TASK_NAME), + _manIdToSkip(-1) { // register the protocol for debugging purposes registerProtocol(KVL_PROTOCOL, KVL_PROTOCOL_signalnames); @@ -79,7 +80,7 @@ GCFEvent::TResult GCFKeyValueLogger::initial(GCFEvent& e, GCFPortInterface& /*p* break; case F_DISCONNECTED: - _kvlClientPort.setTimer(1.0); // try again after 1 second + _kvlClientPort.setTimer(TO_TRY_RECONNECT); // try again after 1 second break; default: @@ -100,9 +101,11 @@ GCFEvent::TResult GCFKeyValueLogger::operational(GCFEvent& e, GCFPortInterface& LOG_FATAL("Connection lost to KeyValue Logger deamon"); p.close(); break; + case F_CLOSED: TRAN(GCFKeyValueLogger::initial); break; + case F_ENTRY: { KVLUpdateEvent* pUpdateEvent; @@ -120,6 +123,10 @@ GCFEvent::TResult GCFKeyValueLogger::operational(GCFEvent& e, GCFPortInterface& delete pEvent; } _msgQueue.clear(); + if (_manIdToSkip > -1) + { + skipUpdatesFrom(_manIdToSkip); + } break; } default: @@ -194,17 +201,12 @@ void GCFKeyValueLogger::addAction(const string& key, uint8 action, void GCFKeyValueLogger::skipUpdatesFrom(uint8 manId) { - KVLSkipUpdatesFromEvent* pIndication = new KVLSkipUpdatesFromEvent; - pIndication->man_id = manId; - + KVLSkipUpdatesFromEvent indication; + indication.man_id = manId; + _manIdToSkip = manId; if (_kvlClientPort.isConnected()) { - _kvlClientPort.send(*pIndication); - delete pIndication; - } - else - { - _msgQueue.push_back(pIndication); + _kvlClientPort.send(indication); } } } // namespace LogSys diff --git a/MAC/GCF/LogSys/KVLogSys/src/KVLDefines.h b/MAC/GCF/LogSys/KVLogSys/src/KVLDefines.h index 0dce8e146fc..a2c6e52c5a2 100644 --- a/MAC/GCF/LogSys/KVLogSys/src/KVLDefines.h +++ b/MAC/GCF/LogSys/KVLogSys/src/KVLDefines.h @@ -36,7 +36,8 @@ namespace LOFAR #define MAX_EVENTS_BUFF_SIZE 1400 #define MAX_NR_OF_EVENTS 250 #define MAX_NR_OF_RETRY_MSG 10 -#define CONNECTION_TIMEOUT 3600.0 // == 1 hour +#define TO_DISCONNECTED 3600.0 // == 1 hour +#define TO_TRY_RECONNECT 5.0 const string KVL_CLIENT_TASK_NAME("GCF-KVLC"); const string KVL_DAEMON_TASK_NAME("GCF-KVLD"); diff --git a/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.cc b/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.cc index 360a619c870..ac81fdd2ee1 100644 --- a/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.cc +++ b/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.cc @@ -49,7 +49,8 @@ KeyValueLoggerDaemon::KeyValueLoggerDaemon() : _registerID(0), _curSeqNr(0), _oldestUnanswerdSeqNr(1), - _propertyLogger(*this) + _propertyLogger(*this), + _waitForAnswer(false) { // register the protocol for debugging purposes registerProtocol(KVL_PROTOCOL, KVL_PROTOCOL_signalnames); @@ -87,7 +88,7 @@ GCFEvent::TResult KeyValueLoggerDaemon::initial(GCFEvent& e, GCFPortInterface& p break; case F_DISCONNECTED: - p.setTimer(1.0); // try again after 1 second + p.setTimer(TO_TRY_RECONNECT); // try again after 1 second break; default: @@ -106,21 +107,72 @@ GCFEvent::TResult KeyValueLoggerDaemon::operational(GCFEvent& e, GCFPortInterfac switch (e.signal) { + case F_ENTRY: + _kvlDaemonPortProvider.setTimer(1.0, 5.0); // garbage timer + if (!_kvlMasterClientPort.isConnected()) + { + _kvlMasterClientPort.open(); + } + break; + + case F_CONNECTED: + DBGFAILWHEN(&_kvlDaemonPortProvider == &p); + if (&_kvlMasterClientPort == &p) + { + _kvlDaemonPortProvider.cancelTimer(hourTimerID); + KVLRegisterEvent request; + request.curID = _registerID; + if (_seqList.size() > 0) + { + request.firstSeqNr = _oldestUnanswerdSeqNr - 1; + } + else + { + request.firstSeqNr = _curSeqNr; + } + _kvlMasterClientPort.send(request); + hourTimerID = -1; + } + break; + + case F_ACCEPT_REQ: + { + LOG_INFO("New daemon client accepted!"); + GCFTCPPort* pNewDCPort = new GCFTCPPort(); + ASSERT(pNewDCPort); + pNewDCPort->init(*this, "kvld-client", GCFPortInterface::SPP, KVL_PROTOCOL); + _kvlDaemonPortProvider.accept(*pNewDCPort); + break; + } case F_DISCONNECTED: DBGFAILWHEN(&_kvlDaemonPortProvider == &p && "Daemon port provider may not be disconnected."); if (&_kvlMasterClientPort == &p) { - LOG_WARN("Connection lost to KeyValue Logger master. Tries a reconnect!!!"); + LOG_WARN("Connection lost to KeyValueLogger master. Tries a reconnect!!!"); _kvlMasterClientPort.cancelAllTimers(); - hourTimerID = _kvlMasterClientPort.setTimer(CONNECTION_TIMEOUT); + hourTimerID = _kvlMasterClientPort.setTimer(TO_DISCONNECTED); + _waitForAnswer = false; } else { - LOG_INFO("Connection lost to KeyValue Logger daemon client."); + LOG_INFO("Connection lost to a KeyValueLogger client of the deamon."); } p.close(); break; + case F_CLOSED: + DBGFAILWHEN(&_kvlDaemonPortProvider == &p); + if (&_kvlMasterClientPort == &p) + { + _kvlMasterClientPort.setTimer(TO_TRY_RECONNECT); + } + else + { + _clientsGarbage.push_back(&p); + _propertyLogger.clientGone(p); + } + break; + case F_TIMER: if (&_kvlDaemonPortProvider == &p) { @@ -128,10 +180,8 @@ GCFEvent::TResult KeyValueLoggerDaemon::operational(GCFEvent& e, GCFPortInterfac if (hourTimerID == (long) pTimer->id) { // about 1 hour no connection: - // reset register ID, master also will release this number - // all buffered updates will lost + // reset register ID, master also will release this number and the client administration _registerID = 0; - _curSeqNr = 0; hourTimerID = -2; // still no connection } else @@ -156,118 +206,49 @@ GCFEvent::TResult KeyValueLoggerDaemon::operational(GCFEvent& e, GCFPortInterfac } else { - if (_seqList.empty()) + if (_nrOfBufferedEvents > 0) { - if (_nrOfBufferedEvents > 0) - { - // send current collected updates - sendEventsBuffer(); - } - } - else if (_kvlMasterClientPort.isConnected()) - { - // resend max. 10 not answered messages - uint8 i = 0; - KVLEventCollectionEvent* pUpdateEvents; - TSequenceList::iterator iter; - // find the first not yet sent events buffer - for (iter = _seqList.find(_oldestUnanswerdSeqNr); - i < MAX_NR_OF_RETRY_MSG; ++iter) - { - if (iter == _seqList.end()) - { - _oldestUnanswerdSeqNr++; - } - else - { - // found - break; - } - i++; - } - for (i = 0; i < MAX_NR_OF_RETRY_MSG; i++) - { - if (iter == _seqList.end()) - { - // because the map is sorted, we have to force jump from end to begin; - iter = _seqList.begin(); - } - if (iter != _seqList.end()) - { - pUpdateEvents = iter->second; - if (pUpdateEvents->daemonID == 0) - { - pUpdateEvents->daemonID = _registerID; - } - _kvlMasterClientPort.send(*pUpdateEvents); - } - else - { - break; - } - ++iter; - } + // send current collected updates + sendEventsBuffer(); } } } break; - - case F_CLOSED: - DBGFAILWHEN(&_kvlDaemonPortProvider == &p); - if (&_kvlMasterClientPort == &p) - { - _kvlMasterClientPort.setTimer(1.0); // try to reconnect again after 1 second; - } - else + + case KVL_REGISTERED: + { + KVLRegisteredEvent response(e); + if (_registerID != response.ID) { - _clientsGarbage.push_back(&p); - _propertyLogger.clientGone(p); + LOG_DEBUG(formatString( + "Registered on master with nr. %d", + response.ID)); } - break; - - case F_CONNECTED: - DBGFAILWHEN(&_kvlDaemonPortProvider == &p); - if (&_kvlMasterClientPort == &p) + _registerID = response.ID; + _kvlMasterClientPort.setTimer(1.0, 1.0); // start the send heartbeat + if (response.curSeqNr != (_oldestUnanswerdSeqNr - 1)) { - _kvlDaemonPortProvider.cancelTimer(hourTimerID); - KVLRegisterEvent request; - request.curID = _registerID; - if (_seqList.size() > 0) + // collection with this seqNr was received successful, + // but no answer was received before the connection was broken + // so the collection can be removed and the following message in + // the queue can be send + TSequenceList::iterator iter = _seqList.find(response.curSeqNr); + if (iter != _seqList.end()) { - request.firstSeqNr = _oldestUnanswerdSeqNr - 1; + delete iter->second; } - else - { - request.firstSeqNr = _curSeqNr; - } - _kvlMasterClientPort.send(request); - hourTimerID = -1; - } + _seqList.erase(response.curSeqNr); + _oldestUnanswerdSeqNr++; + } + sendOldestCollection(); break; - - case F_ACCEPT_REQ: - { - LOG_INFO("New daemon client accepted!"); - GCFTCPPort* pNewDCPort = new GCFTCPPort(); - ASSERT(pNewDCPort); - pNewDCPort->init(*this, "kvld-client", GCFPortInterface::SPP, KVL_PROTOCOL); - _kvlDaemonPortProvider.accept(*pNewDCPort); - break; - } - case F_ENTRY: - _kvlDaemonPortProvider.setTimer(1.0, 5.0); // garbage timer - if (!_kvlMasterClientPort.isConnected()) - { - _kvlMasterClientPort.open(); - } - break; - + } case KVL_UPDATE: case KVL_ADD_ACTION: { if (hourTimerID == -2) { - LOG_DEBUG("More than 1 hour no connection with the master, so dump all key value updates."); + LOG_DEBUG("More than 1 hour no connection with the master, so dump all receiving key value updates."); break; } @@ -298,50 +279,19 @@ GCFEvent::TResult KeyValueLoggerDaemon::operational(GCFEvent& e, GCFPortInterfac } break; } - case KVL_ANSWER: - { - KVLAnswerEvent answer(e); - TSequenceList::iterator iter = _seqList.find(answer.seqNr); - LOG_DEBUG(formatString( - "Message with nr. %d was successfully received by the master.", - answer.seqNr)); - - if (iter != _seqList.end()) - { - delete iter->second; - } - _seqList.erase(answer.seqNr); - ASSERT(answer.seqNr == _oldestUnanswerdSeqNr); - _oldestUnanswerdSeqNr++; - break; - } - - case KVL_REGISTERED: + case F_VCHANGEMSG: { - KVLRegisteredEvent response(e); - if (_registerID != response.ID) + if (hourTimerID == -2) { - LOG_DEBUG(formatString( - "Registered on master with nr. %d", - response.ID)); + LOG_DEBUG("More than 1 hour no connection with the master, so dump all receiving key value updates."); + break; } - _registerID = response.ID; - - for (uint64 i = 0; i < MAX_NR_OF_RETRY_MSG; i++) + + if (_seqList.size() == 0xFFFF) { - _seqList.erase(response.curSeqNr - i); + LOG_DEBUG("Cannot buffer more events. Dump as long as the buffer not decreases."); + break; } - _kvlMasterClientPort.setTimer(1.0, 1.0); // start the (re)send heartbeat - break; - } - case KVL_SKIP_UPDATES_FROM: - { - KVLSkipUpdatesFromEvent request(e); - _propertyLogger.skipUpdatesFrom(request.man_id, p); - break; - } - case F_VCHANGEMSG: - { GCFPropValueEvent& pve = (GCFPropValueEvent&) e; KVLUpdateEvent ue; ue.key = pve.pPropName; @@ -369,6 +319,31 @@ GCFEvent::TResult KeyValueLoggerDaemon::operational(GCFEvent& e, GCFPortInterfac } break; } + case KVL_ANSWER: + { + KVLAnswerEvent answer(e); + LOG_DEBUG(formatString( + "Message with nr. %d was successfully received by the master.", + answer.seqNr)); + + TSequenceList::iterator iter = _seqList.find(answer.seqNr); + if (iter != _seqList.end()) + { + delete iter->second; + } + _seqList.erase(answer.seqNr); + ASSERT(answer.seqNr == _oldestUnanswerdSeqNr); + _waitForAnswer = false; + _oldestUnanswerdSeqNr++; + sendOldestCollection(); + break; + } + case KVL_SKIP_UPDATES_FROM: + { + KVLSkipUpdatesFromEvent request(e); + _propertyLogger.skipUpdatesFrom(request.man_id, p); + break; + } default: status = GCFEvent::NOT_HANDLED; break; @@ -391,15 +366,31 @@ void KeyValueLoggerDaemon::sendEventsBuffer() pCollectionEvent->nrOfEvents = _nrOfBufferedEvents; pCollectionEvent->events.buf.setValue(_eventsBuf, _curEventsBufSize, true); - if (_kvlMasterClientPort.isConnected()) - { - _kvlMasterClientPort.send(*pCollectionEvent); - } _seqList[_curSeqNr] = pCollectionEvent; + sendOldestCollection(); + _curEventsBufSize = 0; _nrOfBufferedEvents = 0; } +void KeyValueLoggerDaemon::sendOldestCollection() +{ + if (!_seqList.empty() && !_waitForAnswer && _kvlMasterClientPort.isConnected()) + { + TSequenceList::iterator iter = _seqList.find(_oldestUnanswerdSeqNr); + ASSERT(iter != _seqList.end()); + + KVLEventCollectionEvent* pUpdateEvents = iter->second; + if (pUpdateEvents->daemonID == 0) + { + pUpdateEvents->daemonID = _registerID; + } + + _waitForAnswer = true; + _kvlMasterClientPort.send(*pUpdateEvents); + } +} + } // namespace LogSys } // namespace GCF } // namespace LOFAR diff --git a/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.h b/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.h index 65f77a13432..0c5144c4a4e 100644 --- a/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.h +++ b/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.h @@ -54,6 +54,7 @@ class KeyValueLoggerDaemon : public TM::GCFTask private: // helper methods void sendEventsBuffer(); + void sendOldestCollection(); private: // data members TM::GCFTCPPort _kvlDaemonPortProvider; @@ -71,6 +72,7 @@ class KeyValueLoggerDaemon : public TM::GCFTask uint64 _curSeqNr; uint64 _oldestUnanswerdSeqNr; PropertyLogger _propertyLogger; + bool _waitForAnswer; }; } // namespace LogSys diff --git a/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerMaster.cc b/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerMaster.cc index d7ff9618185..6ee4251c52c 100644 --- a/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerMaster.cc +++ b/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerMaster.cc @@ -94,7 +94,7 @@ GCFEvent::TResult KeyValueLoggerMaster::initial(GCFEvent& e, GCFPortInterface& p break; case F_DISCONNECTED: - p.setTimer(1.0); // try again after 1 second + p.setTimer(TO_TRY_RECONNECT); // try again after 1 second break; default: @@ -162,7 +162,7 @@ GCFEvent::TResult KeyValueLoggerMaster::operational(GCFEvent& e, GCFPortInterfac if (iter->second.pPort == &p) { iter->second.pPort = 0; - iter->second.hourTimerID = _kvlMasterPortProvider.setTimer(CONNECTION_TIMEOUT); + iter->second.hourTimerID = _kvlMasterPortProvider.setTimer(TO_DISCONNECTED); LOG_DEBUG(formatString( "Hour timer %d for disconnected daemon %d is started.", iter->second.hourTimerID, @@ -298,7 +298,7 @@ GCFEvent::TResult KeyValueLoggerMaster::operational(GCFEvent& e, GCFPortInterfac { TClient client; client.pPort = &p; - client.curSeqNr = 0; + client.curSeqNr = request.firstSeqNr; uint8 newClientID = 0; do { @@ -312,7 +312,7 @@ GCFEvent::TResult KeyValueLoggerMaster::operational(GCFEvent& e, GCFPortInterfac newClientID)); _clients[newClientID] = client; response.ID = newClientID; - response.curSeqNr = 0; + response.curSeqNr = client.curSeqNr; } else { diff --git a/MAC/GCF/LogSys/include/GCF/LogSys/GCF_KeyValueLogger.h b/MAC/GCF/LogSys/include/GCF/LogSys/GCF_KeyValueLogger.h index 268415f895b..7a318ae14e9 100644 --- a/MAC/GCF/LogSys/include/GCF/LogSys/GCF_KeyValueLogger.h +++ b/MAC/GCF/LogSys/include/GCF/LogSys/GCF_KeyValueLogger.h @@ -71,6 +71,7 @@ class GCFKeyValueLogger : public TM::GCFTask private: // data members TM::GCFPort _kvlClientPort; + int _manIdToSkip; static GCFKeyValueLogger* _pInstance; private: // admin members -- GitLab