Skip to content
Snippets Groups Projects
Commit 08cebff9 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8725: added progress and eta logging

parent dca33075
Branches
Tags
Loading
......@@ -13,7 +13,21 @@ import socket
import os, sys, getpass
import time
import random
import math
import atexit
from datetime import datetime, timedelta
# TODO: reuse method from LCS.PyCommon.utils
def humanreadablesize(num, suffix='B', base=1000):
""" converts the given size (number) to a human readable string in powers of 'base'"""
try:
for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']:
if abs(num) < float(base):
return "%3.1f%s%s" % (num, unit, suffix)
num /= float(base)
return "%.2f%s%s" % (num, 'Y', suffix)
except TypeError:
return str(num)
logger = logging.getLogger('Slave')
......@@ -201,13 +215,32 @@ class LtaCp:
if p_remote_mkfifo.returncode != 0:
raise LtacpException('ltacp %s: remote fifo creation failed: \nstdout: %s\nstderr: %s' % (self.logId, output_remote_mkfifo[0],output_remote_mkfifo[1]))
# get input datasize
cmd_remote_du = self.ssh_cmd + ['du -b --max-depth=0 %s' % (self.src_path_data,)]
logger.info('ltacp %s: remote getting datasize. executing: %s' % (self.logId, ' '.join(cmd_remote_du)))
p_remote_du = Popen(cmd_remote_du, stdout=PIPE, stderr=PIPE)
self.started_procs[p_remote_du] = cmd_remote_du
# block until du is finished
output_remote_du = p_remote_du.communicate()
if p_remote_du.returncode != 0:
raise LtacpException('ltacp %s: remote fifo creation failed: \nstdout: %s\nstderr: %s' % (self.logId,
output_remote_du[0],
output_remote_du[1]))
# compute various parameters for progress logging
input_datasize = int(output_remote_du[0].split()[0])
logger.info('ltacp %s: input datasize: %d bytes, %s' % (self.logId, input_datasize, humanreadablesize(input_datasize)))
estimated_tar_size = 512*(input_datasize / 512) + 3*512 #512byte header, 2*512byte ending, 512byte modulo data
tar_record_size = 10240 # 20 * 512 byte blocks
# start sending remote data, tee to fifo
src_path_parent, src_path_child = os.path.split(self.src_path_data)
cmd_remote_data = self.ssh_cmd + ['cd %s; tar c -O %s | tee %s | %s %s %s' % (src_path_parent,
src_path_child,
self.remote_data_fifo,
self.remoteNetCatCmd,
self.localIPAddress, port_data)]
cmd_remote_data = self.ssh_cmd + ['cd %s; tar c --checkpoint=100 --checkpoint-action="ttyout=checkpoint %%u\\n" -O %s | tee %s | %s %s %s' % (src_path_parent,
src_path_child,
self.remote_data_fifo,
self.remoteNetCatCmd,
self.localIPAddress,
port_data)]
logger.info('ltacp %s: remote starting transfer. executing: %s' % (self.logId, ' '.join(cmd_remote_data)))
p_remote_data = Popen(cmd_remote_data, stdout=PIPE, stderr=PIPE)
self.started_procs[p_remote_data] = cmd_remote_data
......@@ -218,9 +251,44 @@ class LtaCp:
p_remote_checksum = Popen(cmd_remote_checksum, stdout=PIPE, stderr=PIPE)
self.started_procs[p_remote_checksum] = cmd_remote_checksum
# timedelta.total_seconds is only available for python >= 2.7
def timedelta_total_seconds(td):
return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / float(10**6)
# waiting for output, comparing checksums, etc.
logger.info('ltacp %s: waiting for remote data transfer to finish...' % self.logId)
transfer_start_time = datetime.utcnow()
prev_progress_time = datetime.utcnow()
prev_bytes_transfered = 0
while p_remote_data.poll() == None:
try:
nextline = p_remote_data.stdout.readline()
if len(nextline) > 0:
record_nr = int(nextline.split()[-1].strip())
total_bytes_transfered = record_nr * tar_record_size
percentage_done = 100.0*total_bytes_transfered/input_datasize
current_progress_time = datetime.utcnow()
elapsed_secs_since_start = timedelta_total_seconds(current_progress_time - transfer_start_time)
elapsed_secs_since_prev = timedelta_total_seconds(current_progress_time - prev_progress_time)
if percentage_done > 0 and elapsed_secs_since_start > 0 and elapsed_secs_since_prev > 0:
avg_speed = total_bytes_transfered / elapsed_secs_since_start
current_bytes_transfered = total_bytes_transfered - prev_bytes_transfered
current_speed = current_bytes_transfered / elapsed_secs_since_prev
if elapsed_secs_since_prev > 30 or current_bytes_transfered > 1e9:
prev_progress_time = current_progress_time
prev_bytes_transfered = total_bytes_transfered
percentage_to_go = 100.0 - percentage_done
time_to_go = elapsed_secs_since_start * percentage_to_go / percentage_done
logger.info('ltacp %s: transfered %d bytes, %s, %.1f%% at avgSpeed=%s curSpeed=%s to_go=%s' % (self.logId,
total_bytes_transfered,
humanreadablesize(total_bytes_transfered),
percentage_done,
humanreadablesize(avg_speed, 'Bps'),
humanreadablesize(current_speed, 'Bps'),
timedelta(seconds=int(round(time_to_go)))))
except KeyboardInterrupt:
self.cleanup()
output_remote_data = p_remote_data.communicate()
if p_remote_data.returncode != 0:
raise LtacpException('ltacp %s: Error in remote data transfer: %s' % (self.logId, output_remote_data[1]))
......@@ -260,7 +328,7 @@ class LtaCp:
if p_byte_count.returncode != 0:
raise LtacpException('ltacp %s: Error while receiving remote md5 checksum: %s' % (self.logId, output_byte_count[1]))
byte_count = int(output_byte_count[0].split()[2])
logger.info('ltacp %s: byte count of datastream is %d' % (self.logId, byte_count))
logger.info('ltacp %s: byte count of datastream is %d %s' % (self.logId, byte_count, humanreadablesize(byte_count)))
logger.debug('ltacp %s: waiting for transfer via globus-url-copy to LTA to finish...' % self.logId)
output_data_out = p_data_out.communicate()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment