diff --git a/MAC/GCF/LogSys/KVLogSys/src/GCF_KeyValueLogger.cc b/MAC/GCF/LogSys/KVLogSys/src/GCF_KeyValueLogger.cc index 0198822bc547c066c4a7e201febe2777a1037f2a..493987109ed81a232692ec1df0486f7e9671e4b9 100644 --- a/MAC/GCF/LogSys/KVLogSys/src/GCF_KeyValueLogger.cc +++ b/MAC/GCF/LogSys/KVLogSys/src/GCF_KeyValueLogger.cc @@ -51,7 +51,8 @@ GCFKeyValueLogger* GCFKeyValueLogger::instance() } GCFKeyValueLogger::GCFKeyValueLogger() : - GCFTask((State)&GCFKeyValueLogger::initial, KVL_CLIENT_TASK_NAME) + GCFTask((State)&GCFKeyValueLogger::initial, KVL_CLIENT_TASK_NAME), + _manIdToSkip(-1) { // register the protocol for debugging purposes registerProtocol(KVL_PROTOCOL, KVL_PROTOCOL_signalnames); @@ -79,7 +80,7 @@ GCFEvent::TResult GCFKeyValueLogger::initial(GCFEvent& e, GCFPortInterface& /*p* break; case F_DISCONNECTED: - _kvlClientPort.setTimer(1.0); // try again after 1 second + _kvlClientPort.setTimer(TO_TRY_RECONNECT); // try again after 1 second break; default: @@ -100,9 +101,11 @@ GCFEvent::TResult GCFKeyValueLogger::operational(GCFEvent& e, GCFPortInterface& LOG_FATAL("Connection lost to KeyValue Logger deamon"); p.close(); break; + case F_CLOSED: TRAN(GCFKeyValueLogger::initial); break; + case F_ENTRY: { KVLUpdateEvent* pUpdateEvent; @@ -120,6 +123,10 @@ GCFEvent::TResult GCFKeyValueLogger::operational(GCFEvent& e, GCFPortInterface& delete pEvent; } _msgQueue.clear(); + if (_manIdToSkip > -1) + { + skipUpdatesFrom(_manIdToSkip); + } break; } default: @@ -194,17 +201,12 @@ void GCFKeyValueLogger::addAction(const string& key, uint8 action, void GCFKeyValueLogger::skipUpdatesFrom(uint8 manId) { - KVLSkipUpdatesFromEvent* pIndication = new KVLSkipUpdatesFromEvent; - pIndication->man_id = manId; - + KVLSkipUpdatesFromEvent indication; + indication.man_id = manId; + _manIdToSkip = manId; if (_kvlClientPort.isConnected()) { - _kvlClientPort.send(*pIndication); - delete pIndication; - } - else - { - _msgQueue.push_back(pIndication); + _kvlClientPort.send(indication); } } } // namespace LogSys diff --git a/MAC/GCF/LogSys/KVLogSys/src/KVLDefines.h b/MAC/GCF/LogSys/KVLogSys/src/KVLDefines.h index 0dce8e146fcd45cfb0ecbcc22e82803000744e12..a2c6e52c5a20393e3c1a7e124df8d4f757915288 100644 --- a/MAC/GCF/LogSys/KVLogSys/src/KVLDefines.h +++ b/MAC/GCF/LogSys/KVLogSys/src/KVLDefines.h @@ -36,7 +36,8 @@ namespace LOFAR #define MAX_EVENTS_BUFF_SIZE 1400 #define MAX_NR_OF_EVENTS 250 #define MAX_NR_OF_RETRY_MSG 10 -#define CONNECTION_TIMEOUT 3600.0 // == 1 hour +#define TO_DISCONNECTED 3600.0 // == 1 hour +#define TO_TRY_RECONNECT 5.0 const string KVL_CLIENT_TASK_NAME("GCF-KVLC"); const string KVL_DAEMON_TASK_NAME("GCF-KVLD"); diff --git a/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.cc b/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.cc index 360a619c87095c9d53f2bc2783dde1317e2561d3..ac81fdd2ee10a95937d9009cde7345f3975a4d5f 100644 --- a/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.cc +++ b/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.cc @@ -49,7 +49,8 @@ KeyValueLoggerDaemon::KeyValueLoggerDaemon() : _registerID(0), _curSeqNr(0), _oldestUnanswerdSeqNr(1), - _propertyLogger(*this) + _propertyLogger(*this), + _waitForAnswer(false) { // register the protocol for debugging purposes registerProtocol(KVL_PROTOCOL, KVL_PROTOCOL_signalnames); @@ -87,7 +88,7 @@ GCFEvent::TResult KeyValueLoggerDaemon::initial(GCFEvent& e, GCFPortInterface& p break; case F_DISCONNECTED: - p.setTimer(1.0); // try again after 1 second + p.setTimer(TO_TRY_RECONNECT); // try again after 1 second break; default: @@ -106,21 +107,72 @@ GCFEvent::TResult KeyValueLoggerDaemon::operational(GCFEvent& e, GCFPortInterfac switch (e.signal) { + case F_ENTRY: + _kvlDaemonPortProvider.setTimer(1.0, 5.0); // garbage timer + if (!_kvlMasterClientPort.isConnected()) + { + _kvlMasterClientPort.open(); + } + break; + + case F_CONNECTED: + DBGFAILWHEN(&_kvlDaemonPortProvider == &p); + if (&_kvlMasterClientPort == &p) + { + _kvlDaemonPortProvider.cancelTimer(hourTimerID); + KVLRegisterEvent request; + request.curID = _registerID; + if (_seqList.size() > 0) + { + request.firstSeqNr = _oldestUnanswerdSeqNr - 1; + } + else + { + request.firstSeqNr = _curSeqNr; + } + _kvlMasterClientPort.send(request); + hourTimerID = -1; + } + break; + + case F_ACCEPT_REQ: + { + LOG_INFO("New daemon client accepted!"); + GCFTCPPort* pNewDCPort = new GCFTCPPort(); + ASSERT(pNewDCPort); + pNewDCPort->init(*this, "kvld-client", GCFPortInterface::SPP, KVL_PROTOCOL); + _kvlDaemonPortProvider.accept(*pNewDCPort); + break; + } case F_DISCONNECTED: DBGFAILWHEN(&_kvlDaemonPortProvider == &p && "Daemon port provider may not be disconnected."); if (&_kvlMasterClientPort == &p) { - LOG_WARN("Connection lost to KeyValue Logger master. Tries a reconnect!!!"); + LOG_WARN("Connection lost to KeyValueLogger master. Tries a reconnect!!!"); _kvlMasterClientPort.cancelAllTimers(); - hourTimerID = _kvlMasterClientPort.setTimer(CONNECTION_TIMEOUT); + hourTimerID = _kvlMasterClientPort.setTimer(TO_DISCONNECTED); + _waitForAnswer = false; } else { - LOG_INFO("Connection lost to KeyValue Logger daemon client."); + LOG_INFO("Connection lost to a KeyValueLogger client of the deamon."); } p.close(); break; + case F_CLOSED: + DBGFAILWHEN(&_kvlDaemonPortProvider == &p); + if (&_kvlMasterClientPort == &p) + { + _kvlMasterClientPort.setTimer(TO_TRY_RECONNECT); + } + else + { + _clientsGarbage.push_back(&p); + _propertyLogger.clientGone(p); + } + break; + case F_TIMER: if (&_kvlDaemonPortProvider == &p) { @@ -128,10 +180,8 @@ GCFEvent::TResult KeyValueLoggerDaemon::operational(GCFEvent& e, GCFPortInterfac if (hourTimerID == (long) pTimer->id) { // about 1 hour no connection: - // reset register ID, master also will release this number - // all buffered updates will lost + // reset register ID, master also will release this number and the client administration _registerID = 0; - _curSeqNr = 0; hourTimerID = -2; // still no connection } else @@ -156,118 +206,49 @@ GCFEvent::TResult KeyValueLoggerDaemon::operational(GCFEvent& e, GCFPortInterfac } else { - if (_seqList.empty()) + if (_nrOfBufferedEvents > 0) { - if (_nrOfBufferedEvents > 0) - { - // send current collected updates - sendEventsBuffer(); - } - } - else if (_kvlMasterClientPort.isConnected()) - { - // resend max. 10 not answered messages - uint8 i = 0; - KVLEventCollectionEvent* pUpdateEvents; - TSequenceList::iterator iter; - // find the first not yet sent events buffer - for (iter = _seqList.find(_oldestUnanswerdSeqNr); - i < MAX_NR_OF_RETRY_MSG; ++iter) - { - if (iter == _seqList.end()) - { - _oldestUnanswerdSeqNr++; - } - else - { - // found - break; - } - i++; - } - for (i = 0; i < MAX_NR_OF_RETRY_MSG; i++) - { - if (iter == _seqList.end()) - { - // because the map is sorted, we have to force jump from end to begin; - iter = _seqList.begin(); - } - if (iter != _seqList.end()) - { - pUpdateEvents = iter->second; - if (pUpdateEvents->daemonID == 0) - { - pUpdateEvents->daemonID = _registerID; - } - _kvlMasterClientPort.send(*pUpdateEvents); - } - else - { - break; - } - ++iter; - } + // send current collected updates + sendEventsBuffer(); } } } break; - - case F_CLOSED: - DBGFAILWHEN(&_kvlDaemonPortProvider == &p); - if (&_kvlMasterClientPort == &p) - { - _kvlMasterClientPort.setTimer(1.0); // try to reconnect again after 1 second; - } - else + + case KVL_REGISTERED: + { + KVLRegisteredEvent response(e); + if (_registerID != response.ID) { - _clientsGarbage.push_back(&p); - _propertyLogger.clientGone(p); + LOG_DEBUG(formatString( + "Registered on master with nr. %d", + response.ID)); } - break; - - case F_CONNECTED: - DBGFAILWHEN(&_kvlDaemonPortProvider == &p); - if (&_kvlMasterClientPort == &p) + _registerID = response.ID; + _kvlMasterClientPort.setTimer(1.0, 1.0); // start the send heartbeat + if (response.curSeqNr != (_oldestUnanswerdSeqNr - 1)) { - _kvlDaemonPortProvider.cancelTimer(hourTimerID); - KVLRegisterEvent request; - request.curID = _registerID; - if (_seqList.size() > 0) + // collection with this seqNr was received successful, + // but no answer was received before the connection was broken + // so the collection can be removed and the following message in + // the queue can be send + TSequenceList::iterator iter = _seqList.find(response.curSeqNr); + if (iter != _seqList.end()) { - request.firstSeqNr = _oldestUnanswerdSeqNr - 1; + delete iter->second; } - else - { - request.firstSeqNr = _curSeqNr; - } - _kvlMasterClientPort.send(request); - hourTimerID = -1; - } + _seqList.erase(response.curSeqNr); + _oldestUnanswerdSeqNr++; + } + sendOldestCollection(); break; - - case F_ACCEPT_REQ: - { - LOG_INFO("New daemon client accepted!"); - GCFTCPPort* pNewDCPort = new GCFTCPPort(); - ASSERT(pNewDCPort); - pNewDCPort->init(*this, "kvld-client", GCFPortInterface::SPP, KVL_PROTOCOL); - _kvlDaemonPortProvider.accept(*pNewDCPort); - break; - } - case F_ENTRY: - _kvlDaemonPortProvider.setTimer(1.0, 5.0); // garbage timer - if (!_kvlMasterClientPort.isConnected()) - { - _kvlMasterClientPort.open(); - } - break; - + } case KVL_UPDATE: case KVL_ADD_ACTION: { if (hourTimerID == -2) { - LOG_DEBUG("More than 1 hour no connection with the master, so dump all key value updates."); + LOG_DEBUG("More than 1 hour no connection with the master, so dump all receiving key value updates."); break; } @@ -298,50 +279,19 @@ GCFEvent::TResult KeyValueLoggerDaemon::operational(GCFEvent& e, GCFPortInterfac } break; } - case KVL_ANSWER: - { - KVLAnswerEvent answer(e); - TSequenceList::iterator iter = _seqList.find(answer.seqNr); - LOG_DEBUG(formatString( - "Message with nr. %d was successfully received by the master.", - answer.seqNr)); - - if (iter != _seqList.end()) - { - delete iter->second; - } - _seqList.erase(answer.seqNr); - ASSERT(answer.seqNr == _oldestUnanswerdSeqNr); - _oldestUnanswerdSeqNr++; - break; - } - - case KVL_REGISTERED: + case F_VCHANGEMSG: { - KVLRegisteredEvent response(e); - if (_registerID != response.ID) + if (hourTimerID == -2) { - LOG_DEBUG(formatString( - "Registered on master with nr. %d", - response.ID)); + LOG_DEBUG("More than 1 hour no connection with the master, so dump all receiving key value updates."); + break; } - _registerID = response.ID; - - for (uint64 i = 0; i < MAX_NR_OF_RETRY_MSG; i++) + + if (_seqList.size() == 0xFFFF) { - _seqList.erase(response.curSeqNr - i); + LOG_DEBUG("Cannot buffer more events. Dump as long as the buffer not decreases."); + break; } - _kvlMasterClientPort.setTimer(1.0, 1.0); // start the (re)send heartbeat - break; - } - case KVL_SKIP_UPDATES_FROM: - { - KVLSkipUpdatesFromEvent request(e); - _propertyLogger.skipUpdatesFrom(request.man_id, p); - break; - } - case F_VCHANGEMSG: - { GCFPropValueEvent& pve = (GCFPropValueEvent&) e; KVLUpdateEvent ue; ue.key = pve.pPropName; @@ -369,6 +319,31 @@ GCFEvent::TResult KeyValueLoggerDaemon::operational(GCFEvent& e, GCFPortInterfac } break; } + case KVL_ANSWER: + { + KVLAnswerEvent answer(e); + LOG_DEBUG(formatString( + "Message with nr. %d was successfully received by the master.", + answer.seqNr)); + + TSequenceList::iterator iter = _seqList.find(answer.seqNr); + if (iter != _seqList.end()) + { + delete iter->second; + } + _seqList.erase(answer.seqNr); + ASSERT(answer.seqNr == _oldestUnanswerdSeqNr); + _waitForAnswer = false; + _oldestUnanswerdSeqNr++; + sendOldestCollection(); + break; + } + case KVL_SKIP_UPDATES_FROM: + { + KVLSkipUpdatesFromEvent request(e); + _propertyLogger.skipUpdatesFrom(request.man_id, p); + break; + } default: status = GCFEvent::NOT_HANDLED; break; @@ -391,15 +366,31 @@ void KeyValueLoggerDaemon::sendEventsBuffer() pCollectionEvent->nrOfEvents = _nrOfBufferedEvents; pCollectionEvent->events.buf.setValue(_eventsBuf, _curEventsBufSize, true); - if (_kvlMasterClientPort.isConnected()) - { - _kvlMasterClientPort.send(*pCollectionEvent); - } _seqList[_curSeqNr] = pCollectionEvent; + sendOldestCollection(); + _curEventsBufSize = 0; _nrOfBufferedEvents = 0; } +void KeyValueLoggerDaemon::sendOldestCollection() +{ + if (!_seqList.empty() && !_waitForAnswer && _kvlMasterClientPort.isConnected()) + { + TSequenceList::iterator iter = _seqList.find(_oldestUnanswerdSeqNr); + ASSERT(iter != _seqList.end()); + + KVLEventCollectionEvent* pUpdateEvents = iter->second; + if (pUpdateEvents->daemonID == 0) + { + pUpdateEvents->daemonID = _registerID; + } + + _waitForAnswer = true; + _kvlMasterClientPort.send(*pUpdateEvents); + } +} + } // namespace LogSys } // namespace GCF } // namespace LOFAR diff --git a/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.h b/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.h index 65f77a13432c50ee05d176f6980a25f08782df3c..0c5144c4a4e67418999e05f3059071cc7794c271 100644 --- a/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.h +++ b/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerDaemon.h @@ -54,6 +54,7 @@ class KeyValueLoggerDaemon : public TM::GCFTask private: // helper methods void sendEventsBuffer(); + void sendOldestCollection(); private: // data members TM::GCFTCPPort _kvlDaemonPortProvider; @@ -71,6 +72,7 @@ class KeyValueLoggerDaemon : public TM::GCFTask uint64 _curSeqNr; uint64 _oldestUnanswerdSeqNr; PropertyLogger _propertyLogger; + bool _waitForAnswer; }; } // namespace LogSys diff --git a/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerMaster.cc b/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerMaster.cc index d7ff961818518fb0f07b7378437b4a9eb78bca61..6ee4251c52cb7998414b74911b1331ef9821597d 100644 --- a/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerMaster.cc +++ b/MAC/GCF/LogSys/KVLogSys/src/KeyValueLoggerMaster.cc @@ -94,7 +94,7 @@ GCFEvent::TResult KeyValueLoggerMaster::initial(GCFEvent& e, GCFPortInterface& p break; case F_DISCONNECTED: - p.setTimer(1.0); // try again after 1 second + p.setTimer(TO_TRY_RECONNECT); // try again after 1 second break; default: @@ -162,7 +162,7 @@ GCFEvent::TResult KeyValueLoggerMaster::operational(GCFEvent& e, GCFPortInterfac if (iter->second.pPort == &p) { iter->second.pPort = 0; - iter->second.hourTimerID = _kvlMasterPortProvider.setTimer(CONNECTION_TIMEOUT); + iter->second.hourTimerID = _kvlMasterPortProvider.setTimer(TO_DISCONNECTED); LOG_DEBUG(formatString( "Hour timer %d for disconnected daemon %d is started.", iter->second.hourTimerID, @@ -298,7 +298,7 @@ GCFEvent::TResult KeyValueLoggerMaster::operational(GCFEvent& e, GCFPortInterfac { TClient client; client.pPort = &p; - client.curSeqNr = 0; + client.curSeqNr = request.firstSeqNr; uint8 newClientID = 0; do { @@ -312,7 +312,7 @@ GCFEvent::TResult KeyValueLoggerMaster::operational(GCFEvent& e, GCFPortInterfac newClientID)); _clients[newClientID] = client; response.ID = newClientID; - response.curSeqNr = 0; + response.curSeqNr = client.curSeqNr; } else { diff --git a/MAC/GCF/LogSys/include/GCF/LogSys/GCF_KeyValueLogger.h b/MAC/GCF/LogSys/include/GCF/LogSys/GCF_KeyValueLogger.h index 268415f895b890e943a1cd0ebf486d9568969c2f..7a318ae14e99a03fab209b58a3d86a0d81f6d6b9 100644 --- a/MAC/GCF/LogSys/include/GCF/LogSys/GCF_KeyValueLogger.h +++ b/MAC/GCF/LogSys/include/GCF/LogSys/GCF_KeyValueLogger.h @@ -71,6 +71,7 @@ class GCFKeyValueLogger : public TM::GCFTask private: // data members TM::GCFPort _kvlClientPort; + int _manIdToSkip; static GCFKeyValueLogger* _pInstance; private: // admin members