Skip to content
Snippets Groups Projects
Commit 8d121a08 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8725: logging, flow, error handling, etc

parent 07fc9768
No related branches found
No related tags found
No related merge requests found
...@@ -218,9 +218,10 @@ def transfer(src_host, ...@@ -218,9 +218,10 @@ def transfer(src_host,
# waiting for output, comparing checksums, etc. # waiting for output, comparing checksums, etc.
logger.info('ltacp %s: waiting for transfer to finish...' % src_filename) logger.info('ltacp %s: waiting for remote data transfer to finish...' % src_filename)
output_remote_data = p_remote_data.communicate() output_remote_data = p_remote_data.communicate()
logger.debug('ltacp %s: remote data transfer finished...' % src_filename) logger.debug('ltacp %s: remote data transfer finished...' % src_filename)
logger.info('ltacp %s: waiting for remote md5 checksum transfer to finish...' % src_filename)
output_remote_checksum = p_remote_checksum.communicate() output_remote_checksum = p_remote_checksum.communicate()
logger.debug('ltacp %s: remote md5 checksum transfer finished.' % src_filename) logger.debug('ltacp %s: remote md5 checksum transfer finished.' % src_filename)
...@@ -286,46 +287,49 @@ def transfer(src_host, ...@@ -286,46 +287,49 @@ def transfer(src_host,
logger.info('ltacp %s: removing local data fifo for globus-url-copy: %s' % (src_filename, local_data_fifo)) logger.info('ltacp %s: removing local data fifo for globus-url-copy: %s' % (src_filename, local_data_fifo))
os.remove(local_data_fifo) os.remove(local_data_fifo)
logger.debug('ltacp %s: waiting for subprocesses to complete...' % src_filename)
# cancel any started hanging process, as they should all be finished by now # cancel any started hanging process, as they should all be finished by now
for p,cl in started_procs.items(): hanging_procs = dict((p, cl) for (p, cl) in started_procs.items() if p.poll() == None)
if p.poll() == None:
if len(hanging_procs):
logger.warning('ltacp %s: terminating %d hanging subprocesses...' % (src_filename, len(hanging_procs)))
for p,cl in hanging_procs.items():
logger.warning('ltacp %s: terminated hanging process pid=%d cmdline=%s' % (src_filename, p.pid, cl)) logger.warning('ltacp %s: terminated hanging process pid=%d cmdline=%s' % (src_filename, p.pid, cl))
p.terminate() p.terminate()
logger.info('ltacp %s: terminated %d hanging subprocesses...' % (src_filename, len(hanging_procs)))
logger.info('ltacp %s: successfully completed transfer of %s:%s to %s' % (src_filename, src_host, src_path_data, dst_surl)) logger.info('ltacp %s: successfully completed transfer of %s:%s to %s' % (src_filename, src_host, src_path_data, dst_surl))
return (md5_checksum_local, a32_checksum_local) return (md5_checksum_local, a32_checksum_local)
# execute command and optionally return exit code or output streams # execute command and return (stdout, stderr, returncode) tuple
def execute(cmd, return_output=False): def execute(cmd):
logger.info('executing: %s' % ' '.join(cmd)) logger.info('executing: %s' % ' '.join(cmd))
p_cmd = Popen(cmd, stdout=PIPE, stderr=PIPE) p_cmd = Popen(cmd, stdout=PIPE, stderr=PIPE)
output_cmd = p_cmd.communicate() stdout, stderr = p_cmd.communicate()
if return_output: return (stdout, stderr, p_cmd.returncode)
return output_cmd
else:
return p_cmd.returncode
# remove file from srm # remove file from srm
def srmrm(surl): def srmrm(surl):
return execute(['srmrm', surl]) return execute(['srmrm', surl])[2]
# remove (empty) directory from srm # remove (empty) directory from srm
def srmrmdir(surl): def srmrmdir(surl):
return execute(['srmrmdir', surl]) return execute(['srmrmdir', surl])[2]
# remove file from srm # remove file from srm
def srmll(surl): def srmll(surl):
return execute(['srmls', '-l', surl], return_output=True) return execute(['srmls', '-l', surl])
# get checksum from srm via srmls # get checksum from srm via srmls
def get_srm_a32_checksum(surl): def get_srm_a32_checksum(surl):
output = srmll(surl)[0] output, errors, code = srmll(surl)
if code != 0:
return False
if not 'Checksum type:' in output: if not 'Checksum type:' in output:
return False return False
...@@ -348,7 +352,7 @@ def create_missing_directories(surl): ...@@ -348,7 +352,7 @@ def create_missing_directories(surl):
# determine missing dirs # determine missing dirs
while parent: while parent:
code = execute(['srmls', parent]) code = execute(['srmls', parent])[2]
if code == 0: if code == 0:
logger.info('ltacp %s: srmls returned successfully, so this path apparently exists: %s' % parent) logger.info('ltacp %s: srmls returned successfully, so this path apparently exists: %s' % parent)
break; break;
...@@ -359,7 +363,7 @@ def create_missing_directories(surl): ...@@ -359,7 +363,7 @@ def create_missing_directories(surl):
# recreate missing dirs # recreate missing dirs
while len(missing) > 0: while len(missing) > 0:
parent = parent + '/' + missing.pop() parent = parent + '/' + missing.pop()
code = execute(['srmmkdir',"-retry_num=0",parent]) code = execute(['srmmkdir',"-retry_num=0",parent])[2]
if code != 0: if code != 0:
logger.info('ltacp %s: failed to create missing directory: %s' % parent) logger.info('ltacp %s: failed to create missing directory: %s' % parent)
return code return code
...@@ -372,6 +376,10 @@ def create_missing_directories(surl): ...@@ -372,6 +376,10 @@ def create_missing_directories(surl):
# usage: ltacp.py <remote-host> <remote-path> <surl> # usage: ltacp.py <remote-host> <remote-path> <surl>
if __name__ == '__main__': if __name__ == '__main__':
if len(sys.argv) < 4:
print 'example: ./ltacp.py 10.196.232.11 /home/users/ingest/1M.txt srm://lofar-srm.fz-juelich.de:8443/pnfs/fz-juelich.de/data/lofar/ops/test/eor/1M.txt'
sys.exit()
# transfer test: # transfer test:
transfer(sys.argv[1], sys.argv[2], sys.argv[3], 'ingest') transfer(sys.argv[1], sys.argv[2], sys.argv[3], 'ingest')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment