diff --git a/LCS/Stream/include/Stream/PortBroker.h b/LCS/Stream/include/Stream/PortBroker.h index 05e9685a156ce921e4b74317a48ffde8df493179..33f7d67b7b20d7f288867e4f58d9e15b339f0411 100644 --- a/LCS/Stream/include/Stream/PortBroker.h +++ b/LCS/Stream/include/Stream/PortBroker.h @@ -76,7 +76,7 @@ class PortBroker: protected SocketStream { */ class ClientStream: public SocketStream { public: - ClientStream( const std::string &hostname, uint16 port, const std::string &resource, time_t deadline = 0 ); + ClientStream( const std::string &hostname, uint16 port, const std::string &resource, time_t deadline = 0, const std::string &bind_local_iface = "" ); }; protected: diff --git a/LCS/Stream/include/Stream/SocketStream.h b/LCS/Stream/include/Stream/SocketStream.h index c6834cc62781d90401ed1f1a2061393edc752aa1..5af1ad02bd812c919cb19ec3cfe265a5b6f62bc3 100644 --- a/LCS/Stream/include/Stream/SocketStream.h +++ b/LCS/Stream/include/Stream/SocketStream.h @@ -46,7 +46,7 @@ class SocketStream : public FileDescriptorBasedStream }; SocketStream(const std::string &hostname, uint16 _port, Protocol, Mode, - time_t deadline = 0, bool doAccept = true); + time_t deadline = 0, bool doAccept = true, const std::string &bind_local_iface = ""); virtual ~SocketStream(); FileDescriptorBasedStream *detach(); @@ -71,6 +71,8 @@ class SocketStream : public FileDescriptorBasedStream // Allow individual recv()/send() calls to last for 'timeout' seconds before returning EWOULDBLOCK void setTimeout(double timeout); + int getPort() const { return port; } + private: const std::string hostname; uint16 port; diff --git a/LCS/Stream/include/Stream/StreamFactory.h b/LCS/Stream/include/Stream/StreamFactory.h index c0202747364d10eb99596fa6011c5c485928a8b4..c1c79488363e499d44e8b3af215bd5e5dd70474d 100644 --- a/LCS/Stream/include/Stream/StreamFactory.h +++ b/LCS/Stream/include/Stream/StreamFactory.h @@ -30,6 +30,8 @@ namespace LOFAR // Create a stream from a descriptor. // Caller should wrap the returned pointer in some smart ptr type. + // + // deadline: absolute deadline for creating the connection Stream *createStream(const std::string &descriptor, bool asReader, time_t deadline = 0); } // namespace LOFAR diff --git a/LCS/Stream/src/CMakeLists.txt b/LCS/Stream/src/CMakeLists.txt index 355f76184272d710dd565918352b39aa1ec89b4c..d2385ee5975d3cd7a57d53e49e4233ae1805c92f 100644 --- a/LCS/Stream/src/CMakeLists.txt +++ b/LCS/Stream/src/CMakeLists.txt @@ -8,6 +8,7 @@ lofar_add_library(stream FileStream.cc FixedBufferStream.cc NamedPipeStream.cc + NetFuncs.cc NullStream.cc PortBroker.cc SharedMemoryStream.cc diff --git a/LCS/Stream/src/NetFuncs.cc b/LCS/Stream/src/NetFuncs.cc new file mode 100644 index 0000000000000000000000000000000000000000..d0c6179a1888c3be25556f07d40f5170af034427 --- /dev/null +++ b/LCS/Stream/src/NetFuncs.cc @@ -0,0 +1,125 @@ +//# NetFuncs.cc: +//# +//# Copyright (C) 2008 +//# ASTRON (Netherlands Institute for Radio Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +//# +//# This file is part of the LOFAR software suite. +//# The LOFAR software suite is free software: you can redistribute it and/or +//# modify it under the terms of the GNU General Public License as published +//# by the Free Software Foundation, either version 3 of the License, or +//# (at your option) any later version. +//# +//# The LOFAR software suite is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License along +//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +//# +//# $Id$ +#include <lofar_config.h> +#include "NetFuncs.h" + +#include <Common/Thread/Mutex.h> +#include <Common/LofarLogger.h> +#include <Common/SystemCallException.h> + +#include <cstring> +#include <cstdio> +#include <netdb.h> +#include <net/if.h> +#include <sys/ioctl.h> +#include <sys/types.h> +#include <sys/socket.h> + +#include <boost/format.hpp> +using boost::format; + +namespace { + LOFAR::Mutex getAddrInfoMutex; +} + +namespace LOFAR { + + safeAddrInfo::safeAddrInfo() + : + addrinfo(0) + { + } + + safeAddrInfo::~safeAddrInfo() { + if(addrinfo) freeaddrinfo(addrinfo); + } + + void safeGetAddrInfo(safeAddrInfo &result, bool TCP, const std::string &hostname, uint16 port) { + struct addrinfo hints; + char portStr[16]; + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_INET; // IPv4 + hints.ai_flags = AI_NUMERICSERV; // we only use numeric port numbers, not strings like "smtp" + + if (TCP) { + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + } else { + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = IPPROTO_UDP; + } + + snprintf(portStr, sizeof portStr, "%hu", port); + + { + // getaddrinfo does not seem to be thread safe + ScopedLock sl(getAddrInfoMutex); + + int retval; + + if ((retval = getaddrinfo(hostname.c_str(), portStr, &hints, &result.addrinfo)) != 0) { + const string errorstr = gai_strerror(retval); + + throw SystemCallException(str(format("getaddrinfo(%s): %s") % hostname % errorstr), 0, THROW_ARGS); // TODO: SystemCallException also adds strerror(0), which is useless here + } + } + } + + struct sockaddr getInterfaceIP(const std::string &iface) { + int fd = -1; + struct ifreq ifr; + + if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) + THROW_SYSCALL("socket"); + + try { + memset(&ifr, 0, sizeof ifr); + snprintf(ifr.ifr_name, sizeof ifr.ifr_name, "%s", iface.c_str()); + + if (ioctl(fd, SIOCGIFADDR, &ifr) < 0) + THROW_SYSCALL("ioctl"); + } catch(...) { + close(fd); + throw; + } + + close(fd); + return ifr.ifr_addr; + } + + int getSocketPort(int fd) { + struct sockaddr_in sin; + socklen_t addrlen = sizeof sin; + + if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) < 0) + THROW_SYSCALL("getsockname"); + + if (sin.sin_family != AF_INET) + return 0; + + if (addrlen != sizeof sin) + return 0; + + return ntohs(sin.sin_port); + } +} diff --git a/LCS/Stream/src/NetFuncs.h b/LCS/Stream/src/NetFuncs.h new file mode 100644 index 0000000000000000000000000000000000000000..5c5aa59dee444f7b05a8f057288ec97b764dbab8 --- /dev/null +++ b/LCS/Stream/src/NetFuncs.h @@ -0,0 +1,51 @@ +//# NetFuncs.h: +//# +//# Copyright (C) 2008 +//# ASTRON (Netherlands Institute for Radio Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +//# +//# This file is part of the LOFAR software suite. +//# The LOFAR software suite is free software: you can redistribute it and/or +//# modify it under the terms of the GNU General Public License as published +//# by the Free Software Foundation, either version 3 of the License, or +//# (at your option) any later version. +//# +//# The LOFAR software suite is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License along +//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +//# +//# $Id$ + +#ifndef LOFAR_LCS_STREAM_NETFUNCS_H +#define LOFAR_LCS_STREAM_NETFUNCS_H + +#include <Common/LofarTypes.h> + +#include <string> +#include <netdb.h> + +namespace LOFAR { + // A wrapper on 'struct addrInfo' to make sure it is freed correctly + struct safeAddrInfo { + struct addrinfo *addrinfo; + + safeAddrInfo(); + ~safeAddrInfo(); + }; + + // Do a thread-safe lookup of 'hostname:port', and store the result in 'result'. + // TCP=false means UDP. + void safeGetAddrInfo(safeAddrInfo &result, bool TCP, const std::string &hostname, uint16 port); + + // Retrieve the IP of an interface ("eth0") as an 'ifreq' struct, which can be used as sockaddr. + struct sockaddr getInterfaceIP(const std::string &iface); + + // Returns the TCP/UDP port number allocated to a socket, or 0 if unknown. + int getSocketPort(int fd); +} + +#endif diff --git a/LCS/Stream/src/PortBroker.cc b/LCS/Stream/src/PortBroker.cc index fb07012c961d98267e0fdd410494e71f03992c89..0b8185f2fde0aa46ef470f16e09faad83ea17780 100644 --- a/LCS/Stream/src/PortBroker.cc +++ b/LCS/Stream/src/PortBroker.cc @@ -259,10 +259,10 @@ std::string PortBroker::ServerStream::getResource() const } -PortBroker::ClientStream::ClientStream( const string &hostname, uint16 port, const string &resource, time_t deadline ) +PortBroker::ClientStream::ClientStream( const string &hostname, uint16 port, const string &resource, time_t deadline, const std::string &bind_local_iface ) : // connect to port broker - SocketStream(hostname, port, SocketStream::TCP, SocketStream::Client, deadline) + SocketStream(hostname, port, SocketStream::TCP, SocketStream::Client, deadline, true, bind_local_iface) { // request service PortBroker::requestResource(*this, resource); diff --git a/LCS/Stream/src/SocketStream.cc b/LCS/Stream/src/SocketStream.cc index cb8f43186f81b4776dbe3dc2118c04d09ecf9050..eacf2dd1560a3549d351ddad97c33eda1ba48516 100644 --- a/LCS/Stream/src/SocketStream.cc +++ b/LCS/Stream/src/SocketStream.cc @@ -35,15 +35,18 @@ #include <sys/socket.h> #include <netdb.h> #include <dirent.h> +#include <net/if.h> +#include <arpa/inet.h> #include <boost/lexical_cast.hpp> #include <boost/format.hpp> #include <Common/SystemCallException.h> -#include <Common/Thread/Mutex.h> #include <Common/Thread/Cancellation.h> #include <Common/LofarLogger.h> +#include "NetFuncs.h" + //# AI_NUMERICSERV is not defined on OS-X #ifndef AI_NUMERICSERV # define AI_NUMERICSERV 0 @@ -54,27 +57,8 @@ using boost::format; namespace LOFAR { -// port range for unused ports search -const int MINPORT = 10000; -const int MAXPORT = 30000; - - -static struct RandomState { - RandomState() { - xsubi[0] = getpid(); - xsubi[1] = time(0); - xsubi[2] = time(0) >> 16; - } - - unsigned short xsubi[3]; -} randomState; - -namespace { - Mutex getAddrInfoMutex; -}; - SocketStream::SocketStream(const std::string &hostname, uint16 _port, Protocol protocol, - Mode mode, time_t deadline, bool doAccept) + Mode mode, time_t deadline, bool doAccept, const std::string &bind_local_iface) : protocol(protocol), mode(mode), @@ -87,117 +71,100 @@ SocketStream::SocketStream(const std::string &hostname, uint16 _port, Protocol p % hostname % _port); - struct addrinfo hints; - const bool autoPort = (port == 0); - - // use getaddrinfo, because gethostbyname is not thread safe - memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_INET; // IPv4 - hints.ai_flags = AI_NUMERICSERV; // we only use numeric port numbers, not strings like "smtp" - - if (protocol == TCP) { - hints.ai_socktype = SOCK_STREAM; - hints.ai_protocol = IPPROTO_TCP; - } else { - hints.ai_socktype = SOCK_DGRAM; - hints.ai_protocol = IPPROTO_UDP; - } - for(;;) { try { - try { - // Not all potentially blocking calls below have a timeout arg. - // So check early every round. It may abort hanging tests early (=good). - if (deadline > 0 && deadline <= time(0)) - THROW(TimeOutException, "SocketStream"); - - char portStr[16]; - int retval; - struct addrinfo *result; - - if (mode == Server && autoPort) - port = MINPORT + static_cast<unsigned short>((MAXPORT - MINPORT) * erand48(randomState.xsubi)); // erand48() not thread safe, but not a problem. - - snprintf(portStr, sizeof portStr, "%hu", port); - - { - // getaddrinfo does not seem to be thread safe - ScopedLock sl(getAddrInfoMutex); - - if ((retval = getaddrinfo(hostname.c_str(), portStr, &hints, &result)) != 0) { - const string errorstr = gai_strerror(retval); - - throw SystemCallException(str(format("getaddrinfo(%s): %s") % hostname % errorstr), 0, THROW_ARGS); // TODO: SystemCallException also adds strerror(0), which is useless here + // Not all potentially blocking calls below have a timeout arg. + // So check early every round. It may abort hanging tests early (=good). + if (deadline > 0 && deadline <= time(0)) + THROW(TimeOutException, "SocketStream"); + + // Resolve host + port to a 'struct addrinfo' + safeAddrInfo result; + safeGetAddrInfo(result, protocol == SocketStream::TCP, hostname, port); + + // result is a linked list of resolved addresses, we only use the first + + if ((fd = socket(result.addrinfo->ai_family, result.addrinfo->ai_socktype, result.addrinfo->ai_protocol)) < 0) + THROW_SYSCALL("socket"); + + if (bind_local_iface != "") { + // Bind socket to a specific network interface, causing packets to be + // emitted through this interface. + // + // Requires CAP_NET_RAW (or root) + LOG_DEBUG_STR("Binding socket " << description << " to interface " << bind_local_iface); + + struct ifreq ifr; + memset(&ifr, 0, sizeof ifr); + snprintf(ifr.ifr_name, sizeof ifr.ifr_name, "%s", bind_local_iface.c_str()); + + if (setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, &ifr, sizeof ifr) < 0) + try { + THROW_SYSCALL("setsockopt(SO_BINDTODEVICE)"); + } catch(Exception &ex) { + LOG_ERROR_STR("Could not bind socket to device " << bind_local_iface << ": " << ex); } - } - - // make sure result will be freed - struct D { - ~D() { - freeaddrinfo(result); - } - - struct addrinfo *result; - } onDestruct = { result }; - (void) onDestruct; - - // result is a linked list of resolved addresses, we only use the first - - if ((fd = socket(result->ai_family, result->ai_socktype, result->ai_protocol)) < 0) - THROW_SYSCALL("socket"); - - if (mode == Client) { - while (connect(fd, result->ai_addr, result->ai_addrlen) < 0) - if (errno == ECONNREFUSED || errno == ETIMEDOUT) { - if (deadline > 0 && time(0) >= deadline) - throw TimeOutException("client socket", THROW_ARGS); - - if (usleep(999999) < 0) { // near 1 sec; max portably safe arg val - // interrupted by a signal handler -- abort to allow this thread to - // be forced to continue after receiving a SIGINT, as with any other - // system call in this constructor - THROW_SYSCALL("usleep"); - } - } else - THROW_SYSCALL(str(boost::format("connect [%s]") % description)); - } else { - const int on = 1; + } - if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof on) < 0) - THROW_SYSCALL("setsockopt(SO_REUSEADDR)"); + if (mode == Client) { + if (bind_local_iface != "") { + struct sockaddr sa = getInterfaceIP(bind_local_iface); + LOG_DEBUG_STR("Binding socket " << description << " to IP " << inet_ntoa(((struct sockaddr_in*)&sa)->sin_addr) << " of interface " << bind_local_iface); - if (bind(fd, result->ai_addr, result->ai_addrlen) < 0) + if (bind(fd, &sa, sizeof sa) < 0) THROW_SYSCALL(str(boost::format("bind [%s]") % description)); + } - if (protocol == TCP) { - listen_sk = fd; - fd = -1; - - const int listenBacklog = 15; - if (listen(listen_sk, listenBacklog) < 0) - THROW_SYSCALL(str(boost::format("listen [%s]") % description)); - - if (doAccept) - accept(deadline); - else - break; - } + while (connect(fd, result.addrinfo->ai_addr, result.addrinfo->ai_addrlen) < 0) + if (errno == ECONNREFUSED || errno == ETIMEDOUT) { + if (deadline > 0 && time(0) >= deadline) + throw TimeOutException("client socket", THROW_ARGS); + + if (usleep(999999) < 0) { // near 1 sec; max portably safe arg val + // interrupted by a signal handler -- abort to allow this thread to + // be forced to continue after receiving a SIGINT, as with any other + // system call in this constructor + THROW_SYSCALL("usleep"); + } + } else + THROW_SYSCALL(str(boost::format("connect [%s]") % description)); + } else { + const int on = 1; + + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof on) < 0) + THROW_SYSCALL("setsockopt(SO_REUSEADDR)"); + + if (bind(fd, result.addrinfo->ai_addr, result.addrinfo->ai_addrlen) < 0) + THROW_SYSCALL(str(boost::format("bind [%s]") % description)); + + if (port == 0) { + // we let OS search for a free port + port = getSocketPort(fd); + + LOG_DEBUG(str(boost::format("Bound socket %s to port %s") % description % port)); } - // we have an fd! break out of the infinite loop - break; - } catch (...) { - if (listen_sk >= 0) { close(listen_sk); listen_sk = -1; } - if (fd >= 0) { close(fd); fd = -1; } + if (protocol == TCP) { + listen_sk = fd; + fd = -1; - throw; - } - } catch (SystemCallException &exc) { - if ( (exc.syscall() == "bind" || exc.syscall() == "listen") && - mode == Server && autoPort ) { - continue; // try listening on / binding to another server port + const int listenBacklog = 15; + if (listen(listen_sk, listenBacklog) < 0) + THROW_SYSCALL(str(boost::format("listen [%s]") % description)); + + if (doAccept) + accept(deadline); + else + break; + } } + // we have an fd! break out of the infinite loop + break; + } catch (...) { + if (listen_sk >= 0) { close(listen_sk); listen_sk = -1; } + if (fd >= 0) { close(fd); fd = -1; } + throw; } } diff --git a/LCS/Stream/src/StreamFactory.cc b/LCS/Stream/src/StreamFactory.cc index 25bdeb364c34225bb290f10e97a03862469c3eb9..0bd1c6281d8de929ae18f2fa39966929678028d4 100644 --- a/LCS/Stream/src/StreamFactory.cc +++ b/LCS/Stream/src/StreamFactory.cc @@ -43,22 +43,38 @@ namespace LOFAR { std::vector<std::string> split = StringUtil::split(descriptor, ':'); + // null: if (descriptor == "null:") return new NullStream; - else if (split.size() == 3 && split[0] == "udp") - return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline); - else if (split.size() == 3 && split[0] == "tcp") - return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::TCP, asServer ? SocketStream::Server : SocketStream::Client, deadline); - else if (split.size() == 4 && split[0] == "tcpbroker") - return asServer ? static_cast<Stream*>(new PortBroker::ServerStream(split[3])) : static_cast<Stream*>(new PortBroker::ClientStream(split[1], boost::lexical_cast<unsigned short>(split[2]), split[3])); + + // udp:HOST:PORT[:LOCAL_IFACE] + else if (split.size() >= 3 && split[0] == "udp") + return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline, true, split.size() > 3 ? split[3] : ""); + + // tcp:HOST:PORT[:LOCAL_IFACE] + else if (split.size() >= 3 && split[0] == "tcp") + return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::TCP, asServer ? SocketStream::Server : SocketStream::Client, deadline, true, split.size() > 3 ? split[3] : ""); + + // tcpbroker:HOST:BROKERPORT:KEY[:LOCAL_IFACE] + else if (split.size() >= 4 && split[0] == "tcpbroker") + return asServer ? static_cast<Stream*>(new PortBroker::ServerStream(split[3])) : static_cast<Stream*>(new PortBroker::ClientStream(split[1], boost::lexical_cast<unsigned short>(split[2]), split[3], deadline, split.size() > 4 ? split[4] : "")); + + // file:PATH else if (split.size() > 1 && split[0] == "file") { // don't use split[1] to allow : in filenames const std::string filename = descriptor.substr(5); return asServer ? new FileStream(filename.c_str()) : new FileStream(filename.c_str(), 0666); - } else if (split.size() == 2 && split[0] == "pipe") + } + + // pipe:PATH + else if (split.size() == 2 && split[0] == "pipe") return new NamedPipeStream(split[1].c_str(), asServer); + + // HOST:PORT (udp) else if (split.size() == 2) - return new SocketStream(split[0].c_str(), boost::lexical_cast<unsigned short>(split[1]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline); + return new SocketStream(split[0].c_str(), boost::lexical_cast<unsigned short>(split[1]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline, true, ""); + + // PATH (file) else if (split.size() == 1) return asServer ? new FileStream(split[0].c_str()) : new FileStream(split[0].c_str(), 0666); diff --git a/LCS/Stream/test/CMakeLists.txt b/LCS/Stream/test/CMakeLists.txt index 5d2be860d26b7f60e23dded86e9ed2f34f1f3bf7..0bfe459bfb8080f15fe9312e7d11fb225f7cbcf9 100644 --- a/LCS/Stream/test/CMakeLists.txt +++ b/LCS/Stream/test/CMakeLists.txt @@ -1,5 +1,7 @@ include(LofarCTest) lofar_add_test(tFixedBufferStream tFixedBufferStream.cc) +lofar_add_test(tNetFuncs tNetFuncs.cc) lofar_add_test(tPortBroker tPortBroker.cc) +lofar_add_test(tSocketStream tSocketStream.cc) lofar_add_test(tStringStream tStringStream.cc) diff --git a/LCS/Stream/test/tNetFuncs.cc b/LCS/Stream/test/tNetFuncs.cc new file mode 100644 index 0000000000000000000000000000000000000000..ba334cb0b6f07cf370ea5cbd633db0e919d7f7d1 --- /dev/null +++ b/LCS/Stream/test/tNetFuncs.cc @@ -0,0 +1,80 @@ +//# tNetFuncs.cc +//# +//# Copyright (C) 2006 +//# ASTRON (Netherlands Institute for Radio Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +//# +//# This file is part of the LOFAR software suite. +//# The LOFAR software suite is free software: you can redistribute it and/or +//# modify it under the terms of the GNU General Public License as published +//# by the Free Software Foundation, either version 3 of the License, or +//# (at your option) any later version. +//# +//# The LOFAR software suite is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License along +//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +//# +//# $Id$ + +//# Always #include <lofar_config.h> first! +#include <lofar_config.h> + +//# Includes +#include "../src/NetFuncs.h" + +#include <Common/LofarLogger.h> + +using namespace LOFAR; +using namespace std; + +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> + +void test_getInterfaceIP() +{ + // Loopback device (lo) should have IP 127.0.0.1 + struct sockaddr sa = getInterfaceIP("lo"); + + struct sockaddr_in *sai = (struct sockaddr_in*)&sa; + const string ip = inet_ntoa(sai->sin_addr); + ASSERTSTR(ip == "127.0.0.1", "Loopback device reports IP " << ip); +} + +void test_safeGetAddrInfo() +{ + // "localhost" should also resolve to 127.0.0.1 + safeAddrInfo addr; + safeGetAddrInfo(addr, true, "localhost", 0); + + struct sockaddr_in *sai = (struct sockaddr_in*)addr.addrinfo->ai_addr; + const string ip = inet_ntoa(sai->sin_addr); + ASSERTSTR(ip == "127.0.0.1", "localhost resolves to " << ip); +} + +void test() +{ + test_getInterfaceIP(); + test_safeGetAddrInfo(); +} + + +int main(int /*argc*/, const char* argv[]) +{ + INIT_LOGGER(argv[0]); + try { + alarm(30); + + test(); + + } catch (Exception& e) { + LOG_ERROR_STR(e); + return 1; + } + LOG_INFO("Program terminated successfully"); + return 0; +} diff --git a/LCS/Stream/test/tSocketStream.cc b/LCS/Stream/test/tSocketStream.cc new file mode 100644 index 0000000000000000000000000000000000000000..ed0e689be8ef06aebca0645db29405cb768a21b0 --- /dev/null +++ b/LCS/Stream/test/tSocketStream.cc @@ -0,0 +1,97 @@ +//# tSocketStream.cc +//# +//# Copyright (C) 2006 +//# ASTRON (Netherlands Institute for Radio Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +//# +//# This file is part of the LOFAR software suite. +//# The LOFAR software suite is free software: you can redistribute it and/or +//# modify it under the terms of the GNU General Public License as published +//# by the Free Software Foundation, either version 3 of the License, or +//# (at your option) any later version. +//# +//# The LOFAR software suite is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License along +//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +//# +//# $Id$ + +//# Always #include <lofar_config.h> first! +#include <lofar_config.h> + +//# Includes +#include <Stream/SocketStream.h> +#include <Common/LofarLogger.h> +#include <Common/Thread/Thread.h> + +using namespace LOFAR; +using namespace std; + +class Server { +public: + Server(SocketStream::Protocol protocol) + : + protocol(protocol), + ss("localhost", 0, protocol, SocketStream::Server, 0, false), + thread(this, &Server::mainLoop) + { + LOG_INFO_STR("Server listening on port " << ss.getPort()); + } + + ~Server() + { + thread.cancel(); + } + + void mainLoop() { + if (protocol == SocketStream::TCP) + ss.reaccept(); + } + + int getPort() const { return ss.getPort(); } + +private: + SocketStream::Protocol protocol; + SocketStream ss; + Thread thread; +}; + +void test_client_server(SocketStream::Protocol protocol) +{ + // Waits for a connection in the background + Server s(protocol); + + // Setup clients, bound to loopi-back interface + SocketStream c("localhost", s.getPort(), protocol, SocketStream::Client, 0, false); +} + +void test() +{ + LOG_INFO("Testing TCP..."); + test_client_server(SocketStream::TCP); + + LOG_INFO("Testing UDP..."); + test_client_server(SocketStream::UDP); +} + + +int main(int /*argc*/, const char* argv[]) +{ + INIT_LOGGER(argv[0]); + + try { + alarm(30); + + test(); + + } catch (Exception& e) { + LOG_ERROR_STR(e); + return 1; + } + LOG_INFO("Program terminated successfully"); + return 0; +} diff --git a/RTCP/Cobalt/CoInterface/src/Parset.cc b/RTCP/Cobalt/CoInterface/src/Parset.cc index 47f3e8cbfd7ec9edde528952da2ea312476a5dee..c9e30c708e8f9e1e7a4ea709f92cb9de4a32074b 100644 --- a/RTCP/Cobalt/CoInterface/src/Parset.cc +++ b/RTCP/Cobalt/CoInterface/src/Parset.cc @@ -479,7 +479,8 @@ namespace LOFAR node.hostName = getString(prefix + "host", "localhost"); node.cpu = getUint32(prefix + "cpu", 0); - node.nic = getString(prefix + "nic", ""); + node.mpi_nic = getString(prefix + "mpi_nic", ""); + node.out_nic = getString(prefix + "out_nic", ""); node.gpus = getUint32Vector(prefix + "gpus", vector<unsigned>(1,0)); // default to [0] settings.nodes.push_back(node); diff --git a/RTCP/Cobalt/CoInterface/src/Parset.h b/RTCP/Cobalt/CoInterface/src/Parset.h index fb7394a373e51f5e5674eb40f3347389707e41f0..dc266f4d9e47c7ea9e8ddd735a4334f9562b66e5 100644 --- a/RTCP/Cobalt/CoInterface/src/Parset.h +++ b/RTCP/Cobalt/CoInterface/src/Parset.h @@ -268,7 +268,8 @@ namespace LOFAR // NIC(s) to bind to (comma seperated) // // E.g. "mlx4_0", "mlx4_1", "eth0", etc - std::string nic; + std::string mpi_nic; // for MPI + std::string out_nic; // to outputProc }; std::vector<struct Node> nodes; diff --git a/RTCP/Cobalt/CoInterface/src/Stream.cc b/RTCP/Cobalt/CoInterface/src/Stream.cc index f86a9c37fa04c1b3fc627e174f5e7e8a3f1f6237..b9fd08e85bb933e27892d8685ef47c92ebc6d249 100644 --- a/RTCP/Cobalt/CoInterface/src/Stream.cc +++ b/RTCP/Cobalt/CoInterface/src/Stream.cc @@ -61,7 +61,7 @@ namespace LOFAR // The returned descriptor can be supplied to LCS/Stream StreamFactory.h - string getStreamDescriptorBetweenIONandStorage(const Parset &parset, OutputType outputType, unsigned streamNr) + string getStreamDescriptorBetweenIONandStorage(const Parset &parset, OutputType outputType, unsigned streamNr, const std::string &bind_local_iface) { string host = parset.getHostName(outputType, streamNr); uint16 port = storageBrokerPort(parset.settings.observationID); @@ -69,7 +69,7 @@ namespace LOFAR if (host == "") return str(format("file:%s") % parset.getFileName(outputType, streamNr)); else - return str(format("tcpbroker:%s:%u:ion-storage-obs-%u-type-%u-stream-%u") % host % port % parset.settings.observationID % outputType % streamNr); + return str(format("tcpbroker:%s:%u:ion-storage-obs-%u-type-%u-stream-%u:%s") % host % port % parset.settings.observationID % outputType % streamNr % bind_local_iface); } } // namespace Cobalt diff --git a/RTCP/Cobalt/CoInterface/src/Stream.h b/RTCP/Cobalt/CoInterface/src/Stream.h index 8f4797dc77ff00d327d6ac4b335dbaa126a89e89..030395c96be3f00fcac62d108bef25cef861e0b4 100644 --- a/RTCP/Cobalt/CoInterface/src/Stream.h +++ b/RTCP/Cobalt/CoInterface/src/Stream.h @@ -36,7 +36,7 @@ namespace LOFAR std::string getStorageControlDescription(int observationID, int rank); // The returned descriptor can be supplied to LCS/Stream StreamFactory.h - std::string getStreamDescriptorBetweenIONandStorage(const Parset &parset, OutputType outputType, unsigned streamNr); + std::string getStreamDescriptorBetweenIONandStorage(const Parset &parset, OutputType outputType, unsigned streamNr, const std::string &bind_local_iface = ""); } // namespace Cobalt } // namespace LOFAR diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc index 3cfc8a2cc5cdbd572e21cb0abee18ade279abb57..2189a8cac57f801d20c894d70d8aab7ea67fab2f 100644 --- a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc +++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc @@ -589,11 +589,12 @@ void MultiReceiver::dispatch( PortBroker::ServerStream *stream ) // Maintains the connections of an rtcp process with all its outputProc processes // it needs to send data to. MultiSender::MultiSender( const HostMap &hostMap, const Parset &parset, - double maxRetentionTime ) + double maxRetentionTime, const std::string &bind_local_iface ) : hostMap(hostMap), itsParset(parset), - maxRetentionTime(maxRetentionTime) + maxRetentionTime(maxRetentionTime), + bind_local_iface(bind_local_iface) { for (HostMap::const_iterator i = hostMap.begin(); i != hostMap.end(); ++i) { // keep a list of unique hosts @@ -639,7 +640,7 @@ void MultiSender::process( OMPThreadSet *threadSet ) LOG_DEBUG_STR(logPrefix << "MultiSender: Connecting to " << host.hostName << ":" << host.brokerPort << ":" << host.service); - PortBroker::ClientStream stream(host.hostName, host.brokerPort, host.service); + PortBroker::ClientStream stream(host.hostName, host.brokerPort, host.service, 0, bind_local_iface); LOG_DEBUG_STR(logPrefix << "Connected"); diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.h b/RTCP/Cobalt/CoInterface/src/TABTranspose.h index c129230b845bbcc7c0e7500563f491aaaa60c2e6..d302d6726cb83d75f94849c680fb54eec4bc0989 100644 --- a/RTCP/Cobalt/CoInterface/src/TABTranspose.h +++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.h @@ -351,8 +351,9 @@ namespace LOFAR // hostMap: the mapping fileIdx -> Host // parset: the parset (i.e. observation configuration) // maxRetentionTime: drop data older than this from the queue + // bind_local_iface: local NIC to bind to (or "" for any) MultiSender( const HostMap &hostMap, const Parset &parset, - double maxRetentionTime = 3.0 ); + double maxRetentionTime = 3.0, const std::string &bind_local_iface = "" ); ~MultiSender(); // Send the data from the queues to the receiving hosts. Will run until @@ -389,6 +390,9 @@ namespace LOFAR // 'maxRetentionTime' seconds. const double maxRetentionTime; + // Local NIC to bind network connections to, or "" if no binding is required + const std::string bind_local_iface; + // Set of hosts to connect to (the list of unique values in hostMap) std::vector<struct Host> hosts; diff --git a/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/HardwareList.parset b/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/HardwareList.parset index 490ff0df856981f5b3660f7a84a78800e5e20bc7..7ec295e125469c4909516b5a053cd08e75dae45b 100644 --- a/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/HardwareList.parset +++ b/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/HardwareList.parset @@ -5,119 +5,141 @@ PIC.Core.Cobalt.localhost.host=localhost PIC.Core.Cobalt.localhost.cpu=0 -PIC.Core.Cobalt.localhost.nic= +PIC.Core.Cobalt.localhost.mpi_nic= +PIC.Core.Cobalt.localhost.out_nic= PIC.Core.Cobalt.localhost.gpus=[0] # DAS-4 nodes PIC.Core.Cobalt.gpu01_0.host=gpu01 PIC.Core.Cobalt.gpu01_0.cpu=0 -PIC.Core.Cobalt.gpu01_0.nic= +PIC.Core.Cobalt.gpu01_0.mpi_nic= +PIC.Core.Cobalt.gpu01_0.out_nic= PIC.Core.Cobalt.gpu01_0.gpus=[0, 1] PIC.Core.Cobalt.gpu01_1.host=gpu01 PIC.Core.Cobalt.gpu01_1.cpu=1 -PIC.Core.Cobalt.gpu01_1.nic= +PIC.Core.Cobalt.gpu01_1.mpi_nic= PIC.Core.Cobalt.gpu01_1.gpus=[2, 3] # The Cobalt cluster PIC.Core.Cobalt.cbt001_0.host=cbt001 PIC.Core.Cobalt.cbt001_0.cpu=0 -PIC.Core.Cobalt.cbt001_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt001_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt001_0.out_nic= PIC.Core.Cobalt.cbt001_0.gpus=[0, 1] PIC.Core.Cobalt.cbt001_1.host=cbt001 PIC.Core.Cobalt.cbt001_1.cpu=1 -PIC.Core.Cobalt.cbt001_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt001_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt001_1.out_nic= PIC.Core.Cobalt.cbt001_1.gpus=[2, 3] PIC.Core.Cobalt.cbt002_0.host=cbt002 PIC.Core.Cobalt.cbt002_0.cpu=0 -PIC.Core.Cobalt.cbt002_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt002_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt002_0.out_nic= PIC.Core.Cobalt.cbt002_0.gpus=[0, 1] PIC.Core.Cobalt.cbt002_1.host=cbt002 PIC.Core.Cobalt.cbt002_1.cpu=1 -PIC.Core.Cobalt.cbt002_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt002_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt002_1.out_nic= PIC.Core.Cobalt.cbt002_1.gpus=[2, 3] PIC.Core.Cobalt.cbt003_0.host=cbt003 PIC.Core.Cobalt.cbt003_0.cpu=0 -PIC.Core.Cobalt.cbt003_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt003_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt003_0.out_nic= PIC.Core.Cobalt.cbt003_0.gpus=[0, 1] PIC.Core.Cobalt.cbt003_1.host=cbt003 PIC.Core.Cobalt.cbt003_1.cpu=1 -PIC.Core.Cobalt.cbt003_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt003_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt003_1.out_nic= PIC.Core.Cobalt.cbt003_1.gpus=[2, 3] PIC.Core.Cobalt.cbt004_0.host=cbt004 PIC.Core.Cobalt.cbt004_0.cpu=0 -PIC.Core.Cobalt.cbt004_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt004_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt004_0.out_nic= PIC.Core.Cobalt.cbt004_0.gpus=[0, 1] PIC.Core.Cobalt.cbt004_1.host=cbt004 PIC.Core.Cobalt.cbt004_1.cpu=1 -PIC.Core.Cobalt.cbt004_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt004_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt004_1.out_nic= PIC.Core.Cobalt.cbt004_1.gpus=[2, 3] PIC.Core.Cobalt.cbt005_0.host=cbt005 PIC.Core.Cobalt.cbt005_0.cpu=0 -PIC.Core.Cobalt.cbt005_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt005_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt005_0.out_nic= PIC.Core.Cobalt.cbt005_0.gpus=[0, 1] PIC.Core.Cobalt.cbt005_1.host=cbt005 PIC.Core.Cobalt.cbt005_1.cpu=1 -PIC.Core.Cobalt.cbt005_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt005_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt005_1.out_nic= PIC.Core.Cobalt.cbt005_1.gpus=[2, 3] PIC.Core.Cobalt.cbt006_0.host=cbt006 PIC.Core.Cobalt.cbt006_0.cpu=0 -PIC.Core.Cobalt.cbt006_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt006_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt006_0.out_nic= PIC.Core.Cobalt.cbt006_0.gpus=[0, 1] PIC.Core.Cobalt.cbt006_1.host=cbt006 PIC.Core.Cobalt.cbt006_1.cpu=1 -PIC.Core.Cobalt.cbt006_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt006_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt006_1.out_nic= PIC.Core.Cobalt.cbt006_1.gpus=[2, 3] PIC.Core.Cobalt.cbt007_0.host=cbt007 PIC.Core.Cobalt.cbt007_0.cpu=0 -PIC.Core.Cobalt.cbt007_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt007_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt007_0.out_nic= PIC.Core.Cobalt.cbt007_0.gpus=[0, 1] PIC.Core.Cobalt.cbt007_1.host=cbt007 PIC.Core.Cobalt.cbt007_1.cpu=1 -PIC.Core.Cobalt.cbt007_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt007_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt007_1.out_nic= PIC.Core.Cobalt.cbt007_1.gpus=[2, 3] PIC.Core.Cobalt.cbt008_0.host=cbt008 PIC.Core.Cobalt.cbt008_0.cpu=0 -PIC.Core.Cobalt.cbt008_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt008_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt008_0.out_nic= PIC.Core.Cobalt.cbt008_0.gpus=[0, 1] PIC.Core.Cobalt.cbt008_1.host=cbt008 PIC.Core.Cobalt.cbt008_1.cpu=1 -PIC.Core.Cobalt.cbt008_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt008_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt008_1.out_nic= PIC.Core.Cobalt.cbt008_1.gpus=[2, 3] PIC.Core.Cobalt.cbt009_0.host=cbt009 PIC.Core.Cobalt.cbt009_0.cpu=0 -PIC.Core.Cobalt.cbt009_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt009_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt009_0.out_nic= PIC.Core.Cobalt.cbt009_0.gpus=[0, 1] PIC.Core.Cobalt.cbt009_1.host=cbt009 PIC.Core.Cobalt.cbt009_1.cpu=1 -PIC.Core.Cobalt.cbt009_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt009_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt009_1.out_nic= PIC.Core.Cobalt.cbt009_1.gpus=[2, 3] PIC.Core.Cobalt.cbt010_0.host=cbt010 PIC.Core.Cobalt.cbt010_0.cpu=0 -PIC.Core.Cobalt.cbt010_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt010_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt010_0.out_nic= PIC.Core.Cobalt.cbt010_0.gpus=[0, 1] PIC.Core.Cobalt.cbt010_1.host=cbt010 PIC.Core.Cobalt.cbt010_1.cpu=1 -PIC.Core.Cobalt.cbt010_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt010_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt010_1.out_nic= PIC.Core.Cobalt.cbt010_1.gpus=[2, 3] diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc index 9ff49da785934ee2f3e09f69e84c45e7dac719bd..e84a3048457c4254749abbb2ee17ed272e8b8956 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc @@ -128,7 +128,7 @@ namespace LOFAR const std::vector<gpu::Device> &devices, Pool<struct MPIRecvData> &pool, RTmetadata &mdLogger, const std::string &mdKeyPrefix, - int hostID) + unsigned hostID) : subbandProcs(std::max(1UL, (profiling ? 1 : NR_WORKQUEUES_PER_DEVICE) * devices.size())), ps(ps), @@ -146,7 +146,8 @@ namespace LOFAR // be in bulk: if processing is cheap, all subbands will be output right after they have been received. // // Allow queue to drop items older than 3 seconds. - multiSender(hostMap(ps, subbandIndices, hostID), ps, 3.0) + multiSender(hostMap(ps, subbandIndices, hostID), ps, 3.0, hostID < ps.settings.nodes.size() ? ps.settings.nodes.at(hostID).out_nic : ""), + hostID(hostID) { ASSERTSTR(!devices.empty(), "Not bound to any GPU!"); @@ -801,10 +802,11 @@ namespace LOFAR SmartPtr<Stream> outputStream; if (ps.settings.correlator.enabled) { - const string desc = getStreamDescriptorBetweenIONandStorage(ps, CORRELATED_DATA, globalSubbandIdx); + const string desc = getStreamDescriptorBetweenIONandStorage(ps, CORRELATED_DATA, globalSubbandIdx, + hostID < ps.settings.nodes.size() ? ps.settings.nodes.at(hostID).out_nic : ""); try { - outputStream = createStream(desc, false); + outputStream = createStream(desc, false, 0); } catch (Exception &ex) { LOG_ERROR_STR("Error writing subband " << globalSubbandIdx << ", dropping all subsequent blocks: " << ex.what()); return; diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.h b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.h index 8620fb4fa58bd6201cc6553e073f7b9015437cdb..b09516353d02eed582d95e36bb683953b9fe1ba1 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.h +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.h @@ -52,7 +52,7 @@ namespace LOFAR const std::vector<gpu::Device> &devices, Pool<struct MPIRecvData> &pool, RTmetadata &mdLogger, const std::string &mdKeyPrefix, - int hostID = 0); + unsigned hostID = 0); ~Pipeline(); @@ -136,6 +136,9 @@ namespace LOFAR // Output send engine, takes care of the host connections and the multiplexing. TABTranspose::MultiSender multiSender; + + // MPI rank for this node + const unsigned hostID; }; } } diff --git a/RTCP/Cobalt/GPUProc/src/rtcp.cc b/RTCP/Cobalt/GPUProc/src/rtcp.cc index 9bb98da80fea31ed678219c5ea0789d5803c3d09..26940c1e74582505f057ed42bab9ad52f80dfe98 100644 --- a/RTCP/Cobalt/GPUProc/src/rtcp.cc +++ b/RTCP/Cobalt/GPUProc/src/rtcp.cc @@ -330,10 +330,10 @@ int main(int argc, char **argv) } // Select on the local NUMA InfiniBand interface (OpenMPI only, for now) - if (mynode.nic != "") { - LOG_DEBUG_STR("Binding to interface " << mynode.nic); + if (mynode.mpi_nic != "") { + LOG_DEBUG_STR("Binding MPI to interface " << mynode.mpi_nic); - if (setenv("OMPI_MCA_btl_openib_if_include", mynode.nic.c_str(), 1) < 0) + if (setenv("OMPI_MCA_btl_openib_if_include", mynode.mpi_nic.c_str(), 1) < 0) THROW_SYSCALL("setenv(OMPI_MCA_btl_openib_if_include)"); } } else { diff --git a/RTCP/Cobalt/GPUProc/src/scripts/Cobalt_install.sh b/RTCP/Cobalt/GPUProc/src/scripts/Cobalt_install.sh index 0e594f82c00c88b753c045fda6688d15d3b56443..fa1531c76d5c22877d949e502ee72ffe64dac1bf 100755 --- a/RTCP/Cobalt/GPUProc/src/scripts/Cobalt_install.sh +++ b/RTCP/Cobalt/GPUProc/src/scripts/Cobalt_install.sh @@ -45,9 +45,14 @@ for HOST in ${HOSTS:-cbm001 cbm002 cbm003 cbm004 cbm005 cbm006 cbm007 cbm008 cbm ln -sfT /localhome/lofarsystem/lofar/var var # Set capabilities so our soft real-time programs can elevate prios. - COBALT_CAPABILITIES='cap_sys_admin,cap_sys_nice,cap_ipc_lock' -#disabled until we've updated /etc/sudoers to allow lofarbuild to do this -#also, we don't need cap_sys_admin and should drop it, idem on CEP2 - #sudo /sbin/setcap \"${COBALT_CAPABILITIES}\"=ep bin/rtcp bin/outputProc + # + # cap_sys_nice: allow real-time priority for threads + # cap_ipc_lock: allow app to lock in memory (prevent swap) + # cap_net_raw: allow binding sockets to NICs + OUTPUTPROC_CAPABILITIES='cap_sys_nice,cap_ipc_lock' + sudo /sbin/setcap \"${OUTPUTPROC_CAPABILITIES}\"=ep bin/outputProc || true + RTCP_CAPABILITIES='cap_net_raw,cap_sys_nice,cap_ipc_lock' + sudo /sbin/setcap \"${RTCP_CAPABILITIES}\"=ep bin/rtcp || true " || exit 1 done + diff --git a/SubSystems/Online_Cobalt/install/postinstall_root.sh b/SubSystems/Online_Cobalt/install/postinstall_root.sh index b0c069d0813281f95e004203e399c6fee48e977c..029a8a2395ed64fe1ba77cfb07476bf2ce8cd380 100755 --- a/SubSystems/Online_Cobalt/install/postinstall_root.sh +++ b/SubSystems/Online_Cobalt/install/postinstall_root.sh @@ -7,6 +7,11 @@ echo "Giving /localhome/lofar to lofarbuild..." mkdir /localhome/lofar chown lofarbuild.lofarbuild /localhome/lofar +echo "Giving capabilities to lofarbuild..." +addgroup --system capabilities +usermod -a -G capabilities lofarbuild +echo "%capabilities ALL= NOPASSWD:/sbin/setcap" >> /etc/sudoers + # # Casacore #