diff --git a/LTA/LTAIngest/ltacp.py b/LTA/LTAIngest/ltacp.py index 38255a0ba40d00cb6dd51c7853932188de810d8a..6560d2e17e8c3fb2abd943208da2ba88fe765539 100755 --- a/LTA/LTAIngest/ltacp.py +++ b/LTA/LTAIngest/ltacp.py @@ -248,7 +248,7 @@ class LtaCp: # start sending remote data, tee to fifo src_path_parent, src_path_child = os.path.split(self.src_path_data) - cmd_remote_data = self.ssh_cmd + ['cd %s; tar c --checkpoint=100 --checkpoint-action="ttyout=checkpoint %%u\\n" -O %s | tee %s | %s %s %s' % (src_path_parent, + cmd_remote_data = self.ssh_cmd + ['cd %s && tar c --checkpoint=100 --checkpoint-action="ttyout=checkpoint %%u\\n" -O %s | tee %s | %s %s %s' % (src_path_parent, src_path_child, self.remote_data_fifo, self.remoteNetCatCmd, @@ -269,12 +269,15 @@ class LtaCp: return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / float(10**6) # waiting for output, comparing checksums, etc. - logger.info('ltacp %s: waiting for remote data transfer to finish...' % self.logId) + logger.info('ltacp %s: transfering...' % self.logId) transfer_start_time = datetime.utcnow() prev_progress_time = datetime.utcnow() prev_bytes_transfered = 0 - while p_remote_data.poll() == None: + + # wait and poll for progress while all processes are runnning + while len([p for p in self.started_procs.values() if p.poll() is not None]) == 0 try: + # read and process tar stdout lines to create progress messages nextline = p_remote_data.stdout.readline() if len(nextline) > 0: record_nr = int(nextline.split()[-1].strip()) @@ -299,9 +302,17 @@ class LtaCp: humanreadablesize(avg_speed, 'Bps'), humanreadablesize(current_speed, 'Bps'), timedelta(seconds=int(round(time_to_go))))) + time.sleep(0.25) except KeyboardInterrupt: self.cleanup() + logger.info('ltacp %s: waiting for transfer via globus-url-copy to LTA to finish...' % self.logId) + output_data_out = p_data_out.communicate() + if p_data_out.returncode != 0: + raise LtacpException('ltacp %s: transfer via globus-url-copy to LTA failed: %s' % (self.logId, output_data_out[1])) + logger.info('ltacp %s: data transfer via globus-url-copy to LTA complete.' % self.logId) + + logger.info('ltacp %s: waiting for remote data transfer to finish...' % self.logId) output_remote_data = p_remote_data.communicate() if p_remote_data.returncode != 0: raise LtacpException('ltacp %s: Error in remote data transfer: %s' % (self.logId, output_remote_data[1])) @@ -343,12 +354,6 @@ class LtaCp: byte_count = int(output_byte_count[0].split()[0].strip()) logger.info('ltacp %s: byte count of datastream is %d %s' % (self.logId, byte_count, humanreadablesize(byte_count))) - logger.info('ltacp %s: waiting for transfer via globus-url-copy to LTA to finish...' % self.logId) - output_data_out = p_data_out.communicate() - if p_data_out.returncode != 0: - raise LtacpException('ltacp %s: transfer via globus-url-copy to LTA failed: %s' % (self.logId, output_data_out[1])) - logger.info('ltacp %s: data transfer via globus-url-copy to LTA complete.' % self.logId) - logger.debug('ltacp %s: waiting for local adler32 checksum to complete...' % self.logId) output_a32_local = p_a32_local.communicate() if p_a32_local.returncode != 0: @@ -380,7 +385,7 @@ class LtaCp: self.cleanup() logger.info('ltacp %s: successfully completed transfer of %s:%s to %s' % (self.logId, self.src_host, self.src_path_data, self.dst_surl)) - return (md5_checksum_local, a32_checksum_local, byte_count) + return (md5_checksum_local, a32_checksum_local, str(byte_count)) def _ncListen(self, log_name): # pick initial random port for data receiver