Skip to content
Snippets Groups Projects
Commit 0d6b24a6 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-2829: minor fixes

parent 192731a3
No related branches found
No related tags found
1 merge request!1326TMSS-2829
......@@ -53,6 +53,8 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
# cache to reduced rest-calls
# maps producer_id to tuple of producing subtask id and cluster name
_cache = {}
# cache to reduce (ssh) mkdir calls. Only create parent dirs once.
_created_dir_cache = set()
for input_dataproduct in self.tmss_client.get_subtask_input_dataproducts(subtask['id']):
# fetch producing subtask id and cluster name for cache if needed
......@@ -64,17 +66,38 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
if subtask['specifications_doc'].get('managed_output', False):
output_dataproduct = self.tmss_client.get_subtask_transformed_output_dataproduct(subtask['id'], input_dataproduct['id'])
destination_path = output_dataproduct['filepath']
output_dp_path = output_dataproduct['filepath']
else:
destination_path = os.path.join(subtask['specifications_doc']['destination'].rstrip('/'),
('L'+str(_cache[input_dataproduct['producer']]['producing_subtask_id'])) if subtask['specifications_doc'].get('group_by_id', True) else '',
input_dataproduct['filename'])
# strip localhost as destination host to save on ssh wrapping
destination_path = destination_path.lstrip('localhost:').lstrip('127.0.0.1:')
output_dp_path = os.path.join(subtask['specifications_doc']['destination'].rstrip('/'),
('L' + str(_cache[input_dataproduct['producer']]['producing_subtask_id'])) if subtask['specifications_doc'].get('group_by_id', True) else '',
input_dataproduct['filename'])
# split in host & parent_dir_path
dst_host = output_dp_path[:output_dp_path.find(':')] if ':' in output_dp_path else ''
dst_parent_dir_path = output_dp_path.lstrip(dst_host + ':').rstrip(input_dataproduct['filename'])
# strip unneeded localhost to prevent under-the-hood ssh wrapping in rsync, and to prevent unneed ssh calls
dst_host = dst_host.lstrip('localhost:').lstrip('127.0.0.1:')
dst_parent_dir_path = dst_parent_dir_path.lstrip('localhost:').lstrip('127.0.0.1:')
if dst_parent_dir_path not in _created_dir_cache:
# create dst_parent_dir_path directories if needed, prepend them to the cmd
mkdir_cmd = ['mkdir', '-p', dst_parent_dir_path]
if dst_host:
mkdir_cmd = wrap_command_in_ssh_call(mkdir_cmd, dst_host)
logger.info("creating parent destination dir if needed for copy subtask id=%s, executing: %s",subtask['id'], ' '.join(mkdir_cmd))
if call(mkdir_cmd) == 0:
_created_dir_cache.add(dst_parent_dir_path)
else:
msg = "could not create parent destination dir '%s' for copy subtask id=%s" % (dst_parent_dir_path, subtask['id'])
logger.error(msg)
self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg)
return
# prepare the actual copy command
cmd = ['rsync', '-a', '--mkpath', input_dataproduct['filepath'], destination_path]
cmd = ['rsync', '-a', input_dataproduct['filepath'].rstrip('/'), dst_parent_dir_path]
# wrap in cep4 ssh call if cep4
cluster_name = _cache[input_dataproduct['producer']]['cluster_name']
......@@ -84,19 +107,20 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
logger.info("copying dataproduct id=%s for copy subtask id=%s, executing: %s", input_dataproduct['id'], subtask['id'], ' '.join(cmd))
if call(cmd) == 0:
logger.info("copied dataproduct id=%s for copy subtask id=%s to '%s'", input_dataproduct['id'], subtask['id'], destination_path)
logger.info("copied dataproduct id=%s for copy subtask id=%s to '%s'", input_dataproduct['id'], subtask['id'], dst_parent_dir_path)
else:
msg = "could not copy dataproduct id=%s for copy subtask id=%s to '%s'" % (input_dataproduct['id'], subtask['id'], destination_path)
msg = "could not copy dataproduct id=%s for copy subtask id=%s to '%s'" % (input_dataproduct['id'], subtask['id'], dst_parent_dir_path)
logger.error(msg)
self.tmss_client.set_subtask_status(id, 'error', error_reason=msg)
self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg)
return
self.tmss_client.set_subtask_status(id, 'finishing')
self.tmss_client.set_subtask_status(id, 'finished')
self.tmss_client.set_subtask_status(subtask['id'], 'finishing')
self.tmss_client.set_subtask_status(subtask['id'], 'finished')
except Exception as e:
logger.error(e)
self.tmss_client.set_subtask_status(id, 'error', error_reason=str(e))
self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=str(e))
else:
logger.info("skipping subtask id=%s status=%s type=%s", subtask['id'], subtask['state_value'], subtask['subtask_type'])
def create_copy_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER, tmss_client_credentials_id: str="TMSSClient"):
......
......@@ -40,7 +40,7 @@ class TestCopyService(unittest.TestCase):
'''
@classmethod
def setUpClass(cls) -> None:
cls.TEST_UUID = uuid.uuid1()
cls.TEST_UUID = uuid.uuid4().hex[:8]
cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID))
cls.tmp_exchange.open()
......@@ -85,13 +85,55 @@ class TestCopyService(unittest.TestCase):
from lofar.sas.tmss.services.copy_service import create_copy_service
service = create_copy_service(exchange=self.tmp_exchange.address, tmss_client_credentials_id=self.tmss_test_env.client_credentials.dbcreds_id)
with BusListenerJanitor(service):
# Here's a summary of a subset of the relevant loglines showing the correct and expected rsync commands.
# NB: rsync (and gnu/linux) is very sensitive for ending-slashes in paths.
#
# ----- group_by_id=True managed_output=True -----
# writing some small 1KB files in test dataproduct dir for subtask id=2000000 /tmp/my_project_f036c41d-bbed-4f65-8abe-43f403a8e3ad/L2000000/uv/L2000000_SAP000_SB000_uv.MS
# writing some small 1KB files in test dataproduct dir for subtask id=2000000 /tmp/my_project_f036c41d-bbed-4f65-8abe-43f403a8e3ad/L2000000/uv/L2000000_SAP000_SB001_uv.MS
# writing some small 1KB files in test dataproduct dir for subtask id=2000000 /tmp/my_project_f036c41d-bbed-4f65-8abe-43f403a8e3ad/L2000000/uv/L2000000_SAP000_SB002_uv.MS
# writing some small 1KB files in test dataproduct dir for subtask id=2000000 /tmp/my_project_f036c41d-bbed-4f65-8abe-43f403a8e3ad/L2000000/uv/L2000000_SAP000_SB003_uv.MS
# copying dataproduct id=1 for copy subtask id=2000001, executing: rsync -a /tmp/my_project_f036c41d-bbed-4f65-8abe-43f403a8e3ad/L2000000/uv/L2000000_SAP000_SB000_uv.MS /tmp/t_copy_service_e41a6703/L2000000/
# copying dataproduct id=2 for copy subtask id=2000001, executing: rsync -a /tmp/my_project_f036c41d-bbed-4f65-8abe-43f403a8e3ad/L2000000/uv/L2000000_SAP000_SB001_uv.MS /tmp/t_copy_service_e41a6703/L2000000/
# copying dataproduct id=3 for copy subtask id=2000001, executing: rsync -a /tmp/my_project_f036c41d-bbed-4f65-8abe-43f403a8e3ad/L2000000/uv/L2000000_SAP000_SB002_uv.MS /tmp/t_copy_service_e41a6703/L2000000/
# copying dataproduct id=4 for copy subtask id=2000001, executing: rsync -a /tmp/my_project_f036c41d-bbed-4f65-8abe-43f403a8e3ad/L2000000/uv/L2000000_SAP000_SB003_uv.MS /tmp/t_copy_service_e41a6703/L2000000/
#
# ----- group_by_id=True managed_output=False -----
# writing some small 1KB files in test dataproduct dir for subtask id=2000002 /tmp/my_project_da0fd8a7-46ea-4730-822f-f89552c2357b/L2000002/uv/L2000002_SAP000_SB000_uv.MS
# writing some small 1KB files in test dataproduct dir for subtask id=2000002 /tmp/my_project_da0fd8a7-46ea-4730-822f-f89552c2357b/L2000002/uv/L2000002_SAP000_SB001_uv.MS
# writing some small 1KB files in test dataproduct dir for subtask id=2000002 /tmp/my_project_da0fd8a7-46ea-4730-822f-f89552c2357b/L2000002/uv/L2000002_SAP000_SB002_uv.MS
# writing some small 1KB files in test dataproduct dir for subtask id=2000002 /tmp/my_project_da0fd8a7-46ea-4730-822f-f89552c2357b/L2000002/uv/L2000002_SAP000_SB003_uv.MS
# copying dataproduct id=9 for copy subtask id=2000003, executing: rsync -a /tmp/my_project_da0fd8a7-46ea-4730-822f-f89552c2357b/L2000002/uv/L2000002_SAP000_SB000_uv.MS /tmp/t_copy_service_e41a6703/L2000002/
# copying dataproduct id=10 for copy subtask id=2000003, executing: rsync -a /tmp/my_project_da0fd8a7-46ea-4730-822f-f89552c2357b/L2000002/uv/L2000002_SAP000_SB001_uv.MS /tmp/t_copy_service_e41a6703/L2000002/
# copying dataproduct id=11 for copy subtask id=2000003, executing: rsync -a /tmp/my_project_da0fd8a7-46ea-4730-822f-f89552c2357b/L2000002/uv/L2000002_SAP000_SB002_uv.MS /tmp/t_copy_service_e41a6703/L2000002/
# copying dataproduct id=12 for copy subtask id=2000003, executing: rsync -a /tmp/my_project_da0fd8a7-46ea-4730-822f-f89552c2357b/L2000002/uv/L2000002_SAP000_SB003_uv.MS /tmp/t_copy_service_e41a6703/L2000002/
#
# ----- group_by_id=False managed_output=True -----
# writing some small 1KB files in test dataproduct dir for subtask id=2000004 /tmp/my_project_ecd89cd4-2592-46f0-a613-64e7836d34a8/L2000004/uv/L2000004_SAP000_SB000_uv.MS
# writing some small 1KB files in test dataproduct dir for subtask id=2000004 /tmp/my_project_ecd89cd4-2592-46f0-a613-64e7836d34a8/L2000004/uv/L2000004_SAP000_SB001_uv.MS
# writing some small 1KB files in test dataproduct dir for subtask id=2000004 /tmp/my_project_ecd89cd4-2592-46f0-a613-64e7836d34a8/L2000004/uv/L2000004_SAP000_SB002_uv.MS
# writing some small 1KB files in test dataproduct dir for subtask id=2000004 /tmp/my_project_ecd89cd4-2592-46f0-a613-64e7836d34a8/L2000004/uv/L2000004_SAP000_SB003_uv.MS
# copying dataproduct id=13 for copy subtask id=2000005, executing: rsync -a /tmp/my_project_ecd89cd4-2592-46f0-a613-64e7836d34a8/L2000004/uv/L2000004_SAP000_SB000_uv.MS /tmp/t_copy_service_e41a6703/
# copying dataproduct id=14 for copy subtask id=2000005, executing: rsync -a /tmp/my_project_ecd89cd4-2592-46f0-a613-64e7836d34a8/L2000004/uv/L2000004_SAP000_SB001_uv.MS /tmp/t_copy_service_e41a6703/
# copying dataproduct id=15 for copy subtask id=2000005, executing: rsync -a /tmp/my_project_ecd89cd4-2592-46f0-a613-64e7836d34a8/L2000004/uv/L2000004_SAP000_SB002_uv.MS /tmp/t_copy_service_e41a6703/
# copying dataproduct id=16 for copy subtask id=2000005, executing: rsync -a /tmp/my_project_ecd89cd4-2592-46f0-a613-64e7836d34a8/L2000004/uv/L2000004_SAP000_SB003_uv.MS /tmp/t_copy_service_e41a6703/
#
# ----- group_by_id=False managed_output=False -----
# writing some small 1KB files in test dataproduct dir for subtask id=2000006 /tmp/my_project_dc2923f3-b31f-4a26-a99a-eb72447b1f48/L2000006/uv/L2000006_SAP000_SB000_uv.MS
# writing some small 1KB files in test dataproduct dir for subtask id=2000006 /tmp/my_project_dc2923f3-b31f-4a26-a99a-eb72447b1f48/L2000006/uv/L2000006_SAP000_SB001_uv.MS
# writing some small 1KB files in test dataproduct dir for subtask id=2000006 /tmp/my_project_dc2923f3-b31f-4a26-a99a-eb72447b1f48/L2000006/uv/L2000006_SAP000_SB002_uv.MS
# writing some small 1KB files in test dataproduct dir for subtask id=2000006 /tmp/my_project_dc2923f3-b31f-4a26-a99a-eb72447b1f48/L2000006/uv/L2000006_SAP000_SB003_uv.MS
# copying dataproduct id=21 for copy subtask id=2000007, executing: rsync -a /tmp/my_project_dc2923f3-b31f-4a26-a99a-eb72447b1f48/L2000006/uv/L2000006_SAP000_SB000_uv.MS /tmp/t_copy_service_e41a6703/
# copying dataproduct id=22 for copy subtask id=2000007, executing: rsync -a /tmp/my_project_dc2923f3-b31f-4a26-a99a-eb72447b1f48/L2000006/uv/L2000006_SAP000_SB001_uv.MS /tmp/t_copy_service_e41a6703/
# copying dataproduct id=23 for copy subtask id=2000007, executing: rsync -a /tmp/my_project_dc2923f3-b31f-4a26-a99a-eb72447b1f48/L2000006/uv/L2000006_SAP000_SB002_uv.MS /tmp/t_copy_service_e41a6703/
# copying dataproduct id=24 for copy subtask id=2000007, executing: rsync -a /tmp/my_project_dc2923f3-b31f-4a26-a99a-eb72447b1f48/L2000006/uv/L2000006_SAP000_SB003_uv.MS /tmp/t_copy_service_e41a6703/
for group_by_id in [True, False]:
for managed_output in [True, False]:
self.tmss_test_env.delete_scheduling_unit_drafts_cascade()
self.tmss_test_env.delete_reservations_cascade()
logger.info('\n\n\n----- group_by_id=%s managed_output=%s -----', group_by_id, managed_output)
DESTINATION_DIR = '/tmp/t_copy_service_%s' % uuid.uuid4().hex[:8]
DESTINATION_DIR = '/tmp/t_copy_service_%s' % (self.TEST_UUID,)
strategy_template = SchedulingUnitObservingStrategyTemplate.get_version_or_latest(name="Simple Observation")
strategy_template.template['tasks']['Observation']['specifications_doc']['station_configuration']['SAPs'][0]['subbands'] = [0, 1, 2, 3]
......@@ -137,17 +179,19 @@ class TestCopyService(unittest.TestCase):
try:
# create some output files, as if the observation ran
for output_dp in obs_subtask.output_dataproducts.all():
os.makedirs(output_dp.directory, exist_ok=True)
logger.info('writing 1KB test dataproduct for subtask id=%s %s', obs_subtask.id, output_dp.filepath)
with open(output_dp.filepath, 'w') as file:
file.write(1024 * 'a')
# the dataproduct itself is a directory containing multiple files
os.makedirs(output_dp.filepath, exist_ok=True)
logger.info('writing some small 1KB files in test dataproduct dir for subtask id=%s %s', obs_subtask.id, output_dp.filepath)
for i in range(3):
with open(os.path.join(output_dp.filepath, "%d.txt"%(i,)), 'w') as file:
file.write(1024 * 'a')
# "finish" the observation.
obs_subtask = set_subtask_state_following_allowed_transitions(obs_subtask, 'finished')
# wait until the copy_subtask was picked up by the service
copy_subtask = scheduling_unit_blueprint.subtasks.filter(specifications_template__type__value=SubtaskType.Choices.COPY.value).first()
copy_subtask = wait_for_subtask_status(copy_subtask, 'finished')
copy_subtask = wait_for_subtask_status(copy_subtask, 'finished', timeout=120)
# check the output, was it copied?
dest_dir = os.path.join(DESTINATION_DIR, ('L' + str(obs_subtask.id)) if group_by_id else '')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment