Skip to content
Snippets Groups Projects
Commit 1d89147c authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8725: start ssh with --t option for proper termination of remote...

Task #8725: start ssh with --t option for proper termination of remote processes. Do not run subprocesses in shell
parent 9654e2cb
No related branches found
No related tags found
No related merge requests found
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
import logging import logging
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from socket import getfqdn import socket
import os, sys, getpass import os, sys, getpass
import time import time
import random import random
...@@ -32,8 +32,11 @@ class LtacpException(Exception): ...@@ -32,8 +32,11 @@ class LtacpException(Exception):
def __str__(self): def __str__(self):
return str(self.value) return str(self.value)
def getfqdn(): def getLocalIPAddress():
return "10.178.1.2" s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 0)) # connecting to a UDP address doesn't send packets
local_ip_address = s.getsockname()[0]
return local_ip_address
# converts given srm url of an LTA site into a transport url as needed by gridftp. (Sring replacement based on arcane knowledge.) # converts given srm url of an LTA site into a transport url as needed by gridftp. (Sring replacement based on arcane knowledge.)
def convert_surl_to_turl(surl): def convert_surl_to_turl(surl):
...@@ -118,8 +121,8 @@ class LtaCp: ...@@ -118,8 +121,8 @@ class LtaCp:
break break
# create fifo paths # create fifo paths
self.local_data_fifo = '/tmp/ltacp_datapipe_'+self.src_host+'_'+port_data self.local_data_fifo = '/tmp/ltacp_datapipe_%s_%s' % (self.src_host, port_data)
self.remote_data_fifo = '/tmp/ltacp_md5_receivepipe_'+port_md5 self.remote_data_fifo = '/tmp/ltacp_md5_receivepipe_%s' % (port_md5,)
# create local fifo to stream data to globus-url-copy # create local fifo to stream data to globus-url-copy
logger.info('ltacp %s: creating data fifo for globus-url-copy: %s' % (self.logId, self.local_data_fifo)) logger.info('ltacp %s: creating data fifo for globus-url-copy: %s' % (self.logId, self.local_data_fifo))
...@@ -128,7 +131,7 @@ class LtaCp: ...@@ -128,7 +131,7 @@ class LtaCp:
os.mkfifo(self.local_data_fifo) os.mkfifo(self.local_data_fifo)
# create local fifo to stream data to adler32 # create local fifo to stream data to adler32
self.local_adler32_fifo = self.local_data_fifo+'_adler32' self.local_adler32_fifo = '%s_adler32' % (self.local_data_fifo,)
logger.info('ltacp %s: creating data fifo for adler32: %s' % (self.logId, self.local_adler32_fifo)) logger.info('ltacp %s: creating data fifo for adler32: %s' % (self.logId, self.local_adler32_fifo))
if os.path.exists(self.local_adler32_fifo): if os.path.exists(self.local_adler32_fifo):
os.remove(self.local_adler32_fifo) os.remove(self.local_adler32_fifo)
...@@ -183,34 +186,40 @@ class LtaCp: ...@@ -183,34 +186,40 @@ class LtaCp:
# 2) send tar stream of data/dir + tee to fifo for 3) # 2) send tar stream of data/dir + tee to fifo for 3)
# 3) simultaneously to 2), calculate checksum of fifo stream # 3) simultaneously to 2), calculate checksum of fifo stream
# 4) break fifo # 4) break fifo
cmd_remote_mkfifo = ['ssh '+self.src_user+'@'+self.src_host+ cmd_remote_mkfifo = ['ssh',
' \'mkfifo '+self.remote_data_fifo+ '-tt',
'\''] '%s@%s' % (self.src_user, self.src_host),
'mkfifo %s' % (self.remote_data_fifo,)]
logger.info('ltacp %s: remote creating fifo. executing: %s' % (self.logId, ' '.join(cmd_remote_mkfifo))) logger.info('ltacp %s: remote creating fifo. executing: %s' % (self.logId, ' '.join(cmd_remote_mkfifo)))
p_remote_mkfifo = Popen(cmd_remote_mkfifo, shell=True, stdout=PIPE, stderr=PIPE) p_remote_mkfifo = Popen(cmd_remote_mkfifo, stdout=PIPE, stderr=PIPE)
self.started_procs[p_remote_mkfifo] = cmd_remote_mkfifo self.started_procs[p_remote_mkfifo] = cmd_remote_mkfifo
# block until fifo is created # block until fifo is created
output_remote_mkfifo = p_remote_mkfifo.communicate() output_remote_mkfifo = p_remote_mkfifo.communicate()
if p_remote_mkfifo.returncode != 0: if p_remote_mkfifo.returncode != 0:
raise LtacpException('Remote fifo creation failed: '+output_remote_mkfifo[1]) raise LtacpException('ltacp %s: remote fifo creation failed: \nstdout: %s\nstderr: %s' % (self.logId, output_remote_mkfifo[0],output_remote_mkfifo[1]))
# start sending remote data, tee to fifo # start sending remote data, tee to fifo
src_path_parent, src_path_child = os.path.split(self.src_path_data) src_path_parent, src_path_child = os.path.split(self.src_path_data)
cmd_remote_data = ['ssh '+self.src_user+'@'+self.src_host+ cmd_remote_data = ['ssh',
' \'cd '+src_path_parent+ '-tt',
' ; tar c -O '+src_path_child+' | tee '+self.remote_data_fifo+' | nc --send-only '+getfqdn()+' '+port_data+ '%s@%s' % (self.src_user, self.src_host),
'\''] 'cd %s; tar c -O %s | tee %s | nc --send-only %s %s' % (src_path_parent,
src_path_child,
self.remote_data_fifo,
getLocalIPAddress(),
port_data)]
logger.info('ltacp %s: remote starting transfer. executing: %s' % (self.logId, ' '.join(cmd_remote_data))) logger.info('ltacp %s: remote starting transfer. executing: %s' % (self.logId, ' '.join(cmd_remote_data)))
p_remote_data = Popen(cmd_remote_data, shell=True, stdout=PIPE, stderr=PIPE) p_remote_data = Popen(cmd_remote_data, stdout=PIPE, stderr=PIPE)
self.started_procs[p_remote_data] = cmd_remote_data self.started_procs[p_remote_data] = cmd_remote_data
# start computation of checksum on remote fifo stream # start computation of checksum on remote fifo stream
cmd_remote_checksum = ['ssh '+self.src_user+'@'+self.src_host+ cmd_remote_checksum = ['ssh',
' \'cat '+self.remote_data_fifo+' | md5sum | nc --send-only '+getfqdn()+' '+port_md5+ '-tt',
'\''] '%s@%s' % (self.src_user, self.src_host),
'md5sum %s | nc --send-only %s %s' % (self.remote_data_fifo, getLocalIPAddress(), port_md5)]
logger.info('ltacp %s: remote starting computation of md5 checksum. executing: %s' % (self.logId, ' '.join(cmd_remote_checksum))) logger.info('ltacp %s: remote starting computation of md5 checksum. executing: %s' % (self.logId, ' '.join(cmd_remote_checksum)))
p_remote_checksum = Popen(cmd_remote_checksum, shell=True, stdout=PIPE, stderr=PIPE) p_remote_checksum = Popen(cmd_remote_checksum, stdout=PIPE, stderr=PIPE)
self.started_procs[p_remote_checksum] = cmd_remote_checksum self.started_procs[p_remote_checksum] = cmd_remote_checksum
...@@ -255,7 +264,7 @@ class LtaCp: ...@@ -255,7 +264,7 @@ class LtaCp:
logger.debug('ltacp %s: waiting for local adler32 checksum to complete...' % self.logId) logger.debug('ltacp %s: waiting for local adler32 checksum to complete...' % self.logId)
output_a32_local = p_a32_local.communicate() output_a32_local = p_a32_local.communicate()
if p_a32_local.returncode != 0: if p_a32_local.returncode != 0:
raise LtacpException('local adler32 checksum computation failed: '+str(output_a32_local)) raise LtacpException('ltacp %s: local adler32 checksum computation failed: %s' (self.logId, str(output_a32_local)))
logger.debug('ltacp %s: finished computation of local adler32 checksum' % self.logId) logger.debug('ltacp %s: finished computation of local adler32 checksum' % self.logId)
a32_checksum_local = output_a32_local[0].split()[1] a32_checksum_local = output_a32_local[0].split()[1]
...@@ -290,12 +299,15 @@ class LtaCp: ...@@ -290,12 +299,15 @@ class LtaCp:
if self.remote_data_fifo: if self.remote_data_fifo:
'''remove a file (or fifo) on a remote host. Test if file exists before deleting.''' '''remove a file (or fifo) on a remote host. Test if file exists before deleting.'''
cmd_remote_rm = ['ssh %s@%s \'if [ -e "%s" ] ; then rm %s ; fi ;\'' % (self.src_user, self.src_host, self.remote_data_fifo, self.remote_data_fifo)] cmd_remote_rm = ['ssh',
logger.info('ltacp %s: remote removing file if existing. executing: %s' % (self.logId, ' '.join(cmd_remote_rm))) '-tt',
p_remote_rm = Popen(cmd_remote_rm, shell=True, stdout=PIPE, stderr=PIPE) '%s@%s' % (self.src_user, self.src_host),
'if [ -e "%s" ] ; then rm %s ; fi ;' % (self.remote_data_fifo, self.remote_data_fifo)]
logger.info('ltacp %s: removing remote fifo. executing: %s' % (self.logId, ' '.join(cmd_remote_rm)))
p_remote_rm = Popen(cmd_remote_rm, stdout=PIPE, stderr=PIPE)
p_remote_rm.communicate() p_remote_rm.communicate()
if p_remote_rm.returncode != 0: if p_remote_rm.returncode != 0:
logger.error("Could not remove remote file %s@%s:%s\n%s" % (self.src_user, self.src_host, self.remote_data_fifo, p_remote_rm.stderr)) logger.error("Could not remove remote fifo %s@%s:%s\n%s" % (self.src_user, self.src_host, self.remote_data_fifo, p_remote_rm.stderr))
self.remote_data_fifo = None self.remote_data_fifo = None
# remove local data fifo # remove local data fifo
...@@ -319,7 +331,9 @@ class LtaCp: ...@@ -319,7 +331,9 @@ class LtaCp:
if len(running_procs): if len(running_procs):
logger.warning('ltacp %s: terminating %d running subprocesses...' % (self.logId, len(running_procs))) logger.warning('ltacp %s: terminating %d running subprocesses...' % (self.logId, len(running_procs)))
for p,cl in running_procs.items(): for p,cl in running_procs.items():
logger.warning('ltacp %s: terminated running process pid=%d cmdline=%s' % (self.logId, p.pid, cl)) if isinstance(cl, list):
cl = ' '.join(cl)
logger.warning('ltacp %s: terminated running process pid=%d cmdline: %s' % (self.logId, p.pid, cl))
p.terminate() p.terminate()
logger.info('ltacp %s: terminated %d running subprocesses...' % (self.logId, len(running_procs))) logger.info('ltacp %s: terminated %d running subprocesses...' % (self.logId, len(running_procs)))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment