Skip to content
Snippets Groups Projects
Commit 092829ea authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-2829: split into handling queued and scheduled events

parent 0d6b24a6
No related branches found
No related tags found
1 merge request!1326TMSS-2829
......@@ -43,86 +43,107 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
self.tmss_client.close()
def onSubTaskStatusChanged(self, id: int, status:str):
if status == 'scheduled':
if status in ('scheduled', 'queued'):
subtask = self.tmss_client.get_subtask(id)
if subtask['subtask_type'] == 'copy':
try:
self.tmss_client.set_subtask_status(id, 'starting')
self.tmss_client.set_subtask_status(id, 'started')
# cache to reduced rest-calls
# maps producer_id to tuple of producing subtask id and cluster name
_cache = {}
# cache to reduce (ssh) mkdir calls. Only create parent dirs once.
_created_dir_cache = set()
for input_dataproduct in self.tmss_client.get_subtask_input_dataproducts(subtask['id']):
# fetch producing subtask id and cluster name for cache if needed
if input_dataproduct['producer'] not in _cache:
producer = self.tmss_client.get_url_as_json_object(input_dataproduct['producer'])
filesystem = self.tmss_client.get_url_as_json_object(producer['filesystem'])
cluster = self.tmss_client.get_url_as_json_object(filesystem['cluster'])
_cache[input_dataproduct['producer']] = {'producing_subtask_id': producer['subtask_id'], 'cluster_name': cluster['name'] }
if subtask['specifications_doc'].get('managed_output', False):
output_dataproduct = self.tmss_client.get_subtask_transformed_output_dataproduct(subtask['id'], input_dataproduct['id'])
output_dp_path = output_dataproduct['filepath']
else:
output_dp_path = os.path.join(subtask['specifications_doc']['destination'].rstrip('/'),
('L' + str(_cache[input_dataproduct['producer']]['producing_subtask_id'])) if subtask['specifications_doc'].get('group_by_id', True) else '',
input_dataproduct['filename'])
# split in host & parent_dir_path
dst_host = output_dp_path[:output_dp_path.find(':')] if ':' in output_dp_path else ''
dst_parent_dir_path = output_dp_path.lstrip(dst_host + ':').rstrip(input_dataproduct['filename'])
# strip unneeded localhost to prevent under-the-hood ssh wrapping in rsync, and to prevent unneed ssh calls
dst_host = dst_host.lstrip('localhost:').lstrip('127.0.0.1:')
dst_parent_dir_path = dst_parent_dir_path.lstrip('localhost:').lstrip('127.0.0.1:')
if dst_parent_dir_path not in _created_dir_cache:
# create dst_parent_dir_path directories if needed, prepend them to the cmd
mkdir_cmd = ['mkdir', '-p', dst_parent_dir_path]
if dst_host:
mkdir_cmd = wrap_command_in_ssh_call(mkdir_cmd, dst_host)
logger.info("creating parent destination dir if needed for copy subtask id=%s, executing: %s",subtask['id'], ' '.join(mkdir_cmd))
if call(mkdir_cmd) == 0:
_created_dir_cache.add(dst_parent_dir_path)
else:
msg = "could not create parent destination dir '%s' for copy subtask id=%s" % (dst_parent_dir_path, subtask['id'])
logger.error(msg)
self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg)
return
# prepare the actual copy command
cmd = ['rsync', '-a', input_dataproduct['filepath'].rstrip('/'), dst_parent_dir_path]
# wrap in cep4 ssh call if cep4
cluster_name = _cache[input_dataproduct['producer']]['cluster_name']
if cluster_name.lower() == 'cep4':
cmd = wrap_command_in_cep4_available_node_with_lowest_load_ssh_call(cmd, via_head=True)
logger.info("copying dataproduct id=%s for copy subtask id=%s, executing: %s", input_dataproduct['id'], subtask['id'], ' '.join(cmd))
if call(cmd) == 0:
logger.info("copied dataproduct id=%s for copy subtask id=%s to '%s'", input_dataproduct['id'], subtask['id'], dst_parent_dir_path)
else:
msg = "could not copy dataproduct id=%s for copy subtask id=%s to '%s'" % (input_dataproduct['id'], subtask['id'], dst_parent_dir_path)
logger.error(msg)
self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg)
return
self.tmss_client.set_subtask_status(subtask['id'], 'finishing')
self.tmss_client.set_subtask_status(subtask['id'], 'finished')
except Exception as e:
logger.error(e)
self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=str(e))
if status == 'scheduled':
self.queue_copy_subtask(subtask)
elif status == 'queued':
self.run_copy_subtask(subtask)
else:
logger.info("skipping subtask id=%s status=%s type=%s", subtask['id'], subtask['state_value'], subtask['subtask_type'])
def queue_copy_subtask(self, subtask):
if subtask['subtask_type'] != 'copy':
return
if subtask['state_value'] != 'scheduled':
return
self.tmss_client.set_subtask_status(subtask['id'], 'queueing')
self.tmss_client.set_subtask_status(subtask['id'], 'queued')
def run_copy_subtask(self, subtask):
if subtask['subtask_type'] != 'copy':
return
if subtask['state_value'] != 'queued':
return
try:
self.tmss_client.set_subtask_status(subtask['id'], 'starting')
self.tmss_client.set_subtask_status(subtask['id'], 'started')
# cache to reduced rest-calls
# maps producer_id to tuple of producing subtask id and cluster name
_cache = {}
# cache to reduce (ssh) mkdir calls. Only create parent dirs once.
_created_dir_cache = set()
for input_dataproduct in self.tmss_client.get_subtask_input_dataproducts(subtask['id']):
# fetch producing subtask id and cluster name for cache if needed
if input_dataproduct['producer'] not in _cache:
producer = self.tmss_client.get_url_as_json_object(input_dataproduct['producer'])
filesystem = self.tmss_client.get_url_as_json_object(producer['filesystem'])
cluster = self.tmss_client.get_url_as_json_object(filesystem['cluster'])
_cache[input_dataproduct['producer']] = {'producing_subtask_id': producer['subtask_id'], 'cluster_name': cluster['name'] }
if subtask['specifications_doc'].get('managed_output', False):
output_dataproduct = self.tmss_client.get_subtask_transformed_output_dataproduct(subtask['id'], input_dataproduct['id'])
output_dp_path = output_dataproduct['filepath']
else:
output_dp_path = os.path.join(subtask['specifications_doc']['destination'].rstrip('/'),
('L' + str(_cache[input_dataproduct['producer']]['producing_subtask_id'])) if subtask['specifications_doc'].get('group_by_id', True) else '',
input_dataproduct['filename'])
# split in host & parent_dir_path
dst_host = output_dp_path[:output_dp_path.find(':')] if ':' in output_dp_path else ''
dst_parent_dir_path = output_dp_path.lstrip(dst_host + ':').rstrip(input_dataproduct['filename'])
# strip unneeded localhost to prevent under-the-hood ssh wrapping in rsync, and to prevent unneed ssh calls
dst_host = dst_host.lstrip('localhost:').lstrip('127.0.0.1:')
dst_parent_dir_path = dst_parent_dir_path.lstrip('localhost:').lstrip('127.0.0.1:')
if dst_parent_dir_path not in _created_dir_cache:
# create dst_parent_dir_path directories if needed, prepend them to the cmd
mkdir_cmd = ['mkdir', '-p', dst_parent_dir_path]
if dst_host:
mkdir_cmd = wrap_command_in_ssh_call(mkdir_cmd, dst_host)
logger.info("creating parent destination dir if needed for copy subtask id=%s, executing: %s",subtask['id'], ' '.join(mkdir_cmd))
if call(mkdir_cmd) == 0:
_created_dir_cache.add(dst_parent_dir_path)
else:
msg = "could not create parent destination dir '%s' for copy subtask id=%s" % (dst_parent_dir_path, subtask['id'])
logger.error(msg)
self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg)
return
# prepare the actual copy command
cmd = ['rsync', '-a', input_dataproduct['filepath'].rstrip('/'), dst_parent_dir_path]
# wrap in cep4 ssh call if cep4
cluster_name = _cache[input_dataproduct['producer']]['cluster_name']
if cluster_name.lower() == 'cep4':
cmd = wrap_command_in_cep4_available_node_with_lowest_load_ssh_call(cmd, via_head=True)
logger.info("copying dataproduct id=%s for copy subtask id=%s, executing: %s", input_dataproduct['id'], subtask['id'], ' '.join(cmd))
if call(cmd) == 0:
logger.info("copied dataproduct id=%s for copy subtask id=%s to '%s'", input_dataproduct['id'], subtask['id'], dst_parent_dir_path)
else:
msg = "could not copy dataproduct id=%s for copy subtask id=%s to '%s'" % (input_dataproduct['id'], subtask['id'], dst_parent_dir_path)
logger.error(msg)
self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg)
return
self.tmss_client.set_subtask_status(subtask['id'], 'finishing')
self.tmss_client.set_subtask_status(subtask['id'], 'finished')
except Exception as e:
logger.error(e)
self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=str(e))
def create_copy_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER, tmss_client_credentials_id: str="TMSSClient"):
return TMSSBusListener(handler_type=TMSSCopyServiceEventMessageHandler,
handler_kwargs={'tmss_client_credentials_id': tmss_client_credentials_id},
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment