Skip to content
Snippets Groups Projects
Commit c5645b9d authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8725: poll all processes during transfer for early termination. return byte_count as string

parent 700b6709
No related branches found
No related tags found
No related merge requests found
...@@ -248,7 +248,7 @@ class LtaCp: ...@@ -248,7 +248,7 @@ class LtaCp:
# start sending remote data, tee to fifo # start sending remote data, tee to fifo
src_path_parent, src_path_child = os.path.split(self.src_path_data) src_path_parent, src_path_child = os.path.split(self.src_path_data)
cmd_remote_data = self.ssh_cmd + ['cd %s; tar c --checkpoint=100 --checkpoint-action="ttyout=checkpoint %%u\\n" -O %s | tee %s | %s %s %s' % (src_path_parent, cmd_remote_data = self.ssh_cmd + ['cd %s && tar c --checkpoint=100 --checkpoint-action="ttyout=checkpoint %%u\\n" -O %s | tee %s | %s %s %s' % (src_path_parent,
src_path_child, src_path_child,
self.remote_data_fifo, self.remote_data_fifo,
self.remoteNetCatCmd, self.remoteNetCatCmd,
...@@ -269,12 +269,15 @@ class LtaCp: ...@@ -269,12 +269,15 @@ class LtaCp:
return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / float(10**6) return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / float(10**6)
# waiting for output, comparing checksums, etc. # waiting for output, comparing checksums, etc.
logger.info('ltacp %s: waiting for remote data transfer to finish...' % self.logId) logger.info('ltacp %s: transfering...' % self.logId)
transfer_start_time = datetime.utcnow() transfer_start_time = datetime.utcnow()
prev_progress_time = datetime.utcnow() prev_progress_time = datetime.utcnow()
prev_bytes_transfered = 0 prev_bytes_transfered = 0
while p_remote_data.poll() == None:
# wait and poll for progress while all processes are runnning
while len([p for p in self.started_procs.values() if p.poll() is not None]) == 0
try: try:
# read and process tar stdout lines to create progress messages
nextline = p_remote_data.stdout.readline() nextline = p_remote_data.stdout.readline()
if len(nextline) > 0: if len(nextline) > 0:
record_nr = int(nextline.split()[-1].strip()) record_nr = int(nextline.split()[-1].strip())
...@@ -299,9 +302,17 @@ class LtaCp: ...@@ -299,9 +302,17 @@ class LtaCp:
humanreadablesize(avg_speed, 'Bps'), humanreadablesize(avg_speed, 'Bps'),
humanreadablesize(current_speed, 'Bps'), humanreadablesize(current_speed, 'Bps'),
timedelta(seconds=int(round(time_to_go))))) timedelta(seconds=int(round(time_to_go)))))
time.sleep(0.25)
except KeyboardInterrupt: except KeyboardInterrupt:
self.cleanup() self.cleanup()
logger.info('ltacp %s: waiting for transfer via globus-url-copy to LTA to finish...' % self.logId)
output_data_out = p_data_out.communicate()
if p_data_out.returncode != 0:
raise LtacpException('ltacp %s: transfer via globus-url-copy to LTA failed: %s' % (self.logId, output_data_out[1]))
logger.info('ltacp %s: data transfer via globus-url-copy to LTA complete.' % self.logId)
logger.info('ltacp %s: waiting for remote data transfer to finish...' % self.logId)
output_remote_data = p_remote_data.communicate() output_remote_data = p_remote_data.communicate()
if p_remote_data.returncode != 0: if p_remote_data.returncode != 0:
raise LtacpException('ltacp %s: Error in remote data transfer: %s' % (self.logId, output_remote_data[1])) raise LtacpException('ltacp %s: Error in remote data transfer: %s' % (self.logId, output_remote_data[1]))
...@@ -343,12 +354,6 @@ class LtaCp: ...@@ -343,12 +354,6 @@ class LtaCp:
byte_count = int(output_byte_count[0].split()[0].strip()) byte_count = int(output_byte_count[0].split()[0].strip())
logger.info('ltacp %s: byte count of datastream is %d %s' % (self.logId, byte_count, humanreadablesize(byte_count))) logger.info('ltacp %s: byte count of datastream is %d %s' % (self.logId, byte_count, humanreadablesize(byte_count)))
logger.info('ltacp %s: waiting for transfer via globus-url-copy to LTA to finish...' % self.logId)
output_data_out = p_data_out.communicate()
if p_data_out.returncode != 0:
raise LtacpException('ltacp %s: transfer via globus-url-copy to LTA failed: %s' % (self.logId, output_data_out[1]))
logger.info('ltacp %s: data transfer via globus-url-copy to LTA complete.' % self.logId)
logger.debug('ltacp %s: waiting for local adler32 checksum to complete...' % self.logId) logger.debug('ltacp %s: waiting for local adler32 checksum to complete...' % self.logId)
output_a32_local = p_a32_local.communicate() output_a32_local = p_a32_local.communicate()
if p_a32_local.returncode != 0: if p_a32_local.returncode != 0:
...@@ -380,7 +385,7 @@ class LtaCp: ...@@ -380,7 +385,7 @@ class LtaCp:
self.cleanup() self.cleanup()
logger.info('ltacp %s: successfully completed transfer of %s:%s to %s' % (self.logId, self.src_host, self.src_path_data, self.dst_surl)) logger.info('ltacp %s: successfully completed transfer of %s:%s to %s' % (self.logId, self.src_host, self.src_path_data, self.dst_surl))
return (md5_checksum_local, a32_checksum_local, byte_count) return (md5_checksum_local, a32_checksum_local, str(byte_count))
def _ncListen(self, log_name): def _ncListen(self, log_name):
# pick initial random port for data receiver # pick initial random port for data receiver
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment