diff --git a/LTA/LTAIngest/ltacp.py b/LTA/LTAIngest/ltacp.py index 300d26de8c4848dc9e0230b4ee573b166a08e427..8bfb06fe20ed5a27d1a8608b653fd37d0630da04 100755 --- a/LTA/LTAIngest/ltacp.py +++ b/LTA/LTAIngest/ltacp.py @@ -33,10 +33,11 @@ class LtacpException(Exception): return str(self.value) def getLocalIPAddress(): - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect(('8.8.8.8', 0)) # connecting to a UDP address doesn't send packets - local_ip_address = s.getsockname()[0] - return local_ip_address + return '10.178.1.2' + #s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + #s.connect(('8.8.8.8', 0)) # connecting to a UDP address doesn't send packets + #local_ip_address = s.getsockname()[0] + #return local_ip_address # converts given srm url of an LTA site into a transport url as needed by gridftp. (Sring replacement based on arcane knowledge.) def convert_surl_to_turl(surl): @@ -48,6 +49,22 @@ def convert_surl_to_turl(surl): turl = turl.replace("srm://srm.target.rug.nl","gsiftp://gridftp02.target.rug.nl/target/gpfs2/lofar/home/srm",1) return turl +def createNetCatCmd(user, host): + '''helper method to determine the proper call syntax for netcat on host''' + + # nc has no version option or other ways to check it's version + # so, just try the variants and pick the first one that does not fail + nc_variants = ['nc --send-only', 'nc -q 0'] + + for nc_variant in nc_variants: + cmd = ['ssh', '-n', '-x', '%s@%s' % (user, host), nc_variant] + p = Popen(cmd, stdout=PIPE, stderr=PIPE) + out, err = p.communicate() + if 'invalid option' not in err: + return nc_variant + + raise LtacpException('could not determine remote netcat version') + class LtaCp: def __init__(self, @@ -75,6 +92,11 @@ class LtaCp: dst_turl = convert_surl_to_turl(self.dst_surl) logger.info('ltacp %s: initiating transfer of %s:%s to %s' % (self.logId, self.src_host, self.src_path_data, self.dst_surl)) + localIPAddress = getLocalIPAddress() + + localNetCatCmd = createNetCatCmd(getpass.getuser(), localIPAddress) + remoteNetCatCmd = createNetCatCmd(self.src_user, self.src_host) + #--- # Server part #--- @@ -87,12 +109,12 @@ class LtaCp: port_data = str(random.randint(49152, 65535)) while True: # start listen for data stream - cmd_data_in = ['nc', '-l', '-q','0', port_data] + cmd_data_in = localNetCatCmd.split(' ') + ['-l', port_data] logger.info('ltacp %s: listening for data. executing: %s' % (self.logId, ' '.join(cmd_data_in))) p_data_in = Popen(cmd_data_in, stdout=PIPE, stderr=PIPE) time.sleep(0.5) - if p_data_in.poll(): + if p_data_in.poll() is not None: # nc returned prematurely, pick another port to listen to o, e = p_data_in.communicate() logger.info('ltacp %s: nc returned prematurely: %s' % (self.logId, e.strip())) @@ -106,12 +128,12 @@ class LtaCp: port_md5 = str(random.randint(49152, 65535)) while True: # start listen for checksums - cmd_md5_receive = ['nc','-l', '-q','0', port_md5] + cmd_md5_receive = localNetCatCmd.split(' ') + ['-l', port_md5] logger.info('ltacp %s: listening for md5 checksums. executing: %s' % (self.logId, ' '.join(cmd_md5_receive))) p_md5_receive = Popen(cmd_md5_receive, stdout=PIPE, stderr=PIPE) time.sleep(0.5) - if p_md5_receive.poll(): + if p_md5_receive.poll() is not None: # nc returned prematurely, pick another port to listen to o, e = p_md5_receive.communicate() logger.info('ltacp %s: nc returned prematurely: %s' % (self.logId, e.strip())) @@ -140,13 +162,13 @@ class LtaCp: # start tee incoming data stream to fifo (pipe stream further for checksum) cmd_tee_data = ['tee', self.local_data_fifo] logger.info('ltacp %s: splitting datastream. executing on stdout of data listener: %s' % (self.logId, ' '.join(cmd_tee_data),)) - p_tee_data = Popen(cmd_tee_data, stdin=p_data_in.stdout, stdout=PIPE) + p_tee_data = Popen(cmd_tee_data, stdin=p_data_in.stdout, stdout=PIPE, stderr=PIPE) self.started_procs[p_tee_data] = cmd_tee_data # start tee incoming data stream to fifo (pipe stream further for checksum) cmd_tee_checksums = ['tee', self.local_adler32_fifo] logger.info('ltacp %s: splitting datastream again. executing on stdout of 1st data tee: %s' % (self.logId, ' '.join(cmd_tee_checksums),)) - p_tee_checksums = Popen(cmd_tee_checksums, stdin=p_tee_data.stdout, stdout=PIPE) + p_tee_checksums = Popen(cmd_tee_checksums, stdin=p_tee_data.stdout, stdout=PIPE, stderr=PIPE) self.started_procs[p_tee_checksums] = cmd_tee_checksums # start computing md5 checksum of incoming data stream @@ -186,10 +208,9 @@ class LtaCp: # 2) send tar stream of data/dir + tee to fifo for 3) # 3) simultaneously to 2), calculate checksum of fifo stream # 4) break fifo - cmd_remote_mkfifo = ['ssh', - '-tt', - '%s@%s' % (self.src_user, self.src_host), - 'mkfifo %s' % (self.remote_data_fifo,)] + self.ssh_cmd = ['ssh', '-tt', '-n', '-x', '-q', '%s@%s' % (self.src_user, self.src_host)] + + cmd_remote_mkfifo = self.ssh_cmd + ['mkfifo %s' % (self.remote_data_fifo,)] logger.info('ltacp %s: remote creating fifo. executing: %s' % (self.logId, ' '.join(cmd_remote_mkfifo))) p_remote_mkfifo = Popen(cmd_remote_mkfifo, stdout=PIPE, stderr=PIPE) self.started_procs[p_remote_mkfifo] = cmd_remote_mkfifo @@ -201,23 +222,17 @@ class LtaCp: # start sending remote data, tee to fifo src_path_parent, src_path_child = os.path.split(self.src_path_data) - cmd_remote_data = ['ssh', - '-tt', - '%s@%s' % (self.src_user, self.src_host), - 'cd %s; tar c -O %s | tee %s | nc --send-only %s %s' % (src_path_parent, - src_path_child, - self.remote_data_fifo, - getLocalIPAddress(), - port_data)] + cmd_remote_data = self.ssh_cmd + ['cd %s; tar c -O %s | tee %s | %s %s %s' % (src_path_parent, + src_path_child, + self.remote_data_fifo, + remoteNetCatCmd, + localIPAddress, port_data)] logger.info('ltacp %s: remote starting transfer. executing: %s' % (self.logId, ' '.join(cmd_remote_data))) p_remote_data = Popen(cmd_remote_data, stdout=PIPE, stderr=PIPE) self.started_procs[p_remote_data] = cmd_remote_data # start computation of checksum on remote fifo stream - cmd_remote_checksum = ['ssh', - '-tt', - '%s@%s' % (self.src_user, self.src_host), - 'md5sum %s | nc --send-only %s %s' % (self.remote_data_fifo, getLocalIPAddress(), port_md5)] + cmd_remote_checksum = self.ssh_cmd + ['md5sum %s | %s %s %s' % (self.remote_data_fifo, remoteNetCatCmd, localIPAddress, port_md5)] logger.info('ltacp %s: remote starting computation of md5 checksum. executing: %s' % (self.logId, ' '.join(cmd_remote_checksum))) p_remote_checksum = Popen(cmd_remote_checksum, stdout=PIPE, stderr=PIPE) self.started_procs[p_remote_checksum] = cmd_remote_checksum @@ -249,8 +264,8 @@ class LtaCp: logger.debug('ltacp %s: computed local md5 checksum.' % self.logId) # compare remote and local md5 checksums - md5_checksum_remote = output_md5_receive[0].split()[0] - md5_checksum_local = output_md5_local[0].split()[0] + md5_checksum_remote = output_md5_receive[0].split(' ')[0] + md5_checksum_local = output_md5_local[0].split(' ')[0] if(md5_checksum_remote != md5_checksum_local): raise LtacpException('md5 checksum reported by client (%s) does not match local checksum of incoming data stream (%s)' % (self.logId, md5_checksum_remote, md5_checksum_local)) logger.info('ltacp %s: remote and local md5 checksums are equal: %s' % (self.logId, md5_checksum_local,)) @@ -297,12 +312,9 @@ class LtaCp: def cleanup(self): logger.debug('ltacp %s: cleaning up' % (self.logId)) - if self.remote_data_fifo: + if hasattr(self, 'remote_data_fifo') and self.remote_data_fifo: '''remove a file (or fifo) on a remote host. Test if file exists before deleting.''' - cmd_remote_rm = ['ssh', - '-tt', - '%s@%s' % (self.src_user, self.src_host), - 'if [ -e "%s" ] ; then rm %s ; fi ;' % (self.remote_data_fifo, self.remote_data_fifo)] + cmd_remote_rm = self.ssh_cmd + ['if [ -e "%s" ] ; then rm %s ; fi ;' % (self.remote_data_fifo, self.remote_data_fifo)] logger.info('ltacp %s: removing remote fifo. executing: %s' % (self.logId, ' '.join(cmd_remote_rm))) p_remote_rm = Popen(cmd_remote_rm, stdout=PIPE, stderr=PIPE) p_remote_rm.communicate() @@ -311,12 +323,12 @@ class LtaCp: self.remote_data_fifo = None # remove local data fifo - if os.path.exists(self.local_data_fifo): + if hasattr(self, 'local_data_fifo') and os.path.exists(self.local_data_fifo): logger.info('ltacp %s: removing local data fifo for globus-url-copy: %s' % (self.logId, self.local_data_fifo)) os.remove(self.local_data_fifo) # remove local data fifo - if os.path.exists(self.local_adler32_fifo): + if hasattr(self, 'local_adler32_fifo') and os.path.exists(self.local_adler32_fifo): logger.info('ltacp %s: removing local data fifo for adler32: %s' % (self.logId, self.local_adler32_fifo)) os.remove(self.local_adler32_fifo)