From 9e11d4408773207e2505737f25d1d38eadc803d2 Mon Sep 17 00:00:00 2001
From: Jan David Mol <mol@astron.nl>
Date: Fri, 29 Jun 2012 11:26:52 +0000
Subject: [PATCH] Task #3284: Use libssh2 to establish SSH connections BG/P ->
 Storage

---
 .gitattributes                              |   6 +
 CMake/FindLibssh2.cmake                     |  48 ++
 CMake/variants/variants.bgfen0              |   1 +
 RTCP/IONProc/CMakeLists.txt                 |   1 +
 RTCP/IONProc/src/CMakeLists.txt             |   1 +
 RTCP/IONProc/src/ION_main.cc                |  16 +
 RTCP/IONProc/src/Job.cc                     | 271 ++---------
 RTCP/IONProc/src/Job.h                      |  13 +-
 RTCP/IONProc/src/SSH.cc                     | 510 ++++++++++++++++++++
 RTCP/IONProc/src/SSH.h                      |  81 ++++
 RTCP/IONProc/test/CMakeLists.txt            |   1 +
 RTCP/IONProc/test/tDelayCompensation.sh     |   1 -
 RTCP/IONProc/test/tSSH.cc                   |  64 +++
 RTCP/IONProc/test/tSSH.sh                   |   2 +
 RTCP/IONProc/test/tSSH.stdout               |   2 +
 RTCP/Interface/include/Interface/SmartPtr.h |   5 +
 lofar_config.h.cmake                        |   3 +
 17 files changed, 797 insertions(+), 229 deletions(-)
 create mode 100644 CMake/FindLibssh2.cmake
 create mode 100644 RTCP/IONProc/src/SSH.cc
 create mode 100644 RTCP/IONProc/src/SSH.h
 create mode 100644 RTCP/IONProc/test/tSSH.cc
 create mode 100755 RTCP/IONProc/test/tSSH.sh
 create mode 100644 RTCP/IONProc/test/tSSH.stdout

diff --git a/.gitattributes b/.gitattributes
index ab2f444ff0a..708b0d790e2 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -1892,6 +1892,7 @@ CEP/PyBDSM/test/tbdsm_process_image.py -text
 CMake/FindAskapSoft.cmake -text
 CMake/FindCasarest.cmake -text
 CMake/FindJNI.cmake -text
+CMake/FindLibssh2.cmake -text
 CMake/FindValgrind.cmake -text
 CMake/TODO -text
 CMake/testscripts/assay -text
@@ -3465,6 +3466,8 @@ RTCP/IONProc/src/JobQueue.cc -text
 RTCP/IONProc/src/JobQueue.h -text
 RTCP/IONProc/src/PLCClient.cc -text
 RTCP/IONProc/src/PLCClient.h -text
+RTCP/IONProc/src/SSH.cc -text
+RTCP/IONProc/src/SSH.h -text
 RTCP/IONProc/src/StreamMultiplexer.cc -text
 RTCP/IONProc/src/StreamMultiplexer.h -text
 RTCP/IONProc/src/generateDelays.cc -text
@@ -3473,6 +3476,9 @@ RTCP/IONProc/test/RTCP.parset -text
 RTCP/IONProc/test/tDelayCompensation.cc -text
 RTCP/IONProc/test/tDelayCompensation.parset -text
 RTCP/IONProc/test/tDelayCompensation.sh -text
+RTCP/IONProc/test/tSSH.cc -text
+RTCP/IONProc/test/tSSH.sh -text
+RTCP/IONProc/test/tSSH.stdout -text
 RTCP/Interface/include/Interface/BeamCoordinates.h -text
 RTCP/Interface/include/Interface/BeamFormedData.h -text
 RTCP/Interface/include/Interface/DataFactory.h -text
diff --git a/CMake/FindLibssh2.cmake b/CMake/FindLibssh2.cmake
new file mode 100644
index 00000000000..65d4d9d0eb7
--- /dev/null
+++ b/CMake/FindLibssh2.cmake
@@ -0,0 +1,48 @@
+# - Try to find libssh2.
+# Variables used by this module:
+#  LIBSSH2_ROOT_DIR     - LIBSSH2 root directory
+# Variables defined by this module:
+#  LIBSSH2_FOUND        - system has LIBSSH2
+#  LIBSSH2_INCLUDE_DIR  - the LIBSSH2 include directory (cached)
+#  LIBSSH2_INCLUDE_DIRS - the LIBSSH2 include directories
+#                       (identical to LIBSSH2_INCLUDE_DIR)
+#  LIBSSH2_LIBRARY      - the LIBSSH2 library (cached)
+#  LIBSSH2_LIBRARIES    - the LIBSSH2 libraries
+#                       (identical to LIBSSH2_LIBRARY)
+
+# Copyright (C) 2009
+# ASTRON (Netherlands Institute for Radio Astronomy)
+# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
+#
+# This file is part of the LOFAR software suite.
+# The LOFAR software suite is free software: you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# The LOFAR software suite is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
+#
+# $Id: FindLIBSSH2.cmake 21317 2012-06-26 14:22:47Z mol $
+
+if(NOT LIBSSH2_FOUND)
+
+  find_path(LIBSSH2_INCLUDE_DIR libssh2.h
+    HINTS ${LIBSSH2_ROOT_DIR} PATH_SUFFIXES include)
+  find_library(LIBSSH2_LIBRARY ssh2
+    HINTS ${LIBSSH2_ROOT_DIR} PATH_SUFFIXES lib)
+  mark_as_advanced(LIBSSH2_INCLUDE_DIR LIBSSH2_LIBRARY)
+
+  include(FindPackageHandleStandardArgs)
+  find_package_handle_standard_args(LIBSSH2 DEFAULT_MSG 
+    LIBSSH2_LIBRARY LIBSSH2_INCLUDE_DIR)
+
+  set(LIBSSH2_INCLUDE_DIRS ${LIBSSH2_INCLUDE_DIR})
+  set(LIBSSH2_LIBRARIES ${LIBSSH2_LIBRARY})
+
+endif(NOT LIBSSH2_FOUND)
diff --git a/CMake/variants/variants.bgfen0 b/CMake/variants/variants.bgfen0
index aa22b5cb82b..ee11ffe170a 100644
--- a/CMake/variants/variants.bgfen0
+++ b/CMake/variants/variants.bgfen0
@@ -24,6 +24,7 @@ set(CASACORE_MAKE_REQUIRED_EXTERNALS_OPTIONAL ON)
 set(MASS_ROOT_DIR /opt/mass)
 set(FFTW2_ROOT_DIR /globalhome/lofarsystem/packages/fftw-2.1.5-single-precision)
 set(BOOST_ROOT_DIR /globalhome/lofarsystem/packages/root/bgp_ion/boost)
+set(LIBSSH2_ROOT_DIR /globalhome/lofarsystem/packages/root/bgp_ion/libssh2)
 
 set(BLITZ_ROOT_DIR /globalhome/lofarsystem/packages/root/bgfen/blitz)
 set(LOG4CPLUS_ROOT_DIR /globalhome/lofarsystem/packages/root/bgfen/log4cplus)
diff --git a/RTCP/IONProc/CMakeLists.txt b/RTCP/IONProc/CMakeLists.txt
index 83802cae86c..80e705e18b5 100644
--- a/RTCP/IONProc/CMakeLists.txt
+++ b/RTCP/IONProc/CMakeLists.txt
@@ -5,6 +5,7 @@ lofar_package(IONProc 1.0
 
 include(LofarFindPackage)
 lofar_find_package(Boost REQUIRED)
+lofar_find_package(Libssh2)
 lofar_find_package(Valgrind)
 lofar_find_package(Casacore COMPONENTS measures REQUIRED)
 
diff --git a/RTCP/IONProc/src/CMakeLists.txt b/RTCP/IONProc/src/CMakeLists.txt
index a125b51a4d9..9d60c9665b1 100644
--- a/RTCP/IONProc/src/CMakeLists.txt
+++ b/RTCP/IONProc/src/CMakeLists.txt
@@ -32,6 +32,7 @@ lofar_add_library(ionproc
   ReaderWriterSynchronization.cc
   Scheduling.cc
   StreamMultiplexer.cc
+  SSH.cc
   FCNP_ServerStream.cc)
 
 lofar_add_bin_program(versionionproc versionionproc.cc)
diff --git a/RTCP/IONProc/src/ION_main.cc b/RTCP/IONProc/src/ION_main.cc
index c5ae24384e3..772f122fb58 100644
--- a/RTCP/IONProc/src/ION_main.cc
+++ b/RTCP/IONProc/src/ION_main.cc
@@ -49,6 +49,10 @@
 #include <sys/types.h>
 #include <sys/mman.h>
 
+#ifdef HAVE_LIBSSH2
+#include <libssh2.h>
+#endif
+
 #include <boost/format.hpp>
 
 #if defined HAVE_MPI
@@ -418,6 +422,14 @@ int main(int argc, char **argv)
       exit(1);
     }
 #endif
+
+#ifdef HAVE_LIBSSH2
+  int rc = libssh2_init(0);
+  if (rc) {
+    std::cerr << "libssh2 init failed: " << rc << std::endl;
+    exit(1);
+  }
+#endif  
   
 #if defined HAVE_BGP
   INIT_LOGGER_WITH_SYSINFO(str(boost::format("IONProc@%02d") % myPsetNumber));
@@ -436,6 +448,10 @@ int main(int argc, char **argv)
 
   master_thread();
 
+#ifdef HAVE_LIBSSH2
+  libssh2_exit();
+#endif
+
 #if defined HAVE_MPI
   MPI_Finalize();
 #endif
diff --git a/RTCP/IONProc/src/Job.cc b/RTCP/IONProc/src/Job.cc
index 9ed29ff1795..f2e92648347 100644
--- a/RTCP/IONProc/src/Job.cc
+++ b/RTCP/IONProc/src/Job.cc
@@ -37,11 +37,7 @@
 #include <Stream/SocketStream.h>
 #include <Stream/PortBroker.h>
 
-#include <sys/types.h>
-#include <sys/wait.h>
 #include <unistd.h>
-#include <fcntl.h>
-#include <signal.h>
 #include <time.h>
 
 #include <boost/format.hpp>
@@ -198,216 +194,6 @@ template <typename T> void Job::broadcast(T &value)
 }
 
 
-static void exitwitherror( const char *errorstr )
-{
-  // can't cast to (void) since gcc won't allow that as a method to drop the result
-  int ignoreResult;
-
-  ignoreResult = write(STDERR_FILENO, errorstr, strlen(errorstr)+1);
-
-  // use _exit instead of exit to avoid calling atexit handlers in both
-  // the master and the child process.
-  _exit(1);
-}
-
-void Job::StorageProcess::execSSH(const char *sshKey, const char *userName, const char *hostName, const char *executable, const char *rank, const char *cwd, const char *isBigEndian)
-{
-  // DO NOT DO ANY CALL THAT GRABS A LOCK, since the lock may be held by a
-  // thread that is no longer part of our address space
-
-  // use write() for output since the Logger uses a mutex, and printf also holds locks
-
-  // Prevent cancellation due to race conditions. A cancellation can still be pending for this JobThread, in which case one of the system calls
-  // below triggers it. If this thread/process can be cancelled, there will be multiple processes running, leading to all kinds of Bad Things.
-  Cancellation::disable();
-
-  // close all file descriptors other than stdin/out/err, which might have been openend by
-  // other threads at the time of fork(). We brute force over all possible fds, most of which will be invalid.
-  for (int f = sysconf(_SC_OPEN_MAX); f > 2; --f)
-    (void)close(f);
-
-  // create a valid stdin from which can be read (a blocking fd created by pipe() won't suffice anymore for since at least OpenSSH 5.8)
-  // rationale: this forked process inherits stdin from the parent process, which is unusable because IONProc is started in the background
-  // and routed through mpirun as well. Also, it is shared by all forked processes. Nevertheless, we want Storage to be able to determine
-  // when to shut down based on whether stdin is open. So we create a new stdin.
-  int devzero = open("/dev/zero", O_RDONLY);
-
-  if (devzero < 0)
-    exitwitherror("cannot open /dev/zero\n");
-
-  if (close(0) < 0)
-    exitwitherror("cannot close stdin\n");
-
-  if (dup(devzero) < 0)
-    exitwitherror("cannot dup /dev/zero into stdin\n");
-
-  if (close(devzero) < 0)
-    exitwitherror("cannot close /dev/zero\n");
-
-  if (execl("/usr/bin/ssh",
-    "ssh",
-    "-q",
-    "-i", sshKey,
-    "-c", "blowfish",
-    "-o", "StrictHostKeyChecking=no",
-    "-o", "UserKnownHostsFile=/dev/null",
-    "-o", "ServerAliveInterval=30",
-    "-l", userName,
-    hostName,
-
-    "cd", cwd, "&&",
-#if defined USE_VALGRIND
-    "valgrind", "--leak-check=full",
-#endif
-    executable,
-
-    boost::lexical_cast<std::string>(itsParset.observationID()).c_str(),
-    rank,
-    isBigEndian,
-
-    static_cast<char *>(0)
-  ) < 0)
-    exitwitherror("execl failed\n");
-
-  exitwitherror("execl succeeded but did return\n");
-}
-
-
-void Job::StorageProcess::forkSSH(const char *sshKey, const char *userName, const char *hostName, const char *executable, const char *rank, const char *cwd, const char *isBigEndian)
-{
-  LOG_INFO_STR(itsLogPrefix << " starting");
-
-  LOG_DEBUG_STR(itsLogPrefix << "child will exec "
-    "\"/usr/bin/ssh "
-    "-q "
-    "-i " << sshKey << " "
-    "-c blowfish "
-    "-o StrictHostKeyChecking=no "
-    "-o UserKnownHostsFile=/dev/null "
-    "-o ServerAliveInterval=30 "
-    "-l " << userName << " "
-    << hostName << " "
-    "cd " << cwd << " && "
-#if defined USE_VALGRIND
-    "valgrind " "--leak-check=full "
-#endif
-    << executable << " " << rank << " " << isBigEndian << " "
-    "\""
-  );
-
-  switch (itsPID = fork()) {
-    case -1 : throw SystemCallException("fork", errno, THROW_ARGS);
-
-    case  0 : execSSH(sshKey, userName, hostName, executable, rank, cwd, isBigEndian);
-  }
-}
-
-
-void Job::StorageProcess::joinSSH(unsigned &timeout)
-{
-  if (itsPID != 0) {
-    int status;
-
-    // always try at least one waitpid(). if child has not exited, optionally
-    // sleep and try again.
-    for (;;) {
-      pid_t ret;
-
-      if ((ret = waitpid(itsPID, &status, WNOHANG)) == (pid_t)-1) {
-        int error = errno;
-
-        if (error == EINTR) {
-          LOG_DEBUG_STR(itsLogPrefix << " waitpid() was interrupted -- retrying");
-          continue;
-        }
-
-        // error
-        LOG_WARN_STR(itsLogPrefix << " waitpid() failed with errno " << error);
-        return;
-      } else if (ret == 0) {
-        // child still running
-        if (timeout == 0) {
-          break;
-        }
-
-        timeout--;
-        sleep(1);
-      } else {
-        // child exited
-        if (WIFSIGNALED(status) != 0)
-          LOG_WARN_STR(itsLogPrefix << "SSH was killed by signal " << WTERMSIG(status));
-        else if (WEXITSTATUS(status) != 0) {
-          const char *explanation;
-
-          switch (WEXITSTATUS(status)) {
-            default:
-              explanation = "??";
-              break;
-
-            case 255:
-              explanation = "Network or authentication error";
-              break;
-            case 127:
-              explanation = "BASH: command/library not found";
-              break;
-            case 126:
-              explanation = "BASH: command found but could not be executed (wrong architecture?)";
-              break;
-
-            case 128 + SIGHUP:
-              explanation = "killed by SIGHUP";
-              break;
-            case 128 + SIGINT:
-              explanation = "killed by SIGINT (Ctrl-C)";
-              break;
-            case 128 + SIGQUIT:
-              explanation = "killed by SIGQUIT";
-              break;
-            case 128 + SIGILL:
-              explanation = "illegal instruction";
-              break;
-            case 128 + SIGABRT:
-              explanation = "killed by SIGABRT";
-              break;
-            case 128 + SIGKILL:
-              explanation = "killed by SIGKILL";
-              break;
-            case 128 + SIGSEGV:
-              explanation = "segmentation fault";
-              break;
-            case 128 + SIGPIPE:
-              explanation = "broken pipe";
-              break;
-            case 128 + SIGALRM:
-              explanation = "killed by SIGALRM";
-              break;
-            case 128 + SIGTERM:
-              explanation = "killed by SIGTERM";
-              break;
-          }
-
-          LOG_ERROR_STR(itsLogPrefix << " exited with exit code " << WEXITSTATUS(status) << " (" << explanation << ")" );
-        } else
-          LOG_INFO_STR(itsLogPrefix << " terminated normally");
-
-        return;  
-      }
-    }
-
-    // child did not exit within the given timeout
-
-    LOG_WARN_STR(itsLogPrefix << " sending SIGTERM");
-    kill(itsPID, SIGTERM);
-
-    if (waitpid(itsPID, &status, 0) == -1) {
-      LOG_WARN_STR(itsLogPrefix << " waitpid() failed");
-    }
-
-    LOG_WARN_STR(itsLogPrefix << " terminated after sending SIGTERM");
-  }
-}
-
-
 Job::StorageProcess::StorageProcess( const Parset &parset, const string &logPrefix, int rank, const string &hostname )
 :
   itsParset(parset),
@@ -436,20 +222,47 @@ void Job::StorageProcess::start()
   if (getcwd(cwd, sizeof cwd) == 0)
     throw SystemCallException("getcwd", errno, THROW_ARGS);
 
-  forkSSH(sshKey.c_str(),
-          userName.c_str(),
-          itsHostname.c_str(),
-          executable.c_str(),
-          boost::lexical_cast<std::string>(itsRank).c_str(),
-          cwd,
+#ifdef HAVE_LIBSSH2
+  std::string commandLine = str(boost::format("cd %s && %s%s %u %d %u")
+    % cwd
+#if defined USE_VALGRIND
+    % "valgrind --leak-check=full "
+#else
+    % ""
+#endif
+    % executable
+    % itsParset.observationID()
+    % itsRank
+#if defined WORDS_BIGENDIAN
+    % 1
+#else
+    % 0
+#endif
+  );
+
+  itsSSHconnection = new SSHconnection(itsLogPrefix, itsHostname, commandLine, userName, sshKey);
+  itsSSHconnection->start();
+#else
+
+#warn Using fork/exec for SSH processes to Storage
+  const char *commandLine[] = {
+    "cd ", cwd, " && ",
+#if defined USE_VALGRIND
+    "valgrind " "--leak-check=full "
+#endif
+    executable.c_str(),
+    boost::lexical_cast<std::string>(itsRank).c_str(),
 #if defined WORDS_BIGENDIAN
-          "1"
+    "1",
 #else
-          "0"
+    "0",
 #endif
-          );
+    0
+  };
+  itsPID = forkSSH(itsLogPrefix, itsHostname.c_str(), commandLine, userName.c_str(), sshKey.c_str());
 
   // client process won't reach this point
+#endif
 
   itsThread = new Thread(this, &Job::StorageProcess::controlThread, itsLogPrefix + "[ControlThread] ", 65535);
 }
@@ -457,7 +270,17 @@ void Job::StorageProcess::start()
 
 void Job::StorageProcess::stop(unsigned &timeout)
 {
-  joinSSH(timeout);
+#ifdef HAVE_LIBSSH2
+  // TODO: update timeout
+  struct timespec deadline;
+
+  deadline.tv_sec  = time(0) + timeout;
+  deadline.tv_nsec = 0;
+
+  itsSSHconnection->stop(deadline);
+#else
+  joinSSH(itsLogPrefix, itsPID, timeout);
+#endif  
 }
 
 
diff --git a/RTCP/IONProc/src/Job.h b/RTCP/IONProc/src/Job.h
index 56e44e3f284..b8235a53a6b 100644
--- a/RTCP/IONProc/src/Job.h
+++ b/RTCP/IONProc/src/Job.h
@@ -36,6 +36,10 @@
 #include <Common/Thread/Thread.h>
 #include <PLCClient.h>
 
+#ifdef HAVE_LIBSSH2
+#include <SSH.h>
+#endif
+
 #include <sys/time.h>
 
 #include <vector>
@@ -100,9 +104,11 @@ class Job : public PLCRunnable
     private:
       void                               controlThread();
 
-      void			         execSSH(const char *sshKey, const char *userName, const char *hostName, const char *executable, const char *rank, const char *cwd, const char *isBigEndian);
-      void			         forkSSH(const char *sshKey, const char *userName, const char *hostName, const char *executable, const char *rank, const char *cwd, const char *isBigEndian);
-      void				 joinSSH(unsigned &timeout);
+#ifdef HAVE_LIBSSH2
+      SmartPtr<SSHconnection>            itsSSHconnection;
+#else      
+      int itsPID;
+#endif
 
       const Parset &itsParset;
       const std::string itsLogPrefix;
@@ -110,7 +116,6 @@ class Job : public PLCRunnable
       const int itsRank;
       const std::string itsHostname;
 
-      int itsPID;
       SmartPtr<Thread> itsThread;
     };
 
diff --git a/RTCP/IONProc/src/SSH.cc b/RTCP/IONProc/src/SSH.cc
new file mode 100644
index 00000000000..f95556d47f9
--- /dev/null
+++ b/RTCP/IONProc/src/SSH.cc
@@ -0,0 +1,510 @@
+//#  SSH.cc: setup an SSH connection using libssh2
+//#
+//#  Copyright (C) 2012
+//#  ASTRON (Netherlands Foundation for Research in Astronomy)
+//#  P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
+//#
+//#  This program is free software; you can redistribute it and/or modify
+//#  it under the terms of the GNU General Public License as published by
+//#  the Free Software Foundation; either version 2 of the License, or
+//#  (at your option) any later version.
+//#
+//#  This program is distributed in the hope that it will be useful,
+//#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+//#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+//#  GNU General Public License for more details.
+//#
+//#  You should have received a copy of the GNU General Public License
+//#  along with this program; if not, write to the Free Software
+//#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+//#
+//#  $Id: SSH.cc 18226 2012-06-09 12:56:47Z mol $
+
+//# Always #include <lofar_config.h> first!
+#include <lofar_config.h>
+#include <SSH.h>
+
+#include <Common/Thread/Cancellation.h>
+#include <Common/LofarLogger.h>
+#include <sys/wait.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <signal.h>
+#include <fcntl.h>
+#include <vector>
+
+#ifdef HAVE_LIBSSH2
+#include <sstream>
+#include <sys/select.h>
+#include <Common/lofar_string.h>
+#include <Stream/SocketStream.h>
+#endif
+
+using namespace std;
+
+namespace LOFAR {
+namespace RTCP {
+
+#ifdef HAVE_LIBSSH2
+  
+SSHconnection::SSHconnection(const string &logPrefix, const string &hostname, const string &commandline, const string &username, const string &sshkey)
+:
+  itsLogPrefix(logPrefix),
+  itsHostName(hostname),
+  itsCommandLine(commandline),
+  itsUserName(username),
+  itsSSHKey(sshkey)
+{
+}
+
+void SSHconnection::start()
+{
+  itsThread = new Thread(this, &SSHconnection::commThread, itsLogPrefix + "[SSH Thread] ", 65536);
+}
+
+void SSHconnection::stop( const struct timespec &deadline )
+{
+  if (!itsThread->wait(deadline)) {
+    itsThread->cancel();
+
+    itsThread->wait();
+  }
+}
+
+void SSHconnection::free_session( LIBSSH2_SESSION *session )
+{
+  if (!session)
+    return;
+
+  libssh2_session_disconnect(session, "Normal Shutdown, Thank you for playing");
+  libssh2_session_free(session);
+}
+
+void SSHconnection::free_channel( LIBSSH2_CHANNEL *channel )
+{
+  if (!channel)
+    return;
+
+  libssh2_channel_free(channel);
+}
+
+bool SSHconnection::open_session( FileDescriptorBasedStream &sock )
+{
+  int rc;
+
+  /* Create a session instance */
+  session = libssh2_session_init();
+  if (!session) {
+    LOG_ERROR_STR( itsLogPrefix << "Cannot create SSH session object" );
+    return false;
+  }
+
+  /* tell libssh2 we want it all done non-blocking */
+  libssh2_session_set_blocking(session, 0);
+
+  /* ... start it up. This will trade welcome banners, exchange keys,
+   * and setup crypto, compression, and MAC layers
+   */
+  while ((rc = libssh2_session_handshake(session, sock.fd)) ==
+         LIBSSH2_ERROR_EAGAIN) {
+    waitsocket(sock);
+  }
+
+  if (rc) {
+    LOG_ERROR_STR( itsLogPrefix << "Failure establishing SSH session: " << rc);
+    return false;
+  }
+
+  /* Authenticate by public key */
+  while ((rc = libssh2_userauth_publickey_fromfile(session,
+                      itsUserName.c_str(), // remote username
+                      NULL,                // public key filename
+                      itsSSHKey.c_str(),   // private key filename
+                      NULL                 // password
+                      )) ==
+         LIBSSH2_ERROR_EAGAIN) {
+    waitsocket(sock);
+  }
+
+  if (rc) {
+    LOG_ERROR_STR( itsLogPrefix << "Authentication by public key failed: " << rc);
+    return false;
+  }
+
+  return true;
+}
+
+bool SSHconnection::open_channel( FileDescriptorBasedStream &sock )
+{
+  /* Exec non-blocking on the remote host */
+  while( (channel = libssh2_channel_open_session(session)) == NULL &&
+         libssh2_session_last_error(session,NULL,NULL,0) ==
+         LIBSSH2_ERROR_EAGAIN )
+  {
+    waitsocket(sock);
+  }
+
+  if (!channel)
+  {
+    LOG_ERROR_STR( itsLogPrefix << "Could not set up SSH channel" );
+    return false;
+  }
+
+  return true;
+}
+
+bool SSHconnection::close_channel( FileDescriptorBasedStream &sock )
+{
+  int rc;
+
+  while( (rc = libssh2_channel_close(channel)) == LIBSSH2_ERROR_EAGAIN ) {
+    waitsocket(sock);
+  }
+
+  return true;
+}
+
+bool SSHconnection::waitsocket( FileDescriptorBasedStream &sock )
+{
+  struct timeval timeout;
+  int rc;
+  fd_set fd;
+  fd_set *writefd = NULL;
+  fd_set *readfd = NULL;
+  int dir;
+
+  timeout.tv_sec = 10;
+  timeout.tv_usec = 0;
+
+  FD_ZERO(&fd);
+
+  FD_SET(sock.fd, &fd);
+
+  /* now make sure we wait in the correct direction */
+  dir = libssh2_session_block_directions(session);
+
+  if(dir & LIBSSH2_SESSION_BLOCK_INBOUND)
+      readfd = &fd;
+
+  if(dir & LIBSSH2_SESSION_BLOCK_OUTBOUND)
+      writefd = &fd;
+
+  {
+    Cancellation::enable();
+
+    // select() is a cancellation point
+    rc = ::select(sock.fd + 1, readfd, writefd, NULL, &timeout);
+
+    Cancellation::disable();
+  }   
+
+  return rc > 0;
+}
+
+void SSHconnection::commThread()
+{
+  SocketStream sock( itsHostName, 22, SocketStream::TCP, SocketStream::Client );
+
+  LOG_DEBUG_STR( itsLogPrefix << "Connected" );
+
+  stringstream buffer;
+  int rc;
+  int exitcode;
+  char *exitsignal=(char *)"none";
+
+  /* Prevent cancellation from here on -- we manually insert cancellation points to avoid
+     screwing up libssh2's internal administration. */
+  Cancellation::disable();
+  Cancellation::point();
+
+  if (!open_session(sock))
+    return;
+
+  if (!open_channel(sock))
+    return;
+
+  while( (rc = libssh2_channel_exec(channel, itsCommandLine.c_str())) ==
+         LIBSSH2_ERROR_EAGAIN )
+  {
+    waitsocket(sock);
+  }
+
+  if (rc)
+  {
+    LOG_ERROR_STR( itsLogPrefix << "Failure starting remote command: " << rc);
+    return;
+  }
+
+  LOG_DEBUG_STR( itsLogPrefix << "Remote command started, waiting for output" );
+
+  /* Session I/O */
+  for( ;; )
+  {
+    /* loop until we block */
+    do {
+      char data[0x4000];
+
+      rc = libssh2_channel_read( channel, data, sizeof data );
+      if( rc > 0 )
+      {
+        buffer.write( data, rc );
+
+        /* extract and log lines */
+        for( ;; )
+        {
+          Cancellation::point();
+
+          buffer.getline( data, sizeof data );
+
+          if (buffer.fail()) {
+            // no line found
+            buffer.clear();
+            break;
+          }
+
+          // TODO: Use logger somehow (we'd duplicate the prefix if we just use LOG_* macros..)
+          cout << data << endl;
+        }
+      } else {
+        if( rc < 0 && rc != LIBSSH2_ERROR_EAGAIN ) {
+          /* no need to output this for the EAGAIN case */
+          LOG_ERROR_STR( itsLogPrefix << "libssh2_channel_read returned " << rc);
+        }   
+      }
+    } while( rc > 0 );
+
+    /* this is due to blocking that would occur otherwise so we loop on
+       this condition */
+    if( rc == LIBSSH2_ERROR_EAGAIN )
+    {
+      waitsocket(sock);
+    } else {
+      /* EOF */
+      break;
+    }    
+  }
+
+  LOG_DEBUG_STR( itsLogPrefix << "Disconnecting" );
+
+  close_channel(sock);
+
+  if (rc == 0)
+  {
+    exitcode = libssh2_channel_get_exit_status( channel );
+    libssh2_channel_get_exit_signal(channel, &exitsignal,
+                                    NULL, NULL, NULL, NULL, NULL);
+  } else {
+    exitcode = 127;
+  }
+
+  if (exitsignal) {
+    LOG_ERROR_STR(itsLogPrefix << "SSH was killed by signal " << exitsignal);
+  } else if(exitcode > 0) {
+    const char *explanation = "";
+
+    LOG_ERROR_STR(itsLogPrefix << "Exited with exit code " << exitcode << " (" << explanation << ")" );
+  } else {
+    LOG_INFO_STR(itsLogPrefix << "Terminated normally");
+  }
+}
+
+#endif
+
+static void exitwitherror( const char *errorstr )
+{
+  // can't cast to (void) since gcc won't allow that as a method to drop the result
+  int ignoreResult;
+
+  ignoreResult = write(STDERR_FILENO, errorstr, strlen(errorstr)+1);
+
+  // use _exit instead of exit to avoid calling atexit handlers in both
+  // the master and the child process.
+  _exit(1);
+}
+
+static void execSSH(const char * const sshParams[])
+{
+  // DO NOT DO ANY CALL THAT GRABS A LOCK, since the lock may be held by a
+  // thread that is no longer part of our address space
+
+  // use write() for output since the Logger uses a mutex, and printf also holds locks
+
+  // Prevent cancellation due to race conditions. A cancellation can still be pending for this JobThread, in which case one of the system calls
+  // below triggers it. If this thread/process can be cancelled, there will be multiple processes running, leading to all kinds of Bad Things.
+  Cancellation::disable();
+
+  // close all file descriptors other than stdin/out/err, which might have been openend by
+  // other threads at the time of fork(). We brute force over all possible fds, most of which will be invalid.
+  for (int f = sysconf(_SC_OPEN_MAX); f > 2; --f)
+    (void)close(f);
+
+  // create a valid stdin from which can be read (a blocking fd created by pipe() won't suffice anymore for since at least OpenSSH 5.8)
+  // rationale: this forked process inherits stdin from the parent process, which is unusable because IONProc is started in the background
+  // and routed through mpirun as well. Also, it is shared by all forked processes. Nevertheless, we want Storage to be able to determine
+  // when to shut down based on whether stdin is open. So we create a new stdin.
+  int devzero = open("/dev/zero", O_RDONLY);
+
+  if (devzero < 0)
+    exitwitherror("cannot open /dev/zero\n");
+
+  if (close(0) < 0)
+    exitwitherror("cannot close stdin\n");
+
+  if (dup(devzero) < 0)
+    exitwitherror("cannot dup /dev/zero into stdin\n");
+
+  if (close(devzero) < 0)
+    exitwitherror("cannot close /dev/zero\n");
+
+  if (execv("/usr/bin/ssh", const_cast<char * const *>(sshParams)) < 0)
+    exitwitherror("execv failed\n");
+
+  exitwitherror("execv succeeded but did return\n");
+}
+
+
+pid_t forkSSH(const std::string &logPrefix, const char *hostName, char * const extraParams[], const char *userName, const char *sshKey)
+{
+  pid_t pid;
+
+  LOG_INFO_STR(logPrefix << "Starting");
+
+  vector<const char*> sshParams;
+
+  const char * const defaultParams[] = {
+    "ssh",
+    "-q",
+    "-i", sshKey,
+    "-c", "blowfish",
+    "-o", "StrictHostKeyChecking=no",
+    "-o", "UserKnownHostsFile=/dev/null",
+    "-o", "ServerAliveInterval=30",
+    "-l", userName,
+    hostName,
+    0 };
+
+  for( const char * const *p = defaultParams; *p != 0; p++ )
+    sshParams.push_back(*p);
+
+  for( const char * const *p = extraParams; *p != 0; p++ )
+    sshParams.push_back(*p);
+
+  sshParams.push_back(0);
+
+  switch (pid = fork()) {
+    case -1 : throw SystemCallException("fork", errno, THROW_ARGS);
+
+    case  0 : execSSH(&sshParams[0]);
+  }
+
+  return pid;
+}
+
+
+void joinSSH(const std::string &logPrefix, pid_t pid, unsigned &timeout)
+{
+  if (pid != 0) {
+    int status;
+
+    // always try at least one waitpid(). if child has not exited, optionally
+    // sleep and try again.
+    for (;;) {
+      pid_t ret;
+
+      if ((ret = waitpid(pid, &status, WNOHANG)) == (pid_t)-1) {
+        int error = errno;
+
+        if (error == EINTR) {
+          LOG_DEBUG_STR(logPrefix << " waitpid() was interrupted -- retrying");
+          continue;
+        }
+
+        // error
+        LOG_WARN_STR(logPrefix << " waitpid() failed with errno " << error);
+        return;
+      } else if (ret == 0) {
+        // child still running
+        if (timeout == 0) {
+          break;
+        }
+
+        timeout--;
+        sleep(1);
+      } else {
+        // child exited
+        if (WIFSIGNALED(status) != 0)
+          LOG_WARN_STR(logPrefix << "SSH was killed by signal " << WTERMSIG(status));
+        else if (WEXITSTATUS(status) != 0) {
+          const char *explanation;
+
+          switch (WEXITSTATUS(status)) {
+            default:
+              explanation = "??";
+              break;
+
+            case 255:
+              explanation = "Network or authentication error";
+              break;
+            case 127:
+              explanation = "BASH: command/library not found";
+              break;
+            case 126:
+              explanation = "BASH: command found but could not be executed (wrong architecture?)";
+              break;
+
+            case 128 + SIGHUP:
+              explanation = "killed by SIGHUP";
+              break;
+            case 128 + SIGINT:
+              explanation = "killed by SIGINT (Ctrl-C)";
+              break;
+            case 128 + SIGQUIT:
+              explanation = "killed by SIGQUIT";
+              break;
+            case 128 + SIGILL:
+              explanation = "illegal instruction";
+              break;
+            case 128 + SIGABRT:
+              explanation = "killed by SIGABRT";
+              break;
+            case 128 + SIGKILL:
+              explanation = "killed by SIGKILL";
+              break;
+            case 128 + SIGSEGV:
+              explanation = "segmentation fault";
+              break;
+            case 128 + SIGPIPE:
+              explanation = "broken pipe";
+              break;
+            case 128 + SIGALRM:
+              explanation = "killed by SIGALRM";
+              break;
+            case 128 + SIGTERM:
+              explanation = "killed by SIGTERM";
+              break;
+          }
+
+          LOG_ERROR_STR(logPrefix << " exited with exit code " << WEXITSTATUS(status) << " (" << explanation << ")" );
+        } else
+          LOG_INFO_STR(logPrefix << " terminated normally");
+
+        return;  
+      }
+    }
+
+    // child did not exit within the given timeout
+
+    LOG_WARN_STR(logPrefix << " sending SIGTERM");
+    kill(pid, SIGTERM);
+
+    if (waitpid(pid, &status, 0) == -1) {
+      LOG_WARN_STR(logPrefix << " waitpid() failed");
+    }
+
+    LOG_WARN_STR(logPrefix << " terminated after sending SIGTERM");
+  }
+}
+
+} // namespace RTCP
+} // namespace LOFAR
+
diff --git a/RTCP/IONProc/src/SSH.h b/RTCP/IONProc/src/SSH.h
new file mode 100644
index 00000000000..ba7d8dea2f7
--- /dev/null
+++ b/RTCP/IONProc/src/SSH.h
@@ -0,0 +1,81 @@
+//#  SSH.h
+//#
+//#  P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
+//#
+//#  This program is free software; you can redistribute it and/or modify
+//#  it under the terms of the GNU General Public License as published by
+//#  the Free Software Foundation; either version 2 of the License, or
+//#  (at your option) any later version.
+//#
+//#  This program is distributed in the hope that it will be useful,
+//#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+//#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+//#  GNU General Public License for more details.
+//#
+//#  You should have received a copy of the GNU General Public License
+//#  along with this program; if not, write to the Free Software
+//#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+//#
+//#  $Id: SSH.h 15296 2010-03-24 10:19:41Z mol $
+
+
+//# Never #include <config.h> or #include <lofar_config.h> in a header file!
+
+#ifndef LOFAR_RTCP_SSH_H
+#define LOFAR_RTCP_SSH_H
+
+#include <string>
+
+#ifdef HAVE_LIBSSH2
+#include <Common/Thread/Thread.h>
+#include <Stream/FileDescriptorBasedStream.h>
+#include <libssh2.h>
+#include <Interface/SmartPtr.h>
+#endif
+
+namespace LOFAR {
+namespace RTCP {
+
+#ifdef HAVE_LIBSSH2
+
+class SSHconnection {
+public:
+  SSHconnection(const string &logPrefix, const string &hostname, const string &commandline, const string &username, const string &sshkey);
+
+  void start();
+  void stop( const struct timespec &deadline );
+
+private:
+  const string itsLogPrefix;
+  const string itsHostName;
+  const string itsCommandLine;
+  const string itsUserName;
+  const string itsSSHKey;
+
+  SmartPtr<Thread> itsThread;
+
+  static void free_session( LIBSSH2_SESSION *session );
+  static void free_channel( LIBSSH2_CHANNEL *channel );
+
+  SmartPtr<LIBSSH2_SESSION, SmartPtrFreeFunc<LIBSSH2_SESSION, free_session> > session;
+  SmartPtr<LIBSSH2_CHANNEL, SmartPtrFreeFunc<LIBSSH2_CHANNEL, free_channel> > channel;
+
+  bool waitsocket( FileDescriptorBasedStream &sock );
+
+  bool open_session( FileDescriptorBasedStream &sock );
+  bool open_channel( FileDescriptorBasedStream &sock );
+  bool close_channel( FileDescriptorBasedStream &sock );
+
+  void commThread();
+};
+
+#endif
+
+pid_t forkSSH(const std::string &logPrefix, const char *hostName, char * const extraParams[], const char *userName, const char *sshKey);
+void joinSSH(const std::string &logPrefix, pid_t pid, unsigned &timeout);
+
+} // namespace RTCP
+} // namespace LOFAR
+
+
+#endif
diff --git a/RTCP/IONProc/test/CMakeLists.txt b/RTCP/IONProc/test/CMakeLists.txt
index 45880e81d75..14beaac6c09 100644
--- a/RTCP/IONProc/test/CMakeLists.txt
+++ b/RTCP/IONProc/test/CMakeLists.txt
@@ -6,3 +6,4 @@ include(LofarCTest)
 include_directories(${PACKAGE_SOURCE_DIR}/src)
 
 lofar_add_test(tDelayCompensation tDelayCompensation.cc)
+lofar_add_test(tSSH tSSH.cc)
diff --git a/RTCP/IONProc/test/tDelayCompensation.sh b/RTCP/IONProc/test/tDelayCompensation.sh
index 7e4850dc328..583bf9f3695 100755
--- a/RTCP/IONProc/test/tDelayCompensation.sh
+++ b/RTCP/IONProc/test/tDelayCompensation.sh
@@ -1,3 +1,2 @@
-
 #!/bin/sh
 ./runctest.sh tDelayCompensation 2>&1 > tDelayCompensation.log
diff --git a/RTCP/IONProc/test/tSSH.cc b/RTCP/IONProc/test/tSSH.cc
new file mode 100644
index 00000000000..26de94bbd1b
--- /dev/null
+++ b/RTCP/IONProc/test/tSSH.cc
@@ -0,0 +1,64 @@
+#include <lofar_config.h>
+
+#include <SSH.h>
+#include <unistd.h>
+#include <time.h>
+#include <cstdlib>
+#include <cstdio>
+#include <Stream/SocketStream.h>
+#include <Common/LofarLogger.h>
+
+// some useful environment variables
+char *USER;
+char *HOME;
+
+// the existence of $HOME/.ssh/id_rsa is assumed,
+// as well as the fact that it can be used to
+// authenticate on localhost.
+char privkey[1024];
+
+using namespace LOFAR;
+using namespace RTCP;
+
+
+void test_SSHconnection() {
+  SSHconnection ssh("", "localhost", "echo SSHconnection success", USER, privkey);
+
+  ssh.start();
+
+  struct timespec ts;
+  ts.tv_sec = time(0) + 10;
+  ts.tv_nsec = 0;
+
+  ssh.stop(ts);
+
+}
+
+
+void test_forkExec() {
+  pid_t pid;
+
+  char * const params[] = {
+    "echo",
+    "forkExec success",
+    0
+  };
+  
+  pid = forkSSH("", "localhost", params, USER, privkey);
+
+  unsigned timeout = 10;
+  joinSSH("", pid, timeout);
+}
+
+int main() {
+  INIT_LOGGER( "tSSH" );
+
+  USER = getenv("USER");
+  HOME = getenv("HOME");
+  snprintf(privkey, sizeof privkey, "%s/.ssh/id_rsa", HOME);
+
+  test_SSHconnection();
+  test_forkExec();
+
+  return 0;
+}
diff --git a/RTCP/IONProc/test/tSSH.sh b/RTCP/IONProc/test/tSSH.sh
new file mode 100755
index 00000000000..9a1c8b302b5
--- /dev/null
+++ b/RTCP/IONProc/test/tSSH.sh
@@ -0,0 +1,2 @@
+#!/bin/sh
+./runctest.sh tSSH 2>&1 > tSSH.log
diff --git a/RTCP/IONProc/test/tSSH.stdout b/RTCP/IONProc/test/tSSH.stdout
new file mode 100644
index 00000000000..61fb2a75903
--- /dev/null
+++ b/RTCP/IONProc/test/tSSH.stdout
@@ -0,0 +1,2 @@
+SSHconnection success
+forkExec success
diff --git a/RTCP/Interface/include/Interface/SmartPtr.h b/RTCP/Interface/include/Interface/SmartPtr.h
index 6113f1f3026..85e79a75cfe 100644
--- a/RTCP/Interface/include/Interface/SmartPtr.h
+++ b/RTCP/Interface/include/Interface/SmartPtr.h
@@ -72,6 +72,11 @@ public:
   static void free( T *ptr ) { ::free(ptr); }
 };
 
+template <typename T, void (*F)(T*) > class SmartPtrFreeFunc {
+public:
+  static void free( T *ptr ) { F(ptr); }
+};
+
 template <typename T, class D> inline SmartPtr<T,D>::SmartPtr(T *orig)
 :
   ptr(orig)
diff --git a/lofar_config.h.cmake b/lofar_config.h.cmake
index 2303c2e747f..bc66715589f 100644
--- a/lofar_config.h.cmake
+++ b/lofar_config.h.cmake
@@ -120,6 +120,9 @@
 /* Define if LAM is installed */
 #cmakedefine HAVE_LAM 1
 
+/* Define if libssh2 is installed */
+#cmakedefine HAVE_LIBSSH2 1
+
 /* Define if LOG4CPLUS is installed */
 #cmakedefine HAVE_LOG4CPLUS 1
 
-- 
GitLab