diff --git a/LCS/Stream/include/Stream/NetFuncs.h b/LCS/Stream/include/Stream/NetFuncs.h index a71639f80b7a45d8ac6847bb3ad2ee97fc9df7a2..c0abaa8f7e5e752550a9cbf3bba78c3fd58bf0fb 100644 --- a/LCS/Stream/include/Stream/NetFuncs.h +++ b/LCS/Stream/include/Stream/NetFuncs.h @@ -1,6 +1,6 @@ //# NetFuncs.h: //# -//# Copyright (C) 2015 +//# Copyright (C) 2015, 2017 //# ASTRON (Netherlands Institute for Radio Astronomy) //# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands //# @@ -26,6 +26,9 @@ #include <Common/LofarTypes.h> #include <string> +#include <set> +#include <sys/types.h> +#include <ifaddrs.h> #include <netdb.h> namespace LOFAR { @@ -37,10 +40,24 @@ namespace LOFAR { ~safeAddrInfo(); }; + // A wrapper on 'struct ifaddrs' to make sure it is freed correctly + struct IfAddrs { + struct ifaddrs *ifa; + + IfAddrs(); + ~IfAddrs(); + }; + // Do a thread-safe lookup of 'hostname:port', and store the result in 'result'. // TCP=false means UDP. void safeGetAddrInfo(safeAddrInfo &result, bool TCP, const std::string &hostname, uint16 port); + // Returns full hostnames (FQDNs) of the network interfaces in this machine. + // Includes names (if avail) of IPv4, IPv6, and loopback interfaces. + // Excludes names of interfaces that are down. + // Also not included is the system hostname if it is different from any interface hostname (e.g. on cbt nodes). + std::set<std::string> myInterfaceHostnames(); + // Retrieve the IP of an interface ("eth0") as an 'ifreq' struct, which can be used as sockaddr. struct sockaddr getInterfaceIP(const std::string &iface); diff --git a/LCS/Stream/src/NetFuncs.cc b/LCS/Stream/src/NetFuncs.cc index a44bdf7e0290e8fc6b3e032b61beef45965832cd..d843da657e3caf3cf8d4e4a1817f15f8f3812d07 100644 --- a/LCS/Stream/src/NetFuncs.cc +++ b/LCS/Stream/src/NetFuncs.cc @@ -1,6 +1,6 @@ //# NetFuncs.cc: //# -//# Copyright (C) 2008 +//# Copyright (C) 2015, 2017 //# ASTRON (Netherlands Institute for Radio Astronomy) //# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands //# @@ -79,13 +79,77 @@ namespace LOFAR { int retval; if ((retval = getaddrinfo(hostname.c_str(), portStr, &hints, &result.addrinfo)) != 0) { - const string errorstr = gai_strerror(retval); + const std::string errorstr = gai_strerror(retval); throw SystemCallException(str(format("getaddrinfo(%s): %s") % hostname % errorstr), 0, THROW_ARGS); // TODO: SystemCallException also adds strerror(0), which is useless here } } } + IfAddrs::IfAddrs() { + if (getifaddrs(&ifa) != 0) { + THROW_SYSCALL("getifaddrs"); + } + } + + IfAddrs::~IfAddrs() { + if (ifa) { + freeifaddrs(ifa); + } + } + + std::set<std::string> myInterfaceHostnames() { + IfAddrs ifas; + std::set<std::string> hostnames; + + struct ifaddrs *ap; + for (ap = ifas.ifa; ap != NULL; ap = ap->ifa_next) { + char hostname[NI_MAXHOST]; + int salen; + + if (ap->ifa_addr == NULL) { + continue; + } + if (!(ap->ifa_flags & IFF_UP)) { + continue; + } + + int family = ap->ifa_addr->sa_family; + if (family == AF_INET) { + salen = sizeof(struct sockaddr_in); + } else if (family == AF_INET6) { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)ap->ifa_addr; + if (IN6_IS_ADDR_LINKLOCAL(&sin6->sin6_addr) || + IN6_IS_ADDR_MC_LINKLOCAL(&sin6->sin6_addr)) { + continue; // IPv6 link-local + } + + salen = sizeof(struct sockaddr_in6); + } else { + continue; + } + + { + // See safeGetAddrInfo() above. If getaddrinfo does not seem to be thread safe, + // then better lock getnameinfo too, and with the same mutex. + ScopedLock sl(getAddrInfoMutex); + + int retval = getnameinfo(ap->ifa_addr, salen, hostname, sizeof(hostname), + NULL, 0, NI_NAMEREQD); + if (retval != 0) { + const std::string errorstr = gai_strerror(retval); + LOG_WARN_STR(str(format("myHostnames(): failed to getnameinfo() for some addr of type %d on interface %s: %s") % + family % ap->ifa_name % errorstr)); + continue; + } + } + + hostnames.insert(hostname); + } + + return hostnames; + } + struct sockaddr getInterfaceIP(const std::string &iface) { int fd = -1; struct ifreq ifr; diff --git a/LCS/Stream/test/tNetFuncs.cc b/LCS/Stream/test/tNetFuncs.cc index 89f1a27e48fa31f6abea37e9c7c8fe2176f00400..7580fef98b3df1e83cc65b114e671dff3ac380fb 100644 --- a/LCS/Stream/test/tNetFuncs.cc +++ b/LCS/Stream/test/tNetFuncs.cc @@ -37,6 +37,8 @@ using namespace std; void test_getInterfaceIP() { + LOG_INFO("test_getInterfaceIP"); + // Loopback device (lo) should have IP 127.0.0.1 struct sockaddr sa = getInterfaceIP("lo"); @@ -47,6 +49,8 @@ void test_getInterfaceIP() void test_safeGetAddrInfo() { + LOG_INFO("test_safeGetAddrInfo"); + // "localhost" should also resolve to 127.0.0.1 safeAddrInfo addr; safeGetAddrInfo(addr, true, "localhost", 0); @@ -56,10 +60,31 @@ void test_safeGetAddrInfo() ASSERTSTR(ip == "127.0.0.1", "localhost resolves to " << ip); } +void test_myInterfaceHostnames() +{ + LOG_INFO("test_myInterfaceHostnames"); + + set<string> names = myInterfaceHostnames(); + size_t sz = names.size(); + ASSERTSTR(sz != 0, "Size of myInterfaceHostnames() retval is " << sz); + + bool loopbackNameSeen = false; + for (set<string>::iterator it = names.begin(); it != names.end(); ++it) { + LOG_INFO_STR("test_myInterfaceHostnames(): hostname = " << *it); + + if (it->find("localhost") != std::string::npos) { // "localhost", "ip6-localhost", "localhost6", ... + loopbackNameSeen = true; + } + } + + ASSERTSTR(loopbackNameSeen, "myInterfaceHostnames() did not return any loopback interface name"); +} + void test() { test_getInterfaceIP(); test_safeGetAddrInfo(); + test_myInterfaceHostnames(); } diff --git a/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc b/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc index 68c9b7b91f30aaa9dad1670ab802d9700dc0423a..f9e5db1a9a6974845325a249687d354ca1e31ec7 100644 --- a/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc +++ b/RTCP/Cobalt/OutputProc/src/GPUProcIO.cc @@ -39,6 +39,7 @@ #include <MessageBus/ToBus.h> #include <MessageBus/Protocols/TaskFeedbackDataproducts.h> #include <Stream/PortBroker.h> +#include <Stream/NetFuncs.h> #include <ApplCommon/PVSSDatapointDefs.h> #include <ApplCommon/StationInfo.h> #include <MACIO/RTmetadata.h> @@ -108,6 +109,26 @@ bool process(Stream &controlStream) alarm(0); string myHostName = myHostname(false); + // Get all my hostnames to see which output files are my responsibility. + set<string> myHostNames = myInterfaceHostnames(); // includes IPv4, IPv6 loopback name(s) + myHostNames.insert(myHostname(true)); // might be different (e.g. on cbt nodes) + set<string> myShortHostNames; // to avoid changing myHostNames while iterating + string hostNameConcat; + for (set<string>::iterator it = myHostNames.begin(); it != myHostNames.end(); ++it) { + size_t pos = it->find('.'); + if (pos != string::npos) { + string shortName = it->substr(0, pos); + myShortHostNames.insert(shortName); + + hostNameConcat += shortName; + hostNameConcat += ' '; + } + + hostNameConcat += *it; + hostNameConcat += ' '; + } + myHostNames.insert(myShortHostNames.begin(), myShortHostNames.end()); + LOG_INFO_STR("Expecting and writing data for data products for hostnames: " << hostNameConcat); if (parset.settings.realTime) { /* @@ -158,10 +179,9 @@ bool process(Stream &controlStream) { struct ObservationSettings::Correlator::File &file = parset.settings.correlator.files[fileIdx]; - if (file.location.host != myHostName - && file.location.host.find(myHostName + ".") != 0 - && file.location.host != "localhost") + if (myHostNames.find(file.location.host) == myHostNames.end()) { continue; + } LOG_INFO_STR("starting with fileIdx " << fileIdx); @@ -189,10 +209,9 @@ bool process(Stream &controlStream) { struct ObservationSettings::BeamFormer::File &file = parset.settings.beamFormer.files[fileIdx]; - if (file.location.host != myHostName - && file.location.host.find(myHostName + ".") != 0 - && file.location.host != "localhost") + if (myHostNames.find(file.location.host) == myHostNames.end()) { continue; + } const unsigned allFileIdx = fileIdx + parset.settings.correlator.files.size(); mdLogger.log(mdKeyPrefix + PN_COP_LOCUS_NODE + '[' + lexical_cast<string>(allFileIdx) + ']',