Skip to content
Snippets Groups Projects
Commit 6b9ec390 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-2829: create destination parent dir(s) if needed

parent ab0038e3
Branches
Tags
No related merge requests found
......@@ -50,9 +50,10 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
self.tmss_client.set_subtask_status(id, 'starting')
self.tmss_client.set_subtask_status(id, 'started')
# cache to reduced rest-calls
# maps producer_id to tuple of producing subtask id and cluster name
# cache to reduce rest-calls, maps producer_id to tuple of producing subtask id and cluster name
_cache = {}
# cache to reduce ssh/mkdir calls
_created_destination_dirs = set()
for input_dataproduct in self.tmss_client.get_subtask_input_dataproducts(subtask['id']):
# fetch producing subtask id and cluster name for cache if needed
......@@ -64,17 +65,39 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
if subtask['specifications_doc'].get('managed_output', False):
output_dataproduct = self.tmss_client.get_subtask_transformed_output_dataproduct(subtask['id'], input_dataproduct['id'])
destination_path = output_dataproduct['filepath']
destination = output_dataproduct['filepath']
else:
destination_path = os.path.join(subtask['specifications_doc']['destination'].rstrip('/'),
destination = os.path.join(subtask['specifications_doc']['destination'].rstrip('/'),
('L'+str(_cache[input_dataproduct['producer']]['producing_subtask_id'])) if subtask['specifications_doc'].get('group_by_id', True) else '',
input_dataproduct['filename'])
# strip localhost as destination host to save on ssh wrapping
destination_path = destination_path.lstrip('localhost:').lstrip('127.0.0.1:')
# strip unneeded localhost as destination host to prevent under-the-hood ssh wrapping in rsync, and to prevent unneed ssh calls
destination = destination.lstrip('localhost:').lstrip('127.0.0.1:')
# split in host & path
dst_host = destination[:destination.find(':')] if ':' in destination else ''
dst_parent_dir_path = destination.lstrip(dst_host+':').rstrip(input_dataproduct['filename'])
# create dst_parent_dir_path directories if needed
if dst_parent_dir_path not in _created_destination_dirs:
cmd = ['mkdir', '-p', dst_parent_dir_path]
if dst_host:
cmd = wrap_command_in_ssh_call(cmd, dst_host)
logger.info("creating destination directory if needed copy subtask id=%s, executing: %s", subtask['id'], ' '.join(cmd))
if call(cmd) == 0:
logger.info("created destination directory '%s' for copy subtask id=%s", destination, subtask['id'])
_created_destination_dirs.add(dst_parent_dir_path)
else:
msg = "could not create destination directory '%s' for copy subtask id=%s" %(destination, subtask['id'])
logger.error(msg)
self.tmss_client.set_subtask_status(id, 'error', error_reason=msg)
return
# prepare the actual copy command
cmd = ['rsync', '-a', '--mkpath', input_dataproduct['filepath'], destination_path]
cmd = ['rsync', '-a', input_dataproduct['filepath'], destination]
# wrap in cep4 ssh call if cep4
cluster_name = _cache[input_dataproduct['producer']]['cluster_name']
......@@ -84,9 +107,9 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
logger.info("copying dataproduct id=%s for copy subtask id=%s, executing: %s", input_dataproduct['id'], subtask['id'], ' '.join(cmd))
if call(cmd) == 0:
logger.info("copied dataproduct id=%s for copy subtask id=%s to '%s'", input_dataproduct['id'], subtask['id'], destination_path)
logger.info("copied dataproduct id=%s for copy subtask id=%s to '%s'", input_dataproduct['id'], subtask['id'], destination)
else:
msg = "could not copy dataproduct id=%s for copy subtask id=%s to '%s'" % (input_dataproduct['id'], subtask['id'], destination_path)
msg = "could not copy dataproduct id=%s for copy subtask id=%s to '%s'" % (input_dataproduct['id'], subtask['id'], destination)
logger.error(msg)
self.tmss_client.set_subtask_status(id, 'error', error_reason=msg)
return
......
......@@ -68,12 +68,20 @@ class TestCopyService(unittest.TestCase):
def mocked_wrap_command_in_cep4_head_node_ssh_call(cmd, *args, **kwargs):
logger.info('mocked_wrap_command_in_cep4_head_node_ssh_call returning original command (without ssh): %s', ' '.join(cmd))
return cmd
wrap_command_in_cep4_head_node_ssh_call_patcher = mock.patch('lofar.sas.tmss.services.copy_service.wrap_command_in_cep4_available_node_with_lowest_load_ssh_call')
self.addCleanup(wrap_command_in_cep4_head_node_ssh_call_patcher.stop)
self.wrap_command_in_cep4_head_node_ssh_call_mock = wrap_command_in_cep4_head_node_ssh_call_patcher.start()
self.wrap_command_in_cep4_head_node_ssh_call_mock.side_effect = mocked_wrap_command_in_cep4_head_node_ssh_call
def mocked_wrap_command_in_ssh_call(cmd, *args, **kwargs):
logger.info('wrap_command_in_ssh_call returning original command (without ssh): %s', ' '.join(cmd))
return cmd
mocked_wrap_command_in_ssh_call_patcher = mock.patch('lofar.sas.tmss.services.copy_service.wrap_command_in_ssh_call')
self.addCleanup(mocked_wrap_command_in_ssh_call_patcher.stop)
self.mocked_wrap_command_in_ssh_call_patcher_mock = mocked_wrap_command_in_ssh_call_patcher.start()
self.mocked_wrap_command_in_ssh_call_patcher_mock.side_effect = mocked_wrap_command_in_ssh_call
def test_copy_managed_and_unmanaged(self):
from lofar.sas.tmss.tmss.tmssapp.models import SchedulingUnitObservingStrategyTemplate, SchedulingSet, SubtaskType
from lofar.sas.tmss.tmss.tmssapp.tasks import create_scheduling_unit_draft_from_observing_strategy_template, create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft, create_cleanuptask_for_scheduling_unit_blueprint
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment