diff --git a/.gitattributes b/.gitattributes index 3cccd62b1b4a4a9d0cc451113d674d6daa5577b4..31ec76c29fa06278253be1605fa31c6e42aac54e 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1892,6 +1892,7 @@ CEP/PyBDSM/test/tbdsm_process_image.py -text CMake/FindAskapSoft.cmake -text CMake/FindCasarest.cmake -text CMake/FindJNI.cmake -text +CMake/FindLibssh2.cmake -text CMake/FindValgrind.cmake -text CMake/TODO -text CMake/testscripts/assay -text @@ -3459,6 +3460,8 @@ RTCP/IONProc/src/JobQueue.cc -text RTCP/IONProc/src/JobQueue.h -text RTCP/IONProc/src/PLCClient.cc -text RTCP/IONProc/src/PLCClient.h -text +RTCP/IONProc/src/SSH.cc -text +RTCP/IONProc/src/SSH.h -text RTCP/IONProc/src/StreamMultiplexer.cc -text RTCP/IONProc/src/StreamMultiplexer.h -text RTCP/IONProc/src/generateDelays.cc -text @@ -3467,6 +3470,9 @@ RTCP/IONProc/test/RTCP.parset -text RTCP/IONProc/test/tDelayCompensation.cc -text RTCP/IONProc/test/tDelayCompensation.parset -text RTCP/IONProc/test/tDelayCompensation.sh -text +RTCP/IONProc/test/tSSH.cc -text +RTCP/IONProc/test/tSSH.sh -text +RTCP/IONProc/test/tSSH.stdout -text RTCP/Interface/include/Interface/BeamCoordinates.h -text RTCP/Interface/include/Interface/BeamFormedData.h -text RTCP/Interface/include/Interface/DataFactory.h -text diff --git a/CMake/FindLibssh2.cmake b/CMake/FindLibssh2.cmake new file mode 100644 index 0000000000000000000000000000000000000000..65d4d9d0eb7450b08e654cca84962082b6b4caa3 --- /dev/null +++ b/CMake/FindLibssh2.cmake @@ -0,0 +1,48 @@ +# - Try to find libssh2. +# Variables used by this module: +# LIBSSH2_ROOT_DIR - LIBSSH2 root directory +# Variables defined by this module: +# LIBSSH2_FOUND - system has LIBSSH2 +# LIBSSH2_INCLUDE_DIR - the LIBSSH2 include directory (cached) +# LIBSSH2_INCLUDE_DIRS - the LIBSSH2 include directories +# (identical to LIBSSH2_INCLUDE_DIR) +# LIBSSH2_LIBRARY - the LIBSSH2 library (cached) +# LIBSSH2_LIBRARIES - the LIBSSH2 libraries +# (identical to LIBSSH2_LIBRARY) + +# Copyright (C) 2009 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# $Id: FindLIBSSH2.cmake 21317 2012-06-26 14:22:47Z mol $ + +if(NOT LIBSSH2_FOUND) + + find_path(LIBSSH2_INCLUDE_DIR libssh2.h + HINTS ${LIBSSH2_ROOT_DIR} PATH_SUFFIXES include) + find_library(LIBSSH2_LIBRARY ssh2 + HINTS ${LIBSSH2_ROOT_DIR} PATH_SUFFIXES lib) + mark_as_advanced(LIBSSH2_INCLUDE_DIR LIBSSH2_LIBRARY) + + include(FindPackageHandleStandardArgs) + find_package_handle_standard_args(LIBSSH2 DEFAULT_MSG + LIBSSH2_LIBRARY LIBSSH2_INCLUDE_DIR) + + set(LIBSSH2_INCLUDE_DIRS ${LIBSSH2_INCLUDE_DIR}) + set(LIBSSH2_LIBRARIES ${LIBSSH2_LIBRARY}) + +endif(NOT LIBSSH2_FOUND) diff --git a/CMake/variants/variants.bgfen0 b/CMake/variants/variants.bgfen0 index aa22b5cb82bd1b8a07a32c8313deb3f8c3e9bc8b..ee11ffe170a55854ff0d0877bc22ca98d0be77a0 100644 --- a/CMake/variants/variants.bgfen0 +++ b/CMake/variants/variants.bgfen0 @@ -24,6 +24,7 @@ set(CASACORE_MAKE_REQUIRED_EXTERNALS_OPTIONAL ON) set(MASS_ROOT_DIR /opt/mass) set(FFTW2_ROOT_DIR /globalhome/lofarsystem/packages/fftw-2.1.5-single-precision) set(BOOST_ROOT_DIR /globalhome/lofarsystem/packages/root/bgp_ion/boost) +set(LIBSSH2_ROOT_DIR /globalhome/lofarsystem/packages/root/bgp_ion/libssh2) set(BLITZ_ROOT_DIR /globalhome/lofarsystem/packages/root/bgfen/blitz) set(LOG4CPLUS_ROOT_DIR /globalhome/lofarsystem/packages/root/bgfen/log4cplus) diff --git a/RTCP/CNProc/src/BeamFormer.cc b/RTCP/CNProc/src/BeamFormer.cc index 3403d80f33d1f3a818a37da685fd0cb0b00ec24b..f5c0e1e6e3fa865ba03e0467fe061f97855542cd 100644 --- a/RTCP/CNProc/src/BeamFormer.cc +++ b/RTCP/CNProc/src/BeamFormer.cc @@ -593,7 +593,10 @@ void BeamFormer::computeFlysEye(const SampleData<> *in, SampleData<> *out, unsig unsigned src = itsValidStations[beam][0]; // copy station src to dest - out->flags[beam] = in->flags[src]; + for (unsigned ch = 0; ch < itsNrChannels; ch++) { + out->flags[beam][ch] = in->flags[ch][src]; + } + for (unsigned ch = 0; ch < itsNrChannels; ch ++) memcpy(out->samples[beam][ch].origin(), in->samples[ch][src].origin(), diff --git a/RTCP/CNProc/src/CN_Processing.cc b/RTCP/CNProc/src/CN_Processing.cc index 82d68a748771b137e10a168d2d9cc4be35a2a5f9..114af29a1147f2b5de78dc663263a284bd1c3af3 100644 --- a/RTCP/CNProc/src/CN_Processing.cc +++ b/RTCP/CNProc/src/CN_Processing.cc @@ -688,7 +688,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::filter() #endif if (itsFakeInputData) - FakeData(itsParset).fill(itsFilteredData); + FakeData(itsParset).fill(itsFilteredData, *itsCurrentSubband); } diff --git a/RTCP/IONProc/CMakeLists.txt b/RTCP/IONProc/CMakeLists.txt index 83802cae86c6c20b7ee4f34e2ae33d1e3644a476..80e705e18b524895175c253d744d35d1b42d049f 100644 --- a/RTCP/IONProc/CMakeLists.txt +++ b/RTCP/IONProc/CMakeLists.txt @@ -5,6 +5,7 @@ lofar_package(IONProc 1.0 include(LofarFindPackage) lofar_find_package(Boost REQUIRED) +lofar_find_package(Libssh2) lofar_find_package(Valgrind) lofar_find_package(Casacore COMPONENTS measures REQUIRED) diff --git a/RTCP/IONProc/src/CMakeLists.txt b/RTCP/IONProc/src/CMakeLists.txt index a125b51a4d9aecfa2c7b1b1238139c25d7ec0fa6..9d60c9665b1e97d68ff543b49829d419f1dfa67a 100644 --- a/RTCP/IONProc/src/CMakeLists.txt +++ b/RTCP/IONProc/src/CMakeLists.txt @@ -32,6 +32,7 @@ lofar_add_library(ionproc ReaderWriterSynchronization.cc Scheduling.cc StreamMultiplexer.cc + SSH.cc FCNP_ServerStream.cc) lofar_add_bin_program(versionionproc versionionproc.cc) diff --git a/RTCP/IONProc/src/ION_main.cc b/RTCP/IONProc/src/ION_main.cc index c5ae24384e3bbeb1457ccb744323ca09e0ab59a7..772f122fb5800475172fbd606c01d5100db287df 100644 --- a/RTCP/IONProc/src/ION_main.cc +++ b/RTCP/IONProc/src/ION_main.cc @@ -49,6 +49,10 @@ #include <sys/types.h> #include <sys/mman.h> +#ifdef HAVE_LIBSSH2 +#include <libssh2.h> +#endif + #include <boost/format.hpp> #if defined HAVE_MPI @@ -418,6 +422,14 @@ int main(int argc, char **argv) exit(1); } #endif + +#ifdef HAVE_LIBSSH2 + int rc = libssh2_init(0); + if (rc) { + std::cerr << "libssh2 init failed: " << rc << std::endl; + exit(1); + } +#endif #if defined HAVE_BGP INIT_LOGGER_WITH_SYSINFO(str(boost::format("IONProc@%02d") % myPsetNumber)); @@ -436,6 +448,10 @@ int main(int argc, char **argv) master_thread(); +#ifdef HAVE_LIBSSH2 + libssh2_exit(); +#endif + #if defined HAVE_MPI MPI_Finalize(); #endif diff --git a/RTCP/IONProc/src/Job.cc b/RTCP/IONProc/src/Job.cc index 9ed29ff1795d624ba61b945d605c031a2bf2f713..f2e92648347bb9025ffc68b64ab9a0ed16d28169 100644 --- a/RTCP/IONProc/src/Job.cc +++ b/RTCP/IONProc/src/Job.cc @@ -37,11 +37,7 @@ #include <Stream/SocketStream.h> #include <Stream/PortBroker.h> -#include <sys/types.h> -#include <sys/wait.h> #include <unistd.h> -#include <fcntl.h> -#include <signal.h> #include <time.h> #include <boost/format.hpp> @@ -198,216 +194,6 @@ template <typename T> void Job::broadcast(T &value) } -static void exitwitherror( const char *errorstr ) -{ - // can't cast to (void) since gcc won't allow that as a method to drop the result - int ignoreResult; - - ignoreResult = write(STDERR_FILENO, errorstr, strlen(errorstr)+1); - - // use _exit instead of exit to avoid calling atexit handlers in both - // the master and the child process. - _exit(1); -} - -void Job::StorageProcess::execSSH(const char *sshKey, const char *userName, const char *hostName, const char *executable, const char *rank, const char *cwd, const char *isBigEndian) -{ - // DO NOT DO ANY CALL THAT GRABS A LOCK, since the lock may be held by a - // thread that is no longer part of our address space - - // use write() for output since the Logger uses a mutex, and printf also holds locks - - // Prevent cancellation due to race conditions. A cancellation can still be pending for this JobThread, in which case one of the system calls - // below triggers it. If this thread/process can be cancelled, there will be multiple processes running, leading to all kinds of Bad Things. - Cancellation::disable(); - - // close all file descriptors other than stdin/out/err, which might have been openend by - // other threads at the time of fork(). We brute force over all possible fds, most of which will be invalid. - for (int f = sysconf(_SC_OPEN_MAX); f > 2; --f) - (void)close(f); - - // create a valid stdin from which can be read (a blocking fd created by pipe() won't suffice anymore for since at least OpenSSH 5.8) - // rationale: this forked process inherits stdin from the parent process, which is unusable because IONProc is started in the background - // and routed through mpirun as well. Also, it is shared by all forked processes. Nevertheless, we want Storage to be able to determine - // when to shut down based on whether stdin is open. So we create a new stdin. - int devzero = open("/dev/zero", O_RDONLY); - - if (devzero < 0) - exitwitherror("cannot open /dev/zero\n"); - - if (close(0) < 0) - exitwitherror("cannot close stdin\n"); - - if (dup(devzero) < 0) - exitwitherror("cannot dup /dev/zero into stdin\n"); - - if (close(devzero) < 0) - exitwitherror("cannot close /dev/zero\n"); - - if (execl("/usr/bin/ssh", - "ssh", - "-q", - "-i", sshKey, - "-c", "blowfish", - "-o", "StrictHostKeyChecking=no", - "-o", "UserKnownHostsFile=/dev/null", - "-o", "ServerAliveInterval=30", - "-l", userName, - hostName, - - "cd", cwd, "&&", -#if defined USE_VALGRIND - "valgrind", "--leak-check=full", -#endif - executable, - - boost::lexical_cast<std::string>(itsParset.observationID()).c_str(), - rank, - isBigEndian, - - static_cast<char *>(0) - ) < 0) - exitwitherror("execl failed\n"); - - exitwitherror("execl succeeded but did return\n"); -} - - -void Job::StorageProcess::forkSSH(const char *sshKey, const char *userName, const char *hostName, const char *executable, const char *rank, const char *cwd, const char *isBigEndian) -{ - LOG_INFO_STR(itsLogPrefix << " starting"); - - LOG_DEBUG_STR(itsLogPrefix << "child will exec " - "\"/usr/bin/ssh " - "-q " - "-i " << sshKey << " " - "-c blowfish " - "-o StrictHostKeyChecking=no " - "-o UserKnownHostsFile=/dev/null " - "-o ServerAliveInterval=30 " - "-l " << userName << " " - << hostName << " " - "cd " << cwd << " && " -#if defined USE_VALGRIND - "valgrind " "--leak-check=full " -#endif - << executable << " " << rank << " " << isBigEndian << " " - "\"" - ); - - switch (itsPID = fork()) { - case -1 : throw SystemCallException("fork", errno, THROW_ARGS); - - case 0 : execSSH(sshKey, userName, hostName, executable, rank, cwd, isBigEndian); - } -} - - -void Job::StorageProcess::joinSSH(unsigned &timeout) -{ - if (itsPID != 0) { - int status; - - // always try at least one waitpid(). if child has not exited, optionally - // sleep and try again. - for (;;) { - pid_t ret; - - if ((ret = waitpid(itsPID, &status, WNOHANG)) == (pid_t)-1) { - int error = errno; - - if (error == EINTR) { - LOG_DEBUG_STR(itsLogPrefix << " waitpid() was interrupted -- retrying"); - continue; - } - - // error - LOG_WARN_STR(itsLogPrefix << " waitpid() failed with errno " << error); - return; - } else if (ret == 0) { - // child still running - if (timeout == 0) { - break; - } - - timeout--; - sleep(1); - } else { - // child exited - if (WIFSIGNALED(status) != 0) - LOG_WARN_STR(itsLogPrefix << "SSH was killed by signal " << WTERMSIG(status)); - else if (WEXITSTATUS(status) != 0) { - const char *explanation; - - switch (WEXITSTATUS(status)) { - default: - explanation = "??"; - break; - - case 255: - explanation = "Network or authentication error"; - break; - case 127: - explanation = "BASH: command/library not found"; - break; - case 126: - explanation = "BASH: command found but could not be executed (wrong architecture?)"; - break; - - case 128 + SIGHUP: - explanation = "killed by SIGHUP"; - break; - case 128 + SIGINT: - explanation = "killed by SIGINT (Ctrl-C)"; - break; - case 128 + SIGQUIT: - explanation = "killed by SIGQUIT"; - break; - case 128 + SIGILL: - explanation = "illegal instruction"; - break; - case 128 + SIGABRT: - explanation = "killed by SIGABRT"; - break; - case 128 + SIGKILL: - explanation = "killed by SIGKILL"; - break; - case 128 + SIGSEGV: - explanation = "segmentation fault"; - break; - case 128 + SIGPIPE: - explanation = "broken pipe"; - break; - case 128 + SIGALRM: - explanation = "killed by SIGALRM"; - break; - case 128 + SIGTERM: - explanation = "killed by SIGTERM"; - break; - } - - LOG_ERROR_STR(itsLogPrefix << " exited with exit code " << WEXITSTATUS(status) << " (" << explanation << ")" ); - } else - LOG_INFO_STR(itsLogPrefix << " terminated normally"); - - return; - } - } - - // child did not exit within the given timeout - - LOG_WARN_STR(itsLogPrefix << " sending SIGTERM"); - kill(itsPID, SIGTERM); - - if (waitpid(itsPID, &status, 0) == -1) { - LOG_WARN_STR(itsLogPrefix << " waitpid() failed"); - } - - LOG_WARN_STR(itsLogPrefix << " terminated after sending SIGTERM"); - } -} - - Job::StorageProcess::StorageProcess( const Parset &parset, const string &logPrefix, int rank, const string &hostname ) : itsParset(parset), @@ -436,20 +222,47 @@ void Job::StorageProcess::start() if (getcwd(cwd, sizeof cwd) == 0) throw SystemCallException("getcwd", errno, THROW_ARGS); - forkSSH(sshKey.c_str(), - userName.c_str(), - itsHostname.c_str(), - executable.c_str(), - boost::lexical_cast<std::string>(itsRank).c_str(), - cwd, +#ifdef HAVE_LIBSSH2 + std::string commandLine = str(boost::format("cd %s && %s%s %u %d %u") + % cwd +#if defined USE_VALGRIND + % "valgrind --leak-check=full " +#else + % "" +#endif + % executable + % itsParset.observationID() + % itsRank +#if defined WORDS_BIGENDIAN + % 1 +#else + % 0 +#endif + ); + + itsSSHconnection = new SSHconnection(itsLogPrefix, itsHostname, commandLine, userName, sshKey); + itsSSHconnection->start(); +#else + +#warn Using fork/exec for SSH processes to Storage + const char *commandLine[] = { + "cd ", cwd, " && ", +#if defined USE_VALGRIND + "valgrind " "--leak-check=full " +#endif + executable.c_str(), + boost::lexical_cast<std::string>(itsRank).c_str(), #if defined WORDS_BIGENDIAN - "1" + "1", #else - "0" + "0", #endif - ); + 0 + }; + itsPID = forkSSH(itsLogPrefix, itsHostname.c_str(), commandLine, userName.c_str(), sshKey.c_str()); // client process won't reach this point +#endif itsThread = new Thread(this, &Job::StorageProcess::controlThread, itsLogPrefix + "[ControlThread] ", 65535); } @@ -457,7 +270,17 @@ void Job::StorageProcess::start() void Job::StorageProcess::stop(unsigned &timeout) { - joinSSH(timeout); +#ifdef HAVE_LIBSSH2 + // TODO: update timeout + struct timespec deadline; + + deadline.tv_sec = time(0) + timeout; + deadline.tv_nsec = 0; + + itsSSHconnection->stop(deadline); +#else + joinSSH(itsLogPrefix, itsPID, timeout); +#endif } diff --git a/RTCP/IONProc/src/Job.h b/RTCP/IONProc/src/Job.h index 56e44e3f28474baa9fcb1f2a0e909519f383b0d9..b8235a53a6b914efe909ec9d0ec72e3456bb60f3 100644 --- a/RTCP/IONProc/src/Job.h +++ b/RTCP/IONProc/src/Job.h @@ -36,6 +36,10 @@ #include <Common/Thread/Thread.h> #include <PLCClient.h> +#ifdef HAVE_LIBSSH2 +#include <SSH.h> +#endif + #include <sys/time.h> #include <vector> @@ -100,9 +104,11 @@ class Job : public PLCRunnable private: void controlThread(); - void execSSH(const char *sshKey, const char *userName, const char *hostName, const char *executable, const char *rank, const char *cwd, const char *isBigEndian); - void forkSSH(const char *sshKey, const char *userName, const char *hostName, const char *executable, const char *rank, const char *cwd, const char *isBigEndian); - void joinSSH(unsigned &timeout); +#ifdef HAVE_LIBSSH2 + SmartPtr<SSHconnection> itsSSHconnection; +#else + int itsPID; +#endif const Parset &itsParset; const std::string itsLogPrefix; @@ -110,7 +116,6 @@ class Job : public PLCRunnable const int itsRank; const std::string itsHostname; - int itsPID; SmartPtr<Thread> itsThread; }; diff --git a/RTCP/IONProc/src/SSH.cc b/RTCP/IONProc/src/SSH.cc new file mode 100644 index 0000000000000000000000000000000000000000..f95556d47f9dd75418e5d8da8fe79838a777f35a --- /dev/null +++ b/RTCP/IONProc/src/SSH.cc @@ -0,0 +1,510 @@ +//# SSH.cc: setup an SSH connection using libssh2 +//# +//# Copyright (C) 2012 +//# ASTRON (Netherlands Foundation for Research in Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +//# +//# This program is free software; you can redistribute it and/or modify +//# it under the terms of the GNU General Public License as published by +//# the Free Software Foundation; either version 2 of the License, or +//# (at your option) any later version. +//# +//# This program is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License +//# along with this program; if not, write to the Free Software +//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +//# +//# $Id: SSH.cc 18226 2012-06-09 12:56:47Z mol $ + +//# Always #include <lofar_config.h> first! +#include <lofar_config.h> +#include <SSH.h> + +#include <Common/Thread/Cancellation.h> +#include <Common/LofarLogger.h> +#include <sys/wait.h> +#include <sys/types.h> +#include <unistd.h> +#include <signal.h> +#include <fcntl.h> +#include <vector> + +#ifdef HAVE_LIBSSH2 +#include <sstream> +#include <sys/select.h> +#include <Common/lofar_string.h> +#include <Stream/SocketStream.h> +#endif + +using namespace std; + +namespace LOFAR { +namespace RTCP { + +#ifdef HAVE_LIBSSH2 + +SSHconnection::SSHconnection(const string &logPrefix, const string &hostname, const string &commandline, const string &username, const string &sshkey) +: + itsLogPrefix(logPrefix), + itsHostName(hostname), + itsCommandLine(commandline), + itsUserName(username), + itsSSHKey(sshkey) +{ +} + +void SSHconnection::start() +{ + itsThread = new Thread(this, &SSHconnection::commThread, itsLogPrefix + "[SSH Thread] ", 65536); +} + +void SSHconnection::stop( const struct timespec &deadline ) +{ + if (!itsThread->wait(deadline)) { + itsThread->cancel(); + + itsThread->wait(); + } +} + +void SSHconnection::free_session( LIBSSH2_SESSION *session ) +{ + if (!session) + return; + + libssh2_session_disconnect(session, "Normal Shutdown, Thank you for playing"); + libssh2_session_free(session); +} + +void SSHconnection::free_channel( LIBSSH2_CHANNEL *channel ) +{ + if (!channel) + return; + + libssh2_channel_free(channel); +} + +bool SSHconnection::open_session( FileDescriptorBasedStream &sock ) +{ + int rc; + + /* Create a session instance */ + session = libssh2_session_init(); + if (!session) { + LOG_ERROR_STR( itsLogPrefix << "Cannot create SSH session object" ); + return false; + } + + /* tell libssh2 we want it all done non-blocking */ + libssh2_session_set_blocking(session, 0); + + /* ... start it up. This will trade welcome banners, exchange keys, + * and setup crypto, compression, and MAC layers + */ + while ((rc = libssh2_session_handshake(session, sock.fd)) == + LIBSSH2_ERROR_EAGAIN) { + waitsocket(sock); + } + + if (rc) { + LOG_ERROR_STR( itsLogPrefix << "Failure establishing SSH session: " << rc); + return false; + } + + /* Authenticate by public key */ + while ((rc = libssh2_userauth_publickey_fromfile(session, + itsUserName.c_str(), // remote username + NULL, // public key filename + itsSSHKey.c_str(), // private key filename + NULL // password + )) == + LIBSSH2_ERROR_EAGAIN) { + waitsocket(sock); + } + + if (rc) { + LOG_ERROR_STR( itsLogPrefix << "Authentication by public key failed: " << rc); + return false; + } + + return true; +} + +bool SSHconnection::open_channel( FileDescriptorBasedStream &sock ) +{ + /* Exec non-blocking on the remote host */ + while( (channel = libssh2_channel_open_session(session)) == NULL && + libssh2_session_last_error(session,NULL,NULL,0) == + LIBSSH2_ERROR_EAGAIN ) + { + waitsocket(sock); + } + + if (!channel) + { + LOG_ERROR_STR( itsLogPrefix << "Could not set up SSH channel" ); + return false; + } + + return true; +} + +bool SSHconnection::close_channel( FileDescriptorBasedStream &sock ) +{ + int rc; + + while( (rc = libssh2_channel_close(channel)) == LIBSSH2_ERROR_EAGAIN ) { + waitsocket(sock); + } + + return true; +} + +bool SSHconnection::waitsocket( FileDescriptorBasedStream &sock ) +{ + struct timeval timeout; + int rc; + fd_set fd; + fd_set *writefd = NULL; + fd_set *readfd = NULL; + int dir; + + timeout.tv_sec = 10; + timeout.tv_usec = 0; + + FD_ZERO(&fd); + + FD_SET(sock.fd, &fd); + + /* now make sure we wait in the correct direction */ + dir = libssh2_session_block_directions(session); + + if(dir & LIBSSH2_SESSION_BLOCK_INBOUND) + readfd = &fd; + + if(dir & LIBSSH2_SESSION_BLOCK_OUTBOUND) + writefd = &fd; + + { + Cancellation::enable(); + + // select() is a cancellation point + rc = ::select(sock.fd + 1, readfd, writefd, NULL, &timeout); + + Cancellation::disable(); + } + + return rc > 0; +} + +void SSHconnection::commThread() +{ + SocketStream sock( itsHostName, 22, SocketStream::TCP, SocketStream::Client ); + + LOG_DEBUG_STR( itsLogPrefix << "Connected" ); + + stringstream buffer; + int rc; + int exitcode; + char *exitsignal=(char *)"none"; + + /* Prevent cancellation from here on -- we manually insert cancellation points to avoid + screwing up libssh2's internal administration. */ + Cancellation::disable(); + Cancellation::point(); + + if (!open_session(sock)) + return; + + if (!open_channel(sock)) + return; + + while( (rc = libssh2_channel_exec(channel, itsCommandLine.c_str())) == + LIBSSH2_ERROR_EAGAIN ) + { + waitsocket(sock); + } + + if (rc) + { + LOG_ERROR_STR( itsLogPrefix << "Failure starting remote command: " << rc); + return; + } + + LOG_DEBUG_STR( itsLogPrefix << "Remote command started, waiting for output" ); + + /* Session I/O */ + for( ;; ) + { + /* loop until we block */ + do { + char data[0x4000]; + + rc = libssh2_channel_read( channel, data, sizeof data ); + if( rc > 0 ) + { + buffer.write( data, rc ); + + /* extract and log lines */ + for( ;; ) + { + Cancellation::point(); + + buffer.getline( data, sizeof data ); + + if (buffer.fail()) { + // no line found + buffer.clear(); + break; + } + + // TODO: Use logger somehow (we'd duplicate the prefix if we just use LOG_* macros..) + cout << data << endl; + } + } else { + if( rc < 0 && rc != LIBSSH2_ERROR_EAGAIN ) { + /* no need to output this for the EAGAIN case */ + LOG_ERROR_STR( itsLogPrefix << "libssh2_channel_read returned " << rc); + } + } + } while( rc > 0 ); + + /* this is due to blocking that would occur otherwise so we loop on + this condition */ + if( rc == LIBSSH2_ERROR_EAGAIN ) + { + waitsocket(sock); + } else { + /* EOF */ + break; + } + } + + LOG_DEBUG_STR( itsLogPrefix << "Disconnecting" ); + + close_channel(sock); + + if (rc == 0) + { + exitcode = libssh2_channel_get_exit_status( channel ); + libssh2_channel_get_exit_signal(channel, &exitsignal, + NULL, NULL, NULL, NULL, NULL); + } else { + exitcode = 127; + } + + if (exitsignal) { + LOG_ERROR_STR(itsLogPrefix << "SSH was killed by signal " << exitsignal); + } else if(exitcode > 0) { + const char *explanation = ""; + + LOG_ERROR_STR(itsLogPrefix << "Exited with exit code " << exitcode << " (" << explanation << ")" ); + } else { + LOG_INFO_STR(itsLogPrefix << "Terminated normally"); + } +} + +#endif + +static void exitwitherror( const char *errorstr ) +{ + // can't cast to (void) since gcc won't allow that as a method to drop the result + int ignoreResult; + + ignoreResult = write(STDERR_FILENO, errorstr, strlen(errorstr)+1); + + // use _exit instead of exit to avoid calling atexit handlers in both + // the master and the child process. + _exit(1); +} + +static void execSSH(const char * const sshParams[]) +{ + // DO NOT DO ANY CALL THAT GRABS A LOCK, since the lock may be held by a + // thread that is no longer part of our address space + + // use write() for output since the Logger uses a mutex, and printf also holds locks + + // Prevent cancellation due to race conditions. A cancellation can still be pending for this JobThread, in which case one of the system calls + // below triggers it. If this thread/process can be cancelled, there will be multiple processes running, leading to all kinds of Bad Things. + Cancellation::disable(); + + // close all file descriptors other than stdin/out/err, which might have been openend by + // other threads at the time of fork(). We brute force over all possible fds, most of which will be invalid. + for (int f = sysconf(_SC_OPEN_MAX); f > 2; --f) + (void)close(f); + + // create a valid stdin from which can be read (a blocking fd created by pipe() won't suffice anymore for since at least OpenSSH 5.8) + // rationale: this forked process inherits stdin from the parent process, which is unusable because IONProc is started in the background + // and routed through mpirun as well. Also, it is shared by all forked processes. Nevertheless, we want Storage to be able to determine + // when to shut down based on whether stdin is open. So we create a new stdin. + int devzero = open("/dev/zero", O_RDONLY); + + if (devzero < 0) + exitwitherror("cannot open /dev/zero\n"); + + if (close(0) < 0) + exitwitherror("cannot close stdin\n"); + + if (dup(devzero) < 0) + exitwitherror("cannot dup /dev/zero into stdin\n"); + + if (close(devzero) < 0) + exitwitherror("cannot close /dev/zero\n"); + + if (execv("/usr/bin/ssh", const_cast<char * const *>(sshParams)) < 0) + exitwitherror("execv failed\n"); + + exitwitherror("execv succeeded but did return\n"); +} + + +pid_t forkSSH(const std::string &logPrefix, const char *hostName, char * const extraParams[], const char *userName, const char *sshKey) +{ + pid_t pid; + + LOG_INFO_STR(logPrefix << "Starting"); + + vector<const char*> sshParams; + + const char * const defaultParams[] = { + "ssh", + "-q", + "-i", sshKey, + "-c", "blowfish", + "-o", "StrictHostKeyChecking=no", + "-o", "UserKnownHostsFile=/dev/null", + "-o", "ServerAliveInterval=30", + "-l", userName, + hostName, + 0 }; + + for( const char * const *p = defaultParams; *p != 0; p++ ) + sshParams.push_back(*p); + + for( const char * const *p = extraParams; *p != 0; p++ ) + sshParams.push_back(*p); + + sshParams.push_back(0); + + switch (pid = fork()) { + case -1 : throw SystemCallException("fork", errno, THROW_ARGS); + + case 0 : execSSH(&sshParams[0]); + } + + return pid; +} + + +void joinSSH(const std::string &logPrefix, pid_t pid, unsigned &timeout) +{ + if (pid != 0) { + int status; + + // always try at least one waitpid(). if child has not exited, optionally + // sleep and try again. + for (;;) { + pid_t ret; + + if ((ret = waitpid(pid, &status, WNOHANG)) == (pid_t)-1) { + int error = errno; + + if (error == EINTR) { + LOG_DEBUG_STR(logPrefix << " waitpid() was interrupted -- retrying"); + continue; + } + + // error + LOG_WARN_STR(logPrefix << " waitpid() failed with errno " << error); + return; + } else if (ret == 0) { + // child still running + if (timeout == 0) { + break; + } + + timeout--; + sleep(1); + } else { + // child exited + if (WIFSIGNALED(status) != 0) + LOG_WARN_STR(logPrefix << "SSH was killed by signal " << WTERMSIG(status)); + else if (WEXITSTATUS(status) != 0) { + const char *explanation; + + switch (WEXITSTATUS(status)) { + default: + explanation = "??"; + break; + + case 255: + explanation = "Network or authentication error"; + break; + case 127: + explanation = "BASH: command/library not found"; + break; + case 126: + explanation = "BASH: command found but could not be executed (wrong architecture?)"; + break; + + case 128 + SIGHUP: + explanation = "killed by SIGHUP"; + break; + case 128 + SIGINT: + explanation = "killed by SIGINT (Ctrl-C)"; + break; + case 128 + SIGQUIT: + explanation = "killed by SIGQUIT"; + break; + case 128 + SIGILL: + explanation = "illegal instruction"; + break; + case 128 + SIGABRT: + explanation = "killed by SIGABRT"; + break; + case 128 + SIGKILL: + explanation = "killed by SIGKILL"; + break; + case 128 + SIGSEGV: + explanation = "segmentation fault"; + break; + case 128 + SIGPIPE: + explanation = "broken pipe"; + break; + case 128 + SIGALRM: + explanation = "killed by SIGALRM"; + break; + case 128 + SIGTERM: + explanation = "killed by SIGTERM"; + break; + } + + LOG_ERROR_STR(logPrefix << " exited with exit code " << WEXITSTATUS(status) << " (" << explanation << ")" ); + } else + LOG_INFO_STR(logPrefix << " terminated normally"); + + return; + } + } + + // child did not exit within the given timeout + + LOG_WARN_STR(logPrefix << " sending SIGTERM"); + kill(pid, SIGTERM); + + if (waitpid(pid, &status, 0) == -1) { + LOG_WARN_STR(logPrefix << " waitpid() failed"); + } + + LOG_WARN_STR(logPrefix << " terminated after sending SIGTERM"); + } +} + +} // namespace RTCP +} // namespace LOFAR + diff --git a/RTCP/IONProc/src/SSH.h b/RTCP/IONProc/src/SSH.h new file mode 100644 index 0000000000000000000000000000000000000000..ba7d8dea2f75c3d82007aceb337ac594cf4a1870 --- /dev/null +++ b/RTCP/IONProc/src/SSH.h @@ -0,0 +1,81 @@ +//# SSH.h +//# +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +//# +//# This program is free software; you can redistribute it and/or modify +//# it under the terms of the GNU General Public License as published by +//# the Free Software Foundation; either version 2 of the License, or +//# (at your option) any later version. +//# +//# This program is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License +//# along with this program; if not, write to the Free Software +//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +//# +//# $Id: SSH.h 15296 2010-03-24 10:19:41Z mol $ + + +//# Never #include <config.h> or #include <lofar_config.h> in a header file! + +#ifndef LOFAR_RTCP_SSH_H +#define LOFAR_RTCP_SSH_H + +#include <string> + +#ifdef HAVE_LIBSSH2 +#include <Common/Thread/Thread.h> +#include <Stream/FileDescriptorBasedStream.h> +#include <libssh2.h> +#include <Interface/SmartPtr.h> +#endif + +namespace LOFAR { +namespace RTCP { + +#ifdef HAVE_LIBSSH2 + +class SSHconnection { +public: + SSHconnection(const string &logPrefix, const string &hostname, const string &commandline, const string &username, const string &sshkey); + + void start(); + void stop( const struct timespec &deadline ); + +private: + const string itsLogPrefix; + const string itsHostName; + const string itsCommandLine; + const string itsUserName; + const string itsSSHKey; + + SmartPtr<Thread> itsThread; + + static void free_session( LIBSSH2_SESSION *session ); + static void free_channel( LIBSSH2_CHANNEL *channel ); + + SmartPtr<LIBSSH2_SESSION, SmartPtrFreeFunc<LIBSSH2_SESSION, free_session> > session; + SmartPtr<LIBSSH2_CHANNEL, SmartPtrFreeFunc<LIBSSH2_CHANNEL, free_channel> > channel; + + bool waitsocket( FileDescriptorBasedStream &sock ); + + bool open_session( FileDescriptorBasedStream &sock ); + bool open_channel( FileDescriptorBasedStream &sock ); + bool close_channel( FileDescriptorBasedStream &sock ); + + void commThread(); +}; + +#endif + +pid_t forkSSH(const std::string &logPrefix, const char *hostName, char * const extraParams[], const char *userName, const char *sshKey); +void joinSSH(const std::string &logPrefix, pid_t pid, unsigned &timeout); + +} // namespace RTCP +} // namespace LOFAR + + +#endif diff --git a/RTCP/IONProc/test/CMakeLists.txt b/RTCP/IONProc/test/CMakeLists.txt index 45880e81d754957a460aadc53b1168c3b34fbedf..14beaac6c093928066bee009967f3a8968dd75ea 100644 --- a/RTCP/IONProc/test/CMakeLists.txt +++ b/RTCP/IONProc/test/CMakeLists.txt @@ -6,3 +6,4 @@ include(LofarCTest) include_directories(${PACKAGE_SOURCE_DIR}/src) lofar_add_test(tDelayCompensation tDelayCompensation.cc) +lofar_add_test(tSSH tSSH.cc) diff --git a/RTCP/IONProc/test/tDelayCompensation.sh b/RTCP/IONProc/test/tDelayCompensation.sh index 7e4850dc32878d907c55c121f5db0fdbc43877f6..583bf9f3695f5a3d810a38ffe3da10f3c1c43cb6 100755 --- a/RTCP/IONProc/test/tDelayCompensation.sh +++ b/RTCP/IONProc/test/tDelayCompensation.sh @@ -1,3 +1,2 @@ - #!/bin/sh ./runctest.sh tDelayCompensation 2>&1 > tDelayCompensation.log diff --git a/RTCP/IONProc/test/tSSH.cc b/RTCP/IONProc/test/tSSH.cc new file mode 100644 index 0000000000000000000000000000000000000000..26de94bbd1ba8cfec82d9b523d3540f35f32239b --- /dev/null +++ b/RTCP/IONProc/test/tSSH.cc @@ -0,0 +1,64 @@ +#include <lofar_config.h> + +#include <SSH.h> +#include <unistd.h> +#include <time.h> +#include <cstdlib> +#include <cstdio> +#include <Stream/SocketStream.h> +#include <Common/LofarLogger.h> + +// some useful environment variables +char *USER; +char *HOME; + +// the existence of $HOME/.ssh/id_rsa is assumed, +// as well as the fact that it can be used to +// authenticate on localhost. +char privkey[1024]; + +using namespace LOFAR; +using namespace RTCP; + + +void test_SSHconnection() { + SSHconnection ssh("", "localhost", "echo SSHconnection success", USER, privkey); + + ssh.start(); + + struct timespec ts; + ts.tv_sec = time(0) + 10; + ts.tv_nsec = 0; + + ssh.stop(ts); + +} + + +void test_forkExec() { + pid_t pid; + + char * const params[] = { + "echo", + "forkExec success", + 0 + }; + + pid = forkSSH("", "localhost", params, USER, privkey); + + unsigned timeout = 10; + joinSSH("", pid, timeout); +} + +int main() { + INIT_LOGGER( "tSSH" ); + + USER = getenv("USER"); + HOME = getenv("HOME"); + snprintf(privkey, sizeof privkey, "%s/.ssh/id_rsa", HOME); + + test_SSHconnection(); + test_forkExec(); + + return 0; +} diff --git a/RTCP/IONProc/test/tSSH.sh b/RTCP/IONProc/test/tSSH.sh new file mode 100755 index 0000000000000000000000000000000000000000..9a1c8b302b55fbc8be21fea61e0265a25072012a --- /dev/null +++ b/RTCP/IONProc/test/tSSH.sh @@ -0,0 +1,2 @@ +#!/bin/sh +./runctest.sh tSSH 2>&1 > tSSH.log diff --git a/RTCP/IONProc/test/tSSH.stdout b/RTCP/IONProc/test/tSSH.stdout new file mode 100644 index 0000000000000000000000000000000000000000..61fb2a75903ab38f9e9b0ecc41da17dc2f318793 --- /dev/null +++ b/RTCP/IONProc/test/tSSH.stdout @@ -0,0 +1,2 @@ +SSHconnection success +forkExec success diff --git a/RTCP/Interface/include/Interface/FakeData.h b/RTCP/Interface/include/Interface/FakeData.h index 9185ef13b0fda3fcc8ec82f5678d1629fca8b54c..3efd59db0d481ebae9cdde719ee3cbc286f78896 100644 --- a/RTCP/Interface/include/Interface/FakeData.h +++ b/RTCP/Interface/include/Interface/FakeData.h @@ -15,7 +15,7 @@ class FakeData { public: FakeData( const Parset &parset ): itsParset(parset) {} - void fill( FilteredData *data ) const; + void fill( FilteredData *data, unsigned subband ) const; void check( const FilteredData *data ) const; void check( const FinalBeamFormedData *data, unsigned pol ) const; @@ -40,13 +40,14 @@ template<> bool FakeData::equal( const fcomplex a, const fcomplex b ) const { return equal(real(a), real(b)) && equal(imag(a), imag(b)); } -void FakeData::fill( FilteredData *data ) const +void FakeData::fill( FilteredData *data, unsigned subband ) const { for (unsigned s = 0; s < itsParset.nrStations(); s++) { for (unsigned c = 0; c < itsParset.nrChannelsPerSubband(); c++) { for (unsigned t = 0; t < itsParset.CNintegrationSteps(); t++) { - data->samples[c][s][t][0] = makefcomplex(1 * t, 2 * t); - data->samples[c][s][t][1] = makefcomplex(3 * t, 5 * t); + const float base = 1000 * subband; + data->samples[c][s][t][0] = makefcomplex(base + 1 * t, base + 2 * t); + data->samples[c][s][t][1] = makefcomplex(base + 3 * t, base + 5 * t); } data->flags[c][s].reset(); } diff --git a/RTCP/Interface/include/Interface/SmartPtr.h b/RTCP/Interface/include/Interface/SmartPtr.h index 6113f1f30261c8d8a6c78619500d0fa542d0bf39..85e79a75cfe2bf8b0bf8f72e87d3162a635c3d46 100644 --- a/RTCP/Interface/include/Interface/SmartPtr.h +++ b/RTCP/Interface/include/Interface/SmartPtr.h @@ -72,6 +72,11 @@ public: static void free( T *ptr ) { ::free(ptr); } }; +template <typename T, void (*F)(T*) > class SmartPtrFreeFunc { +public: + static void free( T *ptr ) { F(ptr); } +}; + template <typename T, class D> inline SmartPtr<T,D>::SmartPtr(T *orig) : ptr(orig) diff --git a/lofar_config.h.cmake b/lofar_config.h.cmake index 2303c2e747f42200271004bc9059324ae69088d5..bc66715589fb890db06f5b1da9045001c9a40314 100644 --- a/lofar_config.h.cmake +++ b/lofar_config.h.cmake @@ -120,6 +120,9 @@ /* Define if LAM is installed */ #cmakedefine HAVE_LAM 1 +/* Define if libssh2 is installed */ +#cmakedefine HAVE_LIBSSH2 1 + /* Define if LOG4CPLUS is installed */ #cmakedefine HAVE_LOG4CPLUS 1