Skip to content
Snippets Groups Projects
Commit 63ffb733 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8725: connect stdin of ssh processes to dev null

parent 17af2fb0
Branches
Tags
No related merge requests found
...@@ -248,83 +248,84 @@ class LtaCp: ...@@ -248,83 +248,84 @@ class LtaCp:
estimated_tar_size = 512*(input_datasize / 512) + 3*512 #512byte header, 2*512byte ending, 512byte modulo data estimated_tar_size = 512*(input_datasize / 512) + 3*512 #512byte header, 2*512byte ending, 512byte modulo data
tar_record_size = 10240 # 20 * 512 byte blocks tar_record_size = 10240 # 20 * 512 byte blocks
# start sending remote data, tee to fifo with open(os.devnull, 'r') as devnull:
src_path_parent, src_path_child = os.path.split(self.src_path_data) # start sending remote data, tee to fifo
cmd_remote_data = self.ssh_cmd + ['cd %s && tar c --checkpoint=100 --checkpoint-action="ttyout=checkpoint %%u\\n" -O %s | tee %s | %s %s %s' % (src_path_parent, src_path_parent, src_path_child = os.path.split(self.src_path_data)
src_path_child, cmd_remote_data = self.ssh_cmd + ['cd %s && tar c --checkpoint=100 --checkpoint-action="ttyout=checkpoint %%u\\n" -O %s | tee %s | %s %s %s' % (src_path_parent,
self.remote_data_fifo, src_path_child,
self.remoteNetCatCmd, self.remote_data_fifo,
self.localIPAddress, self.remoteNetCatCmd,
port_data)] self.localIPAddress,
logger.info('ltacp %s: remote starting transfer. executing: %s' % (self.logId, ' '.join(cmd_remote_data))) port_data)]
p_remote_data = Popen(cmd_remote_data, stdout=PIPE, stderr=PIPE) logger.info('ltacp %s: remote starting transfer. executing: %s' % (self.logId, ' '.join(cmd_remote_data)))
self.started_procs[p_remote_data] = cmd_remote_data p_remote_data = Popen(cmd_remote_data, stdin=devnull, stdout=PIPE, stderr=PIPE)
self.started_procs[p_remote_data] = cmd_remote_data
# start computation of checksum on remote fifo stream
cmd_remote_checksum = self.ssh_cmd + ['md5sum %s | %s %s %s' % (self.remote_data_fifo, self.remoteNetCatCmd, self.localIPAddress, port_md5)] # start computation of checksum on remote fifo stream
logger.info('ltacp %s: remote starting computation of md5 checksum. executing: %s' % (self.logId, ' '.join(cmd_remote_checksum))) cmd_remote_checksum = self.ssh_cmd + ['md5sum %s | %s %s %s' % (self.remote_data_fifo, self.remoteNetCatCmd, self.localIPAddress, port_md5)]
p_remote_checksum = Popen(cmd_remote_checksum, stdout=PIPE, stderr=PIPE) logger.info('ltacp %s: remote starting computation of md5 checksum. executing: %s' % (self.logId, ' '.join(cmd_remote_checksum)))
self.started_procs[p_remote_checksum] = cmd_remote_checksum p_remote_checksum = Popen(cmd_remote_checksum, stdin=devnull, stdout=PIPE, stderr=PIPE)
self.started_procs[p_remote_checksum] = cmd_remote_checksum
# timedelta.total_seconds is only available for python >= 2.7
def timedelta_total_seconds(td): # timedelta.total_seconds is only available for python >= 2.7
return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / float(10**6) def timedelta_total_seconds(td):
return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / float(10**6)
# waiting for output, comparing checksums, etc.
logger.info('ltacp %s: transfering...' % self.logId) # waiting for output, comparing checksums, etc.
transfer_start_time = datetime.utcnow() logger.info('ltacp %s: transfering...' % self.logId)
prev_progress_time = datetime.utcnow() transfer_start_time = datetime.utcnow()
prev_bytes_transfered = 0 prev_progress_time = datetime.utcnow()
prev_bytes_transfered = 0
# wait and poll for progress while all processes are runnning
while len([p for p in self.started_procs.keys() if p.poll() is not None]) == 0: # wait and poll for progress while all processes are runnning
try: while len([p for p in self.started_procs.keys() if p.poll() is not None]) == 0:
# read and process tar stdout lines to create progress messages try:
nextline = p_remote_data.stdout.readline() # read and process tar stdout lines to create progress messages
if len(nextline) > 0: nextline = p_remote_data.stdout.readline()
record_nr = int(nextline.split()[-1].strip()) if len(nextline) > 0:
total_bytes_transfered = record_nr * tar_record_size record_nr = int(nextline.split()[-1].strip())
percentage_done = 100.0*total_bytes_transfered/input_datasize total_bytes_transfered = record_nr * tar_record_size
current_progress_time = datetime.utcnow() percentage_done = 100.0*total_bytes_transfered/input_datasize
elapsed_secs_since_start = timedelta_total_seconds(current_progress_time - transfer_start_time) current_progress_time = datetime.utcnow()
elapsed_secs_since_prev = timedelta_total_seconds(current_progress_time - prev_progress_time) elapsed_secs_since_start = timedelta_total_seconds(current_progress_time - transfer_start_time)
if percentage_done > 0 and elapsed_secs_since_start > 0 and elapsed_secs_since_prev > 0: elapsed_secs_since_prev = timedelta_total_seconds(current_progress_time - prev_progress_time)
avg_speed = total_bytes_transfered / elapsed_secs_since_start if percentage_done > 0 and elapsed_secs_since_start > 0 and elapsed_secs_since_prev > 0:
current_bytes_transfered = total_bytes_transfered - prev_bytes_transfered avg_speed = total_bytes_transfered / elapsed_secs_since_start
current_speed = current_bytes_transfered / elapsed_secs_since_prev current_bytes_transfered = total_bytes_transfered - prev_bytes_transfered
if elapsed_secs_since_prev > 60 or current_bytes_transfered > 0.05*input_datasize: current_speed = current_bytes_transfered / elapsed_secs_since_prev
prev_progress_time = current_progress_time if elapsed_secs_since_prev > 60 or current_bytes_transfered > 0.05*input_datasize:
prev_bytes_transfered = total_bytes_transfered prev_progress_time = current_progress_time
percentage_to_go = 100.0 - percentage_done prev_bytes_transfered = total_bytes_transfered
time_to_go = elapsed_secs_since_start * percentage_to_go / percentage_done percentage_to_go = 100.0 - percentage_done
logger.info('ltacp %s: transfered %d bytes, %s, %.1f%% at avgSpeed=%s curSpeed=%s to_go=%s' % (self.logId, time_to_go = elapsed_secs_since_start * percentage_to_go / percentage_done
total_bytes_transfered, logger.info('ltacp %s: transfered %d bytes, %s, %.1f%% at avgSpeed=%s curSpeed=%s to_go=%s' % (self.logId,
humanreadablesize(total_bytes_transfered), total_bytes_transfered,
percentage_done, humanreadablesize(total_bytes_transfered),
humanreadablesize(avg_speed, 'Bps'), percentage_done,
humanreadablesize(current_speed, 'Bps'), humanreadablesize(avg_speed, 'Bps'),
timedelta(seconds=int(round(time_to_go))))) humanreadablesize(current_speed, 'Bps'),
time.sleep(0.25) timedelta(seconds=int(round(time_to_go)))))
except KeyboardInterrupt: time.sleep(0.25)
self.cleanup() except KeyboardInterrupt:
self.cleanup()
logger.info('ltacp %s: waiting for transfer via globus-url-copy to LTA to finish...' % self.logId)
output_data_out = p_data_out.communicate() logger.info('ltacp %s: waiting for transfer via globus-url-copy to LTA to finish...' % self.logId)
if p_data_out.returncode != 0: output_data_out = p_data_out.communicate()
raise LtacpException('ltacp %s: transfer via globus-url-copy to LTA failed: %s' % (self.logId, output_data_out[1])) if p_data_out.returncode != 0:
logger.info('ltacp %s: data transfer via globus-url-copy to LTA complete.' % self.logId) raise LtacpException('ltacp %s: transfer via globus-url-copy to LTA failed: %s' % (self.logId, output_data_out[1]))
logger.info('ltacp %s: data transfer via globus-url-copy to LTA complete.' % self.logId)
logger.info('ltacp %s: waiting for remote data transfer to finish...' % self.logId)
output_remote_data = p_remote_data.communicate() logger.info('ltacp %s: waiting for remote data transfer to finish...' % self.logId)
if p_remote_data.returncode != 0: output_remote_data = p_remote_data.communicate()
raise LtacpException('ltacp %s: Error in remote data transfer: %s' % (self.logId, output_remote_data[1])) if p_remote_data.returncode != 0:
logger.debug('ltacp %s: remote data transfer finished...' % self.logId) raise LtacpException('ltacp %s: Error in remote data transfer: %s' % (self.logId, output_remote_data[1]))
logger.debug('ltacp %s: remote data transfer finished...' % self.logId)
logger.info('ltacp %s: waiting for remote md5 checksum computation to finish...' % self.logId)
output_remote_checksum = p_remote_checksum.communicate() logger.info('ltacp %s: waiting for remote md5 checksum computation to finish...' % self.logId)
if p_remote_checksum.returncode != 0: output_remote_checksum = p_remote_checksum.communicate()
raise LtacpException('ltacp %s: Error in remote md5 checksum computation: %s' % (self.logId, output_remote_checksum[1])) if p_remote_checksum.returncode != 0:
logger.debug('ltacp %s: remote md5 checksum computation finished.' % self.logId) raise LtacpException('ltacp %s: Error in remote md5 checksum computation: %s' % (self.logId, output_remote_checksum[1]))
logger.debug('ltacp %s: remote md5 checksum computation finished.' % self.logId)
logger.debug('ltacp %s: waiting to receive remote md5 checksum...' % self.logId) logger.debug('ltacp %s: waiting to receive remote md5 checksum...' % self.logId)
output_md5_receive = p_md5_receive.communicate() output_md5_receive = p_md5_receive.communicate()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment