Commit 67a03e81 authored by Leon Hiemstra's avatar Leon Hiemstra

implemented mutexes properly when server (re)init

parent e7603605
......@@ -86,7 +86,9 @@ static UA_StatusCode UNB_MethodCallback(UA_Server *server,
cout <<"inputStr->data=" << inputStr->data <<endl;
try {
while(!SD.opcua_mutex.try_lock()) { cerr << "mutex not ready\n"; usleep(100000); }
SD.unb->read(termout,std::string((char *)inputStr->data),0,-1); // TODO: make read/write
SD.opcua_mutex.unlock();
} catch(runtime_error& e) {
cerr << "UNB_MethodCallback: " << e.what() << endl;
termout.strout << e.what();
......@@ -139,9 +141,13 @@ static void add_UNB_Method(UA_Server *server, std::string regname)
1, &inputArgument, 1, &outputArgument, NULL, NULL);
}
// FIXME: test the SD.ready* lines first
int ua_server_init(void)
int ua_server_init(bool warm_start)
{
if(!warm_start) {
mUaServer = UA_Server_new();
UA_ServerConfig_setDefault(UA_Server_getConfig(mUaServer));
}
TermOutput termout;
std::vector<int> nodes = SD.unb->get_nodes(); // all nodes
......@@ -160,19 +166,8 @@ int ua_server_init(void)
return 0;
}
int ua_server_delete(void)
{
//UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "UA Server delete nodes");
return 0;
}
int ua_server(void)
{
mUaServer = UA_Server_new();
UA_ServerConfig_setDefault(UA_Server_getConfig(mUaServer));
ua_server_init();
UA_StatusCode retval = UA_Server_run(mUaServer, &ServerRunning);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "UA Server Stopped");
......
......@@ -25,8 +25,7 @@
using namespace std;
int ua_server_init(void);
int ua_server_delete(void);
int ua_server_init(bool warm_start);
int ua_server(void);
#endif // UA_SERVER_H
......@@ -76,7 +76,9 @@ void monitor(void)
while(ServerRunning) {
usleep(1000000);
while(!SD.monitor_mutex.try_lock()) { cerr << "mutex not ready\n"; usleep(100000); }
cmdstatusnew = Cmd.command("read /unb0/pn[0:2]/dev/system", termout, cmdname, &SD);
SD.monitor_mutex.unlock();
if(cmdstatusnew.status != CMD_EMPTY) cmdstatus = cmdstatusnew;
cout << "Monitor thread: " << print_termout(termout) << endl;
termout.clear();
......@@ -88,7 +90,6 @@ void control_server(TCPSSocket *sock, const int clientId, std::atomic<size_t> *n
bool quit=false;
while(ServerRunning) {
try {
SD.ready_client[clientId] = true;
SD.controlSocket[clientId] = new TCPCSSocket(sock->listen());
string banner = "output={ SDP to Uniboard Translator " + string(SDPUNB_VERSION)
+ ". (use 'help' for available commands) }\n";
......@@ -118,10 +119,9 @@ void control_server(TCPSSocket *sock, const int clientId, std::atomic<size_t> *n
pos = sbuf.find("\r");
if (pos != string::npos) tcpbuf[pos] = 0;
while(!SD.ready_init) { cout << "init not ready\n"; usleep(100000); }
SD.ready_client[clientId] = false;
while(!SD.control_mutex[clientId].try_lock()) { cerr << "mutex not ready\n"; usleep(100000); }
cmdstatusnew = Cmd.command(string((char *)tcpbuf), termout, cmdname, &SD);
SD.ready_client[clientId] = true;
SD.control_mutex[clientId].unlock();
if(cmdstatusnew.status != CMD_EMPTY) cmdstatus = cmdstatusnew;
string s = print_termout(termout);
SD.controlSocket[clientId]->tx((unsigned char *)s.c_str(), strlen(s.c_str()));
......@@ -140,16 +140,7 @@ void control_server(TCPSSocket *sock, const int clientId, std::atomic<size_t> *n
(*nthreads)--;
}
// FIXME: find a way to 'disable interrupts'
void wait_for_clients_to_start_init(void)
{
for(uint c=0; c<c_MAX_CONTROL_SERVERS; c++) {
while(!SD.ready_client[c]) { cout << "client not ready\n"; usleep(100000); }
}
SD.ready_init = false;
}
void server_init(void)
void server_init(bool warm_start)
{
ostringstream sstr;
list<class NODE_config> NC;
......@@ -169,7 +160,17 @@ void server_init(void)
}
cout << "Hint: follow the syslog: tail -f /var/log/localmessages" << endl;
wait_for_clients_to_start_init();
cerr << "locking all mutexes for (re)init..." << endl;
for(uint c=0; c<c_MAX_CONTROL_SERVERS; c++) {
SD.control_mutex[c].lock();
}
SD.monitor_mutex.lock();
SD.opcua_mutex.lock();
cerr << "done, it is mine. Now (re)init all" << endl;
if(warm_start) {
delete SD.unb;
}
list<class Node*> nodelist;
for(auto nc : NC) {
......@@ -182,13 +183,16 @@ void server_init(void)
nodelist.push_back(node);
}
SD.unb = new UniboardMap(nodelist);
SD.ready_init = true;
}
void server_delete(void)
{
wait_for_clients_to_start_init();
delete SD.unb;
ua_server_init(warm_start);
cout << "unlocking all mutexes...";
for(uint c=0; c<c_MAX_CONTROL_SERVERS; c++) {
SD.control_mutex[c].unlock();
}
SD.monitor_mutex.unlock();
SD.opcua_mutex.unlock();
cout << "...done" << endl;
}
static void stopHandler(int signal)
......@@ -202,11 +206,7 @@ static void stopHandler(int signal)
static void hupHandler(int signal)
{
cerr << "received HUP signal: reload" << endl;
server_delete();
ua_server_delete();
server_init();
ua_server_init();
server_init(true);
}
int main (int argc, char* argv[])
......@@ -277,10 +277,9 @@ int main (int argc, char* argv[])
for(uint c=0; c<c_MAX_CONTROL_SERVERS; c++) {
SD.tcpport = control_tcpport;
SD.verbose[c] = verbose;
SD.ready_client[c] = true;
}
server_init();
server_init(false);
TCPSSocket sock(control_tcpport, "", c_MAX_CONTROL_SERVERS);
for(uint c=0; c<c_MAX_CONTROL_SERVERS; c++) {
......@@ -298,7 +297,7 @@ int main (int argc, char* argv[])
closelog(); // close syslog connection
server_delete();
delete SD.unb;
exit(EXIT_SUCCESS);
} catch(exception& e) {
......
......@@ -25,12 +25,13 @@ using namespace std;
#include <list>
#include <iostream>
#include <fstream>
#include <atomic>
#include <mutex>
#include "map.h"
#include "io/tcpsocket.h"
#define c_MAX_CONTROL_SERVERS 3
#define c_MAX_CONTROL_SERVERS 3
#define c_TCP_CONTROL_SOCKET 3335
#define c_TCP_CONTROL_BUFFERSIZE 1000
......@@ -42,8 +43,9 @@ public:
struct timespec t0;
TCPCSSocket *controlSocket[c_MAX_CONTROL_SERVERS];
std::string configfile;
std::atomic<bool> ready_client[c_MAX_CONTROL_SERVERS];
std::atomic<bool> ready_init;
std::mutex control_mutex[c_MAX_CONTROL_SERVERS];
std::mutex monitor_mutex;
std::mutex opcua_mutex;
};
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment