diff --git a/RTCP/Cobalt/CoInterface/src/Parset.cc b/RTCP/Cobalt/CoInterface/src/Parset.cc index 47f3e8cbfd7ec9edde528952da2ea312476a5dee..c9e30c708e8f9e1e7a4ea709f92cb9de4a32074b 100644 --- a/RTCP/Cobalt/CoInterface/src/Parset.cc +++ b/RTCP/Cobalt/CoInterface/src/Parset.cc @@ -479,7 +479,8 @@ namespace LOFAR node.hostName = getString(prefix + "host", "localhost"); node.cpu = getUint32(prefix + "cpu", 0); - node.nic = getString(prefix + "nic", ""); + node.mpi_nic = getString(prefix + "mpi_nic", ""); + node.out_nic = getString(prefix + "out_nic", ""); node.gpus = getUint32Vector(prefix + "gpus", vector<unsigned>(1,0)); // default to [0] settings.nodes.push_back(node); diff --git a/RTCP/Cobalt/CoInterface/src/Parset.h b/RTCP/Cobalt/CoInterface/src/Parset.h index fb7394a373e51f5e5674eb40f3347389707e41f0..dc266f4d9e47c7ea9e8ddd735a4334f9562b66e5 100644 --- a/RTCP/Cobalt/CoInterface/src/Parset.h +++ b/RTCP/Cobalt/CoInterface/src/Parset.h @@ -268,7 +268,8 @@ namespace LOFAR // NIC(s) to bind to (comma seperated) // // E.g. "mlx4_0", "mlx4_1", "eth0", etc - std::string nic; + std::string mpi_nic; // for MPI + std::string out_nic; // to outputProc }; std::vector<struct Node> nodes; diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc index 3cfc8a2cc5cdbd572e21cb0abee18ade279abb57..2189a8cac57f801d20c894d70d8aab7ea67fab2f 100644 --- a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc +++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc @@ -589,11 +589,12 @@ void MultiReceiver::dispatch( PortBroker::ServerStream *stream ) // Maintains the connections of an rtcp process with all its outputProc processes // it needs to send data to. MultiSender::MultiSender( const HostMap &hostMap, const Parset &parset, - double maxRetentionTime ) + double maxRetentionTime, const std::string &bind_local_iface ) : hostMap(hostMap), itsParset(parset), - maxRetentionTime(maxRetentionTime) + maxRetentionTime(maxRetentionTime), + bind_local_iface(bind_local_iface) { for (HostMap::const_iterator i = hostMap.begin(); i != hostMap.end(); ++i) { // keep a list of unique hosts @@ -639,7 +640,7 @@ void MultiSender::process( OMPThreadSet *threadSet ) LOG_DEBUG_STR(logPrefix << "MultiSender: Connecting to " << host.hostName << ":" << host.brokerPort << ":" << host.service); - PortBroker::ClientStream stream(host.hostName, host.brokerPort, host.service); + PortBroker::ClientStream stream(host.hostName, host.brokerPort, host.service, 0, bind_local_iface); LOG_DEBUG_STR(logPrefix << "Connected"); diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.h b/RTCP/Cobalt/CoInterface/src/TABTranspose.h index c129230b845bbcc7c0e7500563f491aaaa60c2e6..d302d6726cb83d75f94849c680fb54eec4bc0989 100644 --- a/RTCP/Cobalt/CoInterface/src/TABTranspose.h +++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.h @@ -351,8 +351,9 @@ namespace LOFAR // hostMap: the mapping fileIdx -> Host // parset: the parset (i.e. observation configuration) // maxRetentionTime: drop data older than this from the queue + // bind_local_iface: local NIC to bind to (or "" for any) MultiSender( const HostMap &hostMap, const Parset &parset, - double maxRetentionTime = 3.0 ); + double maxRetentionTime = 3.0, const std::string &bind_local_iface = "" ); ~MultiSender(); // Send the data from the queues to the receiving hosts. Will run until @@ -389,6 +390,9 @@ namespace LOFAR // 'maxRetentionTime' seconds. const double maxRetentionTime; + // Local NIC to bind network connections to, or "" if no binding is required + const std::string bind_local_iface; + // Set of hosts to connect to (the list of unique values in hostMap) std::vector<struct Host> hosts; diff --git a/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/HardwareList.parset b/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/HardwareList.parset index 490ff0df856981f5b3660f7a84a78800e5e20bc7..7ec295e125469c4909516b5a053cd08e75dae45b 100644 --- a/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/HardwareList.parset +++ b/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/HardwareList.parset @@ -5,119 +5,141 @@ PIC.Core.Cobalt.localhost.host=localhost PIC.Core.Cobalt.localhost.cpu=0 -PIC.Core.Cobalt.localhost.nic= +PIC.Core.Cobalt.localhost.mpi_nic= +PIC.Core.Cobalt.localhost.out_nic= PIC.Core.Cobalt.localhost.gpus=[0] # DAS-4 nodes PIC.Core.Cobalt.gpu01_0.host=gpu01 PIC.Core.Cobalt.gpu01_0.cpu=0 -PIC.Core.Cobalt.gpu01_0.nic= +PIC.Core.Cobalt.gpu01_0.mpi_nic= +PIC.Core.Cobalt.gpu01_0.out_nic= PIC.Core.Cobalt.gpu01_0.gpus=[0, 1] PIC.Core.Cobalt.gpu01_1.host=gpu01 PIC.Core.Cobalt.gpu01_1.cpu=1 -PIC.Core.Cobalt.gpu01_1.nic= +PIC.Core.Cobalt.gpu01_1.mpi_nic= PIC.Core.Cobalt.gpu01_1.gpus=[2, 3] # The Cobalt cluster PIC.Core.Cobalt.cbt001_0.host=cbt001 PIC.Core.Cobalt.cbt001_0.cpu=0 -PIC.Core.Cobalt.cbt001_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt001_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt001_0.out_nic= PIC.Core.Cobalt.cbt001_0.gpus=[0, 1] PIC.Core.Cobalt.cbt001_1.host=cbt001 PIC.Core.Cobalt.cbt001_1.cpu=1 -PIC.Core.Cobalt.cbt001_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt001_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt001_1.out_nic= PIC.Core.Cobalt.cbt001_1.gpus=[2, 3] PIC.Core.Cobalt.cbt002_0.host=cbt002 PIC.Core.Cobalt.cbt002_0.cpu=0 -PIC.Core.Cobalt.cbt002_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt002_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt002_0.out_nic= PIC.Core.Cobalt.cbt002_0.gpus=[0, 1] PIC.Core.Cobalt.cbt002_1.host=cbt002 PIC.Core.Cobalt.cbt002_1.cpu=1 -PIC.Core.Cobalt.cbt002_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt002_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt002_1.out_nic= PIC.Core.Cobalt.cbt002_1.gpus=[2, 3] PIC.Core.Cobalt.cbt003_0.host=cbt003 PIC.Core.Cobalt.cbt003_0.cpu=0 -PIC.Core.Cobalt.cbt003_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt003_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt003_0.out_nic= PIC.Core.Cobalt.cbt003_0.gpus=[0, 1] PIC.Core.Cobalt.cbt003_1.host=cbt003 PIC.Core.Cobalt.cbt003_1.cpu=1 -PIC.Core.Cobalt.cbt003_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt003_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt003_1.out_nic= PIC.Core.Cobalt.cbt003_1.gpus=[2, 3] PIC.Core.Cobalt.cbt004_0.host=cbt004 PIC.Core.Cobalt.cbt004_0.cpu=0 -PIC.Core.Cobalt.cbt004_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt004_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt004_0.out_nic= PIC.Core.Cobalt.cbt004_0.gpus=[0, 1] PIC.Core.Cobalt.cbt004_1.host=cbt004 PIC.Core.Cobalt.cbt004_1.cpu=1 -PIC.Core.Cobalt.cbt004_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt004_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt004_1.out_nic= PIC.Core.Cobalt.cbt004_1.gpus=[2, 3] PIC.Core.Cobalt.cbt005_0.host=cbt005 PIC.Core.Cobalt.cbt005_0.cpu=0 -PIC.Core.Cobalt.cbt005_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt005_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt005_0.out_nic= PIC.Core.Cobalt.cbt005_0.gpus=[0, 1] PIC.Core.Cobalt.cbt005_1.host=cbt005 PIC.Core.Cobalt.cbt005_1.cpu=1 -PIC.Core.Cobalt.cbt005_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt005_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt005_1.out_nic= PIC.Core.Cobalt.cbt005_1.gpus=[2, 3] PIC.Core.Cobalt.cbt006_0.host=cbt006 PIC.Core.Cobalt.cbt006_0.cpu=0 -PIC.Core.Cobalt.cbt006_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt006_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt006_0.out_nic= PIC.Core.Cobalt.cbt006_0.gpus=[0, 1] PIC.Core.Cobalt.cbt006_1.host=cbt006 PIC.Core.Cobalt.cbt006_1.cpu=1 -PIC.Core.Cobalt.cbt006_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt006_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt006_1.out_nic= PIC.Core.Cobalt.cbt006_1.gpus=[2, 3] PIC.Core.Cobalt.cbt007_0.host=cbt007 PIC.Core.Cobalt.cbt007_0.cpu=0 -PIC.Core.Cobalt.cbt007_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt007_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt007_0.out_nic= PIC.Core.Cobalt.cbt007_0.gpus=[0, 1] PIC.Core.Cobalt.cbt007_1.host=cbt007 PIC.Core.Cobalt.cbt007_1.cpu=1 -PIC.Core.Cobalt.cbt007_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt007_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt007_1.out_nic= PIC.Core.Cobalt.cbt007_1.gpus=[2, 3] PIC.Core.Cobalt.cbt008_0.host=cbt008 PIC.Core.Cobalt.cbt008_0.cpu=0 -PIC.Core.Cobalt.cbt008_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt008_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt008_0.out_nic= PIC.Core.Cobalt.cbt008_0.gpus=[0, 1] PIC.Core.Cobalt.cbt008_1.host=cbt008 PIC.Core.Cobalt.cbt008_1.cpu=1 -PIC.Core.Cobalt.cbt008_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt008_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt008_1.out_nic= PIC.Core.Cobalt.cbt008_1.gpus=[2, 3] PIC.Core.Cobalt.cbt009_0.host=cbt009 PIC.Core.Cobalt.cbt009_0.cpu=0 -PIC.Core.Cobalt.cbt009_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt009_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt009_0.out_nic= PIC.Core.Cobalt.cbt009_0.gpus=[0, 1] PIC.Core.Cobalt.cbt009_1.host=cbt009 PIC.Core.Cobalt.cbt009_1.cpu=1 -PIC.Core.Cobalt.cbt009_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt009_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt009_1.out_nic= PIC.Core.Cobalt.cbt009_1.gpus=[2, 3] PIC.Core.Cobalt.cbt010_0.host=cbt010 PIC.Core.Cobalt.cbt010_0.cpu=0 -PIC.Core.Cobalt.cbt010_0.nic=mlx4_0 +PIC.Core.Cobalt.cbt010_0.mpi_nic=mlx4_0 +PIC.Core.Cobalt.cbt010_0.out_nic= PIC.Core.Cobalt.cbt010_0.gpus=[0, 1] PIC.Core.Cobalt.cbt010_1.host=cbt010 PIC.Core.Cobalt.cbt010_1.cpu=1 -PIC.Core.Cobalt.cbt010_1.nic=mlx4_1 +PIC.Core.Cobalt.cbt010_1.mpi_nic=mlx4_1 +PIC.Core.Cobalt.cbt010_1.out_nic= PIC.Core.Cobalt.cbt010_1.gpus=[2, 3] diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc index 9ff49da785934ee2f3e09f69e84c45e7dac719bd..60b3c9e820a70c41785cd8413b57718ccb2f83e7 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc @@ -128,7 +128,7 @@ namespace LOFAR const std::vector<gpu::Device> &devices, Pool<struct MPIRecvData> &pool, RTmetadata &mdLogger, const std::string &mdKeyPrefix, - int hostID) + unsigned hostID) : subbandProcs(std::max(1UL, (profiling ? 1 : NR_WORKQUEUES_PER_DEVICE) * devices.size())), ps(ps), @@ -146,7 +146,8 @@ namespace LOFAR // be in bulk: if processing is cheap, all subbands will be output right after they have been received. // // Allow queue to drop items older than 3 seconds. - multiSender(hostMap(ps, subbandIndices, hostID), ps, 3.0) + multiSender(hostMap(ps, subbandIndices, hostID), ps, 3.0, hostID < ps.settings.nodes.size() ? ps.settings.nodes.at(hostID).out_nic : ""), + hostID(hostID) { ASSERTSTR(!devices.empty(), "Not bound to any GPU!"); @@ -804,7 +805,7 @@ namespace LOFAR const string desc = getStreamDescriptorBetweenIONandStorage(ps, CORRELATED_DATA, globalSubbandIdx); try { - outputStream = createStream(desc, false); + outputStream = createStream(desc, false, 0, hostID < ps.settings.nodes.size() ? ps.settings.nodes.at(hostID).out_nic : ""); } catch (Exception &ex) { LOG_ERROR_STR("Error writing subband " << globalSubbandIdx << ", dropping all subsequent blocks: " << ex.what()); return; diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.h b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.h index 8620fb4fa58bd6201cc6553e073f7b9015437cdb..b09516353d02eed582d95e36bb683953b9fe1ba1 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.h +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.h @@ -52,7 +52,7 @@ namespace LOFAR const std::vector<gpu::Device> &devices, Pool<struct MPIRecvData> &pool, RTmetadata &mdLogger, const std::string &mdKeyPrefix, - int hostID = 0); + unsigned hostID = 0); ~Pipeline(); @@ -136,6 +136,9 @@ namespace LOFAR // Output send engine, takes care of the host connections and the multiplexing. TABTranspose::MultiSender multiSender; + + // MPI rank for this node + const unsigned hostID; }; } } diff --git a/RTCP/Cobalt/GPUProc/src/rtcp.cc b/RTCP/Cobalt/GPUProc/src/rtcp.cc index 9bb98da80fea31ed678219c5ea0789d5803c3d09..1335c03e74a59e8cb31bd4f4c7c41a81f9fe91a5 100644 --- a/RTCP/Cobalt/GPUProc/src/rtcp.cc +++ b/RTCP/Cobalt/GPUProc/src/rtcp.cc @@ -330,10 +330,10 @@ int main(int argc, char **argv) } // Select on the local NUMA InfiniBand interface (OpenMPI only, for now) - if (mynode.nic != "") { - LOG_DEBUG_STR("Binding to interface " << mynode.nic); + if (mynode.mpi_nic != "") { + LOG_DEBUG_STR("Binding to interface " << mynode.mpi_nic); - if (setenv("OMPI_MCA_btl_openib_if_include", mynode.nic.c_str(), 1) < 0) + if (setenv("OMPI_MCA_btl_openib_if_include", mynode.mpi_nic.c_str(), 1) < 0) THROW_SYSCALL("setenv(OMPI_MCA_btl_openib_if_include)"); } } else { diff --git a/RTCP/Cobalt/GPUProc/src/scripts/Cobalt_install.sh b/RTCP/Cobalt/GPUProc/src/scripts/Cobalt_install.sh index 0e594f82c00c88b753c045fda6688d15d3b56443..fa1531c76d5c22877d949e502ee72ffe64dac1bf 100755 --- a/RTCP/Cobalt/GPUProc/src/scripts/Cobalt_install.sh +++ b/RTCP/Cobalt/GPUProc/src/scripts/Cobalt_install.sh @@ -45,9 +45,14 @@ for HOST in ${HOSTS:-cbm001 cbm002 cbm003 cbm004 cbm005 cbm006 cbm007 cbm008 cbm ln -sfT /localhome/lofarsystem/lofar/var var # Set capabilities so our soft real-time programs can elevate prios. - COBALT_CAPABILITIES='cap_sys_admin,cap_sys_nice,cap_ipc_lock' -#disabled until we've updated /etc/sudoers to allow lofarbuild to do this -#also, we don't need cap_sys_admin and should drop it, idem on CEP2 - #sudo /sbin/setcap \"${COBALT_CAPABILITIES}\"=ep bin/rtcp bin/outputProc + # + # cap_sys_nice: allow real-time priority for threads + # cap_ipc_lock: allow app to lock in memory (prevent swap) + # cap_net_raw: allow binding sockets to NICs + OUTPUTPROC_CAPABILITIES='cap_sys_nice,cap_ipc_lock' + sudo /sbin/setcap \"${OUTPUTPROC_CAPABILITIES}\"=ep bin/outputProc || true + RTCP_CAPABILITIES='cap_net_raw,cap_sys_nice,cap_ipc_lock' + sudo /sbin/setcap \"${RTCP_CAPABILITIES}\"=ep bin/rtcp || true " || exit 1 done +