From 63ffb733f0d11c17ebcc75b55e06e403e19e9b6b Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Wed, 20 Jan 2016 08:52:36 +0000 Subject: [PATCH] Task #8725: connect stdin of ssh processes to dev null --- LTA/LTAIngest/ltacp.py | 155 +++++++++++++++++++++-------------------- 1 file changed, 78 insertions(+), 77 deletions(-) diff --git a/LTA/LTAIngest/ltacp.py b/LTA/LTAIngest/ltacp.py index 762ef5ffc95..b4bb42cfa97 100755 --- a/LTA/LTAIngest/ltacp.py +++ b/LTA/LTAIngest/ltacp.py @@ -248,83 +248,84 @@ class LtaCp: estimated_tar_size = 512*(input_datasize / 512) + 3*512 #512byte header, 2*512byte ending, 512byte modulo data tar_record_size = 10240 # 20 * 512 byte blocks - # start sending remote data, tee to fifo - src_path_parent, src_path_child = os.path.split(self.src_path_data) - cmd_remote_data = self.ssh_cmd + ['cd %s && tar c --checkpoint=100 --checkpoint-action="ttyout=checkpoint %%u\\n" -O %s | tee %s | %s %s %s' % (src_path_parent, - src_path_child, - self.remote_data_fifo, - self.remoteNetCatCmd, - self.localIPAddress, - port_data)] - logger.info('ltacp %s: remote starting transfer. executing: %s' % (self.logId, ' '.join(cmd_remote_data))) - p_remote_data = Popen(cmd_remote_data, stdout=PIPE, stderr=PIPE) - self.started_procs[p_remote_data] = cmd_remote_data - - # start computation of checksum on remote fifo stream - cmd_remote_checksum = self.ssh_cmd + ['md5sum %s | %s %s %s' % (self.remote_data_fifo, self.remoteNetCatCmd, self.localIPAddress, port_md5)] - logger.info('ltacp %s: remote starting computation of md5 checksum. executing: %s' % (self.logId, ' '.join(cmd_remote_checksum))) - p_remote_checksum = Popen(cmd_remote_checksum, stdout=PIPE, stderr=PIPE) - self.started_procs[p_remote_checksum] = cmd_remote_checksum - - # timedelta.total_seconds is only available for python >= 2.7 - def timedelta_total_seconds(td): - return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / float(10**6) - - # waiting for output, comparing checksums, etc. - logger.info('ltacp %s: transfering...' % self.logId) - transfer_start_time = datetime.utcnow() - prev_progress_time = datetime.utcnow() - prev_bytes_transfered = 0 - - # wait and poll for progress while all processes are runnning - while len([p for p in self.started_procs.keys() if p.poll() is not None]) == 0: - try: - # read and process tar stdout lines to create progress messages - nextline = p_remote_data.stdout.readline() - if len(nextline) > 0: - record_nr = int(nextline.split()[-1].strip()) - total_bytes_transfered = record_nr * tar_record_size - percentage_done = 100.0*total_bytes_transfered/input_datasize - current_progress_time = datetime.utcnow() - elapsed_secs_since_start = timedelta_total_seconds(current_progress_time - transfer_start_time) - elapsed_secs_since_prev = timedelta_total_seconds(current_progress_time - prev_progress_time) - if percentage_done > 0 and elapsed_secs_since_start > 0 and elapsed_secs_since_prev > 0: - avg_speed = total_bytes_transfered / elapsed_secs_since_start - current_bytes_transfered = total_bytes_transfered - prev_bytes_transfered - current_speed = current_bytes_transfered / elapsed_secs_since_prev - if elapsed_secs_since_prev > 60 or current_bytes_transfered > 0.05*input_datasize: - prev_progress_time = current_progress_time - prev_bytes_transfered = total_bytes_transfered - percentage_to_go = 100.0 - percentage_done - time_to_go = elapsed_secs_since_start * percentage_to_go / percentage_done - logger.info('ltacp %s: transfered %d bytes, %s, %.1f%% at avgSpeed=%s curSpeed=%s to_go=%s' % (self.logId, - total_bytes_transfered, - humanreadablesize(total_bytes_transfered), - percentage_done, - humanreadablesize(avg_speed, 'Bps'), - humanreadablesize(current_speed, 'Bps'), - timedelta(seconds=int(round(time_to_go))))) - time.sleep(0.25) - except KeyboardInterrupt: - self.cleanup() - - logger.info('ltacp %s: waiting for transfer via globus-url-copy to LTA to finish...' % self.logId) - output_data_out = p_data_out.communicate() - if p_data_out.returncode != 0: - raise LtacpException('ltacp %s: transfer via globus-url-copy to LTA failed: %s' % (self.logId, output_data_out[1])) - logger.info('ltacp %s: data transfer via globus-url-copy to LTA complete.' % self.logId) - - logger.info('ltacp %s: waiting for remote data transfer to finish...' % self.logId) - output_remote_data = p_remote_data.communicate() - if p_remote_data.returncode != 0: - raise LtacpException('ltacp %s: Error in remote data transfer: %s' % (self.logId, output_remote_data[1])) - logger.debug('ltacp %s: remote data transfer finished...' % self.logId) - - logger.info('ltacp %s: waiting for remote md5 checksum computation to finish...' % self.logId) - output_remote_checksum = p_remote_checksum.communicate() - if p_remote_checksum.returncode != 0: - raise LtacpException('ltacp %s: Error in remote md5 checksum computation: %s' % (self.logId, output_remote_checksum[1])) - logger.debug('ltacp %s: remote md5 checksum computation finished.' % self.logId) + with open(os.devnull, 'r') as devnull: + # start sending remote data, tee to fifo + src_path_parent, src_path_child = os.path.split(self.src_path_data) + cmd_remote_data = self.ssh_cmd + ['cd %s && tar c --checkpoint=100 --checkpoint-action="ttyout=checkpoint %%u\\n" -O %s | tee %s | %s %s %s' % (src_path_parent, + src_path_child, + self.remote_data_fifo, + self.remoteNetCatCmd, + self.localIPAddress, + port_data)] + logger.info('ltacp %s: remote starting transfer. executing: %s' % (self.logId, ' '.join(cmd_remote_data))) + p_remote_data = Popen(cmd_remote_data, stdin=devnull, stdout=PIPE, stderr=PIPE) + self.started_procs[p_remote_data] = cmd_remote_data + + # start computation of checksum on remote fifo stream + cmd_remote_checksum = self.ssh_cmd + ['md5sum %s | %s %s %s' % (self.remote_data_fifo, self.remoteNetCatCmd, self.localIPAddress, port_md5)] + logger.info('ltacp %s: remote starting computation of md5 checksum. executing: %s' % (self.logId, ' '.join(cmd_remote_checksum))) + p_remote_checksum = Popen(cmd_remote_checksum, stdin=devnull, stdout=PIPE, stderr=PIPE) + self.started_procs[p_remote_checksum] = cmd_remote_checksum + + # timedelta.total_seconds is only available for python >= 2.7 + def timedelta_total_seconds(td): + return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / float(10**6) + + # waiting for output, comparing checksums, etc. + logger.info('ltacp %s: transfering...' % self.logId) + transfer_start_time = datetime.utcnow() + prev_progress_time = datetime.utcnow() + prev_bytes_transfered = 0 + + # wait and poll for progress while all processes are runnning + while len([p for p in self.started_procs.keys() if p.poll() is not None]) == 0: + try: + # read and process tar stdout lines to create progress messages + nextline = p_remote_data.stdout.readline() + if len(nextline) > 0: + record_nr = int(nextline.split()[-1].strip()) + total_bytes_transfered = record_nr * tar_record_size + percentage_done = 100.0*total_bytes_transfered/input_datasize + current_progress_time = datetime.utcnow() + elapsed_secs_since_start = timedelta_total_seconds(current_progress_time - transfer_start_time) + elapsed_secs_since_prev = timedelta_total_seconds(current_progress_time - prev_progress_time) + if percentage_done > 0 and elapsed_secs_since_start > 0 and elapsed_secs_since_prev > 0: + avg_speed = total_bytes_transfered / elapsed_secs_since_start + current_bytes_transfered = total_bytes_transfered - prev_bytes_transfered + current_speed = current_bytes_transfered / elapsed_secs_since_prev + if elapsed_secs_since_prev > 60 or current_bytes_transfered > 0.05*input_datasize: + prev_progress_time = current_progress_time + prev_bytes_transfered = total_bytes_transfered + percentage_to_go = 100.0 - percentage_done + time_to_go = elapsed_secs_since_start * percentage_to_go / percentage_done + logger.info('ltacp %s: transfered %d bytes, %s, %.1f%% at avgSpeed=%s curSpeed=%s to_go=%s' % (self.logId, + total_bytes_transfered, + humanreadablesize(total_bytes_transfered), + percentage_done, + humanreadablesize(avg_speed, 'Bps'), + humanreadablesize(current_speed, 'Bps'), + timedelta(seconds=int(round(time_to_go))))) + time.sleep(0.25) + except KeyboardInterrupt: + self.cleanup() + + logger.info('ltacp %s: waiting for transfer via globus-url-copy to LTA to finish...' % self.logId) + output_data_out = p_data_out.communicate() + if p_data_out.returncode != 0: + raise LtacpException('ltacp %s: transfer via globus-url-copy to LTA failed: %s' % (self.logId, output_data_out[1])) + logger.info('ltacp %s: data transfer via globus-url-copy to LTA complete.' % self.logId) + + logger.info('ltacp %s: waiting for remote data transfer to finish...' % self.logId) + output_remote_data = p_remote_data.communicate() + if p_remote_data.returncode != 0: + raise LtacpException('ltacp %s: Error in remote data transfer: %s' % (self.logId, output_remote_data[1])) + logger.debug('ltacp %s: remote data transfer finished...' % self.logId) + + logger.info('ltacp %s: waiting for remote md5 checksum computation to finish...' % self.logId) + output_remote_checksum = p_remote_checksum.communicate() + if p_remote_checksum.returncode != 0: + raise LtacpException('ltacp %s: Error in remote md5 checksum computation: %s' % (self.logId, output_remote_checksum[1])) + logger.debug('ltacp %s: remote md5 checksum computation finished.' % self.logId) logger.debug('ltacp %s: waiting to receive remote md5 checksum...' % self.logId) output_md5_receive = p_md5_receive.communicate() -- GitLab