Skip to content
Snippets Groups Projects
Commit c200e3fa authored by Kenneth Hiemstra's avatar Kenneth Hiemstra
Browse files

Multi client now. Replaced boost tcp socket with normal.

parent bb209cee
No related branches found
No related tags found
No related merge requests found
...@@ -11,7 +11,7 @@ AM_CXXFLAGS = -std=c++11 -pedantic -Wall -Woverloaded-virtual -Wwrite-strings -D ...@@ -11,7 +11,7 @@ AM_CXXFLAGS = -std=c++11 -pedantic -Wall -Woverloaded-virtual -Wwrite-strings -D
# #
############################################################################ ############################################################################
io_SOURCES = udpsocket.cpp udpsocket.h unbos.cpp unbos.h io_SOURCES = udpsocket.cpp udpsocket.h unbos.cpp unbos.h tcpsocket.h tcpsocket.cpp
# the sources to add to the library and to add to the source distribution # the sources to add to the library and to add to the source distribution
libio_a_SOURCES = $(io_SOURCES) libio_a_SOURCES = $(io_SOURCES)
......
//
// Copyright (c) 2009 G.W. Kant, Astron, The Netherlands
// E. van der Wal, Astron, The Netherlands
//
#include <unistd.h>
#include <stdlib.h>
#include <sys/ioctl.h>
#include <arpa/inet.h> // htons
#include <sys/types.h>
#include <sys/wait.h>
#include <netdb.h>
#include <fcntl.h>
#include <syslog.h>
#include <errno.h>
#include <iostream>
#include <sstream>
#include <string.h>
#include "tcpsocket.h"
/*
TCP Server Socket listens to port p and binds to interface iface.
Set iface="" to not bind to an interface.
*/
TCPSSocket::TCPSSocket(uint16_t p, const char* iface, int maxservers) throw(const char *)
{
port = p;
MaxServers = maxservers;
sock = socket(PF_INET, SOCK_STREAM, 0);
if (sock < 0)
throw "TCPSSocket: socket creation error";
int val = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
&val, sizeof(val)) < 0) {
close(sock);
cerr << "Error: " << strerror(errno) << endl;
throw "TCPSSocket: cannot set socket option SO_REUSEADDR";
}
// this requires root permission:
// if (setsockopt(sock, SOL_SOCKET, SO_BINDTODEVICE,
// (void*) iface, strlen(iface)) < 0) {
// close(sock);
// cerr << "Error: " << strerror(errno) << endl;
//
// throw "TCPSSocket: cannot set socket option SO_BINDTODEVICE";
// }
/* Give the socket a name. */
name.sin_family = AF_INET;
name.sin_port = htons (port);
name.sin_addr.s_addr = htonl (INADDR_ANY);
if (bind (sock, (struct sockaddr *) &name, sizeof (name)) < 0) {
close(sock);
cerr << "Error: " << strerror(errno) << endl;
throw "TCPSSocket: cannot bind socket";
}
}
TCPSSocket::~TCPSSocket()
{
close(sock);
}
int TCPSSocket::listen(void) throw(const char *)
{
#define USE_NO_FORK
#ifdef USE_NO_FORK
ostringstream strs;
#else
int Servers=1, status;
#endif
syslog(LOG_INFO,"Listening to tcp port %d\n",port);
if (::listen (sock, 1) < 0)
throw "TCPSSocket::listen: cannot put socket into listening state";
#ifdef USE_NO_FORK
/* Block until input arrives on the server socket. */
int nsock;
struct sockaddr_in clientname;
socklen_t size = sizeof (clientname);
nsock = accept (sock,(struct sockaddr *) &clientname,&size);
if (nsock < 0) {
cerr << "Error TCPSSocket::listen() accept(): " << strerror(errno) << endl;
exit(-1);
}
struct hostent *he;
he = gethostbyaddr(&clientname.sin_addr, sizeof clientname.sin_addr, AF_INET);
strs << "Server: Client connect from host "
<< (he ? he->h_name : inet_ntoa (clientname.sin_addr)) << " (" << inet_ntoa (clientname.sin_addr)
<< "):" << ntohs(clientname.sin_port) << endl;
syslog(LOG_INFO,strs.str().c_str());
return nsock;
#else
while (1) {
/* Block until input arrives on the server socket. */
int nsock;
struct sockaddr_in clientname;
socklen_t size = sizeof (clientname);
pid_t pid = fork();
if (pid < 0)
throw "TCPSSocket::listen: cannot fork a child process";
else if (pid) { // We are the parent
++Servers;
cerr << "Child " << pid << endl;
if (Servers>MaxServers) {
cerr << "Maximum number of servers (" << MaxServers << ") reached" << endl;
int cpid = wait(&status);
cerr << "Child " << cpid << " terminated" << endl;
--Servers;
}
}
else { // We are child
nsock = accept (sock,
(struct sockaddr *) &clientname,
&size);
if (nsock < 0)
exit(-1);
cerr << "Server: Client connect from host "
<< inet_ntoa (clientname.sin_addr)
<< ":" << ntohs (clientname.sin_port)
<< endl;
return nsock;
}
}
#endif
}
TCPCSSocket::TCPCSSocket(int s)
{
sock = s;
}
TCPCSSocket::~TCPCSSocket()
{
close(sock);
}
size_t TCPCSSocket::rx(unsigned char *buf, size_t len) throw(const char*)
/*
Receive max len bytes from a TCP socket
This one stops after one packet and returns the
number of actual received bytes
*/
{
size_t nrx = 0;
ssize_t ret;
do {
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(sock, &readfds);
// No time out
if(select(sock + 1, &readfds, NULL, NULL, NULL) == -1) {
throw "TCPCSSocket::rx: select error";
}
if FD_ISSET(sock, &readfds) {
ret = recvfrom(sock, buf, len, 0, NULL, NULL);
if(ret == -1) {
throw "TCPCSSocket::rx(): recvfrom=-1 error";
} else if(ret == 0) {
throw "TCPCSSocket::rx(): recvfrom=0 peer orderly shutdown";
}
nrx += ret;
break;
}
} while (1);
return nrx;
}
size_t TCPCSSocket::_rx(unsigned char *buf, size_t len) throw(const char*)
/*
Receive len bytes from a TCP socket
*/
{
size_t nrx = 0;
ssize_t ret;
//struct timeval timeout;
//timeout.tv_sec = timeoutms / 1000;
//timeout.tv_usec = ( timeoutms % 1000 ) * 1000;
// Receive a datagram from a tx source
do {
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(sock, &readfds);
// No time out
if(select(sock + 1, &readfds, NULL, NULL, NULL) == -1) {
throw "TCPCSSocket::_rx: select error";
}
if FD_ISSET(sock, &readfds) {
ret = recvfrom(sock, &buf[nrx], len-nrx, 0, NULL, NULL);
if(ret == -1) {
throw "TCPCSSocket::_rx(): recvfrom=-1 error";
} else if(ret == 0) {
throw "TCPCSSocket::_rx(): recvfrom=0 peer orderly shutdown";
}
nrx += ret;
}
if (nrx==len)
break;
} while (1);
return nrx;
}
size_t TCPCSSocket::tx(const unsigned char* mes, size_t len) throw(const char*)
{
int ntxbytes = write(sock, mes, len);
if (ntxbytes < 0)
throw "TCPCSSocket::tx(): could not send message";
return ntxbytes;
}
#ifndef TCPSOCKET_H
#define TCPSOCKET_H
//
// $Id$
//
// Copyright (c) 2009 G.W. Kant, Astron, The Netherlands
// E. van der Wal, Astron, The Netherlands
//
#include <sys/socket.h>
#include <stdio.h>
using namespace std;
/*
TCP Server Socket listens to port p and binds to interface iface.
Set iface="" to not to bind to an interface.
*/
class TCPSSocket {
struct sockaddr_in name;
struct hostent* hostinfo;
int sock;
uint16_t port;
int MaxServers;
public:
TCPSSocket(uint16_t p, const char* ifname, int maxservers=1) throw(const char*);
~TCPSSocket();
int listen(void) throw(const char*);
};
// Connected TCP server socket
class TCPCSSocket {
protected:
int sock;
public:
TCPCSSocket(int descr = -1);
~TCPCSSocket();
void Shutdown() { shutdown(sock,SHUT_RDWR); }
size_t rx(unsigned char* buf, size_t size) throw(const char*);
size_t _rx(unsigned char* buf, size_t size) throw(const char*);
size_t tx(const unsigned char* data, size_t size) throw(const char*);
};
#endif // TCPSOCKET_H
...@@ -16,22 +16,17 @@ ...@@ -16,22 +16,17 @@
#include <condition_variable> #include <condition_variable>
#include <syslog.h> #include <syslog.h>
#include <signal.h> #include <signal.h>
#include <boost/asio.hpp>
#include <boost/iostreams/stream.hpp>
#include <boost/numeric/ublas/matrix.hpp>
#include <boost/numeric/ublas/io.hpp>
#include <boost/program_options.hpp> #include <boost/program_options.hpp>
#include <boost/program_options/parsers.hpp> #include <boost/program_options/parsers.hpp>
using namespace std; using namespace std;
using boost::asio::ip::tcp;
using namespace boost::numeric::ublas;
namespace po = boost::program_options; namespace po = boost::program_options;
#include "sdpunb.h" #include "sdpunb.h"
#include "tools/parse.h" #include "tools/parse.h"
#include "cmd.h" #include "cmd.h"
#include "unb_config.h" #include "unb_config.h"
#include "io/tcpsocket.h"
/* Global var */ /* Global var */
volatile int ServerQuit = 0; volatile int ServerQuit = 0;
...@@ -40,28 +35,34 @@ bool USE_THREADS=true; ...@@ -40,28 +35,34 @@ bool USE_THREADS=true;
const char* SDPUNB_VERSION= " (testing) " __DATE__ " " __TIME__; const char* SDPUNB_VERSION= " (testing) " __DATE__ " " __TIME__;
void print_termout(iostream *strm,TermOutput& termout)
string print_termout(TermOutput& termout)
{ {
*strm << "output={\n"; string s = "output={\n";
*strm << termout.strout.str(); s += termout.strout.str();
*strm << "}\n"; s += "}\n";
if(termout.Verbose) { if(termout.Verbose) {
*strm << "human={\n"; s += "human={\n";
*strm << termout.strhuman.str(); s += termout.strhuman.str();
*strm << "}\n"; s += "}\n";
} }
*strm << "errors={\n"; s += "errors={\n";
*strm << termout.strerr.str(); s += termout.strerr.str();
*strm << "}\n"; s += "}\n";
return s;
} }
int Run(Serverdat *sd, iostream *strm) void control_server(TCPSSocket *sock, Serverdat *sd, const int clientId, std::atomic<size_t> *nthreads)
{ {
bool quit=false; bool quit=false;
while(!ServerQuit) {
try { try {
*strm << "output={ SDP to Uniboard Translator " << SDPUNB_VERSION //cout << "control server thread[" << clientId << "] started" << endl;
<< ". (use 'help' for available commands) }" << endl; sd->controlSocket[clientId] = new TCPCSSocket(sock->listen());
string banner = "output={ SDP to Uniboard Translator " + string(SDPUNB_VERSION)
+ ". (use 'help' for available commands) }\n";
sd->controlSocket[clientId]->tx((unsigned char *)banner.c_str(),strlen(banner.c_str()));
CMD Cmd; CMD Cmd;
string line; string line;
...@@ -69,59 +70,63 @@ int Run(Serverdat *sd, iostream *strm) ...@@ -69,59 +70,63 @@ int Run(Serverdat *sd, iostream *strm)
CMDstatus cmdstatus={CMD_STATUS_OK, 0,0}; CMDstatus cmdstatus={CMD_STATUS_OK, 0,0};
CMDstatus cmdstatusnew=cmdstatus; CMDstatus cmdstatusnew=cmdstatus;
TermOutput termout(sd->verbose); TermOutput termout(sd->verbose[clientId]);
unsigned char tcpbuf[1000];
while(!quit) { while(!quit) {
*strm << "sdpunb:" <<cmdname<<":"<< cmdstatus.status << ":CMD>"; string prompt = "sdpunb:" + cmdname + ":" + to_string(cmdstatus.status) + ":CMD>";
if(!getline(*strm, line)) { throw runtime_error("lost connection"); } sd->controlSocket[clientId]->tx((unsigned char *)prompt.c_str(),strlen(prompt.c_str()));
cmdstatusnew=Cmd.command(line,termout,cmdname,sd);
if(cmdstatusnew.status!=CMD_EMPTY) cmdstatus=cmdstatusnew;
print_termout(strm,termout); int len = sd->controlSocket[clientId]->rx(tcpbuf,1000);
if(len < 1) {
quit = true;
} else {
// With telnet, a \r\n is added to the transmitted string, get rid of them.
string sbuf((char *)tcpbuf);
size_t pos = sbuf.find("\n");
if (pos!=string::npos) tcpbuf[pos]=0;
pos = sbuf.find("\r");
if (pos!=string::npos) tcpbuf[pos]=0;
cmdstatusnew=Cmd.command(string((char *)tcpbuf),termout,cmdname,sd);
if(cmdstatusnew.status!=CMD_EMPTY) cmdstatus=cmdstatusnew;
string s = print_termout(termout);
sd->controlSocket[clientId]->tx((unsigned char *)s.c_str(),strlen(s.c_str()));
termout.clear(); termout.clear();
if(cmdstatus.status==CMD_REQUEST_QUIT) { if(cmdstatus.status==CMD_REQUEST_QUIT) {
quit = true; quit = true;
} }
} }
return 0;
} catch(exception& e) {
syslog(LOG_WARNING,"Catch at Run(): %s (closing user connection)\n",e.what());
} catch(...) {
syslog(LOG_WARNING,"Catch ... at Run(): (closing user connection)\n");
} }
syslog(LOG_INFO,"Client is disconnected\n"); //cerr << "client disconnect" << endl;
return 1; sd->controlSocket[clientId]->Shutdown();
delete sd->controlSocket[clientId];
} catch(const char *str) {
//cerr << "catch at server: " << str << " (closing user connection)" << endl;
delete sd->controlSocket[clientId];
} }
void server(Serverdat *sd, std::atomic<size_t> *nthreads)
{
boost::asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), sd->tcpport));
while (!ServerQuit) {
tcp::iostream stream;
acceptor.accept(*stream.rdbuf());
Run(sd,&stream);
} }
(*nthreads)--; (*nthreads)--;
} }
int main (int argc, char* argv[]) int main (int argc, char* argv[])
{ {
// defaults: // defaults:
int tcpport = 3338; int control_tcpport = 3335;
bool nodaemon = false; bool nodaemon = false;
int verbose = 1; int verbose = 1;
debug=0; debug=0;
string configfile = "/etc/uniboard.conf"; string configfile = "/etc/uniboard.conf";
std::atomic<size_t> nthreads(0);
USE_THREADS=true; USE_THREADS=true;
std::atomic<size_t> ncthreads(0);
std::thread *control_server_thread[c_MAX_CONTROL_SERVERS];
po::options_description desc("Allowed options"); po::options_description desc("Allowed options");
desc.add_options() desc.add_options()
("help,h", "shows this help text") ("help,h", "shows this help text")
("version,v", "prints sdpunb version info") ("version,v", "prints sdpunb version info")
("tcpport", po::value<int>(&tcpport)->default_value(tcpport), ("tcpport", po::value<int>(&control_tcpport)->default_value(control_tcpport),
"The TCP (ctrl) port to listen to (default=3338) (port+1 is 2nd user...)") "The TCP (ctrl) port to listen to (default=3335)")
("nodaemon", po::value<bool>(&nodaemon)->zero_tokens(), ("nodaemon", po::value<bool>(&nodaemon)->zero_tokens(),
"With this flag, the server runs NOT as daemon") "With this flag, the server runs NOT as daemon")
("nothreads", "With this flag, the server does not use threads for nodes") ("nothreads", "With this flag, the server does not use threads for nodes")
...@@ -192,26 +197,30 @@ int main (int argc, char* argv[]) ...@@ -192,26 +197,30 @@ int main (int argc, char* argv[])
Node *node = new Node(nc.ipaddr,uniboardnr,n,type,nc.firmware); Node *node = new Node(nc.ipaddr,uniboardnr,n,type,nc.firmware);
nodelist.push_back(node); nodelist.push_back(node);
} }
Serverdat serverdat;
serverdat.unb = new UniboardMap(nodelist);
if(!nodaemon) { if(!nodaemon) {
if(daemon(1,0) < 0) cerr << "Error fork as daemon: " << strerror(errno) << endl; if(daemon(1,0) < 0) cerr << "Error fork as daemon: " << strerror(errno) << endl;
} }
std::thread *serverthread;
serverdat.tcpport = tcpport;
serverdat.verbose = verbose;
serverthread = new std::thread(server, &serverdat, &nthreads);
++nthreads;
serverthread->join(); Serverdat control_server_dat;
delete serverthread; control_server_dat.unb = new UniboardMap(nodelist);
for(uint c=0;c<c_MAX_CONTROL_SERVERS;c++) {
control_server_dat.tcpport = control_tcpport;
control_server_dat.verbose[c] = verbose;
}
TCPSSocket sock(control_tcpport, "", c_MAX_CONTROL_SERVERS);
for(uint c=0;c<c_MAX_CONTROL_SERVERS;c++) {
control_server_thread[c] = new std::thread(control_server, &sock, &control_server_dat, c, &ncthreads);
++ncthreads;
}
for(uint c=0;c<c_MAX_CONTROL_SERVERS;c++) { control_server_thread[c]->join(); }
for(uint c=0;c<c_MAX_CONTROL_SERVERS;c++) { delete control_server_thread[c]; }
for(auto node : nodelist) { delete node; } for(auto node : nodelist) { delete node; }
closelog(); // close syslog connection closelog(); // close syslog connection
delete control_server_dat.unb;
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} catch(exception& e) { } catch(exception& e) {
......
...@@ -11,13 +11,17 @@ using namespace std; ...@@ -11,13 +11,17 @@ using namespace std;
#include <fstream> #include <fstream>
#include "map.h" #include "map.h"
#include "io/tcpsocket.h"
#define c_MAX_CONTROL_SERVERS 3
class Serverdat { class Serverdat {
public: public:
int tcpport; int tcpport;
int verbose; int verbose[c_MAX_CONTROL_SERVERS];
UniboardMap *unb; UniboardMap *unb;
struct timespec t0; struct timespec t0;
TCPCSSocket *controlSocket[c_MAX_CONTROL_SERVERS];
}; };
#endif #endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment