Skip to content
Snippets Groups Projects
Commit f03f072e authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #8444: Move bind_local_iface parameter from createStream to the stream description

parent 6a74d043
No related branches found
No related tags found
No related merge requests found
...@@ -32,8 +32,7 @@ namespace LOFAR ...@@ -32,8 +32,7 @@ namespace LOFAR
// Caller should wrap the returned pointer in some smart ptr type. // Caller should wrap the returned pointer in some smart ptr type.
// //
// deadline: absolute deadline for creating the connection // deadline: absolute deadline for creating the connection
// bind_local_iface: bind to this NIC (eth0, etc), or "" if none Stream *createStream(const std::string &descriptor, bool asReader, time_t deadline = 0);
Stream *createStream(const std::string &descriptor, bool asReader, time_t deadline = 0, const std::string &bind_local_iface = "");
} // namespace LOFAR } // namespace LOFAR
......
...@@ -39,7 +39,7 @@ namespace LOFAR ...@@ -39,7 +39,7 @@ namespace LOFAR
{ {
// Caller should wrap the returned pointer in some smart ptr type. // Caller should wrap the returned pointer in some smart ptr type.
Stream *createStream(const std::string &descriptor, bool asServer, time_t deadline, const std::string &bind_local_iface) Stream *createStream(const std::string &descriptor, bool asServer, time_t deadline)
{ {
std::vector<std::string> split = StringUtil::split(descriptor, ':'); std::vector<std::string> split = StringUtil::split(descriptor, ':');
...@@ -47,17 +47,17 @@ namespace LOFAR ...@@ -47,17 +47,17 @@ namespace LOFAR
if (descriptor == "null:") if (descriptor == "null:")
return new NullStream; return new NullStream;
// udp:HOST:PORT // udp:HOST:PORT[:LOCAL_IFACE]
else if (split.size() == 3 && split[0] == "udp") else if (split.size() >= 3 && split[0] == "udp")
return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline, true, bind_local_iface); return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline, true, split.size() > 3 ? split[3] : "");
// tcp:HOST:PORT // tcp:HOST:PORT[:LOCAL_IFACE]
else if (split.size() == 3 && split[0] == "tcp") else if (split.size() >= 3 && split[0] == "tcp")
return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::TCP, asServer ? SocketStream::Server : SocketStream::Client, deadline, true, bind_local_iface); return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::TCP, asServer ? SocketStream::Server : SocketStream::Client, deadline, true, split.size() > 3 ? split[3] : "");
// tcpbroker:HOST:BROKERPORT:KEY // tcpbroker:HOST:BROKERPORT:KEY[:LOCAL_IFACE]
else if (split.size() == 4 && split[0] == "tcpbroker") else if (split.size() >= 4 && split[0] == "tcpbroker")
return asServer ? static_cast<Stream*>(new PortBroker::ServerStream(split[3])) : static_cast<Stream*>(new PortBroker::ClientStream(split[1], boost::lexical_cast<unsigned short>(split[2]), split[3], deadline, bind_local_iface)); return asServer ? static_cast<Stream*>(new PortBroker::ServerStream(split[3])) : static_cast<Stream*>(new PortBroker::ClientStream(split[1], boost::lexical_cast<unsigned short>(split[2]), split[3], deadline, split.size() > 4 ? split[4] : ""));
// file:PATH // file:PATH
else if (split.size() > 1 && split[0] == "file") { else if (split.size() > 1 && split[0] == "file") {
...@@ -72,7 +72,7 @@ namespace LOFAR ...@@ -72,7 +72,7 @@ namespace LOFAR
// HOST:PORT (udp) // HOST:PORT (udp)
else if (split.size() == 2) else if (split.size() == 2)
return new SocketStream(split[0].c_str(), boost::lexical_cast<unsigned short>(split[1]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline, true, bind_local_iface); return new SocketStream(split[0].c_str(), boost::lexical_cast<unsigned short>(split[1]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline, true, "");
// PATH (file) // PATH (file)
else if (split.size() == 1) else if (split.size() == 1)
......
...@@ -61,7 +61,7 @@ namespace LOFAR ...@@ -61,7 +61,7 @@ namespace LOFAR
// The returned descriptor can be supplied to LCS/Stream StreamFactory.h // The returned descriptor can be supplied to LCS/Stream StreamFactory.h
string getStreamDescriptorBetweenIONandStorage(const Parset &parset, OutputType outputType, unsigned streamNr) string getStreamDescriptorBetweenIONandStorage(const Parset &parset, OutputType outputType, unsigned streamNr, const std::string &bind_local_iface)
{ {
string host = parset.getHostName(outputType, streamNr); string host = parset.getHostName(outputType, streamNr);
uint16 port = storageBrokerPort(parset.settings.observationID); uint16 port = storageBrokerPort(parset.settings.observationID);
...@@ -69,7 +69,7 @@ namespace LOFAR ...@@ -69,7 +69,7 @@ namespace LOFAR
if (host == "") if (host == "")
return str(format("file:%s") % parset.getFileName(outputType, streamNr)); return str(format("file:%s") % parset.getFileName(outputType, streamNr));
else else
return str(format("tcpbroker:%s:%u:ion-storage-obs-%u-type-%u-stream-%u") % host % port % parset.settings.observationID % outputType % streamNr); return str(format("tcpbroker:%s:%u:ion-storage-obs-%u-type-%u-stream-%u:%s") % host % port % parset.settings.observationID % outputType % streamNr % bind_local_iface);
} }
} // namespace Cobalt } // namespace Cobalt
......
...@@ -36,7 +36,7 @@ namespace LOFAR ...@@ -36,7 +36,7 @@ namespace LOFAR
std::string getStorageControlDescription(int observationID, int rank); std::string getStorageControlDescription(int observationID, int rank);
// The returned descriptor can be supplied to LCS/Stream StreamFactory.h // The returned descriptor can be supplied to LCS/Stream StreamFactory.h
std::string getStreamDescriptorBetweenIONandStorage(const Parset &parset, OutputType outputType, unsigned streamNr); std::string getStreamDescriptorBetweenIONandStorage(const Parset &parset, OutputType outputType, unsigned streamNr, const std::string &bind_local_iface = "");
} // namespace Cobalt } // namespace Cobalt
} // namespace LOFAR } // namespace LOFAR
......
...@@ -802,10 +802,11 @@ namespace LOFAR ...@@ -802,10 +802,11 @@ namespace LOFAR
SmartPtr<Stream> outputStream; SmartPtr<Stream> outputStream;
if (ps.settings.correlator.enabled) { if (ps.settings.correlator.enabled) {
const string desc = getStreamDescriptorBetweenIONandStorage(ps, CORRELATED_DATA, globalSubbandIdx); const string desc = getStreamDescriptorBetweenIONandStorage(ps, CORRELATED_DATA, globalSubbandIdx,
hostID < ps.settings.nodes.size() ? ps.settings.nodes.at(hostID).out_nic : "");
try { try {
outputStream = createStream(desc, false, 0, hostID < ps.settings.nodes.size() ? ps.settings.nodes.at(hostID).out_nic : ""); outputStream = createStream(desc, false, 0);
} catch (Exception &ex) { } catch (Exception &ex) {
LOG_ERROR_STR("Error writing subband " << globalSubbandIdx << ", dropping all subsequent blocks: " << ex.what()); LOG_ERROR_STR("Error writing subband " << globalSubbandIdx << ", dropping all subsequent blocks: " << ex.what());
return; return;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment