diff --git a/MAC/GCF/_PAL/PA/pvss/scripts/libs/gcfpa-cwd.ctl b/MAC/GCF/_PAL/PA/pvss/scripts/libs/gcfpa-cwd.ctl index ec025b746ae6ec81714ed9542d72225fb48701ed..e6cbd5c1ef42efaf75936f17645ce887241b6ed2 100644 --- a/MAC/GCF/_PAL/PA/pvss/scripts/libs/gcfpa-cwd.ctl +++ b/MAC/GCF/_PAL/PA/pvss/scripts/libs/gcfpa-cwd.ctl @@ -113,8 +113,6 @@ void connectionsChanged(string dp, dyn_uint value, string manType) string sysNr = getSystemId(dpSubStr(dp, DPSUB_SYS)); dyn_string msg; - string manString; - dyn_uint deletedLocalManNums; int i, j; bool manNumFound = false; dyn_string newItem; @@ -122,7 +120,9 @@ void connectionsChanged(string dp, dyn_uint value, string manType) { for (j = 1; j <= dynlen(gConnManList); j++) { - if (gConnManList[j][1] == sysNr && gConnManList[j][2] == manType && value[i] == gConnManList[j][3]) + if (gConnManList[j][1] == sysNr && + gConnManList[j][2] == manType && + value[i] == gConnManList[j][3]) { manNumFound = true; break; @@ -141,7 +141,9 @@ void connectionsChanged(string dp, dyn_uint value, string manType) } for (i = 1; i <= dynlen(gConnManList); i++) { - if (gConnManList[i][1] == sysNr && gConnManList[i][2] == manType && !dynContains(value, gConnManList[i][3])) + if (gConnManList[i][1] == sysNr && + gConnManList[i][2] == manType && + !dynContains(value, gConnManList[i][3])) { // a (remote) manager is disconnected from PVSS so inform the local property agent msg = "d" + sysNr + ":" + manType + ":" + gConnManList[i][3] + ":"; diff --git a/MAC/GCF/_PAL/PI/src/GPI_PropertySet.cc b/MAC/GCF/_PAL/PI/src/GPI_PropertySet.cc index 7c6122363fa1219dcb15afb8fc81bc13afc41408..817f97c7314333694d1958925add599797ec0a9c 100644 --- a/MAC/GCF/_PAL/PI/src/GPI_PropertySet.cc +++ b/MAC/GCF/_PAL/PI/src/GPI_PropertySet.cc @@ -30,6 +30,11 @@ TPIResult convertPAToPIResult(TPAResult result); TPAResult convertPIToPAResult(TPIResult result); +GPIPropertySet::~GPIPropertySet() +{ + unsubscribeAllProps(); +} + void GPIPropertySet::propSubscribed(const string& /*propName*/) { assert(_state == S_LINKING || _state == S_DELAYED_DISABLING); @@ -161,17 +166,7 @@ void GPIPropertySet::disable(const PIUnregisterScopeEvent& requestIn) switch (_state) { case S_LINKED: - for (list<string>::iterator iter = _propsSubscribed.begin(); - iter != _propsSubscribed.end(); ++iter) - { - string fullName; - assert(_scope.length() > 0); - fullName = _scope + GCF_PROP_NAME_SEP + *iter; - if (GCFPVSSInfo::propExists(fullName)) - { - unsubscribeProp(fullName); - } - } + unsubscribeAllProps(); // intentional fall through case S_ENABLED: if (_savedSeqnr == 0) @@ -222,6 +217,7 @@ void GPIPropertySet::disabled(const PAScopeUnregisteredEvent& responseIn) { case S_DISABLING: { + responseOut.result = convertPAToPIResult(responseIn.result); _state = S_DISABLED; break; @@ -390,27 +386,7 @@ void GPIPropertySet::unlinkPropSet(const PAUnlinkPropSetEvent& requestIn) case S_LINKED: { _state = S_UNLINKING; - TPAResult result(PA_NO_ERROR); - for (list<string>::iterator iter = _propsSubscribed.begin(); - iter != _propsSubscribed.end(); ++iter) - { - string fullName; - if (_scope.length() > 0) - { - fullName = _scope + GCF_PROP_NAME_SEP + *iter; - } - else - { - fullName = *iter; - } - if (GCFPVSSInfo::propExists(fullName)) - { - if (unsubscribeProp(fullName) != GCF_NO_ERROR) - { - result = PA_PI_INTERNAL_ERROR; - } - } - } + TPAResult result = unsubscribeAllProps(); if (result == PA_NO_ERROR) { PIUnlinkPropSetEvent requestOut; @@ -477,6 +453,34 @@ void GPIPropertySet::propSetUnlinkedInPI(TPAResult result) sendMsgToPA(response); } + +TPAResult GPIPropertySet::unsubscribeAllProps() +{ + TPAResult result(PA_NO_ERROR); + for (list<string>::iterator iter = _propsSubscribed.begin(); + iter != _propsSubscribed.end(); ++iter) + { + string fullName; + if (_scope.length() > 0) + { + fullName = _scope + GCF_PROP_NAME_SEP + *iter; + } + else + { + fullName = *iter; + } + if (GCFPVSSInfo::propExists(fullName)) + { + if (unsubscribeProp(fullName) != GCF_NO_ERROR) + { + result = PA_PI_INTERNAL_ERROR; + } + } + } + _propsSubscribed.clear(); + return result; +} + void GPIPropertySet::wrongState(const char* request) { const char* stateString[] = diff --git a/MAC/GCF/_PAL/PI/src/GPI_PropertySet.h b/MAC/GCF/_PAL/PI/src/GPI_PropertySet.h index 2b08fd23bc83bd3bdf8f9dfc128cd5d19b266f7f..4a9a8d6f1221a8eaa75bde5c3ac45ad44825f544 100644 --- a/MAC/GCF/_PAL/PI/src/GPI_PropertySet.h +++ b/MAC/GCF/_PAL/PI/src/GPI_PropertySet.h @@ -50,7 +50,7 @@ class GPIPropertySet : public GCFPropertyProxy _tmpPIResult(PI_NO_ERROR) {} - virtual ~GPIPropertySet() { } + virtual ~GPIPropertySet(); // handling the enable request of a PI client void enable(const PIRegisterScopeEvent& requestIn); @@ -91,6 +91,7 @@ class GPIPropertySet : public GCFPropertyProxy void propValueChanged(const string& propName, const GCFPValue& value); private: //helper methods + TPAResult unsubscribeAllProps(); void sendMsgToPA(GCFEvent& msg); void sendMsgToClient(GCFEvent& msg); void wrongState(const char* request); diff --git a/MAC/GCF/_PAL/PML/src/GCF_Property.cc b/MAC/GCF/_PAL/PML/src/GCF_Property.cc index 76e3fd42e656e0f262d9f354492feaaa94d821e2..49f7ba255c98a6a6597d86c4ca7769469b1d1e3e 100644 --- a/MAC/GCF/_PAL/PML/src/GCF_Property.cc +++ b/MAC/GCF/_PAL/PML/src/GCF_Property.cc @@ -40,6 +40,10 @@ GCFProperty::GCFProperty (const TPropertyInfo& propInfo, GCFPropertySet* pProper GCFProperty::~GCFProperty() { assert (_pPropertySet == 0); + if (exists()) // can already be deleted by the Property Agent + { + unsubscribe(); + } delete _pPropService; _pPropService = 0; } diff --git a/MAC/GCF/_PAL/PML/src/GCF_PropertySet.cc b/MAC/GCF/_PAL/PML/src/GCF_PropertySet.cc index c0c1a5cba703105bd227fdf876ef78c64bb60d57..3e9fe505fcf01fe5da4038d8057e0c97618e6104 100644 --- a/MAC/GCF/_PAL/PML/src/GCF_PropertySet.cc +++ b/MAC/GCF/_PAL/PML/src/GCF_PropertySet.cc @@ -241,7 +241,8 @@ void GCFPropertySet::dispatchAnswer(unsigned short sig, TGCFResult result) if (_pAnswerObj != 0) { GCFPropSetAnswerEvent e(sig); - e.pScope = getScope().c_str(); + string fullScope(getFullScope()); + e.pScope = fullScope.c_str(); e.result = result; _pAnswerObj->handleAnswer(e); } diff --git a/MAC/GCF/_PAL/SAL/src/GSA_PortService.cc b/MAC/GCF/_PAL/SAL/src/GSA_PortService.cc index 5ff29d68aa9153f4ce1961144b2383ec9545ddf8..44a120ea227d3515daeff7b1a4a381cfe502dc6f 100644 --- a/MAC/GCF/_PAL/SAL/src/GSA_PortService.cc +++ b/MAC/GCF/_PAL/SAL/src/GSA_PortService.cc @@ -46,13 +46,19 @@ void GSAPortService::start () assert(!_isSubscribed); if (GCFPVSSInfo::propExists(_port.getPortAddr())) { - if (dpeSubscribe(_port.getPortAddr()) != SA_NO_ERROR) _port.serviceStarted(false); + if (dpeSubscribe(_port.getPortAddr()) != SA_NO_ERROR) + { + _port.serviceStarted(false); + } } else { string portAddr = _port.getPortAddr(); portAddr.erase(0, GCFPVSSInfo::getLocalSystemName().length() + 1); - if (dpCreate(portAddr, "GCFDistPort") != SA_NO_ERROR) _port.serviceStarted(false); + if (dpCreate(portAddr, "GCFDistPort") != SA_NO_ERROR) + { + _port.serviceStarted(false); + } } } @@ -99,7 +105,10 @@ ssize_t GSAPortService::recv (void* buf, size_t count) void GSAPortService::dpCreated(const string& dpName) { assert(dpName.find(_port.getPortAddr()) < dpName.length()); - if (dpeSubscribe(_port.getPortAddr()) != SA_NO_ERROR) _port.serviceStarted(false); + if (dpeSubscribe(_port.getPortAddr()) != SA_NO_ERROR) + { + _port.serviceStarted(false); + } } void GSAPortService::dpDeleted(const string& /*dpName*/) diff --git a/MAC/GCF/_PAL/SAL/test/Echo.cc b/MAC/GCF/_PAL/SAL/test/Echo.cc index bd55c202ab68be1b041c10d38f6340b52a354970..3147b56d023ec03787e3a84054ddc5b2ebbf915d 100644 --- a/MAC/GCF/_PAL/SAL/test/Echo.cc +++ b/MAC/GCF/_PAL/SAL/test/Echo.cc @@ -31,7 +31,7 @@ using std::cout; using std::endl; -Echo::Echo(string name) : GCFTask((State)&Echo::initial, name) +Echo::Echo(string name) : GCFTask((State)&Echo::initial, name) , _pService(0) { // register the protocol for debugging purposes registerProtocol(ECHO_PROTOCOL, ECHO_PROTOCOL_signalnames); @@ -88,8 +88,11 @@ GCFEvent::TResult Echo::connected(GCFEvent& e, GCFPortInterface& p) switch (e.signal) { case F_DISCONNECTED: - service.dpDelete(propName); - service.dpDelete(propName + "_test"); + if (_pService) + { + _pService->dpDelete(propName); + _pService->dpDelete(propName + "_test"); + } cout << "Lost connection to client" << endl; p.close(); break; @@ -104,67 +107,69 @@ GCFEvent::TResult Echo::connected(GCFEvent& e, GCFPortInterface& p) switch (ping.seqnr % 13) { case 0: - service.dpCreate(propName, "ExampleDP_Bit"); + if (_pService) delete _pService; + _pService = new Service(); + _pService->dpCreate(propName, "ExampleDP_Bit"); break; case 1: - service.dpCreate(propName + "_test", "ExampleDP_Int"); + _pService->dpCreate(propName + "_test", "ExampleDP_Int"); break; case 2: - service.dpeSubscribe(propName); + _pService->dpeSubscribe(propName); break; case 3: - service.dpeSubscribe(propName + "_test"); + _pService->dpeSubscribe(propName + "_test"); break; case 4: - service.dpeGet(propName); + _pService->dpeGet(propName); break; case 5: { GCFPVBool wrongTestVal(true); - service.dpeSet(propName + "_test", wrongTestVal); + _pService->dpeSet(propName + "_test", wrongTestVal); GCFPVInteger goodTestVal(1000); - service.dpeSet(propName + "_test", goodTestVal); + _pService->dpeSet(propName + "_test", goodTestVal); break; } case 6: - service.dpeUnsubscribe(propName + "_test1"); - service.dpeUnsubscribe(propName + "_test"); - service.dpeUnsubscribe(propName); + _pService->dpeUnsubscribe(propName + "_test1"); + _pService->dpeUnsubscribe(propName + "_test"); + _pService->dpeUnsubscribe(propName); break; case 7: { GCFPVInteger testVal(2000); - service.dpeSet(propName + "_test", testVal); + _pService->dpeSet(propName + "_test", testVal); break; } case 8: { - service.dpeSubscribe(propName); - service.dpeUnsubscribe(propName); + _pService->dpeSubscribe(propName); + _pService->dpeUnsubscribe(propName); GCFPVBool testVal(true); - service.dpeSet(propName, testVal); + _pService->dpeSet(propName, testVal); break; } case 9: - service.dpeSubscribe(propName); + _pService->dpeSubscribe(propName); break; case 10: - service.dpeGet(propName); - service.dpeUnsubscribe(propName); + _pService->dpeGet(propName); + _pService->dpeUnsubscribe(propName); break; case 11: { - service.dpeSubscribe(propName); - service.dpeUnsubscribe(propName); - service.dpeSubscribe(propName); + _pService->dpeSubscribe(propName); + _pService->dpeUnsubscribe(propName); + _pService->dpeSubscribe(propName); GCFPVBool testVal(false); - service.dpeSet(propName, testVal); + _pService->dpeSet(propName, testVal); break; } case 12: - service.dpDelete(propName); - service.dpDelete(propName + "_test"); - break; + _pService->dpDelete(propName); + _pService->dpDelete(propName + "_test"); + break; } diff --git a/MAC/GCF/_PAL/SAL/test/Echo.h b/MAC/GCF/_PAL/SAL/test/Echo.h index e2afdcf37a2ffb699cb3af02c436b1a1146a6ae1..611d79020e36ab2053110929e90655a67b4fe94b 100644 --- a/MAC/GCF/_PAL/SAL/test/Echo.h +++ b/MAC/GCF/_PAL/SAL/test/Echo.h @@ -70,7 +70,7 @@ class Echo : public GCFTask * are sent through the server port. */ GCFPort server; - Service service; + Service* _pService; }; #endif diff --git a/MAC/GCF/_PALlight/CEP-PMLlight/src/CEPPropertySet.cc b/MAC/GCF/_PALlight/CEP-PMLlight/src/CEPPropertySet.cc index ee4b45cc4c5e890186aad90eaf45ecb03df87da5..1f3f641c7138722cb3e78fe3edf81b8df50a3141 100644 --- a/MAC/GCF/_PALlight/CEP-PMLlight/src/CEPPropertySet.cc +++ b/MAC/GCF/_PALlight/CEP-PMLlight/src/CEPPropertySet.cc @@ -71,10 +71,14 @@ CEPPropertySet::~CEPPropertySet () { assert(_pClient); // delete this set from the PIClient administration permanent - _pClient->unregisterScope(*this); + if (_state != S_DISABLED && _state != S_DISABLING) + { + _pClient->unregisterScope(*this, (_state == S_ENABLING)); + } + _pClient->deletePropSet(*this); + clearAllProperties(); - _pClient->deletePropSet(*this); PIClient::release(); _pClient = 0; } @@ -86,6 +90,8 @@ bool CEPPropertySet::enable () LOG_INFO(formatString ( "REQ: Enable property set '%s'", getScope().c_str())); + + _stateMutex.lock(); if (_state != S_DISABLED) { wrongState("enable"); @@ -107,6 +113,7 @@ bool CEPPropertySet::enable () result = GCF_SCOPE_ALREADY_REG; } } + _stateMutex.unlock(); return (result == GCF_NO_ERROR); } @@ -117,13 +124,17 @@ bool CEPPropertySet::disable () LOG_INFO(formatString ( "REQ: Disable property set '%s'", getScope().c_str())); + + _stateMutex.lock(); + switch (_state) { case S_LINKED: case S_ENABLED: + case S_ENABLING: assert(_pClient); - _pClient->unregisterScope(*this); + _pClient->unregisterScope(*this, (_state == S_ENABLING)); _state = S_DISABLING; break; @@ -132,11 +143,14 @@ bool CEPPropertySet::disable () result = GCF_WRONG_STATE; break; } + _stateMutex.unlock(); return (result == GCF_NO_ERROR); } void CEPPropertySet::scopeRegistered (bool succeed) { + _stateMutex.lock(); + assert(_state == S_ENABLING); LOG_INFO(formatString ( "PA-RESP: Property set '%s' is enabled%s", @@ -144,10 +158,14 @@ void CEPPropertySet::scopeRegistered (bool succeed) (succeed ? "" : " (with errors)"))); _state = (succeed ? S_ENABLED : S_DISABLED); + + _stateMutex.unlock(); } void CEPPropertySet::scopeUnregistered (bool succeed) { + _stateMutex.lock(); + assert(_state == S_DISABLING); LOG_INFO(formatString ( @@ -156,11 +174,16 @@ void CEPPropertySet::scopeUnregistered (bool succeed) (succeed ? "" : " (with errors)"))); _state = S_DISABLED; + + _stateMutex.unlock(); } void CEPPropertySet::linkProperties() { assert(_pClient); + + _stateMutex.lock(); + switch (_state) { case S_ENABLED: @@ -178,11 +201,15 @@ void CEPPropertySet::linkProperties() _pClient->propertiesLinked(getScope(), PI_WRONG_STATE); break; } + _stateMutex.unlock(); } void CEPPropertySet::unlinkProperties() { assert(_pClient); + + _stateMutex.lock(); + switch (_state) { case S_DISABLED: @@ -200,6 +227,31 @@ void CEPPropertySet::unlinkProperties() _pClient->propertiesUnlinked(getScope(), PI_WRONG_STATE); break; } + + _stateMutex.unlock(); +} + +void CEPPropertySet::connectionLost() +{ + assert(_pClient); + _stateMutex.lock(); + + switch (_state) + { + case S_DISABLED: + break; + + case S_DISABLING: + _state = S_DISABLED; + break; + + default: + _pClient->deletePropSet(*this); + _pClient->registerScope(*this); + _state = S_ENABLING; + break; + } + _stateMutex.unlock(); } CEPProperty* CEPPropertySet::getProperty (const string& propName) const @@ -284,12 +336,12 @@ bool CEPPropertySet::exists (const string& propName) const return (pProperty != 0); } -void CEPPropertySet::valueSet(const string& propName, const GCFPValue& value) const +void CEPPropertySet::valueSet(const string& propName, const GCFPValue& value) { assert(_pClient); // a user has changed the property value and monitoring is switched on // changed value will be forward to the PIClient - _pClient->valueSet(propName, value); + _pClient->valueSet(*this, propName, value); } void CEPPropertySet::addProperty(const string& propName, CEPProperty& prop) diff --git a/MAC/GCF/_PALlight/CEP-PMLlight/src/PIClient.cc b/MAC/GCF/_PALlight/CEP-PMLlight/src/PIClient.cc index ff7d1b9624f39a2ded7256709f22151e00430be2..228e01ae58e7f803183fa1ec269f2ad10508f473 100644 --- a/MAC/GCF/_PALlight/CEP-PMLlight/src/PIClient.cc +++ b/MAC/GCF/_PALlight/CEP-PMLlight/src/PIClient.cc @@ -37,6 +37,13 @@ namespace LOFAR { PIClient* PIClient::_pInstance = 0; +bool operator == (const PIClient::TAction& a, const PIClient::TAction& b) +{ + return ((a.pPropSet == b.pPropSet) && + (a.eventID == b.eventID) && + (a.extraData == b.extraData)); +} + void logResult(TPIResult result, CEPPropertySet& propSet); PIClient::PIClient() : @@ -129,9 +136,9 @@ void PIClient::run() } } while (retry); - + bool connected(false); do - { + { retry = false; try { @@ -139,6 +146,7 @@ void PIClient::run() // real synchronous connect with the Property Interface if (_dhPIClient.init()) { + connected = true; LOG_DEBUG("Connected to PropertyInterface of MAC"); } else @@ -183,23 +191,36 @@ void PIClient::run() e.what())); retry = true; - _propSetMutex.lock(); - for (TMyPropertySets::iterator iter = _myPropertySets.begin(); - iter != _myPropertySets.end(); ++iter) - { - iter->second->scopeUnregistered(false); - } - TMyPropertySets tempMyPropertySets(_myPropertySets); - _myPropertySets.clear(); - - for (TMyPropertySets::iterator iter = tempMyPropertySets.begin(); - iter != tempMyPropertySets.end(); ++iter) + if (connected) { - iter->second->enable(); + _bufferMutex.lock(); + for (TBufferedActions::iterator iter = _bufferedActions.begin(); + iter != _bufferedActions.end(); ++iter) + { + delete [] iter->extraData; + } + for (TBufferedActions::iterator iter = _bufferedValues.begin(); + iter != _bufferedValues.end(); ++iter) + { + delete [] iter->extraData; + } + _bufferedActions.clear(); + _bufferedValues.clear(); + _bufferMutex.unlock(); + + _propSetMutex.lock(); + TMyPropertySets tempMyPropertySets(_myPropertySets); + _startedSequences.clear(); + _propSetMutex.unlock(); + + for (TMyPropertySets::iterator iter = tempMyPropertySets.begin(); + iter != tempMyPropertySets.end(); ++iter) + { + iter->second->connectionLost(); + } + connected = false; + sleep(10); } - - _propSetMutex.unlock(); - sleep(10); } } while (retry); @@ -217,24 +238,35 @@ void PIClient::deletePropSet(const CEPPropertySet& propSet) { if (iter->second == &propSet) { - seqToDelete.push_back(iter->first); + iter->second = 0; } } - for (list<uint16>::iterator iter = seqToDelete.begin(); - iter != seqToDelete.end(); ++iter) - { - _startedSequences.erase(*iter); - } for (TBufferedActions::iterator iter = _bufferedActions.begin(); iter != _bufferedActions.end(); ++iter) { if (iter->pPropSet == &propSet) { + delete [] iter->extraData; iter = _bufferedActions.erase(iter); - if (iter != _bufferedActions.end()) break; + iter--; + } + } + for (TBufferedActions::iterator iter = _bufferedValues.begin(); + iter != _bufferedValues.end(); ++iter) + { + if (iter->pPropSet == &propSet) + { + delete [] iter->extraData; + iter = _bufferedValues.erase(iter); + iter--; } } _bufferMutex.unlock(); + _propSetMutex.lock(); + + _myPropertySets.erase(propSet.getScope()); + + _propSetMutex.unlock(); } bool PIClient::registerScope(CEPPropertySet& propSet) @@ -255,6 +287,7 @@ bool PIClient::registerScope(CEPPropertySet& propSet) _myPropertySets[propSet.getScope()] = &propSet; _propSetMutex.unlock(); + _bufferMutex.lock(); BlobOStream extraData(_extraDataBuf); extraData.clear(); _extraDataBuf.clear(); @@ -264,21 +297,45 @@ bool PIClient::registerScope(CEPPropertySet& propSet) extraData << (char) propSet.getCategory(); bufferAction(DH_PIProtocol::REGISTER_SCOPE, &propSet); + _bufferMutex.unlock(); } return succeed; } -void PIClient::unregisterScope(CEPPropertySet& propSet) +void PIClient::unregisterScope(CEPPropertySet& propSet, bool stillEnabling) { + // will be called from user thread context + _bufferMutex.lock(); BlobOStream extraData(_extraDataBuf); extraData.clear(); _extraDataBuf.clear(); extraData.putStart(0); extraData << propSet.getScope(); - - // will be called from user thread context - bufferAction(DH_PIProtocol::UNREGISTER_SCOPE, &propSet); + uint16 waitForSeqNr = 0; + if (stillEnabling) + { + + for (TStartedSequences::iterator iter = _startedSequences.begin(); + iter != _startedSequences.end(); ++iter) + { + if (iter->second == &propSet) + { + LOG_DEBUG(formatString( + "Unregister of '%s' will be send delayed (after compl. seq. %d)!", + propSet.getScope().c_str(), + iter->first)); + assert(iter->second->_state == CEPPropertySet::S_ENABLING); + waitForSeqNr = iter->first; + // response not needed to be forwarded to the prop. set. + iter->second = 0; + break; + } + } + + } + bufferAction(DH_PIProtocol::UNREGISTER_SCOPE, &propSet, waitForSeqNr); + _bufferMutex.unlock(); _propSetMutex.lock(); @@ -289,20 +346,23 @@ void PIClient::unregisterScope(CEPPropertySet& propSet) void PIClient::propertiesLinked(const string& scope, TPIResult result) { + // will be called in piClient thread context + _bufferMutex.lock(); BlobOStream extraData(_extraDataBuf); extraData.clear(); _extraDataBuf.clear(); extraData.putStart(0); extraData << scope; extraData << (uint16) result; - - // will be called in piClient thread context + bufferAction(DH_PIProtocol::PROPSET_LINKED, 0); + _bufferMutex.unlock(); } void PIClient::propertiesUnlinked(const string& scope, TPIResult result) { // will be called in piClient thread context + _bufferMutex.lock(); BlobOStream extraData(_extraDataBuf); extraData.clear(); _extraDataBuf.clear(); @@ -310,11 +370,13 @@ void PIClient::propertiesUnlinked(const string& scope, TPIResult result) extraData << scope; extraData << (uint16) result; bufferAction(DH_PIProtocol::PROPSET_UNLINKED, 0); + _bufferMutex.unlock(); } -void PIClient::valueSet(const string& propName, const GCFPValue& value) +void PIClient::valueSet(CEPPropertySet& propSet, const string& propName, const GCFPValue& value) { // will be called from user thread context + _bufferMutex.lock(); BlobOStream extraData(_extraDataBuf); extraData.clear(); _extraDataBuf.clear(); @@ -335,7 +397,8 @@ void PIClient::valueSet(const string& propName, const GCFPValue& value) value.pack(_valueBuf); extraData.put(_valueBuf, valSize); - bufferAction(DH_PIProtocol::VALUE_SET, 0); + bufferAction(DH_PIProtocol::VALUE_SET, &propSet); + _bufferMutex.unlock(); } void PIClient::scopeRegistered() @@ -344,22 +407,36 @@ void PIClient::scopeRegistered() CEPPropertySet* pPropertySet = _startedSequences[_dhPIClient.getSeqNr()]; _startedSequences.erase(_dhPIClient.getSeqNr()); - assert(pPropertySet); - - // unpacks the extra blob - BlobIStream& blob = _dhPIClient.getExtraBlob(); - uint16 result; - blob >> result; - _propSetMutex.lock(); - logResult((TPIResult) result, *pPropertySet); - if (result != PI_NO_ERROR) + _bufferMutex.lock(); + for (TBufferedActions::iterator iter = _bufferedActions.begin(); + iter != _bufferedActions.end(); ++iter) + { + if (iter->waitForSeqNr == _dhPIClient.getSeqNr()) + { + // buffered action, which waits for this response can now send + iter->waitForSeqNr = 0; + break; + } + } + _bufferMutex.unlock(); + if (pPropertySet) { + // unpacks the extra blob + BlobIStream& blob = _dhPIClient.getExtraBlob(); + uint16 result; + blob >> result; - _myPropertySets.erase(pPropertySet->getScope()); - - } - pPropertySet->scopeRegistered((result == PI_NO_ERROR)); - _propSetMutex.unlock(); + _propSetMutex.lock(); + logResult((TPIResult) result, *pPropertySet); + + if (result != PI_NO_ERROR) + { + _myPropertySets.erase(pPropertySet->getScope()); + } + pPropertySet->scopeRegistered((result == PI_NO_ERROR)); + _propSetMutex.unlock(); + } + // else it is deleted in the meanwhile } void PIClient::scopeUnregistered() @@ -369,17 +446,21 @@ void PIClient::scopeUnregistered() _startedSequences.erase(_dhPIClient.getSeqNr()); - assert(pPropertySet); - BlobIStream& blob = _dhPIClient.getExtraBlob(); - uint16 result; - blob >> result; - - _propSetMutex.lock(); - - logResult((TPIResult) result, *pPropertySet); - pPropertySet->scopeUnregistered((result == PI_NO_ERROR)); - - _propSetMutex.unlock(); + if (pPropertySet) + { + // unpacks the extra blob + BlobIStream& blob = _dhPIClient.getExtraBlob(); + uint16 result; + blob >> result; + + _propSetMutex.lock(); + + logResult((TPIResult) result, *pPropertySet); + pPropertySet->scopeUnregistered((result == PI_NO_ERROR)); + + _propSetMutex.unlock(); + } + // else it is deleted in the meanwhile } void PIClient::linkPropSet() @@ -443,13 +524,14 @@ void PIClient::unlinkPropSet() } void PIClient::bufferAction(DH_PIProtocol::TEventID event, - CEPPropertySet* pPropSet) + CEPPropertySet* pPropSet, + uint16 waitForSeqNr) { // will be called in different thread context TAction action; action.eventID = event; action.pPropSet = pPropSet; - //action.sent = false; + action.waitForSeqNr = waitForSeqNr; uint32 neededSize = _extraDataBuf.size() - sizeof(BlobHeader); // _extraDataBuf is set in the calling method @@ -457,11 +539,18 @@ void PIClient::bufferAction(DH_PIProtocol::TEventID event, memcpy(action.extraData, _extraDataBuf.getBuffer() + sizeof(BlobHeader), neededSize); action.extraDataSize = neededSize; - _bufferMutex.lock(); - - _bufferedActions.push_back(action); - - _bufferMutex.unlock(); + if (event == DH_PIProtocol::VALUE_SET) + { + if (_bufferedValues.size() > 1000) + { + _bufferedValues.pop_front(); + } + _bufferedValues.push_back(action); + } + else + { + _bufferedActions.push_back(action); + } } uint16 PIClient::startSequence(CEPPropertySet& propSet) @@ -484,12 +573,27 @@ uint16 PIClient::startSequence(CEPPropertySet& propSet) void PIClient::processOutstandingActions() { // will be called in piClient thread context - _bufferMutex.lock(); bool sent(true); - CEPPropertySet* pPropSet; - for (TBufferedActions::iterator iter = _bufferedActions.begin(); - iter != _bufferedActions.end() && sent; ++iter) + CEPPropertySet* pPropSet(0); + TBufferedActions::iterator iter; + TAction action; + while (sent) { + _bufferMutex.lock(); + iter = _bufferedActions.begin(); + if (iter == _bufferedActions.end()) + { + _bufferMutex.unlock(); + break; + } + if (iter->waitForSeqNr > 0) + { + _bufferMutex.unlock(); + // pretend this action is sent, because it must be skipped + sent = true; + // skip this action, which waits for a response of another sequence + continue; + } _dhPIClient.setEventID(iter->eventID); BlobOStream& blob = _dhPIClient.createExtraBlob(); switch (iter->eventID) @@ -498,33 +602,84 @@ void PIClient::processOutstandingActions() case DH_PIProtocol::UNREGISTER_SCOPE: pPropSet = iter->pPropSet; assert(pPropSet); - _dhPIClient.setSeqNr(startSequence(*pPropSet)); - + LOG_DEBUG(formatString( + "Send request for '%s' to PI with seq. nr. %d.", + pPropSet->getScope().c_str(), + _dhPIClient.getSeqNr())); + break; default: break; } blob.put(iter->extraData, iter->extraDataSize); + action = *iter; + _bufferMutex.unlock(); - sent = false; - try + sent = _dhPIClient.write(); + if (sent) { - sent = _dhPIClient.write(); + // now the action can be removed + // because the action could be removed between the last unlock and + // lock of the mutex, we must be carefull + // thats why the 'action' is remembered and searched + // action, if still exists, should still the first in the buffer + // if 'action' differs the current first action in the buffer no erase + // is needed anymore + _bufferMutex.lock(); + iter = _bufferedActions.begin(); + if (iter != _bufferedActions.end()) + { + if (*iter == action) + { + delete [] action.extraData; + _bufferedActions.erase(iter); + } + } + _bufferMutex.unlock(); } - catch (std::exception& e) + } + while (sent) + { + // lock again for the value buffer + _bufferMutex.lock(); + iter = _bufferedValues.begin(); + if (iter == _bufferedValues.end()) { - LOG_ERROR(formatString ( - "Exception: %s", - e.what())); + _bufferMutex.unlock(); + break; } + if (iter->waitForSeqNr > 0) + { + _bufferMutex.unlock(); + // pretend this action is sent, because it must be skipped + sent = true; + // skip this action, which waits for a response of another sequence + continue; + } + _dhPIClient.setEventID(iter->eventID); + BlobOStream& blob = _dhPIClient.createExtraBlob(); + assert(iter->eventID == DH_PIProtocol::VALUE_SET); + blob.put(iter->extraData, iter->extraDataSize); + action = *iter; + _bufferMutex.unlock(); + + sent = _dhPIClient.write(); if (sent) { - delete [] iter->extraData; - _bufferedActions.erase(iter); - iter--; + // see description for the following above + _bufferMutex.lock(); + iter = _bufferedValues.begin(); + if (iter != _bufferedValues.end()) + { + if (*iter == action) + { + delete [] action.extraData; + _bufferedValues.erase(iter); + } + } + _bufferMutex.unlock(); } } - _bufferMutex.unlock(); } void logResult(TPIResult result, CEPPropertySet& propSet) diff --git a/MAC/GCF/_PALlight/CEP-PMLlight/src/PIClient.h b/MAC/GCF/_PALlight/CEP-PMLlight/src/PIClient.h index e948c8e5326af9f31493fad15329bc1255ef7eef..4e468873641cf85a5dd647cacc52507983909c90 100644 --- a/MAC/GCF/_PALlight/CEP-PMLlight/src/PIClient.h +++ b/MAC/GCF/_PALlight/CEP-PMLlight/src/PIClient.h @@ -68,13 +68,15 @@ class PIClient // called by CEPPropertySet // <group> bool registerScope (CEPPropertySet& propSet); - void unregisterScope (CEPPropertySet& propSet); + void unregisterScope (CEPPropertySet& propSet, bool stillEnabling); void propertiesLinked (const string& scope, TPIResult result); void propertiesUnlinked (const string& scope, TPIResult result); - void valueSet(const string& propName, const GCFPValue& value); + void valueSet(CEPPropertySet& propSet, + const string& propName, + const GCFPValue& value); void deletePropSet(const CEPPropertySet& propSet); // </group> @@ -83,7 +85,8 @@ class PIClient private: // helper methods void bufferAction (DH_PIProtocol::TEventID event, - CEPPropertySet* pPropSet); + CEPPropertySet* pPropSet, + uint16 waitForSeqNr = 0); void processOutstandingActions(); @@ -117,16 +120,19 @@ class PIClient typedef map<string /* scope */, CEPPropertySet*> TMyPropertySets; TMyPropertySets _myPropertySets; - private: // admistrative members + public: typedef struct { CEPPropertySet* pPropSet; DH_PIProtocol::TEventID eventID; char* extraData; uint32 extraDataSize; + uint16 waitForSeqNr; } TAction; + private: // admistrative members typedef list<TAction> TBufferedActions; TBufferedActions _bufferedActions; + TBufferedActions _bufferedValues; typedef map<uint16 /*seqnr*/, CEPPropertySet*> TStartedSequences; TStartedSequences _startedSequences; diff --git a/MAC/GCF/_PALlight/include/GCF/PALlight/CEPPropertySet.h b/MAC/GCF/_PALlight/include/GCF/PALlight/CEPPropertySet.h index 04bf76a7d738e4a0cd14f6ac0a4e6a287e0d2c82..33a6d0e0b239f797c9c2f12a2be552a86330635f 100644 --- a/MAC/GCF/_PALlight/include/GCF/PALlight/CEPPropertySet.h +++ b/MAC/GCF/_PALlight/include/GCF/PALlight/CEPPropertySet.h @@ -28,6 +28,7 @@ #include <Common/lofar_list.h> #include <Common/lofar_map.h> #include <GCF/PALlight/CEPProperty.h> +#include <GCF/Mutex.h> class GCFPValue; @@ -134,13 +135,13 @@ class CEPPropertySet // if not found GCFPValue* getValue (const string& propName); - private: + private: // methods called by CEPProperty friend class CEPProperty; // will be invoked by CEPProperty if the value of the property has changed // and the property set is in monitoring state - void valueSet(const string& propName, const GCFPValue& value) const; + void valueSet(const string& propName, const GCFPValue& value); - private: + private: // methods called by PIClient friend class PIClient; // response of an enable method call invoked by PIClient void scopeRegistered (bool succeed); @@ -151,6 +152,7 @@ class CEPPropertySet void linkProperties (); void unlinkProperties (); // </group> + void connectionLost(); private: // helper methods void addProperty(const string& propName, CEPProperty& prop); @@ -187,6 +189,7 @@ class CEPPropertySet CEPProperty _dummyProperty; // pointer to PIClient singleton PIClient* _pClient; + GCF::Thread::Mutex _stateMutex; }; inline const string& CEPPropertySet::getScope () const @@ -202,10 +205,14 @@ inline TPSCategory CEPPropertySet::getCategory () const { return _category; } inline bool CEPPropertySet::isEnabled () - { return (_state == S_ENABLED || _state == S_LINKED); } + { GCF::Thread::Mutex::Lock(_stateMutex); + return (_state == S_ENABLED || _state == S_LINKED); + } inline bool CEPPropertySet::isMonitoringOn() - { return _state == S_LINKED; } + { GCF::Thread::Mutex::Lock(_stateMutex); + return _state == S_LINKED; + } } // namespace CEPPMLlight } // namespace GCF