diff --git a/LCS/Stream/include/Stream/StreamFactory.h b/LCS/Stream/include/Stream/StreamFactory.h index 0e9dd777a6516eff3449649ed45d170a55499738..c1c79488363e499d44e8b3af215bd5e5dd70474d 100644 --- a/LCS/Stream/include/Stream/StreamFactory.h +++ b/LCS/Stream/include/Stream/StreamFactory.h @@ -32,8 +32,7 @@ namespace LOFAR // Caller should wrap the returned pointer in some smart ptr type. // // deadline: absolute deadline for creating the connection - // bind_local_iface: bind to this NIC (eth0, etc), or "" if none - Stream *createStream(const std::string &descriptor, bool asReader, time_t deadline = 0, const std::string &bind_local_iface = ""); + Stream *createStream(const std::string &descriptor, bool asReader, time_t deadline = 0); } // namespace LOFAR diff --git a/LCS/Stream/src/StreamFactory.cc b/LCS/Stream/src/StreamFactory.cc index c9e100f25249c20cf945ac4f140a2d24078f1b3e..0bd1c6281d8de929ae18f2fa39966929678028d4 100644 --- a/LCS/Stream/src/StreamFactory.cc +++ b/LCS/Stream/src/StreamFactory.cc @@ -39,7 +39,7 @@ namespace LOFAR { // Caller should wrap the returned pointer in some smart ptr type. - Stream *createStream(const std::string &descriptor, bool asServer, time_t deadline, const std::string &bind_local_iface) + Stream *createStream(const std::string &descriptor, bool asServer, time_t deadline) { std::vector<std::string> split = StringUtil::split(descriptor, ':'); @@ -47,17 +47,17 @@ namespace LOFAR if (descriptor == "null:") return new NullStream; - // udp:HOST:PORT - else if (split.size() == 3 && split[0] == "udp") - return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline, true, bind_local_iface); + // udp:HOST:PORT[:LOCAL_IFACE] + else if (split.size() >= 3 && split[0] == "udp") + return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline, true, split.size() > 3 ? split[3] : ""); - // tcp:HOST:PORT - else if (split.size() == 3 && split[0] == "tcp") - return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::TCP, asServer ? SocketStream::Server : SocketStream::Client, deadline, true, bind_local_iface); + // tcp:HOST:PORT[:LOCAL_IFACE] + else if (split.size() >= 3 && split[0] == "tcp") + return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::TCP, asServer ? SocketStream::Server : SocketStream::Client, deadline, true, split.size() > 3 ? split[3] : ""); - // tcpbroker:HOST:BROKERPORT:KEY - else if (split.size() == 4 && split[0] == "tcpbroker") - return asServer ? static_cast<Stream*>(new PortBroker::ServerStream(split[3])) : static_cast<Stream*>(new PortBroker::ClientStream(split[1], boost::lexical_cast<unsigned short>(split[2]), split[3], deadline, bind_local_iface)); + // tcpbroker:HOST:BROKERPORT:KEY[:LOCAL_IFACE] + else if (split.size() >= 4 && split[0] == "tcpbroker") + return asServer ? static_cast<Stream*>(new PortBroker::ServerStream(split[3])) : static_cast<Stream*>(new PortBroker::ClientStream(split[1], boost::lexical_cast<unsigned short>(split[2]), split[3], deadline, split.size() > 4 ? split[4] : "")); // file:PATH else if (split.size() > 1 && split[0] == "file") { @@ -72,7 +72,7 @@ namespace LOFAR // HOST:PORT (udp) else if (split.size() == 2) - return new SocketStream(split[0].c_str(), boost::lexical_cast<unsigned short>(split[1]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline, true, bind_local_iface); + return new SocketStream(split[0].c_str(), boost::lexical_cast<unsigned short>(split[1]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline, true, ""); // PATH (file) else if (split.size() == 1) diff --git a/RTCP/Cobalt/CoInterface/src/Stream.cc b/RTCP/Cobalt/CoInterface/src/Stream.cc index f86a9c37fa04c1b3fc627e174f5e7e8a3f1f6237..b9fd08e85bb933e27892d8685ef47c92ebc6d249 100644 --- a/RTCP/Cobalt/CoInterface/src/Stream.cc +++ b/RTCP/Cobalt/CoInterface/src/Stream.cc @@ -61,7 +61,7 @@ namespace LOFAR // The returned descriptor can be supplied to LCS/Stream StreamFactory.h - string getStreamDescriptorBetweenIONandStorage(const Parset &parset, OutputType outputType, unsigned streamNr) + string getStreamDescriptorBetweenIONandStorage(const Parset &parset, OutputType outputType, unsigned streamNr, const std::string &bind_local_iface) { string host = parset.getHostName(outputType, streamNr); uint16 port = storageBrokerPort(parset.settings.observationID); @@ -69,7 +69,7 @@ namespace LOFAR if (host == "") return str(format("file:%s") % parset.getFileName(outputType, streamNr)); else - return str(format("tcpbroker:%s:%u:ion-storage-obs-%u-type-%u-stream-%u") % host % port % parset.settings.observationID % outputType % streamNr); + return str(format("tcpbroker:%s:%u:ion-storage-obs-%u-type-%u-stream-%u:%s") % host % port % parset.settings.observationID % outputType % streamNr % bind_local_iface); } } // namespace Cobalt diff --git a/RTCP/Cobalt/CoInterface/src/Stream.h b/RTCP/Cobalt/CoInterface/src/Stream.h index 8f4797dc77ff00d327d6ac4b335dbaa126a89e89..030395c96be3f00fcac62d108bef25cef861e0b4 100644 --- a/RTCP/Cobalt/CoInterface/src/Stream.h +++ b/RTCP/Cobalt/CoInterface/src/Stream.h @@ -36,7 +36,7 @@ namespace LOFAR std::string getStorageControlDescription(int observationID, int rank); // The returned descriptor can be supplied to LCS/Stream StreamFactory.h - std::string getStreamDescriptorBetweenIONandStorage(const Parset &parset, OutputType outputType, unsigned streamNr); + std::string getStreamDescriptorBetweenIONandStorage(const Parset &parset, OutputType outputType, unsigned streamNr, const std::string &bind_local_iface = ""); } // namespace Cobalt } // namespace LOFAR diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc index 60b3c9e820a70c41785cd8413b57718ccb2f83e7..e84a3048457c4254749abbb2ee17ed272e8b8956 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc @@ -802,10 +802,11 @@ namespace LOFAR SmartPtr<Stream> outputStream; if (ps.settings.correlator.enabled) { - const string desc = getStreamDescriptorBetweenIONandStorage(ps, CORRELATED_DATA, globalSubbandIdx); + const string desc = getStreamDescriptorBetweenIONandStorage(ps, CORRELATED_DATA, globalSubbandIdx, + hostID < ps.settings.nodes.size() ? ps.settings.nodes.at(hostID).out_nic : ""); try { - outputStream = createStream(desc, false, 0, hostID < ps.settings.nodes.size() ? ps.settings.nodes.at(hostID).out_nic : ""); + outputStream = createStream(desc, false, 0); } catch (Exception &ex) { LOG_ERROR_STR("Error writing subband " << globalSubbandIdx << ", dropping all subsequent blocks: " << ex.what()); return;