diff --git a/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc b/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc index c03b77cb3b7c7e4ecd585ddf423632416489b4cc..5052b1f6312972786d420d90cdeb66ef2033047e 100644 --- a/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc +++ b/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc @@ -26,6 +26,7 @@ //# Includes #include <Common/LofarLogger.h> #include <CS1_InputSection/AH_InputSection.h> +#include <CS1_Interface/RSPTimeStamp.h> //# Workholders #include <CS1_InputSection/WH_RSPInput.h> @@ -33,6 +34,7 @@ #include <Transport/TransportHolder.h> #include <Transport/TH_MPI.h> +#include <Transport/TH_Socket.h> #define IS_MULTIPLE(number, bignumber) (floor(bignumber / number) == (1.0 * bignumber / number)) @@ -70,6 +72,8 @@ namespace LOFAR { int lowestFreeNode = 0; #endif + TimeStamp::setMaxBlockId(itsParamSet.getDouble("Observation.SampleRate")); + int psetsPerCell = itsParamSet.getInt32("BGLProc.PsetsPerCell"); int nCells = itsParamSet.getInt32("Observation.NSubbands") / (itsParamSet.getInt32("General.SubbandsPerPset") * psetsPerCell); // number of SubBand filters in the application int nNodesPerCell = itsParamSet.getInt32("BGLProc.NodesPerPset") * psetsPerCell; @@ -112,7 +116,7 @@ namespace LOFAR { comp.addBlock(RSPSteps.back()); // Connect the Delay Controller - itsInputStub->connect(ic * nStations + station, (RSPSteps.back())->getInDataManager(0), 0); + //itsInputStub->connect(ic * nStations + station, (RSPSteps.back())->getInDataManager(0), 0); } LOG_TRACE_FLOW_STR("Create the Subband merger workholders"); @@ -129,23 +133,64 @@ namespace LOFAR { comp.addBlock(collectSteps.back()); // Connect splitters to mergers (transpose) +#ifndef HAVE_MPI + for (int station = 0; station < nStations; station++) { + itsConnector.connectSteps(RSPSteps[station], cell, collectSteps.back(), station); + } +#else +#if MPICH_WORKING_ON_INFINI_BAND for (int station = 0; station < nStations; station++) { itsConnector.connectSteps(RSPSteps[station], cell, collectSteps.back(), station); } +#else + vector<string> transposeHosts = itsParamSet.getStringVector("TransposeHosts"); + vector<string> transposePorts = itsParamSet.getStringVector("TransposePorts"); + for (int station = 0; station < nStations; station++) { + // We need to find out if we are on the client or the server + // because TH_Socket doesn't find it out itself. + if (collectSteps.back()->getNode() == TH_MPI::getCurrentRank()) { + // create server socket + collectSteps.back()->connect(station, + RSPSteps[station], + cell, + 1, + new TH_Socket(transposePorts[station], + true, + Socket::TCP, + 5, + false), + false); + } else { + // create client socket + collectSteps.back()->connect(station, + RSPSteps[station], + cell, + 1, + new TH_Socket(transposeHosts[cell], + transposePorts[station], + true, + Socket::TCP, + false), + false); + } + } +#endif +#endif // connect outputs to Subband stub vector<int> channels; - for (int core = 0; core < nNodesPerCell; core++) { - collectSteps.back()->getOutDataManager(0).setOutBuffer(core, false, 10); -#if 1 - itsOutputStub->connect(cell + ic * nCells / inputCells, - core, - (collectSteps.back())->getOutDataManager(0), - core); -#endif - channels.push_back(core); + bool makeConnections = itsParamSet.getBool("Connect"); + if (makeConnections) { + for (int core = 0; core < nNodesPerCell; core++) { + collectSteps.back()->getOutDataManager(0).setOutBuffer(core, false, 3); + itsOutputStub->connect(cell + ic * nCells / inputCells, + core, + (collectSteps.back())->getOutDataManager(0), + core); + channels.push_back(core); + } + collectSteps.back()->getOutDataManager(0).setOutRoundRobinPolicy(channels, itsParamSet.getInt32("BGLProc.MaxConcurrentCommunications")); } - collectSteps.back()->getOutDataManager(0).setOutRoundRobinPolicy(channels, itsParamSet.getInt32("BGLProc.MaxConcurrentCommunications")); } } LOG_TRACE_FLOW_STR("Finished define()"); @@ -163,7 +208,6 @@ namespace LOFAR { LOG_TRACE_FLOW_STR("Start AH_InputSection::run() " ); for (int i = 0; i < steps; i++) { LOG_TRACE_LOOP_STR("processing run " << i ); - cout<<"run "<<i+1<<" of "<<steps<<endl; getComposite().process(); } LOG_TRACE_FLOW_STR("Finished AH_InputSection::run() " );