Skip to content
Snippets Groups Projects
Commit 5daf8208 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-2829: fixed creation of parent dir

parent 7c45fee9
No related branches found
No related tags found
No related merge requests found
......@@ -63,8 +63,8 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
# cache to reduce rest-calls, maps producer_id to tuple of producing subtask id and cluster name
_cache = {}
# cache to reduce ssh/mkdir calls
_created_destination_dirs = set()
# cache to reduce (ssh) mkdir calls. Only create parent dirs once.
_created_dir_cache = set()
for input_dataproduct in self.tmss_client.get_subtask_input_dataproducts(subtask['id']):
# fetch producing subtask id and cluster name for cache if needed
......@@ -82,34 +82,33 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
('L'+str(_cache[input_dataproduct['producer']]['producing_subtask_id'])) if subtask['specifications_doc'].get('group_by_id', True) else '',
input_dataproduct['filename'])
# strip unneeded localhost as destination host to prevent under-the-hood ssh wrapping in rsync, and to prevent unneed ssh calls
destination = destination.lstrip('localhost:').lstrip('127.0.0.1:')
# split in host & path
dst = subtask['specifications_doc']['destination']
dst_host = dst[:dst.find(':')] if ':' in dst else ''
dst_parent_dir_path = destination.lstrip(dst_host+':').rstrip(input_dataproduct['filename'])
# create dst_parent_dir_path directories if needed
if dst_parent_dir_path not in _created_destination_dirs:
cmd = ['mkdir', '-p', dst_parent_dir_path]
if dst_parent_dir_path not in _created_dir_cache:
# create dst_parent_dir_path directories if needed, prepend them to the cmd
mkdir_cmd = ['mkdir', '-p', dst_parent_dir_path]
if dst_host:
cmd = wrap_command_in_ssh_call(cmd, dst_host)
mkdir_cmd = wrap_command_in_ssh_call(mkdir_cmd, dst_host)
logger.info("creating destination directory if needed copy subtask id=%s, executing: %s", subtask['id'], ' '.join(cmd))
logger.info("creating parent destination dir if needed for copy subtask id=%s, executing: %s", subtask['id'], ' '.join(mkdir_cmd))
if call(cmd) == 0:
logger.info("created destination directory '%s' for copy subtask id=%s", destination, subtask['id'])
_created_destination_dirs.add(dst_parent_dir_path)
if call(mkdir_cmd) == 0:
_created_dir_cache.add(dst_parent_dir_path)
else:
msg = "could not create destination directory '%s' for copy subtask id=%s" %(destination, subtask['id'])
msg = "could not create parent destination dir '%s' for copy subtask id=%s" % (dst_parent_dir_path, subtask['id'])
logger.error(msg)
self.tmss_client.set_subtask_status(id, 'error', error_reason=msg)
self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg)
return
# strip unneeded localhost as destination host to prevent under-the-hood ssh wrapping in rsync, and to prevent unneed ssh calls
destination = destination.lstrip('localhost:').lstrip('127.0.0.1:')
# prepare the actual copy command
cmd = ['rsync', '-a', input_dataproduct['filepath'].rstrip('/'), dst_parent_dir_path]
cmd = ['rsync', '-a', input_dataproduct['filepath'].rstrip('/'), destination]
# wrap in cep4 ssh call if cep4
cluster_name = _cache[input_dataproduct['producer']]['cluster_name']
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment