From c5f08ce84c8075e4e59b50331062d11d43e09df7 Mon Sep 17 00:00:00 2001
From: Jan David Mol <mol@astron.nl>
Date: Wed, 6 Mar 2019 22:03:44 +0000
Subject: [PATCH] COB-4: 1) improved thread naming & logging 2) set signal
 handler (OMPThread::init) AFTER MPI_Init to avoid having it overwritten 3) do
 not spawn threads for antenna fields we're not sending over MPI

---
 LCS/Common/include/Common/Thread/Thread.h     | 71 ++++++++++++++-----
 RTCP/Cobalt/CoInterface/src/OMPThread.h       | 65 +++++++----------
 RTCP/Cobalt/CoInterface/src/TABTranspose.cc   |  2 +-
 RTCP/Cobalt/CoInterface/test/tOMPThread.cc    | 10 +--
 RTCP/Cobalt/GPUProc/src/CommandThread.cc      |  3 +-
 .../GPUProc/src/Station/StationInput.cc       |  2 +-
 .../GPUProc/src/Station/StationTranspose.cc   |  1 +
 .../GPUProc/src/cuda/Pipelines/Pipeline.cc    |  1 +
 RTCP/Cobalt/GPUProc/src/rtcp.cc               | 57 +++++++++------
 RTCP/Cobalt/InputProc/src/RSPBoards.cc        |  2 +-
 10 files changed, 127 insertions(+), 87 deletions(-)

diff --git a/LCS/Common/include/Common/Thread/Thread.h b/LCS/Common/include/Common/Thread/Thread.h
index b811551264d..e2e6d865ab1 100644
--- a/LCS/Common/include/Common/Thread/Thread.h
+++ b/LCS/Common/include/Common/Thread/Thread.h
@@ -80,6 +80,12 @@ class Thread
     // to finish.
     bool      caughtException();
 
+    // Return the name of the given thread as registered at the OS
+    static std::string getName(const pthread_t &thread = pthread_self());
+
+    // Register this thread at the OS (NOTE: only first 15 characters of "name" are used)
+    static void setMyName(const std::string &name);
+
     class ScopedPriority
     {
     public:
@@ -204,7 +210,7 @@ private:
 
 template <typename T> inline Thread::Thread(T *object, void (T::*method)(), const std::string &name, const std::string &logPrefix, size_t stackSize)
 :
-  logPrefix(logPrefix),
+  logPrefix(logPrefix + "[Thread " + name + "] "),
   name(name),
   caught_exception(false)
 {
@@ -241,7 +247,7 @@ inline Thread::~Thread()
     try {
       throw SystemCallException("pthread_join", retval, THROW_ARGS);
     } catch (Exception &ex) {
-      LOG_ERROR_STR("Exception in destructor: " << ex);
+      LOG_ERROR_STR(logPrefix << "Exception in destructor: " << ex);
     }
 }
 
@@ -302,6 +308,45 @@ inline bool Thread::caughtException()
   return caught_exception;
 }
 
+inline void Thread::setMyName(const std::string &name)
+{
+  // Inform the kernel of the thread name (only first 16 characters are used!)
+
+#if defined(_LIBCPP_VERSION)
+  int retval;
+
+  // Set name WITHIN the thread, to avoid race conditions
+  if ((retval = pthread_setname_np(name.substr(0,15).c_str())) != 0)
+    throw SystemCallException("pthread_setname_np", retval, THROW_ARGS);
+#else
+# if defined(_GNU_SOURCE) && __GLIBC_PREREQ(2, 12)
+  int retval;
+
+  // Set name WITHIN the thread, to avoid race conditions
+  if ((retval = pthread_setname_np(pthread_self(), name.substr(0,15).c_str())) != 0)
+    throw SystemCallException("pthread_setname_np", retval, THROW_ARGS);
+# else
+  (void)name;
+# endif
+#endif
+}
+
+inline std::string Thread::getName(const pthread_t &thread)
+{
+#if defined(_GNU_SOURCE) && __GLIBC_PREREQ(2, 12)
+  char cname[1024];
+
+  int retval;
+
+  if ((retval = pthread_getname_np(thread, &cname[0], sizeof cname)) != 0)
+    throw SystemCallException("pthread_getname_np", retval, THROW_ARGS);
+
+  return std::string(cname);
+#else
+  (void)thread;
+  return "<unknown>";
+#endif
+}
 
 template <typename T> inline void Thread::stub(Args<T> *args)
 {
@@ -311,26 +356,13 @@ template <typename T> inline void Thread::stub(Args<T> *args)
 
   ThreadLogger threadLogger;
 
-  LOG_DEBUG_STR(logPrefix << "Thread started");
+  LOG_DEBUG_STR(logPrefix << "Started");
 
   ThreadMap::ScopedRegistration sr(ThreadMap::instance(), args->name);
 
   try {
-#if defined(_LIBCPP_VERSION)
-    int retval;
-
     // Set name WITHIN the thread, to avoid race conditions
-    if ((retval = pthread_setname_np(args->name.substr(0,15).c_str())) != 0)
-      throw SystemCallException("pthread_setname_np", retval, THROW_ARGS);
-#else
-# if defined(_GNU_SOURCE) && __GLIBC_PREREQ(2, 12)
-    int retval;
-
-    // Set name WITHIN the thread, to avoid race conditions
-    if ((retval = pthread_setname_np(pthread_self(), args->name.substr(0,15).c_str())) != 0)
-      throw SystemCallException("pthread_setname_np", retval, THROW_ARGS);
-# endif
-#endif
+    setMyName(args->name);
 
     // allow cancellation from here, to guarantee finished.up()
     started.up();
@@ -355,16 +387,17 @@ template <typename T> inline void Thread::stub(Args<T> *args)
 
     caught_exception = true;
   } catch (...) {
-    LOG_DEBUG_STR(logPrefix << "Thread cancelled");
+    LOG_DEBUG_STR(logPrefix << "Cancelled");
 
     finished.up();
 
+    // MUST rethrow exception caused by thread cancellation, or terminate() will be called
     throw;
   }
 
   finished.up();
 
-  LOG_DEBUG_STR(logPrefix << "Thread stopped");
+  LOG_DEBUG_STR(logPrefix << "Finished");
 }
 
 template <typename T> inline void *Thread::stub(void *arg)
diff --git a/RTCP/Cobalt/CoInterface/src/OMPThread.h b/RTCP/Cobalt/CoInterface/src/OMPThread.h
index a358ffc318d..7134ed33ce2 100644
--- a/RTCP/Cobalt/CoInterface/src/OMPThread.h
+++ b/RTCP/Cobalt/CoInterface/src/OMPThread.h
@@ -32,6 +32,7 @@
 #include <Common/SystemCallException.h>
 #include <Common/Thread/Mutex.h>
 #include <Common/Thread/Condition.h>
+#include <Common/Thread/Thread.h>
 
 #include <CoInterface/SmartPtr.h>
 #include <CoInterface/TimeFuncs.h>
@@ -42,6 +43,8 @@ namespace LOFAR
 {
   namespace Cobalt
   {
+  // forward declare
+  class OMPThreadSet;
 
   /*
    * Represents an OpenMP thread. To use,
@@ -116,10 +119,11 @@ namespace LOFAR
         do {
           sendSIGHUP();
 
+          // Retry in 0.2 seconds
           TimeSpec::inc(deadline, 0.2);
         } while (!wait(deadline));
       } catch(Exception &ex) {
-        LOG_ERROR_STR("Caught exception: " << ex);
+        LOG_ERROR_STR("[Thread] kill: Caught exception: " << ex);
       }
     }
 
@@ -144,50 +148,30 @@ namespace LOFAR
     public:
       ScopedName(const std::string newName)
       :
-        oldName(get())
+        oldName(Thread::getName()),
+        newName(newName)
       {
         rebindCPU(); // hack to do here
 
-        set(newName);
+        Thread::setMyName(newName);
+
+        LOG_DEBUG_STR("[Thread " << newName << "] Registered");
       }
 
       ~ScopedName() {
-        set(oldName);
+        Thread::setMyName(oldName);
+
+        LOG_DEBUG_STR("[Thread " << newName << "] Unregistered");
       }
 
     private:
       const std::string oldName;
-
-      void set(const std::string &name) const {
-#if defined(_GNU_SOURCE) && __GLIBC_PREREQ(2, 12)
-        // Inform the kernel of the thread name (only first 16 characters are used!)
-        int retval;
-
-        if ((retval = pthread_setname_np(pthread_self(), name.substr(0,15).c_str())) != 0)
-          throw SystemCallException("pthread_setname_np", retval, THROW_ARGS);
-#else
-        (void)name;
-#endif
-      }
-
-      std::string get() const {
-#if defined(_GNU_SOURCE) && __GLIBC_PREREQ(2, 12)
-        char cname[1024];
-
-        // Inform the kernel of the thread name (only first 16 characters are used!)
-        int retval;
-
-        if ((retval = pthread_getname_np(pthread_self(), &cname[0], sizeof cname)) != 0)
-          throw SystemCallException("pthread_getname_np", retval, THROW_ARGS);
-
-        return std::string(cname);
-#else
-        return "<unknown>";
-#endif
-      }
+      const std::string newName;
     };
 
   private:
+    friend class OMPThreadSet;
+
     const pthread_t id;
     bool stopped;
 
@@ -234,7 +218,7 @@ namespace LOFAR
   public:
     EXCEPTION_CLASS(CannotStartException, Exception);
 
-    OMPThreadSet(): stopped(false) {}
+    OMPThreadSet(const std::string &name): stopped(false), name(name) {}
 
     class ScopedRun {
     public:
@@ -269,12 +253,16 @@ namespace LOFAR
       for (size_t i = 0; i < threads.size(); ++i) {
         // Give the thread until `abstime' to finish up
         if (threads[i]->wait(abstime)) {
-          LOG_DEBUG_STR("Thread " << i << ": exited normally");
+          // NOTE: This thread died, so we cannot get its name. So don't request it with Thread::getName.
+          LOG_DEBUG_STR("[ThreadSet " << name << " #" << i << "/" << threads.size() << "] killAll: Thread exited normally");
         } else {
+          // NOTE: The thread may have died by now in case of a race condition. We don't count on the tid to be reused in
+          // this timeframe (we'll send a SIGHUP).
+
           // Kill the thread
-          LOG_DEBUG_STR("Thread " << i << ": killing...");
+          LOG_DEBUG_STR("[ThreadSet " << name << " #" << i << "/" << threads.size() << "] killAll: Killing");
           threads[i]->kill();
-          LOG_DEBUG_STR("Thread " << i << ": killed");
+          LOG_DEBUG_STR("[ThreadSet " << name << " #" << i << "/" << threads.size() << "] killAll: Killed");
 
           killed[i] = true;
         }
@@ -289,13 +277,14 @@ namespace LOFAR
 
     std::vector< SmartPtr<OMPThread> > threads;
     bool stopped;
+    const std::string name;
 
     // Add this thread to the set
     OMPThread &registerMe() {
       ScopedLock sl(mutex);
 
       if (stopped)
-        THROW(CannotStartException, "ThreadSet was ordered to stop before this thread started");
+        THROW(CannotStartException, "ThreadSet " << name << " was ordered to stop before this thread started");
 
       SmartPtr<OMPThread> t = new OMPThread;
 
@@ -323,7 +312,7 @@ namespace LOFAR
         }
       }
 
-      THROW(CoInterfaceException, "Unregistering thread that was not registered");
+      THROW(CoInterfaceException, "ThreadSet " << name << ": Unregistering thread that was not registered");
     }
   };
 
diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc
index e77c3c5f397..ecf72bfc0e2 100644
--- a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc
+++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc
@@ -708,7 +708,7 @@ MultiSender::~MultiSender()
 void MultiSender::process( OMPThreadSet *threadSet )
 {
   // We need to register our threads somewhere...
-  OMPThreadSet dummySet;
+  OMPThreadSet dummySet("dummy");
 
   if (!threadSet)
     threadSet = &dummySet;
diff --git a/RTCP/Cobalt/CoInterface/test/tOMPThread.cc b/RTCP/Cobalt/CoInterface/test/tOMPThread.cc
index 1cc4b450f55..0ceffd8837c 100644
--- a/RTCP/Cobalt/CoInterface/test/tOMPThread.cc
+++ b/RTCP/Cobalt/CoInterface/test/tOMPThread.cc
@@ -96,13 +96,13 @@ SUITE(OMPThread) {
 
 SUITE(OMPThreadSet) {
   TEST(EmptySet) {
-    OMPThreadSet set;
+    OMPThreadSet set("EmptySet");
 
     CHECK_EQUAL(0UL, set.killAll());
   }
 
   TEST(KillRunning) {
-    OMPThreadSet set;
+    OMPThreadSet set("KillRunning");
     Semaphore constructed;
 
 #   pragma omp parallel sections num_threads(2)
@@ -127,7 +127,7 @@ SUITE(OMPThreadSet) {
   }
 
   TEST(KillUnstarted) {
-    OMPThreadSet set;
+    OMPThreadSet set("KillUnstarted");
     Semaphore killed;
 
     set.killAll();
@@ -135,7 +135,7 @@ SUITE(OMPThreadSet) {
   }
 
   TEST(KillStopped) {
-    OMPThreadSet set;
+    OMPThreadSet set("KillStopped");
 
     {
       OMPThreadSet::ScopedRun sr(set);
@@ -145,7 +145,7 @@ SUITE(OMPThreadSet) {
   }
 
   TEST(PidReuse) {
-    OMPThreadSet set;
+    OMPThreadSet set("PidReuse");
 
     { OMPThreadSet::ScopedRun sr(set); }
     { OMPThreadSet::ScopedRun sr(set); }
diff --git a/RTCP/Cobalt/GPUProc/src/CommandThread.cc b/RTCP/Cobalt/GPUProc/src/CommandThread.cc
index be0349f946c..5c3f3f4ab1d 100644
--- a/RTCP/Cobalt/GPUProc/src/CommandThread.cc
+++ b/RTCP/Cobalt/GPUProc/src/CommandThread.cc
@@ -34,7 +34,8 @@ namespace LOFAR {
   namespace Cobalt {
     CommandThread::CommandThread(const std::string &streamdesc)
     :
-      streamdesc(streamdesc)
+      streamdesc(streamdesc),
+      readerThread("CommandThread")
     {
     }
 
diff --git a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc
index 86a02b8ae6f..0309b8f0898 100644
--- a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc
+++ b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc
@@ -601,7 +601,7 @@ namespace LOFAR {
                                      Queue< SmartPtr< MPIData<SampleT> > > &outputQueue,
                                      MACIO::RTmetadata &mdLogger, const string &mdKeyPrefix )
     {
-      OMPThreadSet packetReaderThreads;
+      OMPThreadSet packetReaderThreads("packetReaders");
 
       if (ps.settings.realTime) {
         // Each board has its own pool to reduce lock contention
diff --git a/RTCP/Cobalt/GPUProc/src/Station/StationTranspose.cc b/RTCP/Cobalt/GPUProc/src/Station/StationTranspose.cc
index 5b0fb842850..e9690a34782 100644
--- a/RTCP/Cobalt/GPUProc/src/Station/StationTranspose.cc
+++ b/RTCP/Cobalt/GPUProc/src/Station/StationTranspose.cc
@@ -228,6 +228,7 @@ namespace LOFAR {
           if (subbandDistribution.at(rank).empty())
             continue;
 
+          // TODO: Pull this out of loop into a class member, as this object is constant per rank
           MPISendStation sender(stationIdx, rank, subbandDistribution.at(rank), mpiData.nrSamples);
 
           const size_t offset = subbandOffsets[rank];
diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc
index b78cd9ac945..dc8b0f64c46 100644
--- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc
+++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc
@@ -138,6 +138,7 @@ namespace LOFAR
       nrSubbandsPerSubbandProc(ceilDiv(subbandIndices.size(), subbandProcs.size())),
       itsMdLogger(mdLogger),
       itsMdKeyPrefix(mdKeyPrefix),
+      outputThreads("Pipeline::outputThreads"),
       mpiPool(pool),
       writePool(subbandIndices.size()),
       factories(ps, nrSubbandsPerSubbandProc),
diff --git a/RTCP/Cobalt/GPUProc/src/rtcp.cc b/RTCP/Cobalt/GPUProc/src/rtcp.cc
index 90ee7386512..1cfd0900968 100644
--- a/RTCP/Cobalt/GPUProc/src/rtcp.cc
+++ b/RTCP/Cobalt/GPUProc/src/rtcp.cc
@@ -281,15 +281,6 @@ int main(int argc, char **argv)
     }
   }
 
-  LOG_INFO("----- Initialising OpenMP");
-
-  // Allow usage of nested omp calls
-  omp_set_nested(true);
-
-  // Allow OpenMP thread registration
-  OMPThread::init();
-  OMPThread::ScopedName sn("main");
-
   LOG_INFO("----- Initialising NUMA MPI NIC bindings");
 
   if(do_numa_bind) {
@@ -385,6 +376,15 @@ int main(int argc, char **argv)
   LOG_INFO("----- Calling MPI_Init_thread");
   mpi.init(argc, argv);
 
+  LOG_INFO("----- Initialising OpenMP");
+
+  // Allow usage of nested omp calls
+  omp_set_nested(true);
+
+  // Allow OpenMP thread registration
+  OMPThread::init(); // Call AFTER MPI_Init, since this sets a signal handler which MPI_Init would override
+  OMPThread::ScopedName sn("main");
+
   // Migrate all moveable and future memory to our socket.
   //
   // NOTE: This restricts the freedom for memory allocations to our socket.
@@ -475,8 +475,7 @@ int main(int argc, char **argv)
        * THE OBSERVATION
        */
 
-      OMPThread::ScopedName sn("stations");
-
+      OMPThread::ScopedName sn("observation");
 
       if (ps.settings.realTime) {
         // Wait just before the obs starts to allocate resources,
@@ -494,26 +493,41 @@ int main(int argc, char **argv)
         {
           OMPThread::ScopedName sn("stations");
 
-          // Read and forward station data over MPI
-          #pragma omp parallel for num_threads(ps.settings.antennaFields.size())
+          // Construct list of stations to receive with this process
+          struct receivingAntennaField {
+            size_t idx;
+            struct StationID id;
+          };
+
+          vector<struct receivingAntennaField> receivingAntennaFields;
+
           for (size_t stat = 0; stat < ps.settings.antennaFields.size(); ++stat) 
           {       
-            OMPThread::ScopedName sn(str(format("%s main") % ps.settings.antennaFields.at(stat).name));
-
-            // Determine if this station should start a pipeline for station..
             const struct StationID stationID(
               StationID::parseFullFieldName(
               ps.settings.antennaFields.at(stat).name));
             const StationNodeAllocation allocation(stationID, ps, mpi.rank(), mpi.size());
 
-            if (!allocation.receivedHere()) 
-            {// Station is not sending from this node, skip          
-              continue;
+            if (allocation.receivedHere()) 
+            {
+              struct receivingAntennaField r;
+
+              r.idx = stat;
+              r.id = stationID;
+              receivingAntennaFields.push_back(r);
             }
+          }
+
+          // Read and forward station data over MPI
+          #pragma omp parallel for num_threads(receivingAntennaFields.size())
+          for (size_t i = 0; i < receivingAntennaFields.size(); i++) 
+          {
+            const struct receivingAntennaField &r = receivingAntennaFields[i];
+            OMPThread::ScopedName sn(str(format("%s main") % r.id.name()));
 
             // For InputProc use the GPUProc mdLogger (same process), but our own key prefix.
             // Since InputProc is inside GPUProc, don't inform the MAC Log Processor.
-            string mdKeyPrefixInputProc = InputProc_PVSSPrefix(ps, stationID.name());
+            string mdKeyPrefixInputProc = InputProc_PVSSPrefix(ps, r.id.name());
             mdKeyPrefixInputProc.push_back('.'); // keys look like: "keyPrefix.subKeyName"
 
             mdLogger.log(mdKeyPrefixInputProc + PN_CSI_OBSERVATION_NAME,
@@ -522,7 +536,7 @@ int main(int argc, char **argv)
             mdLogger.log(mdKeyPrefixInputProc + PN_CSI_CPU,  do_numa_bind ? mynode.cpu : 0);
 
 
-            sendInputToPipeline(ps, stat, subbandDistribution, mpi.rank(),
+            sendInputToPipeline(ps, r.idx, subbandDistribution, mpi.rank(),
                                 mdLogger, mdKeyPrefixInputProc, &stopSwitch);
           }
         }
@@ -549,6 +563,7 @@ int main(int argc, char **argv)
       }
 
       if (mpi.rank() == 0) {
+        LOG_DEBUG_STR("Stopping CommandThread");
         commandThread->stop();
       }
     }
diff --git a/RTCP/Cobalt/InputProc/src/RSPBoards.cc b/RTCP/Cobalt/InputProc/src/RSPBoards.cc
index 855582b8172..60b4e7ac528 100644
--- a/RTCP/Cobalt/InputProc/src/RSPBoards.cc
+++ b/RTCP/Cobalt/InputProc/src/RSPBoards.cc
@@ -45,7 +45,7 @@ namespace LOFAR
     void RSPBoards::process()
     {
       // References to all threads that will need aborting
-      OMPThreadSet threads;
+      OMPThreadSet threads("RSPBoards");
 
       ASSERT(nrBoards > 0);
 
-- 
GitLab