diff --git a/RTCP/CNProc/src/CN_Processing.cc b/RTCP/CNProc/src/CN_Processing.cc index a5dd61759d5cd0c8e59d5be680ec8e27c5ed100a..5ab8c98938e16091b74e38f824b739d4224b2ec7 100644 --- a/RTCP/CNProc/src/CN_Processing.cc +++ b/RTCP/CNProc/src/CN_Processing.cc @@ -28,6 +28,7 @@ #include <Common/Timer.h> #include <Interface/CN_Configuration.h> #include <Interface/CN_Mapping.h> +#include <FCNP_ClientStream.h> #include <complex> #include <cmath> #include <iomanip> @@ -61,9 +62,10 @@ CN_Processing_Base::~CN_Processing_Base() { } -template <typename SAMPLE_TYPE> CN_Processing<SAMPLE_TYPE>::CN_Processing(Stream *str, const LocationInfo &locationInfo) +template <typename SAMPLE_TYPE> CN_Processing<SAMPLE_TYPE>::CN_Processing(Stream *str, Stream *(*createStream)(unsigned), const LocationInfo &locationInfo) : itsStream(str), + itsCreateStream(createStream), itsLocationInfo(locationInfo), itsPlan(0), #if defined HAVE_MPI @@ -171,6 +173,11 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C // create the arenas and allocate the data sets itsMapping.allocate(); + itsOutputStreams.resize(itsPlan->nrOutputTypes()); + for( unsigned i = 0; i < itsPlan->nrOutputTypes(); i++ ) { + itsOutputStreams[i] = itsCreateStream(i + 1); + } + if (itsHasPhaseTwo) { std::vector<unsigned> usedCoresInPset = configuration.usedCoresInPset(); unsigned usedCoresPerPset = usedCoresInPset.size(); @@ -202,6 +209,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C myCoreInPset, itsLocationInfo, phaseOnePsets, phaseTwoPsets ); } #endif // HAVE_MPI + } template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::transpose() @@ -350,7 +358,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::correlate() computeTimer.stop(); } -template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::sendOutput( StreamableData *outputData ) +template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::sendOutput( unsigned outputNr, StreamableData *outputData ) { #if defined HAVE_MPI if (LOG_CONDITION) @@ -359,7 +367,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::sendOutput( Str static NSTimer writeTimer("send timer", true, true); writeTimer.start(); - outputData->write(itsStream, false); + outputData->write(itsOutputStreams[outputNr], false); writeTimer.stop(); } @@ -439,11 +447,11 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::process() } // send all requested outputs - for( unsigned i = 0; i < itsPlan->plan.size(); i++ ) { + for( unsigned i = 0, outputNr = 0; i < itsPlan->plan.size(); i++ ) { const ProcessingPlan::planlet &p = itsPlan->plan[i]; if( p.output ) { - sendOutput( p.source ); + sendOutput( outputNr++, p.source ); } } } @@ -499,6 +507,11 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::postprocess() delete itsIncoherentStokes; } + for (unsigned i = 0; i < itsOutputStreams.size(); i++) { + delete itsOutputStreams[i]; + } + itsOutputStreams.clear(); + delete itsPlan; } diff --git a/RTCP/CNProc/src/CN_Processing.h b/RTCP/CNProc/src/CN_Processing.h index 6833daeb3eba0f71abb47bb96fe33c628199b584..b85e6a0fabf770db0d0ec086c14023f96d4b5ec9 100644 --- a/RTCP/CNProc/src/CN_Processing.h +++ b/RTCP/CNProc/src/CN_Processing.h @@ -65,7 +65,7 @@ class CN_Processing_Base // untemplated helper class template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base, boost::noncopyable { public: - CN_Processing(Stream *, const LocationInfo &); + CN_Processing(Stream *, Stream *(*createStream)(unsigned), const LocationInfo &); ~CN_Processing(); virtual void preprocess(CN_Configuration &); @@ -81,7 +81,7 @@ template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base, void calculateIncoherentStokes(); void correlate(); - void sendOutput( StreamableData *outputData ); + void sendOutput( unsigned outputNr, StreamableData *outputData ); void finishSendingInput(); #if 0 @@ -101,6 +101,8 @@ template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base, unsigned itsComputeGroupRank; unsigned itsPhaseTwoPsetSize, itsPhaseThreePsetSize; Stream *itsStream; + Stream *(*itsCreateStream)(unsigned); + std::vector<Stream*> itsOutputStreams; const LocationInfo &itsLocationInfo; std::vector<double> itsCenterFrequencies; unsigned itsFirstSubband, itsCurrentSubband, itsLastSubband, itsSubbandIncrement; diff --git a/RTCP/CNProc/src/CN_Processing_main.cc b/RTCP/CNProc/src/CN_Processing_main.cc index b8ecf38d217b34afde14dcb2d1faed0bbf2eaa60..e270440ebbd71b8e6fbbab436a90149ca34ab00d 100644 --- a/RTCP/CNProc/src/CN_Processing_main.cc +++ b/RTCP/CNProc/src/CN_Processing_main.cc @@ -74,6 +74,26 @@ void terminate_with_backtrace() #endif +static Stream *createIONstream( unsigned channel ) +{ +#if 1 && defined HAVE_FCNP && defined HAVE_BGP_CN && !defined VALGRIND + /* preferred */ + FCNP_CN::init(); + return new FCNP_ClientStream(channel); +#elif 1 + LocationInfo locationInfo; + + /* used by default for !HAVE_FCNP && !HAVE_BGP */ + usleep(10000 * locationInfo.rankInPset()); // do not connect all at the same time + + return new SocketStream("127.0.0.1", 5000 + locationInfo.rankInPset() + 1000 * channel, SocketStream::TCP, SocketStream::Client); +#elif 0 + /* used for testing */ + return new NullStream; +#else + THROW(CNProcException, "unknown Stream type between ION and CN"); +#endif +} int main(int argc, char **argv) { @@ -114,22 +134,7 @@ int main(int argc, char **argv) LOG_DEBUG("creating connection to ION ..."); - Stream *ionStream; -#if 1 && defined HAVE_FCNP && defined HAVE_BGP_CN - /* preferred */ - FCNP_CN::init(); - ionStream = new FCNP_ClientStream(0); -#elif 1 - /* used by default for !HAVE_FCNP && !HAVE_BGP */ - usleep(10000 * locationInfo.rankInPset()); // do not connect all at the same time - - ionStream = new SocketStream("127.0.0.1", 5000 + locationInfo.rankInPset(), SocketStream::TCP, SocketStream::Client); -#elif 0 - /* used for testing */ - ionStream = new NullStream; -#else - THROW(CNProcException, "unknown Stream type between ION and CN"); -#endif + Stream *ionStream = createIONstream(0); LOG_DEBUG("connection successful"); @@ -144,13 +149,13 @@ int main(int argc, char **argv) case CN_Command::PREPROCESS : configuration.read(ionStream); switch (configuration.nrBitsPerSample()) { - case 4: proc = new CN_Processing<i4complex>(ionStream, locationInfo); + case 4: proc = new CN_Processing<i4complex>(ionStream, &createIONstream, locationInfo); break; - case 8: proc = new CN_Processing<i8complex>(ionStream, locationInfo); + case 8: proc = new CN_Processing<i8complex>(ionStream, &createIONstream, locationInfo); break; - case 16: proc = new CN_Processing<i16complex>(ionStream, locationInfo); + case 16: proc = new CN_Processing<i16complex>(ionStream, &createIONstream, locationInfo); break; } diff --git a/RTCP/IONProc/src/ION_main.cc b/RTCP/IONProc/src/ION_main.cc index bef9a42e3886dcc61f80dd2bdfc7b90de785bdfd..c563efabaa0be87963b231d43b5444304848ca5c 100644 --- a/RTCP/IONProc/src/ION_main.cc +++ b/RTCP/IONProc/src/ION_main.cc @@ -134,17 +134,36 @@ static const unsigned nrCNcoresInPset = 64; // TODO: how to figure out the static pthread_mutex_t allocationMutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t reevaluate = PTHREAD_COND_INITIALIZER; +#if defined HAVE_VALGRIND || !defined HAVE_FCNP // FIXME +static const std::string streamType = "TCP"; +#else +static const std::string streamType = "FCNP"; +#endif #if defined HAVE_FCNP && defined __PPC__ static bool fcnp_inited; #endif +static Stream *createCNstream(unsigned core, unsigned channel) +{ +#if defined HAVE_FCNP && defined __PPC__ + if (streamType == "FCNP") + return new FCNP_ServerStream(core, channel); + else +#endif + if (streamType == "NULL") + return new NullStream; + else if (streamType == "TCP") + return new SocketStream("127.0.0.1", 5000 + core + 1000 * channel, SocketStream::TCP, SocketStream::Server); + else + THROW(IONProcException, "unknown Stream type between ION and CN"); +} -static void createAllCNstreams(const std::string &streamType) +static void createAllCNstreams() { #if defined HAVE_FCNP && defined __PPC__ - if (streamType == "FCNP") { + if (streamType == "FCNP" && !fcnp_inited) { FCNP_ION::init(true); fcnp_inited = true; } @@ -153,18 +172,7 @@ static void createAllCNstreams(const std::string &streamType) allCNstreams.resize(nrCNcoresInPset); for (unsigned core = 0; core < nrCNcoresInPset; core ++) { -#if defined HAVE_FCNP && defined __PPC__ - if (streamType == "FCNP") - allCNstreams[core] = new FCNP_ServerStream(core, 0); - else -#endif - - if (streamType == "NULL") - allCNstreams[core] = new NullStream; - else if (streamType == "TCP") - allCNstreams[core] = new SocketStream("127.0.0.1", 5000 + core, SocketStream::TCP, SocketStream::Server); - else - THROW(IONProcException, "unknown Stream type between ION and CN"); + allCNstreams[core] = createCNstream(core, 0); } } @@ -416,7 +424,6 @@ void Job::jobThread() deallocateResources(); LOG_DEBUG_STR("resources of job " << itsObservationID << " deallocated"); - delete this; } @@ -510,7 +517,7 @@ template <typename SAMPLE_TYPE> void Job::toCNthread() beamletBufferToComputeNode.preprocess(&itsParset); - for (unsigned run = 0; run < itsNrRuns; run ++) + for (unsigned run = 0; !itsToCNthread->stop && run < itsNrRuns; run ++) beamletBufferToComputeNode.process(); beamletBufferToComputeNode.postprocess(); @@ -571,7 +578,7 @@ void Job::fromCNthread() continue; } - outputSections[output] = new OutputSection(&itsParset, list, maxlistsize, output, itsCNstreams, output == nrOutputTypes - 1); + outputSections[output] = new OutputSection(&itsParset, list, maxlistsize, output, &createCNstream ); } // destructor of OutputSections will wait for threads to complete @@ -620,12 +627,7 @@ void master_thread(int argc, char **argv) exit(1); } - //createAllCNstreams(parset.getTransportType("OLAP.OLAP_Conn.IONProc_CNProc")); -#if defined HAVE_VALGRIND // FIXME - createAllCNstreams("TCP"); -#else - createAllCNstreams("FCNP"); -#endif + createAllCNstreams(); pthread_mutex_lock(&allocationMutex); diff --git a/RTCP/IONProc/src/OutputSection.cc b/RTCP/IONProc/src/OutputSection.cc index efcb55b7ea66584b096f52de9c4de4607e8f5870..4fae1748c431570c2c8bbebdf6353ba9ac54c3b1 100644 --- a/RTCP/IONProc/src/OutputSection.cc +++ b/RTCP/IONProc/src/OutputSection.cc @@ -51,19 +51,18 @@ using boost::format; namespace LOFAR { namespace RTCP { -OutputSection::OutputSection(const Parset *ps, std::vector<unsigned> &itemList, unsigned nrUsedCores, unsigned outputType, const std::vector<Stream *> &streamsFromCNs, bool lastOutput) +OutputSection::OutputSection(const Parset *ps, std::vector<unsigned> &itemList, unsigned nrUsedCores, unsigned outputType, Stream *(*createStream)(unsigned,unsigned)) : itsParset(ps), itsItemList(itemList), - itsNrUsedCores(nrUsedCores), itsOutputType(outputType), + itsNrUsedCores(nrUsedCores), itsNrComputeCores(ps->nrCoresPerPset()), itsCurrentComputeCore(0), itsRealTime(ps->realTime()), itsPlan(0), - itsStreamsFromCNs(streamsFromCNs), - thread(0), - lastOutput(lastOutput) + itsStreamsFromCNs(ps->nrCoresPerPset(),0), + thread(0) { itsDroppedCount.resize(itsItemList.size()); @@ -96,6 +95,11 @@ OutputSection::OutputSection(const Parset *ps, std::vector<unsigned> &itemList, itsOutputThreads.push_back(new OutputThread(*itsParset, itsItemList[i], itsOutputType, dataTemplate)); } + // create the streams to the compute cores + for (unsigned core = 0; core < itsNrComputeCores; core++) { + itsStreamsFromCNs[core] = createStream(core, outputType + 1); + } + itsCurrentIntegrationStep = 0; itsSequenceNumber = 0; itsTmpSum = dataTemplate; @@ -115,11 +119,14 @@ OutputSection::OutputSection(const Parset *ps, std::vector<unsigned> &itemList, OutputSection::~OutputSection() { + // WAIT for our thread to finish delete thread; for (unsigned i = 0; i < itsItemList.size(); i++ ) { notDroppingData(i); // for final warning message + + // STOP our output threads delete itsOutputThreads[i]; } @@ -155,31 +162,12 @@ void OutputSection::mainLoop() { const unsigned nrRuns = static_cast<unsigned>(ceil((itsParset->stopTime() - itsParset->startTime()) / itsParset->CNintegrationTime())); - static pthread_mutex_t mutex[64]; - static pthread_cond_t condition[64]; - static unsigned computeCoreStates[64]; - - if( itsOutputType == 0 ) { - for( unsigned i = 0; i < 64; i++ ) { - pthread_mutex_init( &mutex[i], 0 ); - pthread_cond_init( &condition[i], 0 ); - computeCoreStates[i] = 0; - } - } - for( unsigned r = 0; r < nrRuns && !thread->stop; r++ ) { // process data from current core, even if we don't have a subband for this // core (to stay in sync with other psets). for (unsigned i = 0; i < itsNrUsedCores; i++ ) { // TODO: make sure that there are more free buffers than subbandsPerPset - // wait for our turn for this core - pthread_mutex_lock(&mutex[itsCurrentComputeCore]); - while( computeCoreStates[itsCurrentComputeCore] != itsOutputType ) { - pthread_cond_wait(&condition[itsCurrentComputeCore], &mutex[itsCurrentComputeCore]); - } - pthread_mutex_unlock(&mutex[itsCurrentComputeCore]); - if (i < itsItemList.size()) { OutputThread *outputThread = itsOutputThreads[i]; @@ -210,12 +198,6 @@ void OutputSection::mainLoop() } } - // signal next output that we're done with this one - pthread_mutex_lock(&mutex[itsCurrentComputeCore]); - computeCoreStates[itsCurrentComputeCore] = lastOutput ? 0 : itsOutputType + 1; - pthread_cond_broadcast(&condition[itsCurrentComputeCore]); - pthread_mutex_unlock(&mutex[itsCurrentComputeCore]); - if (++ itsCurrentComputeCore == itsNrComputeCores) { itsCurrentComputeCore = 0; } diff --git a/RTCP/IONProc/src/OutputSection.h b/RTCP/IONProc/src/OutputSection.h index fa930cb0d3c2740178f037f93aed4d5b1683766a..143498115b00ee22add1988097c3c3a6ff2c71b1 100644 --- a/RTCP/IONProc/src/OutputSection.h +++ b/RTCP/IONProc/src/OutputSection.h @@ -36,7 +36,7 @@ namespace RTCP { class OutputSection { public: - OutputSection(const Parset *ps, std::vector<unsigned> &itemList, unsigned nrUsedCores, unsigned outputType, const std::vector<Stream *> &streamsFromCNs, bool lastOutput); + OutputSection(const Parset *ps, std::vector<unsigned> &itemList, unsigned nrUsedCores, unsigned outputType, Stream *(*createStream)(unsigned,unsigned)); ~OutputSection(); private: @@ -66,7 +66,7 @@ class OutputSection // the main plan, also holds temporary results CN_ProcessingPlan<> *itsPlan; - const std::vector<Stream *> &itsStreamsFromCNs; + std::vector<Stream *> itsStreamsFromCNs; Thread *thread; diff --git a/RTCP/IONProc/src/OutputThread.cc b/RTCP/IONProc/src/OutputThread.cc index 13fbbd34ad1d9a6b96d7d023a36caff73922ee7d..63768f047e132a6f3b5bad00c023cd89299fcc77 100644 --- a/RTCP/IONProc/src/OutputThread.cc +++ b/RTCP/IONProc/src/OutputThread.cc @@ -67,6 +67,7 @@ OutputThread::OutputThread(const Parset &ps, const unsigned subband, const unsig OutputThread::~OutputThread() { + // STOP our thread itsSendQueue.append(0); // 0 indicates that no more messages will be sent if (connecting) { diff --git a/RTCP/Run/src/LOFAR/Sections.py b/RTCP/Run/src/LOFAR/Sections.py index 912c5acb4a6e23a47e7e3f6f2aa848c9aea89fba..62bc3f9ca665d565ef3e2c9ca0b5b99dfac0f6f1 100644 --- a/RTCP/Run/src/LOFAR/Sections.py +++ b/RTCP/Run/src/LOFAR/Sections.py @@ -147,7 +147,7 @@ class IONProcSection(Section): logfiles = ["%s/run.IONProc.%s.log" % (Locations.files["logdir"],self.partition)] + self.logoutputs if VALGRIND_ION: - valgrind = "/globalhome/mol/root-ppc/bin/valgrind --suppressions=%s --leak-check=full --show-reachable=yes" % (Locations.files["ionsuppfile"],) + valgrind = "/globalhome/mol/root-ppc/bin/valgrind --suppressions=%s --leak-check=no --show-reachable=no" % (Locations.files["ionsuppfile"],) else: valgrind = "" diff --git a/RTCP/Run/src/RTCP.parset b/RTCP/Run/src/RTCP.parset index 2ab178d92cdc91b6f476ac2d5647bca35188c704..f114b2c4f7f3b1fcf0add9ad935696dc6013d75e 100644 --- a/RTCP/Run/src/RTCP.parset +++ b/RTCP/Run/src/RTCP.parset @@ -22,10 +22,14 @@ Observation.subbandList = [154..401] Observation.beamList = [248*0] Observation.rspBoardList = [62*0,62*1,62*2,62*3] Observation.rspSlotList = [0..61,0..61,0..61,0..61] +#Observation.subbandList = [154] +#Observation.beamList = [0] +#Observation.rspBoardList = [0] +#Observation.rspSlotList = [0] Observation.sampleClock = 200 OLAP.nrBitsPerSample = 16 -OLAP.CNProc.usedCores = [0..63] +OLAP.CNProc.usedCoresInPset = [0..63] #OLAP.CNProc.phaseOnePsets = [0] #OLAP.CNProc.phaseTwoPsets = [0] #OLAP.CNProc.phaseThreePsets = [0] diff --git a/RTCP/Storage/include/Storage/InputThread.h b/RTCP/Storage/include/Storage/InputThread.h index c62d946f2e24ed9081d9de2d7c91f5ed01bad2db..e955829b0e0e55da541a18b16cfa7fd01ac794f1 100644 --- a/RTCP/Storage/include/Storage/InputThread.h +++ b/RTCP/Storage/include/Storage/InputThread.h @@ -58,6 +58,8 @@ class InputThread const unsigned itsSubbandNumber; const unsigned itsOutputNumber; const unsigned itsObservationID; + + volatile bool connecting; }; } // namespace RTCP diff --git a/RTCP/Storage/src/InputThread.cc b/RTCP/Storage/src/InputThread.cc index 80cfd29f63d3fd8870b6469326625188e67639d8..6546666c242ae74f55f75a2f857991fb1abd1d94 100644 --- a/RTCP/Storage/src/InputThread.cc +++ b/RTCP/Storage/src/InputThread.cc @@ -41,7 +41,8 @@ InputThread::InputThread(const Parset *ps, unsigned subbandNumber, unsigned outp itsPS(ps), itsSubbandNumber(subbandNumber), itsOutputNumber(outputNumber), - itsObservationID(ps->observationID()) + itsObservationID(ps->observationID()), + connecting(true) // start at true to avoid race condition when aborting a starting thread { for (unsigned i = 0; i < maxReceiveQueueSize; i ++) { StreamableData *data = dataTemplate->clone(); @@ -58,6 +59,10 @@ InputThread::InputThread(const Parset *ps, unsigned subbandNumber, unsigned outp InputThread::~InputThread() { + if (connecting) { + thread->abort(); + } + delete thread; while (!itsReceiveQueue.empty()) @@ -95,6 +100,8 @@ void InputThread::mainLoop() THROW(StorageException, "unsupported ION->Storage stream type: " << connectionType); } + connecting = false; + // limit reads from NullStream to 10 blocks; otherwise unlimited unsigned increment = nullInput ? 1 : 0; std::auto_ptr<StreamableData> data; diff --git a/RTCP/Storage/src/SubbandWriter.cc b/RTCP/Storage/src/SubbandWriter.cc index 7ed21c74fc43d04bddb8423813d4a8c040bd7474..f4ae300a5c2d3b76ccf331886cec71d010fec50c 100644 --- a/RTCP/Storage/src/SubbandWriter.cc +++ b/RTCP/Storage/src/SubbandWriter.cc @@ -146,16 +146,20 @@ SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank, unsigned size) SubbandWriter::~SubbandWriter() { // wait for all threads to finish - for (unsigned i = 0; i < itsInputThreads.size(); i++ ) { - delete itsInputThreads[i]; - } - itsInputThreads.clear(); + // wait for OutputThreads first, since they will still be reading data + // from the InputThreads for (unsigned i = 0; i < itsOutputThreads.size(); i++ ) { delete itsOutputThreads[i]; } itsOutputThreads.clear(); + for (unsigned i = 0; i < itsInputThreads.size(); i++ ) { + delete itsInputThreads[i]; + } + itsInputThreads.clear(); + + #ifdef USE_MAC_PI delete itsPropertySet;