From c5645b9d20ad688153531f9a6561ca0d5feaade7 Mon Sep 17 00:00:00 2001
From: Jorrit Schaap <schaap@astron.nl>
Date: Tue, 12 Jan 2016 08:08:51 +0000
Subject: [PATCH] Task #8725: poll all processes during transfer for early
 termination. return byte_count as string

---
 LTA/LTAIngest/ltacp.py | 25 +++++++++++++++----------
 1 file changed, 15 insertions(+), 10 deletions(-)

diff --git a/LTA/LTAIngest/ltacp.py b/LTA/LTAIngest/ltacp.py
index 38255a0ba40..6560d2e17e8 100755
--- a/LTA/LTAIngest/ltacp.py
+++ b/LTA/LTAIngest/ltacp.py
@@ -248,7 +248,7 @@ class LtaCp:
 
             # start sending remote data, tee to fifo
             src_path_parent, src_path_child = os.path.split(self.src_path_data)
-            cmd_remote_data = self.ssh_cmd + ['cd %s; tar c --checkpoint=100 --checkpoint-action="ttyout=checkpoint %%u\\n" -O %s | tee %s | %s %s %s' % (src_path_parent,
+            cmd_remote_data = self.ssh_cmd + ['cd %s && tar c --checkpoint=100 --checkpoint-action="ttyout=checkpoint %%u\\n" -O %s | tee %s | %s %s %s' % (src_path_parent,
                    src_path_child,
                    self.remote_data_fifo,
                    self.remoteNetCatCmd,
@@ -269,12 +269,15 @@ class LtaCp:
                 return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / float(10**6)
 
             # waiting for output, comparing checksums, etc.
-            logger.info('ltacp %s: waiting for remote data transfer to finish...' % self.logId)
+            logger.info('ltacp %s: transfering...' % self.logId)
             transfer_start_time = datetime.utcnow()
             prev_progress_time = datetime.utcnow()
             prev_bytes_transfered = 0
-            while p_remote_data.poll() == None:
+
+            # wait and poll for progress while all processes are runnning
+            while len([p for p in self.started_procs.values() if p.poll() is not None]) == 0
                 try:
+                    # read and process tar stdout lines to create progress messages
                     nextline = p_remote_data.stdout.readline()
                     if len(nextline) > 0:
                         record_nr = int(nextline.split()[-1].strip())
@@ -299,9 +302,17 @@ class LtaCp:
                                                                                                         humanreadablesize(avg_speed, 'Bps'),
                                                                                                         humanreadablesize(current_speed, 'Bps'),
                                                                                                         timedelta(seconds=int(round(time_to_go)))))
+                time.sleep(0.25)
                 except KeyboardInterrupt:
                     self.cleanup()
 
+            logger.info('ltacp %s: waiting for transfer via globus-url-copy to LTA to finish...' % self.logId)
+            output_data_out = p_data_out.communicate()
+            if p_data_out.returncode != 0:
+                raise LtacpException('ltacp %s: transfer via globus-url-copy to LTA failed: %s' % (self.logId, output_data_out[1]))
+            logger.info('ltacp %s: data transfer via globus-url-copy to LTA complete.' % self.logId)
+
+            logger.info('ltacp %s: waiting for remote data transfer to finish...' % self.logId)
             output_remote_data = p_remote_data.communicate()
             if p_remote_data.returncode != 0:
                 raise LtacpException('ltacp %s: Error in remote data transfer: %s' % (self.logId, output_remote_data[1]))
@@ -343,12 +354,6 @@ class LtaCp:
             byte_count = int(output_byte_count[0].split()[0].strip())
             logger.info('ltacp %s: byte count of datastream is %d %s' % (self.logId, byte_count, humanreadablesize(byte_count)))
 
-            logger.info('ltacp %s: waiting for transfer via globus-url-copy to LTA to finish...' % self.logId)
-            output_data_out = p_data_out.communicate()
-            if p_data_out.returncode != 0:
-                raise LtacpException('ltacp %s: transfer via globus-url-copy to LTA failed: %s' % (self.logId, output_data_out[1]))
-            logger.info('ltacp %s: data transfer via globus-url-copy to LTA complete.' % self.logId)
-
             logger.debug('ltacp %s: waiting for local adler32 checksum to complete...' % self.logId)
             output_a32_local = p_a32_local.communicate()
             if p_a32_local.returncode != 0:
@@ -380,7 +385,7 @@ class LtaCp:
             self.cleanup()
 
         logger.info('ltacp %s: successfully completed transfer of %s:%s to %s' % (self.logId, self.src_host, self.src_path_data, self.dst_surl))
-        return (md5_checksum_local, a32_checksum_local, byte_count)
+        return (md5_checksum_local, a32_checksum_local, str(byte_count))
 
     def _ncListen(self, log_name):
         # pick initial random port for data receiver
-- 
GitLab