Skip to content
Snippets Groups Projects
Commit f1d5d13f authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-2829: check/poll for enough available disk space in queued state before...

TMSS-2829: check/poll for enough available disk space in queued state before starting the cop action
parent 092829ea
No related branches found
No related tags found
1 merge request!1326TMSS-2829
......@@ -27,7 +27,7 @@ from lofar.sas.tmss.client.tmssbuslistener import *
from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession
from subprocess import call
from lofar.common.cep4_utils import *
from lofar.common.subprocess_utils import check_output_returning_strings
class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
'''
......@@ -35,6 +35,7 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
def __init__(self, tmss_client_credentials_id: str="TMSSClient"):
super().__init__()
self.tmss_client = TMSSsession.create_from_dbcreds_for_ldap(tmss_client_credentials_id)
self._last_df_check_timestamp = datetime.min
def start_handling(self):
self.tmss_client.open()
......@@ -42,6 +43,13 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
def stop_handling(self):
self.tmss_client.close()
def before_receive_message(self):
# use 1-sec event loop to poll queued subtasks (rate limited at 30sec)
super().before_receive_message()
if datetime.utcnow() - self._last_df_check_timestamp > timedelta(seconds=30):
self._last_df_check_timestamp = datetime.utcnow()
self.check_and_run_queued_copy_subtask_if_enough_disk_space()
def onSubTaskStatusChanged(self, id: int, status:str):
if status in ('scheduled', 'queued'):
subtask = self.tmss_client.get_subtask(id)
......@@ -49,11 +57,10 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
if status == 'scheduled':
self.queue_copy_subtask(subtask)
elif status == 'queued':
self.run_copy_subtask(subtask)
self.run_copy_subtask_if_enough_disk_space(subtask)
else:
logger.info("skipping subtask id=%s status=%s type=%s", subtask['id'], subtask['state_value'], subtask['subtask_type'])
def queue_copy_subtask(self, subtask):
if subtask['subtask_type'] != 'copy':
return
......@@ -62,6 +69,52 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
self.tmss_client.set_subtask_status(subtask['id'], 'queueing')
self.tmss_client.set_subtask_status(subtask['id'], 'queued')
def check_and_run_queued_copy_subtask_if_enough_disk_space(self):
subtasks = self.tmss_client.get_subtasks(subtask_type='copy', state='queued')
for subtask in subtasks:
self.run_copy_subtask_if_enough_disk_space(subtask)
def run_copy_subtask_if_enough_disk_space(self, subtask):
if subtask['subtask_type'] != 'copy':
return
if subtask['state_value'] != 'queued':
return
# determine destination host and root_dir
destination = subtask['specifications_doc']['destination']
dst_host = destination[:destination.find(':')] if ':' in destination else ''
dst_root_dir = '/'+destination.lstrip(dst_host + ':').split('/')[1]
# strip unneeded localhost to prevent unneeded ssh calls
dst_host = dst_host.lstrip('localhost:').lstrip('127.0.0.1:')
df_cmd = ['df', dst_root_dir]
if dst_host:
df_cmd = wrap_command_in_ssh_call(df_cmd, dst_host)
logger.info("checking free disk space for copy-subtask id=%s, executing: %s", subtask['id'], ' '.join(df_cmd))
# run df cmd, and parse output for total free disk space
df_result = check_output_returning_strings(df_cmd)
df_result_line = df_result.splitlines()[-1]
df_result_line_parts = df_result_line.split(' ')
df_bytes = int(df_result_line_parts[3])
input_dp_sizes = self.tmss_client.get_url_as_json_object(subtask['url'].rstrip('/')+'/input_dataproducts?fields=size')
total_size = sum(x['size'] for x in input_dp_sizes)
if df_bytes > total_size:
logger.info("enough free disk space available for copy-subtask id=%s destination=%s df=%d needed=%d", subtask['id'], destination, df_bytes, total_size)
# clear previously set "not enough free disk space available" error_reason if set
if subtask.get('error_reason'):
self.tmss_client.do_request_and_get_result_as_string('PATCH', subtask['url'], {'error_reason': None})
# run it
self.run_copy_subtask(subtask)
else:
msg = "not enough free disk space available to start copy-subtask id=%s df=%d needed=%d" % (subtask['id'], df_bytes, total_size)
logger.info(msg)
self.tmss_client.do_request_and_get_result_as_string('PATCH', subtask['url'], {'error_reason': msg})
def run_copy_subtask(self, subtask):
if subtask['subtask_type'] != 'copy':
......@@ -79,6 +132,7 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
# cache to reduce (ssh) mkdir calls. Only create parent dirs once.
_created_dir_cache = set()
# ToDo: maybe parallelize this? Are multiple parallel rsync's faster?
for input_dataproduct in self.tmss_client.get_subtask_input_dataproducts(subtask['id']):
# fetch producing subtask id and cluster name for cache if needed
if input_dataproduct['producer'] not in _cache:
......@@ -109,12 +163,12 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
if dst_host:
mkdir_cmd = wrap_command_in_ssh_call(mkdir_cmd, dst_host)
logger.info("creating parent destination dir if needed for copy subtask id=%s, executing: %s",subtask['id'], ' '.join(mkdir_cmd))
logger.info("creating parent destination dir if needed for copy-subtask id=%s, executing: %s",subtask['id'], ' '.join(mkdir_cmd))
if call(mkdir_cmd) == 0:
_created_dir_cache.add(dst_parent_dir_path)
else:
msg = "could not create parent destination dir '%s' for copy subtask id=%s" % (dst_parent_dir_path, subtask['id'])
msg = "could not create parent destination dir '%s' for copy-subtask id=%s" % (dst_parent_dir_path, subtask['id'])
logger.error(msg)
self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg)
return
......@@ -127,12 +181,12 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler):
if cluster_name.lower() == 'cep4':
cmd = wrap_command_in_cep4_available_node_with_lowest_load_ssh_call(cmd, via_head=True)
logger.info("copying dataproduct id=%s for copy subtask id=%s, executing: %s", input_dataproduct['id'], subtask['id'], ' '.join(cmd))
logger.info("copying dataproduct id=%s for copy-subtask id=%s, executing: %s", input_dataproduct['id'], subtask['id'], ' '.join(cmd))
if call(cmd) == 0:
logger.info("copied dataproduct id=%s for copy subtask id=%s to '%s'", input_dataproduct['id'], subtask['id'], dst_parent_dir_path)
logger.info("copied dataproduct id=%s for copy-subtask id=%s to '%s'", input_dataproduct['id'], subtask['id'], dst_parent_dir_path)
else:
msg = "could not copy dataproduct id=%s for copy subtask id=%s to '%s'" % (input_dataproduct['id'], subtask['id'], dst_parent_dir_path)
msg = "could not copy dataproduct id=%s for copy-subtask id=%s to '%s'" % (input_dataproduct['id'], subtask['id'], dst_parent_dir_path)
logger.error(msg)
self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg)
return
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment