diff --git a/RTCP/CNProc/src/CN_Processing.cc b/RTCP/CNProc/src/CN_Processing.cc index 01c0a528f0c21233ea7b7003f6270bfce1b68116..28b2ae67c8d48957b28215eca5673bae767eca52 100644 --- a/RTCP/CNProc/src/CN_Processing.cc +++ b/RTCP/CNProc/src/CN_Processing.cc @@ -90,20 +90,12 @@ CN_Processing_Base::~CN_Processing_Base() } -#if defined CLUSTER_SCHEDULING template <typename SAMPLE_TYPE> CN_Processing<SAMPLE_TYPE>::CN_Processing(const Parset &parset, const std::vector<SmartPtr<Stream> > &inputStreams, Stream *(*createStream)(unsigned, const LocationInfo &), const LocationInfo &locationInfo, Allocator &bigAllocator, unsigned firstBlock) -#else -template <typename SAMPLE_TYPE> CN_Processing<SAMPLE_TYPE>::CN_Processing(const Parset &parset, Stream *inputStream, Stream *(*createStream)(unsigned, const LocationInfo &), const LocationInfo &locationInfo, Allocator &bigAllocator, unsigned firstBlock) -#endif : itsBigAllocator(bigAllocator), itsBlock(firstBlock), itsParset(parset), -#if defined CLUSTER_SCHEDULING itsInputStreams(inputStreams), -#else - itsInputStream(inputStream), -#endif itsLocationInfo(locationInfo), #if defined HAVE_MPI itsTranspose2Logic(parset.CN_transposeLogic(itsLocationInfo.psetNumber(), CN_Mapping::reverseMapCoreOnPset(itsLocationInfo.rankInPset(), itsLocationInfo.psetNumber()))) @@ -392,7 +384,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::transposeInput( { #if defined HAVE_MPI if (itsHasPhaseOne) - itsInputSubbandMetaData->read(itsInputStream); // sync read the meta data + itsInputSubbandMetaData->read(itsInputStreams[0]); // sync read the meta data if (itsHasPhaseTwo && *itsCurrentSubband < itsNrSubbands) { NSTimer postAsyncReceives("post async receives", LOG_CONDITION, true); @@ -425,7 +417,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::transposeInput( LOG_DEBUG_STR("read subband " << subband << " from IO node at t = " << blockAge()); } readTimer.start(); - itsInputData->readOne(itsInputStream, i); // Synchronously read 1 subband from my IO node. + itsInputData->readOne(itsInputStreams[0], i); // Synchronously read 1 subband from my IO node. readTimer.stop(); asyncSendTimer.start(); if (LOG_CONDITION) { @@ -443,8 +435,8 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::transposeInput( if (itsHasPhaseOne) { static NSTimer readTimer("receive timer", true, true); readTimer.start(); - itsInputSubbandMetaData->read(itsInputStream); - itsInputData->read(itsInputStream, false); + itsInputSubbandMetaData->read(itsInputStreams[0]); + itsInputData->read(itsInputStreams[0], false); readTimer.stop(); } #endif // HAVE_MPI diff --git a/RTCP/CNProc/src/CN_Processing.h b/RTCP/CNProc/src/CN_Processing.h index f7a0e6ad16b2c824c866562c9ddeb9908199cd7e..d67e6aaf3d97cf4026b4df8c400210f666af454f 100644 --- a/RTCP/CNProc/src/CN_Processing.h +++ b/RTCP/CNProc/src/CN_Processing.h @@ -71,11 +71,7 @@ class CN_Processing_Base // untemplated helper class template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base { public: -#if defined CLUSTER_SCHEDULING CN_Processing(const Parset &, const std::vector<SmartPtr<Stream> > &inputStream, Stream *(*createStream)(unsigned, const LocationInfo &), const LocationInfo &, Allocator & = heapAllocator, unsigned firstBlock = 0); -#else - CN_Processing(const Parset &, Stream *inputStream, Stream *(*createStream)(unsigned, const LocationInfo &), const LocationInfo &, Allocator & = heapAllocator, unsigned firstBlock = 0); -#endif ~CN_Processing(); virtual void process(unsigned); @@ -121,11 +117,7 @@ template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base const Parset &itsParset; -#if defined CLUSTER_SCHEDULING const std::vector<SmartPtr<Stream> > &itsInputStreams; -#else - Stream *itsInputStream; -#endif SmartPtr<Stream> itsCorrelatedDataStream; SmartPtr<Stream> itsFinalBeamFormedDataStream; SmartPtr<Stream> itsTriggerDataStream; diff --git a/RTCP/CNProc/src/CN_Processing_main.cc b/RTCP/CNProc/src/CN_Processing_main.cc index efb5bbb1a80e5c4f117cee635afa5dc1c3417ec1..784271ad371e7e33846d912675069007d6cb8a01 100644 --- a/RTCP/CNProc/src/CN_Processing_main.cc +++ b/RTCP/CNProc/src/CN_Processing_main.cc @@ -37,6 +37,12 @@ #include <boost/lexical_cast.hpp> #include <execinfo.h> +#if defined CLUSTER_SCHEDULING +#define LOG_CONDITION 1 +#else +#define LOG_CONDITION (locationInfo.rankInPset() == 0) +#endif + #if defined HAVE_MPI #define MPICH_IGNORE_CXX_SEEK #include <mpi.h> @@ -193,27 +199,25 @@ int main(int argc, char **argv) getIONstreamType(); -#if defined CLUSTER_SCHEDULING - LOG_DEBUG("Creating connections to IONs ..."); + if (LOG_CONDITION) + LOG_DEBUG("Creating connection to ION ..."); - std::vector<SmartPtr<Stream> > ionStreams(locationInfo.nrPsets()); + std::vector<SmartPtr<Stream> > ionStreams; + +#if defined CLUSTER_SCHEDULING + ionStreams.resize(locationInfo.nrPsets()); for (unsigned ionode = 0; ionode < locationInfo.nrPsets(); ionode ++) { std::string descriptor = getStreamDescriptorBetweenIONandCN(ionStreamType, ionode, locationInfo.rankInPset(), locationInfo.nrPsets(), locationInfo.psetSize(), 0); ionStreams[ionode] = createStream(descriptor, false); } - - LOG_DEBUG("Creating connections to IONs done"); - SmartPtr<Stream> &ionStream = ionStreams[0]; #else - if (locationInfo.rankInPset() == 0) - LOG_DEBUG("Creating connection to ION ..."); - - SmartPtr<Stream> ionStream(createIONstream(0, locationInfo)); + ionStreams.resize(1); + ionStreams[0] = createIONstream(0, locationInfo); +#endif - if (locationInfo.rankInPset() == 0) + if (LOG_CONDITION) LOG_DEBUG("Creating connection to ION: done"); -#endif // an allocator for our big memory structures @@ -233,17 +237,16 @@ int main(int argc, char **argv) do { //LOG_DEBUG("Wait for command"); - command.read(ionStream); + command.read(ionStreams[0]); //LOG_DEBUG_STR("Received command " << command.value()); switch (command.value()) { case CN_Command::PREPROCESS : try { unsigned firstBlock = command.param(); - parset = new Parset(ionStream); + parset = new Parset(ionStreams[0]); switch (parset->nrBitsPerSample()) { -#if defined CLUSTER_SCHEDULING case 4: proc = new CN_Processing<i4complex>(*parset, ionStreams, &createIONstream, locationInfo, bigAllocator, firstBlock); break; @@ -252,16 +255,6 @@ int main(int argc, char **argv) case 16: proc = new CN_Processing<i16complex>(*parset, ionStreams, &createIONstream, locationInfo, bigAllocator, firstBlock); break; -#else - case 4: proc = new CN_Processing<i4complex>(*parset, ionStream, &createIONstream, locationInfo, bigAllocator, firstBlock); - break; - - case 8: proc = new CN_Processing<i8complex>(*parset, ionStream, &createIONstream, locationInfo, bigAllocator, firstBlock); - break; - - case 16: proc = new CN_Processing<i16complex>(*parset, ionStream, &createIONstream, locationInfo, bigAllocator, firstBlock); - break; -#endif } } catch (Exception &ex) { LOG_ERROR_STR("Caught Exception: " << ex); diff --git a/RTCP/CNProc/test/tCN_Processing_alloc.cc b/RTCP/CNProc/test/tCN_Processing_alloc.cc index e995b0eda74b09ec385382f007673fdb9a5f4334..046f0011dd8dbfe59f1c5177aab9dff4a0de934a 100644 --- a/RTCP/CNProc/test/tCN_Processing_alloc.cc +++ b/RTCP/CNProc/test/tCN_Processing_alloc.cc @@ -57,19 +57,20 @@ int main(int argc, char **argv) { LocationInfo locationInfo; CN_Processing_Base *proc; Parset parset; - NullStream inputStream; + std::vector<SmartPtr<Stream> > inputStreams(1); + inputStreams[0] = new NullStream; parset.adoptFile("tCN_Processing_alloc.parset"); // preprocess switch (parset.nrBitsPerSample()) { - case 4: proc = new CN_Processing<i4complex>(parset, &inputStream, &createIONstream, locationInfo); + case 4: proc = new CN_Processing<i4complex>(parset, inputStreams, &createIONstream, locationInfo); break; - case 8: proc = new CN_Processing<i8complex>(parset, &inputStream, &createIONstream, locationInfo); + case 8: proc = new CN_Processing<i8complex>(parset, inputStreams, &createIONstream, locationInfo); break; - case 16: proc = new CN_Processing<i16complex>(parset, &inputStream, &createIONstream, locationInfo); + case 16: proc = new CN_Processing<i16complex>(parset, inputStreams, &createIONstream, locationInfo); break; default: return 1;