From 51db46d93d02142b7214e605a32c91b312f3142e Mon Sep 17 00:00:00 2001
From: Chris Broekema <broekema@astron.nl>
Date: Wed, 7 Feb 2007 07:50:43 +0000
Subject: [PATCH] BugID: 1011 Add support for UDP sockets to the inputsection.
 Also rename the SOCKET input method to TCP and introduce the UDP input
 method.

---
 .../CS1_InputSection/src/AH_InputSection.cc   |  5 +++++
 .../CEP/CS1/CS1_InputSection/src/Connector.cc | 19 ++++++++++++++++++-
 .../CS1/CS1_InputSection/src/WH_RSPInput.cc   | 10 +++++++++-
 3 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc b/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc
index ac9e6c78ced..b0db7547a91 100644
--- a/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc
+++ b/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc
@@ -133,9 +133,14 @@ namespace LOFAR {
 
 	  // Connect splitters to mergers (transpose)
 #ifndef HAVE_MPI
+
+	  vector <int> stations;
 	  for (int station = 0; station < nStations; station++) {
 	    itsConnector.connectSteps(RSPSteps[station], cell, collectSteps.back(), station);
+	    stations.push_back(station);
 	  }
+
+	  RSPSteps.back()->getOutDataManager(0).setOutRoundRobinPolicy(stations, nStations);
 #else
 #if MPICH_WORKING_ON_INFINI_BAND
 	  for (int station = 0; station < nStations; station++) {
diff --git a/Appl/CEP/CS1/CS1_InputSection/src/Connector.cc b/Appl/CEP/CS1/CS1_InputSection/src/Connector.cc
index 0852c43bd32..14b1e15f855 100644
--- a/Appl/CEP/CS1/CS1_InputSection/src/Connector.cc
+++ b/Appl/CEP/CS1/CS1_InputSection/src/Connector.cc
@@ -65,7 +65,7 @@ namespace LOFAR {
 	}	  
       } else if (transportType=="NULL") {
 	theTH = new TH_Null();
-      } else if (transportType=="SOCKET") {
+      } else if (transportType=="TCP") {
 	string service = ps.getString(key + ".port");
 	if (!isReceiver) {
 	  theTH = new TH_Socket(service, 
@@ -80,6 +80,23 @@ namespace LOFAR {
 				true,
 				Socket::TCP,
 				false);
+	}
+      } else if (transportType == "UDP") {
+	string service = ps.getString(key+".port");
+	if (!isReceiver) {
+	  theTH = new TH_Socket(service,
+				true, 
+				Socket::UDP, 
+				false);
+	} else { 
+	  string host = ps.getString(key+".host");
+	  theTH = new TH_Socket(host, 
+				service, 
+				true, 
+				Socket::UDP, 
+				false);
+	    
+
 	}
       } else {
 	ASSERTSTR(false, "TransportHolder " << transportType << " unknown to Connector");
diff --git a/Appl/CEP/CS1/CS1_InputSection/src/WH_RSPInput.cc b/Appl/CEP/CS1/CS1_InputSection/src/WH_RSPInput.cc
index 3ce6f5e80ba..5a53533fe15 100644
--- a/Appl/CEP/CS1/CS1_InputSection/src/WH_RSPInput.cc
+++ b/Appl/CEP/CS1/CS1_InputSection/src/WH_RSPInput.cc
@@ -35,6 +35,8 @@
 #include <CS1_Interface/RSPTimeStamp.h>
 #include <CS1_InputSection/BeamletBuffer.h>
 #include <CS1_InputSection/InputThread.h>
+#include <tinyCEP/Sel_RoundRobin.h>
+
 
 // for timer
 #include <signal.h>
@@ -80,10 +82,16 @@ namespace LOFAR {
       getDataManager().addInDataHolder(0, new DH_Delay("DH_Delay", ps.getInt32("Input.NRSPBoards")));
  
       // create a outgoing dataholder for each subband
-      for (int s=0; s < itsNoutputs; s++) {
+      vector<int> subbands;
+     for (int s=0; s < itsNoutputs; s++) {
 	snprintf(str, 32, "DH_RSP_out_%d", s);
 	getDataManager().addOutDataHolder(s, new DH_RSP(str, itsPS)); 
+	subbands.push_back(s);
       }
+     // set round-robin output selector, but set start index as well
+     // to optimize P2P transpose
+     getDataManager().setOutputSelector(new Sel_RoundRobin(subbands,itsStationNr));
+
     }
 
 
-- 
GitLab