Skip to content
Snippets Groups Projects
Commit 4ee8bbd5 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8725: use python logger

parent a1452fd9
No related branches found
No related tags found
No related merge requests found
......@@ -7,10 +7,21 @@
# Between the remote and local host, md5 checksums are used to ensure integrity of the file,
# adler32 is used between localhost and the SRM.
import logging
from subprocess import Popen, PIPE
from socket import getfqdn
import os, sys, getpass
import time
log_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)-15s %(levelname)s %(message)s')
formatter.converter = time.gmtime
log_handler.setFormatter(formatter)
logger = logging.getLogger('Slave')
logger.addHandler(log_handler)
logger.setLevel(logging.INFO)
logger = logging.getLogger('Slave')
class LtacpException(Exception):
def __init__(self, value):
......@@ -34,7 +45,7 @@ def convert_surl_to_turl(surl):
def removeRemoteFile(user, host, filepath):
'''remove a file (or fifo) on a remote host. Test if file exists before deleting.'''
cmd_remote_rm = ['ssh %s@%s \'if [ -e "%s" ] ; then rm %s ; fi ;\'' % (user, host, filepath, filepath)]
print 'remote removing file if existing. executing: ', ' '.join(cmd_remote_rm)
logger.debug('ltacp: remote removing file if existing. executing: %s' % ' '.join(cmd_remote_rm))
p_remote_rm = Popen(cmd_remote_rm, shell=True, stdout=PIPE, stderr=PIPE)
p_remote_rm.communicate()
if p_remote_rm.returncode != 0:
......@@ -52,7 +63,7 @@ def transfer(src_host,
):
dst_turl = convert_surl_to_turl(dst_surl)
print 'Initiating transfer of %s:%s to %s' % (src_host, src_path_data, dst_surl)
logger.info('ltacp: initiating transfer of %s:%s to %s' % (src_host, src_path_data, dst_surl))
# default return code
code = 0
......@@ -71,57 +82,57 @@ def transfer(src_host,
#---
# create local fifo to stream data to globus-url-copy
print 'creating data fifo for globus-url-copy: ', local_data_fifo
logger.info('ltacp: creating data fifo for globus-url-copy: %s' % local_data_fifo)
if os.path.exists(local_data_fifo):
os.remove(local_data_fifo)
os.mkfifo(local_data_fifo)
# create local fifo to stream data to adler32
local_adler32_fifo = local_data_fifo+'_adler32'
print 'creating data fifo for adler32: ', local_adler32_fifo
logger.info('ltacp: creating data fifo for adler32: %s' % local_adler32_fifo)
if os.path.exists(local_adler32_fifo):
os.remove(local_adler32_fifo)
os.mkfifo(local_adler32_fifo)
# start listen for data stream
cmd_data_in = ['nc', '-l', '-q','0', port_data]
print 'listening for data. executing: ', ' '.join(cmd_data_in)
logger.info('ltacp: listening for data. executing: %s' % ' '.join(cmd_data_in))
p_data_in = Popen(cmd_data_in, stdout=PIPE)
started_procs.append(p_data_in)
# start listen for checksums
cmd_md5_receive = ['nc','-l', '-q','0', port_md5]
print 'listening for checksums. executing: ', ' '.join(cmd_md5_receive)
logger.info('ltacp: listening for checksums. executing: %s' % ' '.join(cmd_md5_receive))
p_md5_receive = Popen(cmd_md5_receive, stdout=PIPE, stderr=PIPE)
started_procs.append(p_md5_receive)
# start tee incoming data stream to fifo (pipe stream further for checksum)
cmd_tee_data = ['tee', local_data_fifo]
print 'splitting datastream 1. executing on stdout of data listener: %s' % (' '.join(cmd_tee_data),)
logger.info('ltacp: splitting datastream. executing on stdout of data listener: %s' % (' '.join(cmd_tee_data),))
p_tee_data = Popen(cmd_tee_data, stdin=p_data_in.stdout, stdout=PIPE)
started_procs.append(p_tee_data)
# start tee incoming data stream to fifo (pipe stream further for checksum)
cmd_tee_checksums = ['tee', local_adler32_fifo]
print 'splitting datastream 2. executing: on stdout of 1st data tee: %s' % (' '.join(cmd_tee_checksums),)
logger.info('ltacp: splitting datastream again. executing: on stdout of 1st data tee: %s' % (' '.join(cmd_tee_checksums),))
p_tee_checksums = Popen(cmd_tee_checksums, stdin=p_tee_data.stdout, stdout=PIPE)
started_procs.append(p_tee_checksums)
# start computing md5 checksum of incoming data stream
cmd_md5_local = ['md5sum']
print 'computing local md5 checksum. executing on stdout of 2nd data tee: %s' % (' '.join(cmd_md5_local))
logger.info('ltacp: computing local md5 checksum. executing on stdout of 2nd data tee: %s' % (' '.join(cmd_md5_local)))
p_md5_local = Popen(cmd_md5_local, stdin=p_tee_checksums.stdout, stdout=PIPE, stderr=PIPE)
started_procs.append(p_md5_local)
# start computing adler checksum of incoming data stream
cmd_a32_local = ['./md5adler/a32', local_adler32_fifo]
print 'computing local adler32 checksum. executing: ', ' '.join(cmd_a32_local)
logger.info('ltacp: computing local adler32 checksum. executing: %s' % ' '.join(cmd_a32_local))
p_a32_local = Popen(cmd_a32_local, stdout=PIPE, stderr=PIPE)
started_procs.append(p_a32_local)
# start copy fifo stream to SRM
cmd_data_out = ['globus-url-copy', local_data_fifo, dst_turl]
print 'copying data stream into globus-url-copy. executing: ', ' '.join(cmd_data_out)
logger.info('ltacp: copying data stream into globus-url-copy. executing: %s' % ' '.join(cmd_data_out))
p_data_out = Popen(cmd_data_out, stdout=PIPE, stderr=PIPE)
started_procs.append(p_data_out)
......@@ -146,7 +157,7 @@ def transfer(src_host,
cmd_remote_mkfifo = ['ssh '+src_user+'@'+src_host+
' \'mkfifo '+remote_data_fifo+
'\'']
print 'remote creating fifo. executing: ', ' '.join(cmd_remote_mkfifo)
logger.info('ltacp: remote creating fifo. executing: %s' % (' '.join(cmd_remote_mkfifo)))
p_remote_mkfifo = Popen(cmd_remote_mkfifo, shell=True, stdout=PIPE, stderr=PIPE)
started_procs.append(p_remote_mkfifo)
......@@ -163,7 +174,7 @@ def transfer(src_host,
' \'cd '+src_path_parent+
' ; tar c -O '+src_path_child+' | tee '+remote_data_fifo+' | nc --send-only '+getfqdn()+' '+port_data+
'\'']
print 'remote starting transfer. executing: ', ' '.join(cmd_remote_data)
logger.info('ltacp: remote starting transfer. executing: %s' % ' '.join(cmd_remote_data))
p_remote_data = Popen(cmd_remote_data, shell=True, stdout=PIPE, stderr=PIPE)
started_procs.append(p_remote_data)
......@@ -171,22 +182,22 @@ def transfer(src_host,
cmd_remote_checksum = ['ssh '+src_user+'@'+src_host+
' \'cat '+remote_data_fifo+' | md5sum | nc --send-only '+getfqdn()+' '+port_md5+
'\'']
print 'remote starting md5 checksum. executing: ', ' '.join(cmd_remote_checksum)
logger.info('ltacp: remote starting md5 checksum. executing: %s' % ' '.join(cmd_remote_checksum))
p_remote_checksum = Popen(cmd_remote_checksum, shell=True, stdout=PIPE, stderr=PIPE)
started_procs.append(p_remote_checksum)
# waiting for output, comparing checksums, etc.
print "Waiting for transfer to finish..."
logger.debug('ltacp: waiting for transfer to finish...')
output_remote_data = p_remote_data.communicate()
print "remote data transfer finished..."
logger.debug('ltacp: remote data transfer finished...')
output_remote_checksum = p_remote_checksum.communicate()
print "remote md5 checksum transfer finished."
logger.debug('ltacp: remote md5 checksum transfer finished.')
if p_remote_data.returncode == 0 and p_remote_checksum.returncode == 0:
print "Waiting for remote md5 checksum..."
logger.debug('ltacp: waiting for remote md5 checksum...')
output_md5_remote = p_md5_receive.communicate()
print "Waiting for local md5 checksum..."
logger.debug('ltacp: waiting for local md5 checksum...')
output_md5_local = p_md5_local.communicate()
if p_md5_receive.returncode == 0 and p_md5_local.returncode == 0:
......@@ -194,23 +205,23 @@ def transfer(src_host,
md5_checksum_local = output_md5_local[0].split()[0]
if(md5_checksum_remote == md5_checksum_local):
print 'remote and local md5 checksums are equal: %s' % (md5_checksum_local,)
logger.info('ltacp: remote and local md5 checksums are equal: %s' % (md5_checksum_local,))
else:
raise LtacpException('md5 checksum reported by client (%s) does not match local checksum of incoming data stream (%s)' % (md5_checksum_remote, md5_checksum_local))
print "Waiting for transfer via globus-url-copy to LTA to finish..."
logger.debug('ltacp: waiting for transfer via globus-url-copy to LTA to finish...')
output_data_out = p_data_out.communicate()
if p_data_out.returncode == 0:
print "Transfer to LTA complete."
logger.info('ltacp: data transfer to LTA complete.')
print "Waiting for local adler32 checksum to complete..."
logger.debug('ltacp: waiting for local adler32 checksum to complete...')
output_a32_local = p_a32_local.communicate()
a32_checksum_local = output_a32_local[0].split()[1]
if p_a32_local.returncode != 0:
raise LtacpException('local adler32 checksum computation failed: '+str(output_a32_local))
print "Fetching adler32 checksum from LTA..."
logger.debug('ltacp: fetching adler32 checksum from LTA...')
srm_a32_checksum = get_srm_a32_checksum(dst_surl)
if not srm_a32_checksum:
......@@ -219,8 +230,8 @@ def transfer(src_host,
if(srm_a32_checksum != a32_checksum_local):
raise LtacpException('adler32 checksum reported by srm ('+srm_a32_checksum+') does not match original data checksum ('+a32_checksum_local+')')
print "adler32 checksums are equal: ", a32_checksum_local
print "Transfer to LTA completed successfully."
logger.info('ltacp: adler32 checksums are equal: %s' % a32_checksum_local)
logger.info('ltacp: transfer to LTA completed successfully.')
else:
raise LtacpException('Transfer to SRM failed: '+output_data_out[1])
else:
......@@ -230,7 +241,7 @@ def transfer(src_host,
except LtacpException as e:
# Something went wrong
print "! Fatal Error: ", str(e)
logger.error('ltacp: ! Fatal Error: %s' % str(e))
code = 1
# ---
......@@ -241,23 +252,23 @@ def transfer(src_host,
removeRemoteFile(src_user, src_host, remote_data_fifo)
# remove local data fifo
print 'removing local data fifo for globus-url-copy: ', local_data_fifo
logger.info('ltacp: removing local data fifo for globus-url-copy: %s' % local_data_fifo)
os.remove(local_data_fifo)
print 'waiting for subprocesses to complete...'
logger.info('ltacp: waiting for subprocesses to complete...')
# cancel any started process
for p in started_procs:
if p.poll() == None:
p.terminate()
print 'terminated', p.pid
logger.info('ltacp: terminated', p.pid)
print 'Successfully completed transfer of %s:%s to %s' % (src_host, src_path_data, dst_surl)
logger.info('ltacp: successfully completed transfer of %s:%s to %s' % (src_host, src_path_data, dst_surl))
return code
# execute command and optionally return exit code or output streams
def execute(cmd, return_output=False):
print 'executing: ', ' '.join(cmd)
logger.info('ltacp: executing: %s' % ' '.join(cmd))
p_cmd = Popen(cmd, stdout=PIPE, stderr=PIPE)
output_cmd = p_cmd.communicate()
if return_output:
......@@ -307,7 +318,7 @@ def create_missing_directories(surl):
while parent:
code = execute(['srmls', parent])
if code == 0:
print "srmls returned successfully, so this path apparently exists:", parent
logger.info('ltacp: srmls returned successfully, so this path apparently exists: %s' % parent)
break;
else:
parent, child = os.path.split(parent)
......@@ -318,10 +329,10 @@ def create_missing_directories(surl):
parent = parent + '/' + missing.pop()
code = execute(['srmmkdir',"-retry_num=0",parent])
if code != 0:
print "failed to create missing directory:",parent
logger.info('ltacp: failed to create missing directory: %s' % parent)
return code
print "Successfully created parent directory:", parent
logger.info('ltacp: successfully created parent directory: %s' % parent)
return 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment