diff --git a/MAC/GCF/TM/include/GCF/TM/EventPort.h b/MAC/GCF/TM/include/GCF/TM/EventPort.h index 7793cb96ebd6fe1a447c4f6cd85a2d93309635bc..a75d7989c00f2290935bc53eece6cf912a665875 100644 --- a/MAC/GCF/TM/include/GCF/TM/EventPort.h +++ b/MAC/GCF/TM/include/GCF/TM/EventPort.h @@ -77,21 +77,25 @@ public: // receive() : Event GCFEvent* receive(); + // getStatus() + int getStatus() { return (itsStatus); } + private: // static receiveEvent(aSocket) - static GCFEvent* receiveEvent(Socket* aSocket); + GCFEvent* receiveEvent(Socket* aSocket); // static sendEvent(Socket*, Event*) - static void sendEvent(Socket* aSocket, + void sendEvent(Socket* aSocket, GCFEvent* anEvent); // _internal routines: see source code for description string _makeServiceName(const string& aServiceMask, int32 aNumber); bool _setupConnection(); - bool _askBrokerThePortNumber(); - bool _waitForSBAnswer(); - bool _startConnectionToPeer(); - bool _waitForPeerResponse(); + int32 _askBrokerThePortNumber(); + int32 _waitForSBAnswer(); + int32 _startConnectionToPeer(); + int32 _waitForPeerResponse(); + void _peerClosedConnection(); EventPort(); diff --git a/MAC/GCF/TM/src/ServiceBroker/EventPort.cc b/MAC/GCF/TM/src/ServiceBroker/EventPort.cc index 35a12f1932db9ecfcda00b39b063ff70dc748b75..9c6bda6d3974a128de720700bdbbd962e96e924e 100644 --- a/MAC/GCF/TM/src/ServiceBroker/EventPort.cc +++ b/MAC/GCF/TM/src/ServiceBroker/EventPort.cc @@ -68,7 +68,6 @@ EventPort::EventPort(const string& aServiceMask, // We always need a socket to the serviceBroker for getting a portnumber we may use. itsBrokerSocket = new Socket("ServiceBroker", itsHost, toString(MAC_SERVICEBROKER_PORT)); ASSERTSTR(itsBrokerSocket, "can't allocate socket to serviceBroker"); -cout << "BS0=" << itsBrokerSocket << endl; _setupConnection(); } @@ -99,7 +98,6 @@ EventPort::~EventPort() // bool EventPort::connect() { -cout << "BS1=" << itsBrokerSocket << endl; return (_setupConnection()); } @@ -125,7 +123,6 @@ bool EventPort::_setupConnection() // No connection to the SB yet? if (itsStatus == EP_CREATED) { LOG_DEBUG ("Trying to make connection with the ServiceBroker"); -cout << "BS2=" << itsBrokerSocket << endl; itsBrokerSocket->connect(itsSyncComm ? -1 : 500); // try to connect, wait forever or 0.5 sec if (!itsBrokerSocket->isConnected()) { // failed? if (itsSyncComm) { @@ -139,46 +136,46 @@ cout << "BS2=" << itsBrokerSocket << endl; // second step: ask service broker the portnumber if (itsStatus == EP_SEES_SB) { - if (!_askBrokerThePortNumber()) { + itsStatus += _askBrokerThePortNumber(); + if (itsStatus <= EP_SEES_SB) { if (itsSyncComm) { ASSERTSTR(false, "Cannot contact the ServiceBroker"); } return (false); // async socket allows failures. } - itsStatus = EP_WAIT_FOR_SB_ANSWER; } // third step: wait for the answer of the SB if (itsStatus == EP_WAIT_FOR_SB_ANSWER) { - if (!_waitForSBAnswer()) { + itsStatus += _waitForSBAnswer(); + if (itsStatus <= EP_WAIT_FOR_SB_ANSWER) { if (itsSyncComm) { ASSERTSTR(false, "Cannot contact the other party"); } return (false); // async socket allows failures. } - itsStatus = EP_KNOWS_DEST; } // fourth step: try to connect to the real socket. if (itsStatus == EP_KNOWS_DEST) { - if (!_startConnectionToPeer()) { + itsStatus += _startConnectionToPeer(); + if (itsStatus <= EP_KNOWS_DEST) { if (itsSyncComm) { ASSERTSTR(false, "Cannot contact the other party"); } return (false); // async socket allows failures. } - itsStatus = EP_CONNECTING; } // fifth step: wait for response of connection request if (itsStatus == EP_CONNECTING) { - if (!_waitForPeerResponse()) { + itsStatus += _waitForPeerResponse(); + if (itsStatus <= EP_CONNECTING) { if (itsSyncComm) { ASSERTSTR(false, "Cannot contact the other party"); } return (false); // async socket allows failures. } - itsStatus = EP_CONNECTED; } return (itsStatus == EP_CONNECTED); @@ -187,7 +184,7 @@ cout << "BS2=" << itsBrokerSocket << endl; // // send a message to the message broker requesting a portnumber // -bool EventPort::_askBrokerThePortNumber() +int32 EventPort::_askBrokerThePortNumber() { // construct the question. if (itsIsServer) { @@ -210,7 +207,7 @@ bool EventPort::_askBrokerThePortNumber() sendEvent(itsBrokerSocket, &request); } - return (true); + return (1); // goto next state } // @@ -218,12 +215,12 @@ bool EventPort::_askBrokerThePortNumber() // // Wait until we received a response from the service broker. // -bool EventPort::_waitForSBAnswer() +int32 EventPort::_waitForSBAnswer() { // wait for response GCFEvent* answerEventPtr(receiveEvent(itsBrokerSocket)); if (!answerEventPtr) { - return (false); + return (0); // stay in this mode } // a complete event was received, handle it. @@ -231,7 +228,7 @@ bool EventPort::_waitForSBAnswer() SBServiceRegisteredEvent response(*answerEventPtr); if (response.result != 0) { LOG_ERROR_STR("Service " << itsServiceName << " is already on the air."); - return (false); + return (-1); // next time ask again } itsPort = response.portnumber; LOG_DEBUG_STR("Service " << itsServiceName << " will be at port " << itsPort); @@ -241,7 +238,7 @@ bool EventPort::_waitForSBAnswer() SBServiceInfoEvent response(*answerEventPtr); if (response.result != 0) { LOG_ERROR_STR("Service " << itsServiceName << " is unknown"); - return (false); + return (-1); // next time ask again } itsPort = response.portnumber; LOG_DEBUG_STR("Service " << itsServiceName << " is at port " << itsPort); @@ -251,13 +248,13 @@ bool EventPort::_waitForSBAnswer() itsBrokerSocket = 0; } - return (true); + return (1); // continue with next stage } // // _startConnectionToPeer() // -bool EventPort::_startConnectionToPeer() +int32 EventPort::_startConnectionToPeer() { // Finally we can make the real connection if (itsIsServer) { @@ -270,18 +267,18 @@ bool EventPort::_startConnectionToPeer() itsSocket = new Socket(itsServiceName, itsHost, toString(itsPort), Socket::TCP); itsSocket->connect(itsSyncComm ? -1 : 500); // try to connect, wait forever or 0.5 sec } - return (true); + return (1); } // // _waitForPeerResponse() // -bool EventPort::_waitForPeerResponse() +int32 EventPort::_waitForPeerResponse() { // connection (already) successful? if (itsSocket->isConnected()) { itsSocket->setBlocking(itsSyncComm); - return (true); + return (1); } // do a retry @@ -292,7 +289,17 @@ bool EventPort::_waitForPeerResponse() itsSocket->connect(itsSyncComm ? -1 : 500); // try to connect, wait forever or 0.5 sec } - return (false); + return (0); +} + +// +// _peerClosedConnection() +// +void EventPort::_peerClosedConnection() +{ + // yeah.... what is wise to do here???? + ASSERTSTR(false, "Other side closed the connection, bailing out"); + } // @@ -360,6 +367,10 @@ GCFEvent* EventPort::receiveEvent(Socket* aSocket) // first read signal (= eventtype) field if (gReadState == 0) { btsRead = aSocket->read((void*) &(header->signal), sizeof(header->signal)); + if (btsRead < 0) { + _peerClosedConnection(); + return (0); + } if (btsRead != sizeof(header->signal)) { if (aSocket->isBlocking()) { ASSERTSTR(false, "Event-type was not received"); @@ -372,6 +383,10 @@ GCFEvent* EventPort::receiveEvent(Socket* aSocket) // next read the length of the rest of the message if (gReadState == 1) { btsRead = aSocket->read((void*) &(header->length), sizeof(header->length)); + if (btsRead < 0) { + _peerClosedConnection(); + return (0); + } if (btsRead != sizeof(header->length)) { if (aSocket->isBlocking()) { ASSERTSTR(false, "Event-length was not received"); @@ -389,6 +404,10 @@ GCFEvent* EventPort::receiveEvent(Socket* aSocket) btsRead = 0; if (gBtsToRead) { btsRead = aSocket->read(&receiveBuffer[sizeof(GCFEvent) + gTotalBtsRead], gBtsToRead); + if (btsRead < 0) { + _peerClosedConnection(); + return (0); + } if (btsRead != gBtsToRead) { if (aSocket->isBlocking()) { ASSERTSTR(false, "Only " << btsRead << " bytes of msg read: " << gBtsToRead); diff --git a/MAC/GCF/TM/test/Echo.cc b/MAC/GCF/TM/test/Echo.cc index f636d3b3dca344b0c2a6081a46e8644c13113143..38cf27a13e5e14291fdee271f34d425db6091c6a 100644 --- a/MAC/GCF/TM/test/Echo.cc +++ b/MAC/GCF/TM/test/Echo.cc @@ -26,6 +26,10 @@ #include "Echo_Protocol.ph" #include <Common/lofar_iostream.h> +static int gDelay = 0; +static timeval gTime; +static int gSeqnr; + namespace LOFAR { namespace GCF { namespace TM { @@ -43,90 +47,104 @@ GCFEvent::TResult Echo::initial(GCFEvent& e, GCFPortInterface& /*p*/) { GCFEvent::TResult status = GCFEvent::HANDLED; - switch (e.signal) { - case F_INIT: - break; + switch (e.signal) { + case F_INIT: + break; case F_ENTRY: case F_TIMER: - if (!server.isConnected()) - server.open(); - break; + if (!server.isConnected()) + server.open(); + break; case F_CONNECTED: - if (server.isConnected()) { - TRAN(Echo::connected); - } - break; + if (server.isConnected()) { + TRAN(Echo::connected); + } + break; case F_DISCONNECTED: - if (!server.isConnected()) { - server.setTimer(1.0); // try again after 1 second - } - break; + if (!server.isConnected()) { + server.setTimer(1.0); // try again after 1 second + } + break; default: - status = GCFEvent::NOT_HANDLED; - break; - } + status = GCFEvent::NOT_HANDLED; + break; + } - return status; + return (status); } GCFEvent::TResult Echo::connected(GCFEvent& e, GCFPortInterface& p) { - GCFEvent::TResult status = GCFEvent::HANDLED; - - switch (e.signal) { - case F_DISCONNECTED: - cout << "Lost connection to client" << endl; - p.close(); - break; - case F_CLOSED: - TRAN(Echo::initial); - break; - - case ECHO_PING: { - EchoPingEvent ping(e); - // for instance these 3 lines can force an interrupt on the parallele port if - // the pin 9 and 10 are connected together. The interrupt can be seen by means of - // the F_DATAIN signal (see below) - // Send a string, which starts with a 'S'|'s' means simulate an clock pulse interrupt - // (in the case below only one character is even valid) - // otherwise an interrupt will be forced by means of setting the pin 9. - cout << "PING received (seqnr=" << ping.seqnr << ")" << endl; - - timeval echo_time; - gettimeofday(&echo_time, 0); - EchoEchoEvent echo; - echo.seqnr = ping.seqnr; - echo.ping_time = ping.ping_time; - echo.echo_time = echo_time; - - server.send(echo); - - cout << "ECHO sent" << endl; - break; - } - - case F_DATAIN: { - cout << "Clock pulse: "; - // always the recv has to be invoked. Otherwise this F_DATAIN keeps comming - // on each select - char pulse[4096]; // size >= 1 - p.recv(pulse, 4096); // will always return 1 if an interrupt was occured and 0 if not - pulse[1] = 0; // if interrupt occured the first char is filled with a '0' + number of occured interrupts - // in the driver sinds the last recv - cout << pulse << endl; - break; - } - - default: - status = GCFEvent::NOT_HANDLED; - break; - } - - return status; + GCFEvent::TResult status = GCFEvent::HANDLED; + + switch (e.signal) { + case F_DISCONNECTED: + cout << "Lost connection to client" << endl; + p.close(); + break; + + case F_CLOSED: + TRAN(Echo::initial); + break; + + case ECHO_PING: { + EchoPingEvent ping(e); + // for instance these 3 lines can force an interrupt on the parallele port if + // the pin 9 and 10 are connected together. The interrupt can be seen by means of + // the F_DATAIN signal (see below) + // Send a string, which starts with a 'S'|'s' means simulate an clock pulse interrupt + // (in the case below only one character is even valid) + // otherwise an interrupt will be forced by means of setting the pin 9. + cout << "PING received (seqnr=" << ping.seqnr << ")" << endl; + cout << "delay = " << gDelay << endl; + server.setTimer((gDelay < 0) ? -1.0 * gDelay : 1.0 * gDelay); + gSeqnr = ping.seqnr; + gTime = ping.ping_time; + break; + } + + case F_TIMER: { + if (gDelay < 0) { + p.close(); + cout << "connection closed by me." << endl; + break; + } + + timeval echo_time; + gettimeofday(&echo_time, 0); + EchoEchoEvent echo; + echo.seqnr = gSeqnr; + echo.ping_time = gTime; + echo.echo_time = echo_time; + + server.send(echo); + + cout << "ECHO sent" << endl; + break; + } + + case F_DATAIN: { + cout << "Clock pulse: "; + // always the recv has to be invoked. Otherwise this F_DATAIN keeps comming + // on each select + char pulse[4096]; // size >= 1 + p.recv(pulse, 4096); // will always return 1 if an interrupt was occured and 0 if not + pulse[1] = 0; // if interrupt occured the first char is filled with a '0' + number of occured interrupts + // in the driver sinds the last recv + cout << pulse << endl; + break; + } + + default: + status = GCFEvent::NOT_HANDLED; + break; + } + + return (status); } } // namespace TM @@ -134,16 +152,29 @@ GCFEvent::TResult Echo::connected(GCFEvent& e, GCFPortInterface& p) } // namespace LOFAR using namespace LOFAR::GCF::TM; +using namespace std; int main(int argc, char* argv[]) { - GCFTask::init(argc, argv); - - Echo echo_task("ECHO"); - - echo_task.start(); // make initial transition - - GCFTask::run(); - - return 0; + GCFTask::init(argc, argv); + + switch (argc) { + case 1: gDelay = 0; + break; + case 2: gDelay = atoi(argv[1]); + break; + default: + cout << "Syntax: " << argv[0] << " [delay]" << endl; + cout << " When delay is a positive value the server will wait that many seconds " << + "before sending the respons." << endl; + cout << " When delay is a negative value the server will disconnect after that " << + "many seconds without sending the answer." << endl; + return (1); + } + + Echo echo_task("ECHO"); + echo_task.start(); // make initial transition + GCFTask::run(); + + return (0); } diff --git a/MAC/GCF/TM/test/tEventPort.cc b/MAC/GCF/TM/test/tEventPort.cc index 110a78ea1b74790146b551540f40da6dcd66fab2..4d4e63dc7c25f108db08906908ebab0bbfcdffb9 100644 --- a/MAC/GCF/TM/test/tEventPort.cc +++ b/MAC/GCF/TM/test/tEventPort.cc @@ -30,42 +30,72 @@ using namespace LOFAR; using namespace LOFAR::GCF; using namespace LOFAR::GCF::TM; -int main (int32 argc, char*argv[]) { +static EchoPingEvent pingEvent; +static EventPort* echoPort; + +int main (int32 argc, char*argv[]) +{ + bool syncMode; + switch (argc) { + case 2: + syncMode = (argv[1][0] == 's' || argv[1][0] == 'S'); + break; + default: + cout << "Syntax: " << argv[0] << " a | s" << endl; + cout << " the argument ('a' or 's') chooses the asynchrone or synchrone behaviour" << endl + << "of the EventPort. The synchrone mode expects the ServiceBroker and the" << endl + << "Echo server running. If not it will assert. In this mode the EventPort" << endl + << "wait forever for the answer." << endl + << "In asynchrone mode the EventPort will never complain and try forever to" << endl + << "reach each of the connection stages and receive the messages. You can" << endl + << "check the EventPort.status() to see in what state the port is." << endl; + return (1); + } INIT_LOGGER("tEventPort"); - - EventPort echoPort("ECHO:EchoServer", false, ECHO_PROTOCOL, "", true); // syncComm -#if 0 - RSPGetconfigEvent getConfig; - rspPort.send(&getConfig); + // open port + LOG_DEBUG_STR ("Operating in " << ((syncMode) ? "" : "a") << "synchrone mode."); + echoPort = new EventPort("ECHO:EchoServer", false, ECHO_PROTOCOL, "", syncMode); - RSPGetconfigackEvent ack(rspPort.receive()); - cout << "NrRCUs = " << ack.n_rcus << endl; - cout << "NrRSPboards = " << ack.n_rspboards << endl; - cout << "MaxRSPboards = " << ack.max_rspboards << endl; -#else - // note this code will not work but it compiles. - LOG_DEBUG("going to create an event"); - EchoPingEvent pingEvent; + // construct event pingEvent.seqnr = 25; timeval pingTime; gettimeofday(&pingTime, 0); pingEvent.ping_time = pingTime; - LOG_DEBUG("going to send the event"); - echoPort.send(&pingEvent); + LOG_DEBUG("sending the ping event"); + if (syncMode) { + // NOTE: we could also use the while-loop of the asyncmode here. + echoPort->send(&pingEvent); + } + else { + while (!echoPort->send(&pingEvent)) { + cout << "state = " << echoPort->getStatus() << endl; + sleep (1); + ; + } + } LOG_DEBUG("going to wait for the answer event"); - EchoEchoEvent ack(*(echoPort.receive())); - LOG_DEBUG_STR("seqnr: " << ack.seqnr); - double someTime; - someTime = 1.0 * ack.ping_time.tv_sec + (ack.ping_time.tv_usec / 1000000); - LOG_DEBUG_STR("ping : " << someTime); - someTime = 1.0 * ack.echo_time.tv_sec + (ack.echo_time.tv_usec / 1000000); - LOG_DEBUG_STR("pong : " << someTime); + GCFEvent* ackPtr; + if (syncMode) { + // NOTE: we could also use the while-loop of the asyncmode here. + ackPtr = echoPort->receive(); + } + else { + while (!(ackPtr = echoPort->receive())) { + cout << "state = " << echoPort->getStatus() << endl; + sleep (1); + ; + } + } + EchoEchoEvent ack(*ackPtr); -#endif + LOG_DEBUG_STR("seqnr: " << ack.seqnr); + double delta = (1.0 * ack.echo_time.tv_sec + (ack.echo_time.tv_usec / 1000000.0)); + delta -= (1.0 * ack.ping_time.tv_sec + (ack.ping_time.tv_usec / 1000000.0)); + LOG_DEBUG_STR("dTime: " << delta << " sec"); return (0); }