diff --git a/.gitattributes b/.gitattributes index e44a1fb2038e157e8d3328b8c0d6d2989cb4c920..3007c602ffcb885aec658750c310b735d18c56b7 100644 --- a/.gitattributes +++ b/.gitattributes @@ -2568,8 +2568,10 @@ MAC/APL/CURTDBDaemons/src/LogProcessor/LogProcessor.log_prop -text MAC/APL/CURTDBDaemons/src/PVSSGateway/CMakeLists.txt -text MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSDatapointDefs.h -text MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSGateway.cc -text +MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSGateway.conf -text MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSGateway.h -text MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSGatewayMain.cc -text +MAC/APL/CURTDBDaemons/test/tPVSSGateway.cc -text MAC/APL/DEPENDENCIES -text svneol=native#application/octet-stream MAC/APL/MD_Protocol/src/MD_Protocol.prot -text MAC/APL/MainCU/src/CRTriggerControl/CMakeLists.txt -text diff --git a/MAC/APL/CURTDBDaemons/src/PVSSGateway/CMakeLists.txt b/MAC/APL/CURTDBDaemons/src/PVSSGateway/CMakeLists.txt index b6c9ebf5e906e9ed06ff50dce7bb341257832388..fd7140a9d52f226077e5e48dcf106f715cc5dca2 100644 --- a/MAC/APL/CURTDBDaemons/src/PVSSGateway/CMakeLists.txt +++ b/MAC/APL/CURTDBDaemons/src/PVSSGateway/CMakeLists.txt @@ -1,3 +1,7 @@ # $Id: CMakeLists.txt 14273 2009-10-16 10:08:29Z loose $ +install(FILES + PVSSGateway.conf + DESTINATION etc) + lofar_add_bin_program(PVSSGateway PVSSGatewayMain.cc PVSSGateway.cc) diff --git a/MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSGateway.cc b/MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSGateway.cc index 073a02770b22766827d116eba05b274b353031d9..0ee7c025975bfceabaf2183f0cf4faddb35df86e 100644 --- a/MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSGateway.cc +++ b/MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSGateway.cc @@ -24,6 +24,7 @@ #include <Common/LofarLogger.h> #include <Common/Version.h> #include <Common/ParameterSet.h> +#include <Common/NsTimestamp.h> #include <ApplCommon/PosixTime.h> #include <ApplCommon/StationInfo.h> #include <MACIO/GCF_Event.h> @@ -31,6 +32,7 @@ #include <MACIO/KVT_Protocol.ph> #include <GCF/PVSS/GCF_PVTypes.h> #include <GCF/PVSS/PVSSresult.h> +#include <GCF/PVSS/PVSSinfo.h> #include <GCF/RTDB/DP_Protocol.ph> #include "PVSSGateway.h" #include "PVSSDatapointDefs.h" @@ -56,9 +58,13 @@ static PVSSGateway* thisPVSSGateway = 0; // PVSSGateway::PVSSGateway(const string& myName) : GCFTask((State)&PVSSGateway::initial, myName), - itsListener (0), - itsDPservice (0), - itsTimerPort (0) + itsListener (0), + itsDPservice (0), + itsTimerPort (0), + itsMsgBufTimer (0), + itsRemovalDelay (30.0), + itsFlushInterval(5.0), + itsMaxExpandSize(10) { LOG_DEBUG_STR("PVSSGateway(" << myName << ")"); LOG_INFO(Version::getInfo<CURTDBDaemonsVersion>("PVSSGateway")); @@ -66,6 +72,10 @@ PVSSGateway::PVSSGateway(const string& myName) : registerProtocol(DP_PROTOCOL, DP_PROTOCOL_STRINGS); registerProtocol(KVT_PROTOCOL, KVT_PROTOCOL_STRINGS); + itsRemovalDelay = globalParameterSet()->getDouble ("removalDelay", 30.0); + itsFlushInterval = globalParameterSet()->getDouble ("flushInterval", 5.0); + itsMaxExpandSize = globalParameterSet()->getInt32 ("expandSize", 100); + // initialize the ports itsListener = new GCFTCPPort(*this, MAC_SVCMASK_PVSSGATEWAY, GCFPortInterface::MSPP, KVT_PROTOCOL); ASSERTSTR(itsListener, "Can't allocate a listener port"); @@ -220,7 +230,8 @@ GCFEvent::TResult PVSSGateway::operational(GCFEvent& event, GCFPortInterface& p switch (event.signal) { case F_ENTRY: itsPropertySet->setValue(PN_FSM_CURRENT_ACTION, GCFPVString("Active")); - itsTimerPort->setTimer(1.0, 5.0); + itsTimerPort->setTimer(1.0, itsFlushInterval); + itsMsgBufTimer = itsTimerPort->setTimer(0.5, 0.2); break; // Catch incoming connections of new clients @@ -234,8 +245,7 @@ GCFEvent::TResult PVSSGateway::operational(GCFEvent& event, GCFPortInterface& p client->init(*this, "application", GCFPortInterface::SPP, KVT_PROTOCOL); itsListener->accept(*client); } - } - break; + } break; case F_CONNECTED: break; @@ -261,20 +271,19 @@ GCFEvent::TResult PVSSGateway::operational(GCFEvent& event, GCFPortInterface& p port.close(); itsClients.erase(&port); itsClientsGarbage.push_back(&port); - } - break; + } break; case F_TIMER: { - // cleanup the garbage of closed ports to master clients - GCFPortInterface* pPort; - for (TClients::iterator iter = itsClientsGarbage.begin(); - iter != itsClientsGarbage.end(); ++iter) { - pPort = *iter; - delete pPort; + GCFTimerEvent& timerEvent=static_cast<GCFTimerEvent&>(event); + if (timerEvent.id == itsMsgBufTimer) { // fast, multiple times per second + _processMsgBuffer(); } - itsClientsGarbage.clear(); - } - break; + else { // slow timer, once every few seconds + _garbageCollection(); + _flushValueCache(); + _cleanValueCache(); + } + } break; case KVT_REGISTER: { KVTRegisterEvent registerEvent(event); @@ -284,13 +293,12 @@ GCFEvent::TResult PVSSGateway::operational(GCFEvent& event, GCFPortInterface& p answer.obsID = registerEvent.obsID; answer.name = registerEvent.name; port.send(answer); - } - break; + } break; case KVT_SEND_MSG: { KVTSendMsgEvent logEvent(event); LOG_DEBUG_STR("Received: " << logEvent); - bool sendOk(_addKVT(logEvent.kvp)); + bool sendOk(_add2MsgBuffer(logEvent.kvp)); itsClients[&port].msgCnt++; if (logEvent.seqnr > 0) { @@ -299,16 +307,14 @@ GCFEvent::TResult PVSSGateway::operational(GCFEvent& event, GCFPortInterface& p answer.result = !sendOk; port.send(answer); } - } - break; - + } break; case KVT_SEND_MSG_POOL: { KVTSendMsgPoolEvent logEvent(event); LOG_DEBUG_STR("Received: " << logEvent); bool sendOk(true); - for (uint32 i = 0; i < logEvent.kvps.size(); i++) { - sendOk &= _addKVT(logEvent.kvps[i]); + for (size_t i = 0; i < logEvent.kvps.size(); i++) { + sendOk &= _add2MsgBuffer(logEvent.kvps[i]); } itsClients[&port].msgCnt += logEvent.kvps.size(); if (logEvent.seqnr > 0) { @@ -317,8 +323,20 @@ GCFEvent::TResult PVSSGateway::operational(GCFEvent& event, GCFPortInterface& p answer.result = !sendOk; // bool -> int: 0=Ok port.send(answer); } - } - break; + } break; + + case DP_GET: { + DPGetEvent dpgEvent(event); + string DPname(PVSSinfo::getDPbasename(dpgEvent.DPname)); // strip off DBname + LOG_DEBUG_STR("Add to ValueCache: " << DPname); + dynArr_t DA; + DA.lastModify = NsTimestamp::now(); + DA.lastFlush = DA.lastModify; + DA.valArr = (GCFPVDynArr*)(dpgEvent.value._pValue->clone()); + DA.valType = (TMACValueType) (dpgEvent.value._pValue->getType() & ~LPT_DYNARR); + itsValueCache[DPname] = DA; + _adoptRequestPool(DPname); + } break; default: status = GCFEvent::NOT_HANDLED; @@ -409,14 +427,196 @@ PVSS::TMACValueType PVSSGateway::_KVpairType2PVSStype(int kvpType) } // -// _addKVT(key, value, time) +// _writeKVT(key, value, time) // -bool PVSSGateway::_addKVT(const KVpair& kvp) +bool PVSSGateway::_writeKVT(const KVpair& kvp) { PVSSresult result = itsDPservice->setValue(kvp.first, kvp.second, _KVpairType2PVSStype(kvp.valueType), kvp.timestamp, true); return (result == SA_NO_ERROR); } +// ---------------------------------------- MSGbuffer administration ---------------------------------------- +// +// _add2MsgBuffer(KVpair) +// +bool PVSSGateway::_add2MsgBuffer(const KVpair& kvp) +{ + if ((kvp.first.find('[')!=string::npos) && (kvp.first.find(']')!=string::npos)) { + itsMsgBuffer.push(kvp); + return (true); + } + + return (_writeKVT(kvp)); +} + + +// +// _processMsgBuffer() +// +// process all KV events that are in the MsgBuffer at the moment. +// +void PVSSGateway::_processMsgBuffer() +{ + while (!itsMsgBuffer.empty()) { + KVpair kvp = itsMsgBuffer.front(); + itsMsgBuffer.pop(); + // plain variable? + string::size_type pos = kvp.first.find('['); + if (pos == string::npos) { + _writeKVT(kvp); // just write and forget. + break; + } + // its an dynarray. + string keyName(kvp.first.substr(0,pos)); + VCiter cacheIter = itsValueCache.find(keyName); + // seen it before? + if (cacheIter == itsValueCache.end()) { + // no, get the value from PVSS if not already requested.... + if (itsRequestBuffer.find(keyName) == itsRequestBuffer.end()) { + itsDPservice->getValue(keyName); + } + // park valuechange. + itsRequestBuffer.insert(make_pair(keyName, kvp)); + } + else { // update the element in the valueBuffer + string::size_type epos = kvp.first.find(']', pos); + if (epos == string::npos) { + LOG_ERROR_STR("Ill formatted key will not be written: " << kvp.first); + break; + } + int index = atoi(kvp.first.substr(pos+1,epos).c_str()); + _setIndexedValue(keyName, index, kvp.second); + } + } +} + +// ---------------------------------------- requestPool administration ---------------------------------------- +// +// _adoptRequestPool(dpgEvent.DPname); +// +void PVSSGateway::_adoptRequestPool(const string& DPname) +{ + LOG_DEBUG_STR("_adoptRequestPool(" << DPname << ")"); + multimap<string,KVpair>::iterator iter = itsRequestBuffer.begin(); + multimap<string,KVpair>::iterator end = itsRequestBuffer.end(); + while (iter != end) { + if (iter->first == DPname) { + string::size_type pos = iter->second.first.find('['); + string::size_type epos = iter->second.first.find(']', pos); + int index = atoi(iter->second.first.substr(pos+1,epos).c_str()); + _setIndexedValue(DPname, index, iter->second.second); + + multimap<string,KVpair>::iterator obsolete = iter; + ++iter; + itsRequestBuffer.erase(obsolete); + } + else { + ++iter; + } + } +} + +// ---------------------------------------- valueCache administration ---------------------------------------- +// +// _setIndexedValue(keyname, index, value) +// +bool PVSSGateway::_setIndexedValue(const string& keyName, uint index, const string& value) +{ + // search the requested dynArray + VCiter cacheIter = itsValueCache.find(keyName); + if (cacheIter == itsValueCache.end()) { + LOG_ERROR_STR(keyName << " not in valueCache! Cannot set element " << index << " to " << value); + return (false); + } + + // check its size + // Note: Normally you would not allow updating elements that are beyond the size of an array but the problem is that + // the database might contain new DP's that are still empty, so these could never be updated. Therefor we MUST + // allow writing values outside the current size of the dynArray. Since updates come in randomly we cannot grow + // the dynArray element by element (e.g. index 23 may occure before index 4). + // Solution: To protect the dynArray against unlimited grow (when a faulthy index was received) the maxsize of a dynArray + // is limited to itsMaxExpandSize (userdefined). When the current dynArray size is smaller than the requested + // index the dynarray is extended with elements till is can handle the index (limited to itsMaxExpandSize) + // Note: The implementation still allows updates of dynArrays that have more than itsMaxExpandSize elements when the + // database already contained more than itsMaxExpandSize elements, it just limits the creation of new elements. + if (index >= cacheIter->second.valArr->count()) { + if (index > itsMaxExpandSize) { + LOG_ERROR_STR(keyName << " has " << cacheIter->second.valArr->count() << " elements, grow to " << index << " is not allowed"); + return (false); + } + // add 'empty' elements till the index-th element fits. + for (int i = index - cacheIter->second.valArr->count(); i >= 0; --i) { + cacheIter->second.valArr->push_back(GCFPValue::createMACTypeObject(cacheIter->second.valType)); + } + } + + // finally update the value + (cacheIter->second.valArr->getValue()[index])->setValue(value); + cacheIter->second.lastModify = NsTimestamp::now(); + LOG_DEBUG_STR("ValueCache: " << cacheIter->first << "[" << index << "]=" << value); + return(true); +} + +// +// _flushValueCache() +// +void PVSSGateway::_flushValueCache() +{ + double now(NsTimestamp::now()); + VCiter iter = itsValueCache.begin(); + VCiter end = itsValueCache.end(); + while (iter != end) { + if (iter->second.lastModify > iter->second.lastFlush) { + itsDPservice->setValue(iter->first, *(iter->second.valArr)); + iter->second.lastFlush = now; + LOG_DEBUG_STR("ValueCache: flushed " << iter->first); + } + ++iter; + } + +} + +// +// _cleanValueCache() +// +void PVSSGateway::_cleanValueCache() +{ + double obsoleteTime = NsTimestamp::now() - itsRemovalDelay; + VCiter iter = itsValueCache.begin(); + VCiter end = itsValueCache.end(); + while (iter != end) { + if (iter->second.lastModify < obsoleteTime) { + VCiter expired = iter; + ++iter; + LOG_DEBUG_STR("ValueCache: remove " << expired->first); + itsValueCache.erase(expired); + } + else { + ++iter; + } + } +} + +// ---------------------------------------- client socket administration ---------------------------------------- +// +// _garbageCollection() +// +void PVSSGateway::_garbageCollection() +{ + // cleanup the garbage of closed ports to master clients + if (itsClientsGarbage.empty()) { + return; + } + + LOG_DEBUG_STR("_garbageCollection:" << itsClientsGarbage.size()); + GCFPortInterface* pPort; + for (TClients::iterator iter = itsClientsGarbage.begin(); + iter != itsClientsGarbage.end(); ++iter) { + pPort = *iter; + delete pPort; + } + itsClientsGarbage.clear(); +} } // namespace RTDBDaemons } // namespace GCF diff --git a/MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSGateway.conf b/MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSGateway.conf new file mode 100644 index 0000000000000000000000000000000000000000..c88c72b4611f385a6bb6380e97a71957659f9f89 --- /dev/null +++ b/MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSGateway.conf @@ -0,0 +1,20 @@ +# +# PVSSGateway.conf +# + +# The PVSSGateway allows updating of a single element of a dynArray DataPoint. To support this the PVSSGateway +# temporarely stores the dynArrays in memory and collects the element changes before flushing it to the database +# again. There are three parameters to control the characteristics of this feature. +# Note: updates of non-dynarray values are always applied immediately to the database. + +# To prevent unlimited need of memory the variables are deleted from memory when no updates are received +# anymore for a certain amount of time. Default dynArrays are removed after 30 seconds of inactivity. +removalDelay = 30 + +# A DynArray is not written to the database after each element modification to prevent overloading the database. +# Default each dynArray is updated to the database once every 5 seconds. +flushInterval = 5 + +# Max number of elements a dynArray may be expanded to when the element-index of the update request is +# larger than the dynArray size in the database. Default value 100. +expandSize = 100 diff --git a/MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSGateway.h b/MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSGateway.h index 9d3814447d7742fd9af0100ee172be1d1733ad0d..b93fb1187156db91037b276562dc927c323c467e 100644 --- a/MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSGateway.h +++ b/MAC/APL/CURTDBDaemons/src/PVSSGateway/PVSSGateway.h @@ -28,18 +28,22 @@ //# Never #include <config.h> or #include <lofar_config.h> in a header file! //# Includes +#include <queue> #include <Common/LofarLogger.h> #include <Common/lofar_map.h> #include <Common/lofar_list.h> #include <Common/KVpair.h> #include <MACIO/GCF_Event.h> #include <GCF/TM/GCF_Control.h> +#include <GCF/PVSS/GCF_Defines.h> +#include <GCF/PVSS/GCF_PVDynArr.h> #include <GCF/RTDB/RTDB_PropertySet.h> #include <GCF/RTDB/DPservice.h> // Avoid 'using namespace' in headerfiles namespace LOFAR { + using std::queue; using MACIO::GCFEvent; namespace GCF { namespace RTDBDaemons { @@ -71,16 +75,27 @@ private: void finish(); // helper methods - void _registerClient (TM::GCFPortInterface& port, const string& name, uint32 obsID); - void _registerFailure(TM::GCFPortInterface& port); - bool _addKVT (const KVpair& kvp); + void _registerClient (TM::GCFPortInterface& port, const string& name, uint32 obsID); + void _registerFailure (TM::GCFPortInterface& port); + bool _writeKVT (const KVpair& kvp); + bool _setIndexedValue (const string& keyname, uint index, const string& value); + bool _add2MsgBuffer (const KVpair& kvp); + void _processMsgBuffer (); + void _adoptRequestPool (const string& DPname); + void _flushValueCache (); + void _cleanValueCache (); + void _garbageCollection (); PVSS::TMACValueType _KVpairType2PVSStype(int kvpType); - // data members + // ----- data members ----- TM::GCFTCPPort* itsListener; // application inpt RTDB::DPservice* itsDPservice; // connection to PVSS TM::GCFTimerPort* itsTimerPort; // timer - RTDB::RTDBPropertySet* itsPropertySet; + RTDB::RTDBPropertySet* itsPropertySet; // for updating my own state + uint itsMsgBufTimer; // fast timer for processing the MsgBuffer + double itsRemovalDelay; // time in s after which cached values are considered obsolete. + double itsFlushInterval; // time in s between database flushes of modified dynArrays + uint itsMaxExpandSize; // max size to expand an dynarray to. typedef map<TM::GCFPortInterface*, LogClient> LogClientMap; LogClientMap itsClients; @@ -89,6 +104,21 @@ private: typedef list<TM::GCFPortInterface*> TClients; TClients itsClientsGarbage; + // temp storage + queue<KVpair> itsMsgBuffer; // mutex protected queue for user delivered KVpairs + + // value cache for caching updates of dynarr changes + typedef struct dynArr_t { + double lastModify; + double lastFlush; + PVSS::GCFPVDynArr* valArr; + PVSS::TMACValueType valType; + dynArr_t() : lastModify(0.0), lastFlush(0.0),valArr(0) {}; + } dynArr_t; + typedef map<string,dynArr_t>::iterator VCiter; + map<string,dynArr_t> itsValueCache; // cache with retrieved dynarray variables + + multimap<string,KVpair> itsRequestBuffer; // temp. stack for storing 'get PVSS value' requests }; } // namespace RTDBDaemons diff --git a/MAC/APL/CURTDBDaemons/test/CMakeLists.txt b/MAC/APL/CURTDBDaemons/test/CMakeLists.txt index 71da9900b04d2acc238e705594d4ba7a2d1d291e..3033d3cf78c0c71969caa1a04ac992f500b49d7e 100644 --- a/MAC/APL/CURTDBDaemons/test/CMakeLists.txt +++ b/MAC/APL/CURTDBDaemons/test/CMakeLists.txt @@ -3,4 +3,5 @@ include(LofarCTest) lofar_add_test(tLoggingProcessor tLoggingProcessor.cc) +lofar_add_test(tPVSSGateway tPVSSGateway.cc) #lofar_add_test(tKeyValueLogger tKeyValueLogger.cc) diff --git a/MAC/APL/CURTDBDaemons/test/tPVSSGateway.cc b/MAC/APL/CURTDBDaemons/test/tPVSSGateway.cc new file mode 100644 index 0000000000000000000000000000000000000000..63d3ba9220511543306f76b533b5131849333510 --- /dev/null +++ b/MAC/APL/CURTDBDaemons/test/tPVSSGateway.cc @@ -0,0 +1,47 @@ +//# tPVSSGateway.cc +//# +//# Copyright (C) 2014 +//# ASTRON (Netherlands Foundation for Research in Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +//# +//# This program is free software; you can redistribute it and/or modify +//# it under the terms of the GNU General Public License as published by +//# the Free Software Foundation; either version 2 of the License, or +//# (at your option) any later version. +//# +//# This program is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License +//# along with this program; if not, write to the Free Software +//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +//# +//# $Id: tProtocol.cc 15266 2010-03-18 08:52:04Z overeem $ + +//# Always #include <lofar_config.h> first! +#include <lofar_config.h> + +//# Includes +#include <Common/LofarLogger.h> +#include <MACIO/RTmetadata.h> + +using namespace LOFAR; +using namespace LOFAR::MACIO; + +int main (int argc, char* argv[]) +{ + if (argc < 3) { + cout << "Syntax: tPVSSGateway DPname value"; + return (1); + } + + INIT_LOGGER(argv[0]); + + RTmetadata myLogger(3125, "ObservationControl", "localhost"); + + myLogger.log(argv[1], strtod(argv[2],NULL)); + return (0); +} +