From 79cc5e1769f2c5939052343a97e872a6f514e3e3 Mon Sep 17 00:00:00 2001 From: Ruud Overeem <overeem@astron.nl> Date: Thu, 3 Aug 2006 09:07:54 +0000 Subject: [PATCH] BugID: 836 Made serviceBroker client suitable to connected to serviceBrokers at several machines. Extended the SB protocol with re-register events to update a restarted service- Broker with the port that are already in use. Banned 'localhost' from the code. Remove (IN) and (OUT) from the signal names of the framework. They only work confusing. --- MAC/GCF/TM/include/GCF/TM/GCF_TCPPort.h | 10 +- MAC/GCF/TM/src/GCF_Protocols.cc | 24 +- MAC/GCF/TM/src/PortImpl/GCF_TCPPort.cc | 17 +- .../TM/src/ServiceBroker/GSB_Controller.cc | 96 +++- MAC/GCF/TM/src/ServiceBroker/GSB_Controller.h | 22 +- MAC/GCF/TM/src/ServiceBroker/GSB_Defines.h | 9 +- MAC/GCF/TM/src/ServiceBroker/GTM_SBTCPPort.cc | 12 +- .../TM/src/ServiceBroker/GTM_ServiceBroker.cc | 492 +++++++++++++----- .../TM/src/ServiceBroker/GTM_ServiceBroker.h | 107 ++-- MAC/GCF/TM/src/ServiceBroker/SB_Protocol.prot | 97 +++- 10 files changed, 655 insertions(+), 231 deletions(-) diff --git a/MAC/GCF/TM/include/GCF/TM/GCF_TCPPort.h b/MAC/GCF/TM/include/GCF/TM/GCF_TCPPort.h index 20807888403..389d0e884b9 100644 --- a/MAC/GCF/TM/include/GCF/TM/GCF_TCPPort.h +++ b/MAC/GCF/TM/include/GCF/TM/GCF_TCPPort.h @@ -24,6 +24,8 @@ #define GCF_TCPPORT_H #include <GCF/TM/GCF_RawPort.h> +#include <GCF/TM/GCF_TimerPort.h> +#include <GCF/Utils.h> namespace LOFAR { namespace GCF { @@ -122,7 +124,13 @@ private: inline void GCFTCPPort::setHostName(const string& hostname) { - _host = hostname; + // assure that hostname is never filled with localname. + if (hostname.empty() || hostname == "localhost") { + _host = Common::myHostname(false); + } + else { + _host = hostname; + } } inline void GCFTCPPort::setPortNumber(unsigned int portNumber) diff --git a/MAC/GCF/TM/src/GCF_Protocols.cc b/MAC/GCF/TM/src/GCF_Protocols.cc index a96adb38801..903dd8e79fb 100644 --- a/MAC/GCF/TM/src/GCF_Protocols.cc +++ b/MAC/GCF/TM/src/GCF_Protocols.cc @@ -37,9 +37,9 @@ namespace LOFAR const char* F_FSM_PROTOCOL_names[] = { "F_FSM_PROTOCOL: invalid signal", - "F_ENTRY (IN)", - "F_EXIT (IN)", - "F_INIT (IN)", + "F_ENTRY", + "F_EXIT", + "F_INIT", }; /** @@ -48,15 +48,15 @@ const char* F_FSM_PROTOCOL_names[] = const char* F_PORT_PROTOCOL_names[] = { "F_PORT_PROTOCOL: invalid signal", - "F_CONNECT (OUT)", - "F_CONNECTED (IN)", - "F_DISCONNECTED (IN)", - "F_CLOSED (IN)", - "F_TIMER (IN)", - "F_DATAIN (IN)", - "F_DATAOUT (IN)", - "F_RAW_DATA (IN_OUT)", - "F_ACCEPT_REQ (IN)", + "F_CONNECT", + "F_CONNECTED", + "F_DISCONNECTED", + "F_CLOSED", + "F_TIMER", + "F_DATAIN", + "F_DATAOUT", + "F_RAW_DATA", + "F_ACCEPT_REQ", }; } // namespace TM } // namespace GCF diff --git a/MAC/GCF/TM/src/PortImpl/GCF_TCPPort.cc b/MAC/GCF/TM/src/PortImpl/GCF_TCPPort.cc index 851bd11a47a..a1371d2f7a7 100644 --- a/MAC/GCF/TM/src/PortImpl/GCF_TCPPort.cc +++ b/MAC/GCF/TM/src/PortImpl/GCF_TCPPort.cc @@ -36,6 +36,7 @@ namespace LOFAR { using namespace ACC::APS; namespace GCF { using namespace SB; + using namespace Common; namespace TM { @@ -50,6 +51,8 @@ GCFTCPPort::GCFTCPPort(GCFTask& task, : GCFRawPort(task, name, type, protocol, transportRawData), _pSocket(0), _addrIsSet(false), + _addr(), + _host(myHostname(false)), _portNumber(0), _broker(0) { @@ -62,12 +65,14 @@ GCFTCPPort::GCFTCPPort(GCFTask& task, } // -// GCFTCPPorT() +// GCFTCPPort() // GCFTCPPort::GCFTCPPort() : GCFRawPort(), _pSocket(0), _addrIsSet(false), + _addr(), + _host(myHostname(false)), _portNumber(0), _broker(0) { @@ -102,7 +107,7 @@ void GCFTCPPort::init(GCFTask& task, _state = S_DISCONNECTED; GCFRawPort::init(task, name, type, protocol, transportRawData); _portNumber = 0; - _host = ""; + _host = myHostname(false); _addrIsSet = false; if (_pSocket) { delete _pSocket; @@ -143,7 +148,7 @@ bool GCFTCPPort::open() setState(S_CONNECTING); if (getType() == SAP) { // client socket? - if (_host != "" && _portNumber != 0) { // dest. overruled by user? + if (_portNumber != 0) { // dest. overruled by user? // Try to 'open' en 'connect' to port serviceInfo(SB_NO_ERROR, _portNumber, _host); return (true); @@ -178,7 +183,7 @@ bool GCFTCPPort::open() _broker = GTMServiceBroker::instance(); } ASSERT(_broker); - _broker->getServiceinfo(*this, remoteServiceName); + _broker->getServiceinfo(*this, remoteServiceName, _host); // a (dis)connect event will be scheduled return (true); } @@ -278,7 +283,7 @@ void GCFTCPPort::serviceInfo(unsigned int result, unsigned int portNumber, const // Note: Is also called by the GTM_ServiceBroker void GCFTCPPort::serviceGone() { - _host = ""; + _host = myHostname(false); _portNumber = 0; } @@ -353,7 +358,7 @@ void GCFTCPPort::setAddr(const TPeerAddr& addr) // Is new address different from current address? if (_addr.taskname != addr.taskname || _addr.portname != addr.portname) { - _host = ""; // clear current settings + _host = myHostname(false); // clear current settings _portNumber = 0; } _addr = addr; diff --git a/MAC/GCF/TM/src/ServiceBroker/GSB_Controller.cc b/MAC/GCF/TM/src/ServiceBroker/GSB_Controller.cc index acad7de86d7..8a6f0279da5 100644 --- a/MAC/GCF/TM/src/ServiceBroker/GSB_Controller.cc +++ b/MAC/GCF/TM/src/ServiceBroker/GSB_Controller.cc @@ -169,7 +169,13 @@ GCFEvent::TResult GSBController::operational(GCFEvent& event, GCFPortInterface& case SB_UNREGISTER_SERVICE: { SBUnregisterServiceEvent request(event); + SBServiceUnregisteredEvent response; + response.seqnr = request.seqnr; + releaseService(request.servicename); + + response.result = SB_NO_ERROR; + port.send(response); break; } @@ -183,7 +189,7 @@ GCFEvent::TResult GSBController::operational(GCFEvent& event, GCFPortInterface& LOG_INFO(formatString ("Serviceinfo for %s is %d", request.servicename.c_str(), portNr)); response.portnumber = portNr; - response.hostname = Common::myHostname(true); + response.hostname = Common::myHostname(false); response.result = SB_NO_ERROR; } else { @@ -195,6 +201,15 @@ GCFEvent::TResult GSBController::operational(GCFEvent& event, GCFPortInterface& break; } + case SB_REREGISTER_SERVICE: { + SBReregisterServiceEvent request(event); + SBServiceReregisteredEvent response; + response.seqnr = request.seqnr; + response.servicename = request.servicename; + response.result = reRegisterService(request.servicename, request.portnumber, &port); + port.send(response); + break; + } default: status = GCFEvent::NOT_HANDLED; break; @@ -259,7 +274,7 @@ void GSBController::readRanges() } // -// claimPortNumber(serviceName, port) +// claimPortNumber(servicename, port) // uint16 GSBController::claimPortNumber(const string& aServiceName, GCFPortInterface* aPort) @@ -298,6 +313,56 @@ uint16 GSBController::claimPortNumber(const string& aServiceName, return (itsServiceList[idx].portNumber); } +// +// reRegisterService(servicename, portnr) +// +TSBResult GSBController::reRegisterService(const string& servicename, uint16 oldPortNr, + GCFPortInterface* thePort) +{ + int32 idx = 0; + int32 nrElems2Check = itsNrPorts - itsNrFreePorts;// prevent checking whole array + + while (idx < itsNrPorts && nrElems2Check > 0) { + if (itsServiceList[idx].portNumber) { + nrElems2Check--; + if (itsServiceList[idx].serviceName == servicename) { + if (!itsServiceList[idx].ownerPort && itsServiceList[idx].portNumber == oldPortNr) { + itsServiceList[idx].ownerPort = thePort; + LOG_INFO_STR("Service " << servicename << " confirmed at " << oldPortNr); + saveAdministration (itsAdminFile); // to survive crashes + return (SB_NO_ERROR); + } + else { + LOG_ERROR_STR("Recovering of service " << servicename << " at " << + oldPortNr << " not possible. Service was at " << + itsServiceList[idx].portNumber); + return (SB_CANT_RECOVER); + } + } + } + idx++; + } + + // service not in our admin anymore, try to fullfill the question. + idx = portNr2Index(oldPortNr); // convert portnr to array index + if (itsServiceList[idx].ownerPort == 0) { // still free? + // assign port to service + itsServiceList[idx].portNumber = oldPortNr; + itsServiceList[idx].serviceName = servicename; + itsServiceList[idx].ownerPort = thePort; + itsNrFreePorts--; + + LOG_INFO_STR("Service " << servicename << " reregistered at " << oldPortNr); + saveAdministration (itsAdminFile); // to survive crashes + return (SB_NO_ERROR); + } + + LOG_ERROR_STR("Recovering of service " << servicename << " at " << + oldPortNr << " not possible. Portnr taken by " << + itsServiceList[idx].serviceName); + return (SB_CANT_RECOVER); +} + // // releaseService(servicename) // @@ -362,7 +427,7 @@ void GSBController::releasePort(GCFPortInterface* aPort) } // -// findService(serviceName) +// findService(servicename) // uint16 GSBController::findService(const string& aServiceName, bool usedOnly) { @@ -405,13 +470,18 @@ void GSBController::saveAdministration(const string& aFileName) uint16 writeVersion = SB_ADMIN_VERSION; uint16 count = itsNrPorts - itsNrFreePorts; - outFile << writeVersion << count << itsLowerLimit << itsUpperLimit; - + outFile.write((char*)&writeVersion, sizeof(writeVersion)); + outFile.write((char*)&count, sizeof(count)); + outFile.write((char*)&itsLowerLimit, sizeof(itsLowerLimit)); + outFile.write((char*)&itsUpperLimit, sizeof(itsUpperLimit)); + uint16 idx = 0; while (idx < itsNrPorts && count > 0) { if (itsServiceList[idx].portNumber) { // note: the TCPport is not saved because it can not be restored. - outFile << itsServiceList[idx].portNumber << itsServiceList[idx].serviceName; + outFile.write((char*)&itsServiceList[idx].portNumber, + sizeof(itsServiceList[idx].portNumber)); + outFile << itsServiceList[idx].serviceName; count--; } idx++; @@ -436,24 +506,28 @@ void GSBController::loadAdministration(const string& aFileName) uint16 count; // nr of entries in the file uint16 lowerLimit; // range settings uint16 upperLimit; - inFile >> readVersion >> count >> lowerLimit >> upperLimit; + inFile.read((char*)&readVersion, sizeof(readVersion)); + inFile.read((char*)&count, sizeof(count)); + inFile.read((char*)&lowerLimit, sizeof(lowerLimit)); + inFile.read((char*)&upperLimit, sizeof(upperLimit)); LOG_DEBUG_STR ("Loading " << count << " old registrations from file " << aFileName); while (count) { uint16 portNumber; - string serviceName; - inFile >> portNumber >> serviceName; + string servicename; + inFile.read((char*)&portNumber, sizeof(portNumber)); + inFile >> servicename; if (portNumber < itsLowerLimit || portNumber >= itsUpperLimit) { LOG_DEBUG_STR ("Portnumber " << portNumber << " not in current range, ignoring"); } else { itsServiceList[portNr2Index(portNumber)].portNumber = portNumber; - itsServiceList[portNr2Index(portNumber)].serviceName = serviceName; + itsServiceList[portNr2Index(portNumber)].serviceName = servicename; itsServiceList[portNr2Index(portNumber)].ownerPort = 0; itsNrFreePorts--; - LOG_DEBUG_STR ("Loading " << serviceName << "@" << portNumber); + LOG_DEBUG_STR ("Loading " << servicename << "@" << portNumber); } count--; } diff --git a/MAC/GCF/TM/src/ServiceBroker/GSB_Controller.h b/MAC/GCF/TM/src/ServiceBroker/GSB_Controller.h index 019a27728a2..e0980454995 100644 --- a/MAC/GCF/TM/src/ServiceBroker/GSB_Controller.h +++ b/MAC/GCF/TM/src/ServiceBroker/GSB_Controller.h @@ -25,6 +25,7 @@ #include <GCF/TM/GCF_Task.h> #include <GTM_SBTCPPort.h> +#include "GSB_Defines.h" namespace LOFAR { namespace GCF { @@ -60,15 +61,18 @@ private: TM::GCFPortInterface* ownerPort; } TServiceInfo; - void acceptConnectRequest(); - void readRanges (); - uint16 claimPortNumber (const string& aServiceName, TM::GCFPortInterface* aPort); - void releaseService (const string& aServiceName); - void releasePort (TM::GCFPortInterface* aPort); - uint16 findService (const string& aServiceName, bool usedOnly); - void saveAdministration (const string& aFileName); - void loadAdministration (const string& aFileName); - void cleanupOldRegistrations(); + void acceptConnectRequest (); + void readRanges (); + uint16 claimPortNumber (const string& aServiceName, + TM::GCFPortInterface* aPort); + TSBResult reRegisterService (const string& aServicename, uint16 portnr, + TM::GCFPortInterface* aPort); + void releaseService (const string& aServiceName); + void releasePort (TM::GCFPortInterface* aPort); + uint16 findService (const string& aServiceName, bool usedOnly); + void saveAdministration (const string& aFileName); + void loadAdministration (const string& aFileName); + void cleanupOldRegistrations(); // define conversions between portnumber and index. inline uint16 portNr2Index(uint16 portNumber) diff --git a/MAC/GCF/TM/src/ServiceBroker/GSB_Defines.h b/MAC/GCF/TM/src/ServiceBroker/GSB_Defines.h index b6dcde1e2fd..1eb170b97e7 100644 --- a/MAC/GCF/TM/src/ServiceBroker/GSB_Defines.h +++ b/MAC/GCF/TM/src/ServiceBroker/GSB_Defines.h @@ -31,19 +31,18 @@ namespace LOFAR { namespace GCF { namespace SB { -class GCFPValue; +//class GCFPValue; enum TSBResult { SB_NO_ERROR, SB_UNKNOWN_ERROR, SB_SERVICE_ALREADY_EXIST, SB_NO_FREE_PORTNR, - SB_UNKNOWN_SERVICE + SB_UNKNOWN_SERVICE, + SB_NO_CONNECTION, + SB_CANT_RECOVER }; -//#define PARAM_SB_SERVER_PORT "mac.gcf.sb.port" -//#define PARAM_SB_SERVER_HOST "mac.gcf.sb.host" - } // namespace SB } // namespace GCF } // namespace LOFAR diff --git a/MAC/GCF/TM/src/ServiceBroker/GTM_SBTCPPort.cc b/MAC/GCF/TM/src/ServiceBroker/GTM_SBTCPPort.cc index 3012cefc7b6..319cbff0287 100644 --- a/MAC/GCF/TM/src/ServiceBroker/GTM_SBTCPPort.cc +++ b/MAC/GCF/TM/src/ServiceBroker/GTM_SBTCPPort.cc @@ -93,15 +93,15 @@ bool GTMSBTCPPort::open() } uint32 sbPortNumber(MAC_SERVICEBROKER_PORT); - string sbHost ("localhost"); - char hostname[256]; - if (gethostname(hostname, 256) == 0) { - sbHost = hostname; - } +// string sbHost ("localhost"); +// char hostname[256]; +// if (gethostname(hostname, 256) == 0) { +// sbHost = hostname; +// } if (_pSocket->open(sbPortNumber)) { if (SAP == getType()) { - if (_pSocket->connect(sbPortNumber, sbHost)) { + if (_pSocket->connect(sbPortNumber, getHostName())) { setState(S_CONNECTING); schedule_connected(); } diff --git a/MAC/GCF/TM/src/ServiceBroker/GTM_ServiceBroker.cc b/MAC/GCF/TM/src/ServiceBroker/GTM_ServiceBroker.cc index 91690b5e2dc..91f01c80c5c 100644 --- a/MAC/GCF/TM/src/ServiceBroker/GTM_ServiceBroker.cc +++ b/MAC/GCF/TM/src/ServiceBroker/GTM_ServiceBroker.cc @@ -23,16 +23,17 @@ #include <lofar_config.h> #include <Common/LofarLocators.h> -#include "GTM_ServiceBroker.h" -#include <SB_Protocol.ph> #include <APS/ParameterSet.h> +#include <GCF/Utils.h> +#include <SB_Protocol.ph> #include <GTM_Defines.h> -#include "GSB_Defines.h" +#include "GTM_ServiceBroker.h" #include <unistd.h> namespace LOFAR { namespace GCF { using namespace TM; + using namespace Common; namespace SB { // @@ -40,31 +41,6 @@ namespace LOFAR { static string sSBTaskName("GCF-SB"); GTMSBHandler* GTMSBHandler::_pInstance = 0; -// -// logResult -// -void logResult(TSBResult result, const string& servicename) -{ - switch (result) { - case SB_NO_ERROR: - break; - case SB_UNKNOWN_ERROR: - LOG_FATAL("Unknown error"); - break; - case SB_SERVICE_ALREADY_EXIST: - LOG_ERROR(formatString ( "Service %s already exist", servicename.c_str())); - break; - case SB_NO_FREE_PORTNR: - LOG_ERROR(formatString ( "No free portnumber for this service: %s", servicename.c_str())); - break; - case SB_UNKNOWN_SERVICE: - LOG_FATAL(formatString ( "Unknown remote service: %s", servicename.c_str())); - break; - default: - break; - } -} - // // GTMSBHandler() // @@ -76,13 +52,12 @@ GTMSBHandler::GTMSBHandler() // GTMServiceBroker() // GTMServiceBroker::GTMServiceBroker() : - GCFTask((State)>MServiceBroker::initial, sSBTaskName) + GCFTask((State)>MServiceBroker::operational, sSBTaskName), + itsMaxResponse (15), + itsTimerPort (*this, "timerport") { // register the protocol for debugging purposes registerProtocol(SB_PROTOCOL, SB_PROTOCOL_signalnames); - - // initialize the port - _serviceBroker.init(*this, "client", GCFPortInterface::SAP, SB_PROTOCOL); } // @@ -124,6 +99,7 @@ void GTMServiceBroker::release() } } +// -------------------- USER FUNCTIONS -------------------- // // registerService(servicePort) // @@ -133,16 +109,19 @@ void GTMServiceBroker::registerService(GCFTCPPort& servicePort) SBRegisterServiceEvent request; - TAction action; - action.action = request.signal; - action.pPort = &servicePort; - action.servicename = servicename; + Action action; + action.type = request.signal; + action.pPort = &servicePort; + action.servicename = servicename; + action.hostname = myHostname(false); // always local! + action.timestamp = time(0); - request.seqnr = registerAction(action); + request.seqnr = _registerAction(action); request.servicename = servicename; - if (_serviceBroker.isConnected()) { - _serviceBroker.send(request); + BMiter serviceBroker = _getBroker(action.hostname); + if (serviceBroker->second->isConnected()) { + serviceBroker->second->send(request); // will result in SB_SERVICE_REGISTERED } } @@ -151,12 +130,22 @@ void GTMServiceBroker::registerService(GCFTCPPort& servicePort) // void GTMServiceBroker::unregisterService(GCFTCPPort& servicePort) { + string servicename = servicePort.makeServiceName(); SBUnregisterServiceEvent request; - request.servicename = servicePort.makeServiceName(); + Action action; + action.type = request.signal; + action.pPort = &servicePort; + action.servicename = servicename; + action.hostname = myHostname(false); // always local! + action.timestamp = time(0); + + request.seqnr = _registerAction(action); + request.servicename = servicename; - if (_serviceBroker.isConnected()) { - _serviceBroker.send(request); + BMiter serviceBroker = _getBroker(servicePort.getHostName()); + if (serviceBroker->second->isConnected()) { + serviceBroker->second->send(request); // will result in SB_SERVICE_UNREGISTERED } } @@ -169,160 +158,405 @@ void GTMServiceBroker::getServiceinfo(GCFTCPPort& clientPort, { SBGetServiceinfoEvent request; - TAction action; - action.action = request.signal; + Action action; + action.type = request.signal; action.pPort = &clientPort; action.servicename = remoteServiceName; + action.hostname = hostname; + action.timestamp = time(0); - request.seqnr = registerAction(action); + request.seqnr = _registerAction(action); request.servicename = remoteServiceName; request.hostname = hostname; - if (_serviceBroker.isConnected()) { - _serviceBroker.send(request); + BMiter serviceBroker = _getBroker(action.hostname); + if (serviceBroker->second->isConnected()) { + serviceBroker->second->send(request); // will result in SB_SERVICE_INFO } } - // // deletePort(port) // -void GTMServiceBroker::deletePort(GCFTCPPort& port) +void GTMServiceBroker::deletePort(GCFTCPPort& aPort) { - TAction* pAction; - for (TActionSeqList::iterator iter = _actionSeqList.begin(); - iter != _actionSeqList.end(); ++iter) { - pAction = &iter->second; - if (pAction->pPort == &port) { - _actionSeqList.erase(iter); - break; + // clean up all action that refer to this port + ALiter end = itsActionList.end(); + ALiter iter = itsActionList.begin(); + while (iter != end) { + if (iter->pPort == &aPort) { + ALiter itercopy = iter; + iter++; + itsActionList.erase(itercopy); + continue; } - } - for (TServiceClients::iterator iter = _serviceClients.begin(); - iter != _serviceClients.end(); ++iter) { - list<GCFTCPPort*>* pClientPorts = &iter->second; - pClientPorts->remove(&port); + iter++; } + + // unregister service of this port if any + _deleteService(aPort); } +// -------------------- INTERNAL FUNCTIONS -------------------- +#if 0 // -// registerAction(action) +// _deleteBroker(Brokerport) // -unsigned short GTMServiceBroker::registerAction(TAction action) +void GTMServiceBroker::_deleteBroker(GTMSBTCPPort& aPort) { - unsigned short seqnr(1); // 0 is reserved for internal msg. in SB - TActionSeqList::const_iterator iter; - do { - seqnr++; - iter = _actionSeqList.find(seqnr); - } while (iter != _actionSeqList.end()); + // remove port from admin + BMiter end = itsBrokerMap.end(); + BMiter iter = itsBrokerMap.begin(); + while (iter != end) { + if (iter->second == aPort) { + itsBrokerMap.erase(iter); + return ; + } + iter++; + } +} +#endif - _actionSeqList[seqnr] = action; +// +// _deleteService(aClientPort) +// +void GTMServiceBroker::_deleteService(GCFTCPPort& aPort) +{ + // its there a service registered at this port? + SMiter service = itsServiceMap.find(&aPort); + if (service != itsServiceMap.end()) { + unregisterService(aPort); + itsServiceMap.erase(service); + } +} - return seqnr; +// +// _actionName(type) +// +string GTMServiceBroker::_actionName(uint16 type) const +{ + switch (type) { + case SB_REGISTER_SERVICE: return ("RegisterService"); + case SB_UNREGISTER_SERVICE: return ("UnregisterService"); + case SB_GET_SERVICEINFO: return ("GetServiceInfo"); + default: return (formatString("%d???", type)); + } } // -// initial(event, port) +// _logResult // -GCFEvent::TResult GTMServiceBroker::initial(GCFEvent& e, GCFPortInterface& /*p*/) +void GTMServiceBroker::_logResult(uint16 result, + const string& servicename, + const string& hostname) const { - GCFEvent::TResult status = GCFEvent::HANDLED; - switch (e.signal) { - case F_INIT: + switch (result) { + case SB_NO_ERROR: break; - case F_ENTRY: - case F_TIMER: - _serviceBroker.open(); + case SB_UNKNOWN_ERROR: + LOG_FATAL("Unknown error"); break; - case F_CONNECTED: - TRAN(GTMServiceBroker::operational); + case SB_SERVICE_ALREADY_EXIST: + LOG_ERROR_STR("Service " << servicename << " already exist"); break; - case F_DISCONNECTED: - _serviceBroker.setTimer(1.0); // try again after 1 second + case SB_NO_FREE_PORTNR: + LOG_ERROR_STR("No free portnumber for service: " << servicename); break; - default: - status = GCFEvent::NOT_HANDLED; + case SB_UNKNOWN_SERVICE: + LOG_FATAL_STR("Unknown remote service: "<< servicename << "@" << hostname); + break; + + case SB_NO_CONNECTION: + LOG_ERROR_STR("No connection with serviceBroker at " << hostname); + break; + + case SB_CANT_RECOVER: + LOG_ERROR_STR("Unable to recover service " << servicename << + " at serviceBroker, no new connections can't be made."); + break; + + default: break; } +} - return status; +// +// _registerAction(action) +// +unsigned short GTMServiceBroker::_registerAction(Action action) +{ + // reset number when list is empty + if (itsActionList.empty()) { + itsSeqnr = 0; + } + + action.seqnr = ++itsSeqnr; + itsActionList.push_back(action); + + return (itsSeqnr); } // -// operational(event, port) +// _reRegisterServices(hostname) // -GCFEvent::TResult GTMServiceBroker::operational(GCFEvent& e, GCFPortInterface& p) +void GTMServiceBroker::_reRegisterServices(GCFPortInterface* brokerPort) { - GCFEvent::TResult status = GCFEvent::HANDLED; + // nothing to do? + if (itsServiceMap.empty()) { + return; + } - switch (e.signal) { - case F_DISCONNECTED: - LOG_FATAL("Connection lost to Service Broker deamon"); - p.close(); - break; + SMiter end = itsServiceMap.end(); + SMiter iter = itsServiceMap.begin(); + while (iter != end) { + SBReregisterServiceEvent request; + request.seqnr = 0; + request.servicename = iter->second.servicename; + request.portnumber = iter->second.portNr; + brokerPort->send(request); - case F_CLOSED: - TRAN(GTMServiceBroker::initial); - break; + iter++; + } +} + +// +// _doActionList(hostname) +// +void GTMServiceBroker::_doActionList(const string& hostname) +{ + // nothing to do? + if (itsActionList.empty()) { + return; + } + + // Note: while processing the list, the list grows. Therefore we use actionsLeft. + ALiter end = itsActionList.end(); + ALiter iter = itsActionList.begin(); + uint32 actionsLeft = itsActionList.size(); + while (actionsLeft && iter != end) { + actionsLeft--; + // only process the actions for this host + if (iter->hostname != hostname) { + iter++; + } + + // its an action for this host, process it. action is added to the list again + switch (iter->type) { + case SB_REGISTER_SERVICE: + registerService(*(iter->pPort)); + break; + case SB_UNREGISTER_SERVICE: + unregisterService(*(iter->pPort)); + break; + case SB_GET_SERVICEINFO: + getServiceinfo(*(iter->pPort), iter->servicename, iter->hostname); + break; + default: + ASSERTSTR(false, "Unknown action in actionlist: " << iter->type + << ":" << iter->servicename << "@" << iter->hostname); + } + + iter = itsActionList.erase(iter); // remove original action + } +} + +// +// _getBroker(hostname) +// +GTMServiceBroker::BMiter GTMServiceBroker::_getBroker(const string& hostname) +{ + // do we have a connection to this broker already? + BMiter serviceBroker = itsBrokerMap.find(hostname); + if (serviceBroker != itsBrokerMap.end()) { + return (serviceBroker); // return pointer to broker. + } + + // broker to this system not found, create a port to this host. + GTMSBTCPPort* aBrokerPort = new GTMSBTCPPort(*this, hostname, + GCFPortInterface::SAP, SB_PROTOCOL); + ASSERTSTR(aBrokerPort, "Unable to allocate a socket to ServiceBroker@" + << hostname); + + // Add broker to collection and open connection + aBrokerPort->setHostName(hostname); + itsBrokerMap[hostname] = aBrokerPort; + itsBrokerMap[hostname]->open(); + + return (itsBrokerMap.find(hostname)); +} + +// +// _findAction(seqnr) +// +GTMServiceBroker::ALiter GTMServiceBroker::_findAction(uint16 seqnr) +{ + ALiter end = itsActionList.end(); + ALiter iter = itsActionList.begin(); + while (iter != end) { + if (iter->seqnr == seqnr) { + return (iter); + } - case F_ENTRY: { - TAction* pAction(0); - TActionSeqList tmpSeqList(_actionSeqList); - _actionSeqList.clear(); - for (TActionSeqList::iterator iter = tmpSeqList.begin(); - iter != tmpSeqList.end(); ++iter) { - pAction = &iter->second; - switch (pAction->action) { + iter++; + } + return (iter); +} + +// +// _reconnectBrokers() +// +void GTMServiceBroker::_reconnectBrokers() +{ + BMiter end = itsBrokerMap.end(); + BMiter iter = itsBrokerMap.begin(); + + bool newOpen(false); + while (iter != end) { + if (!iter->second->isConnected()) { + iter->second->open(); // will result in F_CONN or F_DISCONN + newOpen = true; + // don't let clients wait forever for this host. + _checkActionList(iter->second->getHostName()); + } + iter++; + } + +// if (newOpen) { +// itsTimerPort.setTimer(5.0); +// } +} + +// +// _checkActionList(hostname); +// +void GTMServiceBroker::_checkActionList(const string& hostname) +{ + LOG_TRACE_FLOW_STR("_checkActionList(" << hostname <<")"); + + ALiter end = itsActionList.end(); + ALiter iter = itsActionList.begin(); + time_t currentTime = time(0); + + // check for which actions we are late. + while (iter != end) { + if (iter->hostname == hostname && currentTime > iter->timestamp+itsMaxResponse) { + LOG_ERROR_STR ("Responsetime expired for " << _actionName(iter->type) << + "(" << iter->servicename << "," << iter->hostname << ")"); + _logResult(SB_NO_CONNECTION, iter->servicename, iter->hostname); + switch (iter->type) { case SB_REGISTER_SERVICE: - registerService(*pAction->pPort); - break; + // pass response to waiting client + iter->pPort->serviceRegistered(SB_NO_CONNECTION, 0); + iter = itsActionList.erase(iter); + continue; case SB_UNREGISTER_SERVICE: - unregisterService(*pAction->pPort); - break; + iter = itsActionList.erase(iter); + continue; case SB_GET_SERVICEINFO: - getServiceinfo(*pAction->pPort, pAction->servicename, pAction->hostname); + iter->pPort->serviceInfo(SB_NO_CONNECTION, 0, ""); + continue; + iter = itsActionList.erase(iter); break; default: - ASSERT(0); + ASSERTSTR(false, "Unknown action in actionlist: " << iter->type); } } + iter++; + } +} + +// +// operational(event, port) +// +GCFEvent::TResult GTMServiceBroker::operational(GCFEvent& event, GCFPortInterface& port) +{ + LOG_DEBUG_STR ("operational:" << evtstr(event) << "@" << port.getName()); + + GCFEvent::TResult status = GCFEvent::HANDLED; + switch (event.signal) { + case F_INIT: + case F_ENTRY: + break; + + case F_CONNECTED: + // we succeeded in making a (re)connection to a serviceBroker. + // register old services and handle waiting actions if any + if(static_cast<GTMSBTCPPort*>(&port)->getHostName() == myHostname(false)) { + _reRegisterServices(&port); + } + _doActionList(static_cast<GTMSBTCPPort*>(&port)->getHostName()); + break; + + case F_DISCONNECTED: + // lost a connection with a service broker + // close port and remove waiting actions. + LOG_DEBUG_STR("Connection lost with service broker at " << + static_cast<GTMSBTCPPort*>(&port)->getHostName()); + port.close(); + // start reconnect sequence + itsTimerPort.setTimer(1.0); + break; + + case F_CLOSED: + break; + + case F_TIMER: + // reconnect timer expired, try to reconnect + _reconnectBrokers(); // reopen not-connected broker ports break; - } case SB_SERVICE_REGISTERED: { - SBServiceRegisteredEvent response(e); - TAction* pAction = &_actionSeqList[response.seqnr]; - if (pAction) { - logResult(response.result, pAction->servicename); - pAction->pPort->serviceRegistered(response.result, response.portnumber); - _actionSeqList.erase(response.seqnr); + SBServiceRegisteredEvent response(event); + ALiter action = _findAction(response.seqnr); + if (action != itsActionList.end()) { + _logResult(response.result, action->servicename, action->hostname); + // remember we registered this service (for reregistering) + if (response.result == SB_NO_ERROR) { + itsServiceMap[action->pPort] = KnownService(action->servicename, response.portnumber); + } + // pass response to waiting client + action->pPort->serviceRegistered(response.result, response.portnumber); + itsActionList.erase(action); } break; } case SB_SERVICE_INFO: { - SBServiceInfoEvent response(e); - TAction* pAction = &_actionSeqList[response.seqnr]; - if (pAction) { - logResult(response.result, pAction->servicename); - if (response.result == SB_NO_ERROR) { - _serviceClients[pAction->servicename].push_back(pAction->pPort); + SBServiceInfoEvent response(event); + ALiter action = _findAction(response.seqnr); + if (action != itsActionList.end()) { + _logResult(response.result, action->servicename, action->hostname); + // pass response to waiting client + action->pPort->serviceInfo(response.result, response.portnumber, + response.hostname); + itsActionList.erase(action); + } + break; + } + + case SB_SERVICE_UNREGISTERED: { + SBServiceUnregisteredEvent response(event); + // remove from 'registered services' map. + ALiter action = _findAction(response.seqnr); + if (action != itsActionList.end()) { + SMiter service = itsServiceMap.find(action->pPort); + if (service != itsServiceMap.end()) { + itsServiceMap.erase(service); } - pAction->pPort->serviceInfo(response.result, response.portnumber, response.hostname); - _actionSeqList.erase(response.seqnr); + itsActionList.erase(action); } break; } - case SB_SERVICE_GONE: { - SBServiceGoneEvent indication(e); - _serviceClients.erase(indication.servicename); + case SB_SERVICE_REREGISTERED: { + SBServiceReregisteredEvent response(event); + _logResult(response.result, response.servicename, ""); + // Note: action was not in the action list. break; } diff --git a/MAC/GCF/TM/src/ServiceBroker/GTM_ServiceBroker.h b/MAC/GCF/TM/src/ServiceBroker/GTM_ServiceBroker.h index 49a77bc223b..f0a8db8f955 100644 --- a/MAC/GCF/TM/src/ServiceBroker/GTM_ServiceBroker.h +++ b/MAC/GCF/TM/src/ServiceBroker/GTM_ServiceBroker.h @@ -24,16 +24,15 @@ #ifndef GTM_SERVICEBROKER_H #define GTM_SERVICEBROKER_H +#include <Common/lofar_map.h> +#include <Common/lofar_list.h> #include <GCF/TM/GCF_Task.h> #include "GTM_SBTCPPort.h" #include <GCF/TM/GCF_Handler.h> -namespace LOFAR -{ - namespace GCF - { - namespace SB - { +namespace LOFAR { + namespace GCF { + namespace SB { /** */ @@ -42,67 +41,91 @@ class GTMSBHandler; class GTMServiceBroker : public TM::GCFTask { - public: +public: ~GTMServiceBroker (); static GTMServiceBroker* instance(bool temporary = false); static void release(); - public: // member functions + // member functions void registerService (TM::GCFTCPPort& servicePort); void unregisterService(TM::GCFTCPPort& servicePort); void getServiceinfo (TM::GCFTCPPort& clientPort, const string& remoteServiceName, - const string& hostname = "localhost"); + const string& hostname); void deletePort (TM::GCFTCPPort& port); - private: +private: friend class GTMSBHandler; GTMServiceBroker (); - private: // state methods - GCFEvent::TResult initial (TM::GCFEvent& e, TM::GCFPortInterface& p); + // state methods GCFEvent::TResult operational (TM::GCFEvent& e, TM::GCFPortInterface& p); - private: // helper methods - typedef struct Action - { - unsigned short action; - TM::GCFTCPPort* pPort; - string servicename; - string hostname; - Action& operator= (const Action& other) - { - if (this != &other) - { - action = other.action; - pPort = other.pPort; - servicename = other.servicename; - hostname = other.hostname; - } - return *this; - } - } TAction; - unsigned short registerAction (TAction action); + // helper structures and classes + typedef struct action_t { + uint16 seqnr; + uint16 type; + TM::GCFTCPPort* pPort; + string servicename; + string hostname; + time_t timestamp; + action_t& operator= (const action_t& other) { + if (this != &other) { + seqnr = other.seqnr; + type = other.type; + pPort = other.pPort; + servicename = other.servicename; + hostname = other.hostname; + timestamp = other.timestamp; + } + return (*this); + } + } Action; + typedef struct service_t { + string servicename; + uint16 portNr; + service_t(string s, uint16 p) : servicename(s), portNr(p) {}; + service_t() : servicename(), portNr(0) {}; + } KnownService; + typedef list<Action> actionList_t; + typedef map<string /*hostname*/, GTMSBTCPPort*> brokerMap_t; + typedef map<GCFTCPPort*, KnownService> serviceMap_t; + typedef actionList_t::iterator ALiter; + typedef brokerMap_t::iterator BMiter; + typedef serviceMap_t::iterator SMiter; + + // helper methods + void _deleteService (GCFTCPPort& aPort); + uint16 _registerAction (Action action); + void _doActionList (const string& hostname); + ALiter _findAction (uint16 seqnr); + BMiter _getBroker (const string& hostname); + void _reconnectBrokers (); + void _checkActionList (const string& hostname); + void _reRegisterServices(GCFPortInterface* brokerPort); + string _actionName (uint16 actionType) const; + void _logResult (uint16 result, + const string& servicename, + const string& hostname) const; - private: // data members - GTMSBTCPPort _serviceBroker; + // data members + uint16 itsSeqnr; // for actionlist. + int32 itsMaxResponse; // never let client wait longer. + GCFTimerPort itsTimerPort; // for reconnecting to brokers + brokerMap_t itsBrokerMap; // connections to all Brokers + actionList_t itsActionList; // deferred actions + serviceMap_t itsServiceMap; // services I registered - private: // admin members - typedef map<unsigned short /*seqnr*/, TAction> TActionSeqList; - TActionSeqList _actionSeqList; - typedef map<string /*remoteservicename*/, list<TM::GCFTCPPort*> > TServiceClients; - TServiceClients _serviceClients; }; class GTMSBHandler : public TM::GCFHandler { - public: - +public: ~GTMSBHandler() { _pInstance = 0; } void workProc() {} void stop () {} - private: +private: friend class GTMServiceBroker; GTMSBHandler(); diff --git a/MAC/GCF/TM/src/ServiceBroker/SB_Protocol.prot b/MAC/GCF/TM/src/ServiceBroker/SB_Protocol.prot index cf7b5ef319c..1411ad53833 100644 --- a/MAC/GCF/TM/src/ServiceBroker/SB_Protocol.prot +++ b/MAC/GCF/TM/src/ServiceBroker/SB_Protocol.prot @@ -1,6 +1,25 @@ -// -// Protocol definition for the Service Broker deamon -// +//# SB_Protocol.prot: Protocol definition for the Service Broker Daemon +//# +//# Copyright (C) 2006 +//# ASTRON (Netherlands Foundation for Research in Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +//# +//# This program is free software; you can redistribute it and/or modify +//# it under the terms of the GNU General Public License as published by +//# the Free Software Foundation; either version 2 of the License, or +//# (at your option) any later version. +//# +//# This program is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License +//# along with this program; if not, write to the Free Software +//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +//# +//# $Id$ + autogen definitions protocol; description = "Protocol for the Service Broker deamon"; @@ -10,6 +29,22 @@ id = "(GCF::TM::F_GCF_PROTOCOL + 3)"; // specify extra include files include = '<GSB_Defines.h>'; +prelude = << PRELUDE_END + +// The following messages are defined in the servicebroker protocol +// +// REGISTER_SERVICE (uint16 seqnr, string servicename); +// SERVICE_REGISTERED (uint16 seqnr, int32 portNumber, uint16 result); +// GET_SERVICEINFO (uint16 seqnr, string servicename, string hostname); +// SERVICE_INFO (uint16 seqnr, int32 portNumber, string hostname, uint16 result); +// UNREGISTER_SERVICE (uint16 seqnr, string servicename); +// SERVICE_UNREGISTERED(uint16 seqnr, uint16 result); +// REREGISTER_SERVICE (uint16 seqnr, string servicename, int32 portNumber); +// SERVICE_REREGISTERED(uint16 seqnr, uint16 result); +// + +PRELUDE_END; + // // An "event" has a "signal" and a "dir" (direction) // and zero or more "param"s. @@ -21,7 +56,7 @@ event = { dir = IN; param = { name = "seqnr"; - type = "unsigned short"; + type = "uint16"; }; param = { name = "servicename"; @@ -32,6 +67,10 @@ event = { event = { signal = UNREGISTER_SERVICE; dir = IN; + param = { + name = "seqnr"; + type = "uint16"; + }; param = { name = "servicename"; type = "string"; @@ -43,7 +82,7 @@ event = { dir = IN; param = { name = "seqnr"; - type = "unsigned short"; + type = "uint16"; }; param = { name = "servicename"; @@ -60,11 +99,11 @@ event = { dir = OUT; param = { name = "seqnr"; - type = "unsigned short"; + type = "uint16"; }; param = { name = "portnumber"; - type = "unsigned int"; + type = "uint32"; }; param = { name = "result"; @@ -77,11 +116,11 @@ event = { dir = OUT; param = { name = "seqnr"; - type = "unsigned short"; + type = "uint16"; }; param = { name = "portnumber"; - type = "unsigned int"; + type = "uint32"; }; param = { name = "hostname"; @@ -94,10 +133,48 @@ event = { }; event = { - signal = SERVICE_GONE; + signal = SERVICE_UNREGISTERED; dir = OUT; + param = { + name = "seqnr"; + type = "uint16"; + }; + param = { + name = "result"; + type = "GCF::SB::TSBResult"; + }; +}; + +event = { + signal = REREGISTER_SERVICE; + dir = IN; + param = { + name = "seqnr"; + type = "uint16"; + }; param = { name = "servicename"; type = "string"; }; + param = { + name = "portnumber"; + type = "uint32"; + }; +}; + +event = { + signal = SERVICE_REREGISTERED; + dir = OUT; + param = { + name = "seqnr"; + type = "uint16"; + }; + param = { + name = "servicename"; + type = "string"; + }; + param = { + name = "result"; + type = "GCF::SB::TSBResult"; + }; }; -- GitLab