diff --git a/LTA/LTAIngest/ltacp.py b/LTA/LTAIngest/ltacp.py index 215ac80b39c52e09e9d9ea734636615d01336320..257a0de052643053d3b92f9b3e6dd598a9a1d1bb 100644 --- a/LTA/LTAIngest/ltacp.py +++ b/LTA/LTAIngest/ltacp.py @@ -7,10 +7,21 @@ # Between the remote and local host, md5 checksums are used to ensure integrity of the file, # adler32 is used between localhost and the SRM. - +import logging from subprocess import Popen, PIPE from socket import getfqdn import os, sys, getpass +import time + +log_handler = logging.StreamHandler() +formatter = logging.Formatter('%(asctime)-15s %(levelname)s %(message)s') +formatter.converter = time.gmtime +log_handler.setFormatter(formatter) +logger = logging.getLogger('Slave') +logger.addHandler(log_handler) +logger.setLevel(logging.INFO) + +logger = logging.getLogger('Slave') class LtacpException(Exception): def __init__(self, value): @@ -34,7 +45,7 @@ def convert_surl_to_turl(surl): def removeRemoteFile(user, host, filepath): '''remove a file (or fifo) on a remote host. Test if file exists before deleting.''' cmd_remote_rm = ['ssh %s@%s \'if [ -e "%s" ] ; then rm %s ; fi ;\'' % (user, host, filepath, filepath)] - print 'remote removing file if existing. executing: ', ' '.join(cmd_remote_rm) + logger.debug('ltacp: remote removing file if existing. executing: %s' % ' '.join(cmd_remote_rm)) p_remote_rm = Popen(cmd_remote_rm, shell=True, stdout=PIPE, stderr=PIPE) p_remote_rm.communicate() if p_remote_rm.returncode != 0: @@ -52,7 +63,7 @@ def transfer(src_host, ): dst_turl = convert_surl_to_turl(dst_surl) - print 'Initiating transfer of %s:%s to %s' % (src_host, src_path_data, dst_surl) + logger.info('ltacp: initiating transfer of %s:%s to %s' % (src_host, src_path_data, dst_surl)) # default return code code = 0 @@ -71,57 +82,57 @@ def transfer(src_host, #--- # create local fifo to stream data to globus-url-copy - print 'creating data fifo for globus-url-copy: ', local_data_fifo + logger.info('ltacp: creating data fifo for globus-url-copy: %s' % local_data_fifo) if os.path.exists(local_data_fifo): os.remove(local_data_fifo) os.mkfifo(local_data_fifo) # create local fifo to stream data to adler32 local_adler32_fifo = local_data_fifo+'_adler32' - print 'creating data fifo for adler32: ', local_adler32_fifo + logger.info('ltacp: creating data fifo for adler32: %s' % local_adler32_fifo) if os.path.exists(local_adler32_fifo): os.remove(local_adler32_fifo) os.mkfifo(local_adler32_fifo) # start listen for data stream cmd_data_in = ['nc', '-l', '-q','0', port_data] - print 'listening for data. executing: ', ' '.join(cmd_data_in) + logger.info('ltacp: listening for data. executing: %s' % ' '.join(cmd_data_in)) p_data_in = Popen(cmd_data_in, stdout=PIPE) started_procs.append(p_data_in) # start listen for checksums cmd_md5_receive = ['nc','-l', '-q','0', port_md5] - print 'listening for checksums. executing: ', ' '.join(cmd_md5_receive) + logger.info('ltacp: listening for checksums. executing: %s' % ' '.join(cmd_md5_receive)) p_md5_receive = Popen(cmd_md5_receive, stdout=PIPE, stderr=PIPE) started_procs.append(p_md5_receive) # start tee incoming data stream to fifo (pipe stream further for checksum) cmd_tee_data = ['tee', local_data_fifo] - print 'splitting datastream 1. executing on stdout of data listener: %s' % (' '.join(cmd_tee_data),) + logger.info('ltacp: splitting datastream. executing on stdout of data listener: %s' % (' '.join(cmd_tee_data),)) p_tee_data = Popen(cmd_tee_data, stdin=p_data_in.stdout, stdout=PIPE) started_procs.append(p_tee_data) # start tee incoming data stream to fifo (pipe stream further for checksum) cmd_tee_checksums = ['tee', local_adler32_fifo] - print 'splitting datastream 2. executing: on stdout of 1st data tee: %s' % (' '.join(cmd_tee_checksums),) + logger.info('ltacp: splitting datastream again. executing: on stdout of 1st data tee: %s' % (' '.join(cmd_tee_checksums),)) p_tee_checksums = Popen(cmd_tee_checksums, stdin=p_tee_data.stdout, stdout=PIPE) started_procs.append(p_tee_checksums) # start computing md5 checksum of incoming data stream cmd_md5_local = ['md5sum'] - print 'computing local md5 checksum. executing on stdout of 2nd data tee: %s' % (' '.join(cmd_md5_local)) + logger.info('ltacp: computing local md5 checksum. executing on stdout of 2nd data tee: %s' % (' '.join(cmd_md5_local))) p_md5_local = Popen(cmd_md5_local, stdin=p_tee_checksums.stdout, stdout=PIPE, stderr=PIPE) started_procs.append(p_md5_local) # start computing adler checksum of incoming data stream cmd_a32_local = ['./md5adler/a32', local_adler32_fifo] - print 'computing local adler32 checksum. executing: ', ' '.join(cmd_a32_local) + logger.info('ltacp: computing local adler32 checksum. executing: %s' % ' '.join(cmd_a32_local)) p_a32_local = Popen(cmd_a32_local, stdout=PIPE, stderr=PIPE) started_procs.append(p_a32_local) # start copy fifo stream to SRM cmd_data_out = ['globus-url-copy', local_data_fifo, dst_turl] - print 'copying data stream into globus-url-copy. executing: ', ' '.join(cmd_data_out) + logger.info('ltacp: copying data stream into globus-url-copy. executing: %s' % ' '.join(cmd_data_out)) p_data_out = Popen(cmd_data_out, stdout=PIPE, stderr=PIPE) started_procs.append(p_data_out) @@ -146,7 +157,7 @@ def transfer(src_host, cmd_remote_mkfifo = ['ssh '+src_user+'@'+src_host+ ' \'mkfifo '+remote_data_fifo+ '\''] - print 'remote creating fifo. executing: ', ' '.join(cmd_remote_mkfifo) + logger.info('ltacp: remote creating fifo. executing: %s' % (' '.join(cmd_remote_mkfifo))) p_remote_mkfifo = Popen(cmd_remote_mkfifo, shell=True, stdout=PIPE, stderr=PIPE) started_procs.append(p_remote_mkfifo) @@ -163,7 +174,7 @@ def transfer(src_host, ' \'cd '+src_path_parent+ ' ; tar c -O '+src_path_child+' | tee '+remote_data_fifo+' | nc --send-only '+getfqdn()+' '+port_data+ '\''] - print 'remote starting transfer. executing: ', ' '.join(cmd_remote_data) + logger.info('ltacp: remote starting transfer. executing: %s' % ' '.join(cmd_remote_data)) p_remote_data = Popen(cmd_remote_data, shell=True, stdout=PIPE, stderr=PIPE) started_procs.append(p_remote_data) @@ -171,22 +182,22 @@ def transfer(src_host, cmd_remote_checksum = ['ssh '+src_user+'@'+src_host+ ' \'cat '+remote_data_fifo+' | md5sum | nc --send-only '+getfqdn()+' '+port_md5+ '\''] - print 'remote starting md5 checksum. executing: ', ' '.join(cmd_remote_checksum) + logger.info('ltacp: remote starting md5 checksum. executing: %s' % ' '.join(cmd_remote_checksum)) p_remote_checksum = Popen(cmd_remote_checksum, shell=True, stdout=PIPE, stderr=PIPE) started_procs.append(p_remote_checksum) # waiting for output, comparing checksums, etc. - print "Waiting for transfer to finish..." + logger.debug('ltacp: waiting for transfer to finish...') output_remote_data = p_remote_data.communicate() - print "remote data transfer finished..." + logger.debug('ltacp: remote data transfer finished...') output_remote_checksum = p_remote_checksum.communicate() - print "remote md5 checksum transfer finished." + logger.debug('ltacp: remote md5 checksum transfer finished.') if p_remote_data.returncode == 0 and p_remote_checksum.returncode == 0: - print "Waiting for remote md5 checksum..." + logger.debug('ltacp: waiting for remote md5 checksum...') output_md5_remote = p_md5_receive.communicate() - print "Waiting for local md5 checksum..." + logger.debug('ltacp: waiting for local md5 checksum...') output_md5_local = p_md5_local.communicate() if p_md5_receive.returncode == 0 and p_md5_local.returncode == 0: @@ -194,23 +205,23 @@ def transfer(src_host, md5_checksum_local = output_md5_local[0].split()[0] if(md5_checksum_remote == md5_checksum_local): - print 'remote and local md5 checksums are equal: %s' % (md5_checksum_local,) + logger.info('ltacp: remote and local md5 checksums are equal: %s' % (md5_checksum_local,)) else: raise LtacpException('md5 checksum reported by client (%s) does not match local checksum of incoming data stream (%s)' % (md5_checksum_remote, md5_checksum_local)) - print "Waiting for transfer via globus-url-copy to LTA to finish..." + logger.debug('ltacp: waiting for transfer via globus-url-copy to LTA to finish...') output_data_out = p_data_out.communicate() if p_data_out.returncode == 0: - print "Transfer to LTA complete." + logger.info('ltacp: data transfer to LTA complete.') - print "Waiting for local adler32 checksum to complete..." + logger.debug('ltacp: waiting for local adler32 checksum to complete...') output_a32_local = p_a32_local.communicate() a32_checksum_local = output_a32_local[0].split()[1] if p_a32_local.returncode != 0: raise LtacpException('local adler32 checksum computation failed: '+str(output_a32_local)) - print "Fetching adler32 checksum from LTA..." + logger.debug('ltacp: fetching adler32 checksum from LTA...') srm_a32_checksum = get_srm_a32_checksum(dst_surl) if not srm_a32_checksum: @@ -219,8 +230,8 @@ def transfer(src_host, if(srm_a32_checksum != a32_checksum_local): raise LtacpException('adler32 checksum reported by srm ('+srm_a32_checksum+') does not match original data checksum ('+a32_checksum_local+')') - print "adler32 checksums are equal: ", a32_checksum_local - print "Transfer to LTA completed successfully." + logger.info('ltacp: adler32 checksums are equal: %s' % a32_checksum_local) + logger.info('ltacp: transfer to LTA completed successfully.') else: raise LtacpException('Transfer to SRM failed: '+output_data_out[1]) else: @@ -230,7 +241,7 @@ def transfer(src_host, except LtacpException as e: # Something went wrong - print "! Fatal Error: ", str(e) + logger.error('ltacp: ! Fatal Error: %s' % str(e)) code = 1 # --- @@ -241,23 +252,23 @@ def transfer(src_host, removeRemoteFile(src_user, src_host, remote_data_fifo) # remove local data fifo - print 'removing local data fifo for globus-url-copy: ', local_data_fifo + logger.info('ltacp: removing local data fifo for globus-url-copy: %s' % local_data_fifo) os.remove(local_data_fifo) - print 'waiting for subprocesses to complete...' + logger.info('ltacp: waiting for subprocesses to complete...') # cancel any started process for p in started_procs: if p.poll() == None: p.terminate() - print 'terminated', p.pid + logger.info('ltacp: terminated', p.pid) - print 'Successfully completed transfer of %s:%s to %s' % (src_host, src_path_data, dst_surl) + logger.info('ltacp: successfully completed transfer of %s:%s to %s' % (src_host, src_path_data, dst_surl)) return code # execute command and optionally return exit code or output streams def execute(cmd, return_output=False): - print 'executing: ', ' '.join(cmd) + logger.info('ltacp: executing: %s' % ' '.join(cmd)) p_cmd = Popen(cmd, stdout=PIPE, stderr=PIPE) output_cmd = p_cmd.communicate() if return_output: @@ -307,7 +318,7 @@ def create_missing_directories(surl): while parent: code = execute(['srmls', parent]) if code == 0: - print "srmls returned successfully, so this path apparently exists:", parent + logger.info('ltacp: srmls returned successfully, so this path apparently exists: %s' % parent) break; else: parent, child = os.path.split(parent) @@ -318,10 +329,10 @@ def create_missing_directories(surl): parent = parent + '/' + missing.pop() code = execute(['srmmkdir',"-retry_num=0",parent]) if code != 0: - print "failed to create missing directory:",parent + logger.info('ltacp: failed to create missing directory: %s' % parent) return code - print "Successfully created parent directory:", parent + logger.info('ltacp: successfully created parent directory: %s' % parent) return 0