diff --git a/LTA/LTAIngest/ltacp.py b/LTA/LTAIngest/ltacp.py index e3951c8e3c3f1f4cd065e409e27c6e8a9345467a..7da1e2cfd584bdc86ea9ddc2596d88ee2b48e2af 100755 --- a/LTA/LTAIngest/ltacp.py +++ b/LTA/LTAIngest/ltacp.py @@ -13,7 +13,21 @@ import socket import os, sys, getpass import time import random +import math import atexit +from datetime import datetime, timedelta + +# TODO: reuse method from LCS.PyCommon.utils +def humanreadablesize(num, suffix='B', base=1000): + """ converts the given size (number) to a human readable string in powers of 'base'""" + try: + for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']: + if abs(num) < float(base): + return "%3.1f%s%s" % (num, unit, suffix) + num /= float(base) + return "%.2f%s%s" % (num, 'Y', suffix) + except TypeError: + return str(num) logger = logging.getLogger('Slave') @@ -201,13 +215,32 @@ class LtaCp: if p_remote_mkfifo.returncode != 0: raise LtacpException('ltacp %s: remote fifo creation failed: \nstdout: %s\nstderr: %s' % (self.logId, output_remote_mkfifo[0],output_remote_mkfifo[1])) + # get input datasize + cmd_remote_du = self.ssh_cmd + ['du -b --max-depth=0 %s' % (self.src_path_data,)] + logger.info('ltacp %s: remote getting datasize. executing: %s' % (self.logId, ' '.join(cmd_remote_du))) + p_remote_du = Popen(cmd_remote_du, stdout=PIPE, stderr=PIPE) + self.started_procs[p_remote_du] = cmd_remote_du + + # block until du is finished + output_remote_du = p_remote_du.communicate() + if p_remote_du.returncode != 0: + raise LtacpException('ltacp %s: remote fifo creation failed: \nstdout: %s\nstderr: %s' % (self.logId, + output_remote_du[0], + output_remote_du[1])) + # compute various parameters for progress logging + input_datasize = int(output_remote_du[0].split()[0]) + logger.info('ltacp %s: input datasize: %d bytes, %s' % (self.logId, input_datasize, humanreadablesize(input_datasize))) + estimated_tar_size = 512*(input_datasize / 512) + 3*512 #512byte header, 2*512byte ending, 512byte modulo data + tar_record_size = 10240 # 20 * 512 byte blocks + # start sending remote data, tee to fifo src_path_parent, src_path_child = os.path.split(self.src_path_data) - cmd_remote_data = self.ssh_cmd + ['cd %s; tar c -O %s | tee %s | %s %s %s' % (src_path_parent, - src_path_child, - self.remote_data_fifo, - self.remoteNetCatCmd, - self.localIPAddress, port_data)] + cmd_remote_data = self.ssh_cmd + ['cd %s; tar c --checkpoint=100 --checkpoint-action="ttyout=checkpoint %%u\\n" -O %s | tee %s | %s %s %s' % (src_path_parent, + src_path_child, + self.remote_data_fifo, + self.remoteNetCatCmd, + self.localIPAddress, + port_data)] logger.info('ltacp %s: remote starting transfer. executing: %s' % (self.logId, ' '.join(cmd_remote_data))) p_remote_data = Popen(cmd_remote_data, stdout=PIPE, stderr=PIPE) self.started_procs[p_remote_data] = cmd_remote_data @@ -218,9 +251,44 @@ class LtaCp: p_remote_checksum = Popen(cmd_remote_checksum, stdout=PIPE, stderr=PIPE) self.started_procs[p_remote_checksum] = cmd_remote_checksum + # timedelta.total_seconds is only available for python >= 2.7 + def timedelta_total_seconds(td): + return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / float(10**6) # waiting for output, comparing checksums, etc. logger.info('ltacp %s: waiting for remote data transfer to finish...' % self.logId) + transfer_start_time = datetime.utcnow() + prev_progress_time = datetime.utcnow() + prev_bytes_transfered = 0 + while p_remote_data.poll() == None: + try: + nextline = p_remote_data.stdout.readline() + if len(nextline) > 0: + record_nr = int(nextline.split()[-1].strip()) + total_bytes_transfered = record_nr * tar_record_size + percentage_done = 100.0*total_bytes_transfered/input_datasize + current_progress_time = datetime.utcnow() + elapsed_secs_since_start = timedelta_total_seconds(current_progress_time - transfer_start_time) + elapsed_secs_since_prev = timedelta_total_seconds(current_progress_time - prev_progress_time) + if percentage_done > 0 and elapsed_secs_since_start > 0 and elapsed_secs_since_prev > 0: + avg_speed = total_bytes_transfered / elapsed_secs_since_start + current_bytes_transfered = total_bytes_transfered - prev_bytes_transfered + current_speed = current_bytes_transfered / elapsed_secs_since_prev + if elapsed_secs_since_prev > 30 or current_bytes_transfered > 1e9: + prev_progress_time = current_progress_time + prev_bytes_transfered = total_bytes_transfered + percentage_to_go = 100.0 - percentage_done + time_to_go = elapsed_secs_since_start * percentage_to_go / percentage_done + logger.info('ltacp %s: transfered %d bytes, %s, %.1f%% at avgSpeed=%s curSpeed=%s to_go=%s' % (self.logId, + total_bytes_transfered, + humanreadablesize(total_bytes_transfered), + percentage_done, + humanreadablesize(avg_speed, 'Bps'), + humanreadablesize(current_speed, 'Bps'), + timedelta(seconds=int(round(time_to_go))))) + except KeyboardInterrupt: + self.cleanup() + output_remote_data = p_remote_data.communicate() if p_remote_data.returncode != 0: raise LtacpException('ltacp %s: Error in remote data transfer: %s' % (self.logId, output_remote_data[1])) @@ -260,7 +328,7 @@ class LtaCp: if p_byte_count.returncode != 0: raise LtacpException('ltacp %s: Error while receiving remote md5 checksum: %s' % (self.logId, output_byte_count[1])) byte_count = int(output_byte_count[0].split()[2]) - logger.info('ltacp %s: byte count of datastream is %d' % (self.logId, byte_count)) + logger.info('ltacp %s: byte count of datastream is %d %s' % (self.logId, byte_count, humanreadablesize(byte_count))) logger.debug('ltacp %s: waiting for transfer via globus-url-copy to LTA to finish...' % self.logId) output_data_out = p_data_out.communicate()