diff --git a/SAS/TMSS/backend/services/copy_service/copy_service.py b/SAS/TMSS/backend/services/copy_service/copy_service.py index 4e8754fb7c7a4b5d43321fe22e72ac79c6e42cf7..5819024ff51605784e99e9288d5c0f5f3df84449 100755 --- a/SAS/TMSS/backend/services/copy_service/copy_service.py +++ b/SAS/TMSS/backend/services/copy_service/copy_service.py @@ -27,7 +27,7 @@ from lofar.sas.tmss.client.tmssbuslistener import * from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession from subprocess import call from lofar.common.cep4_utils import * - +from lofar.common.subprocess_utils import check_output_returning_strings class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler): ''' @@ -35,6 +35,7 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler): def __init__(self, tmss_client_credentials_id: str="TMSSClient"): super().__init__() self.tmss_client = TMSSsession.create_from_dbcreds_for_ldap(tmss_client_credentials_id) + self._last_df_check_timestamp = datetime.min def start_handling(self): self.tmss_client.open() @@ -42,6 +43,13 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler): def stop_handling(self): self.tmss_client.close() + def before_receive_message(self): + # use 1-sec event loop to poll queued subtasks (rate limited at 30sec) + super().before_receive_message() + if datetime.utcnow() - self._last_df_check_timestamp > timedelta(seconds=30): + self._last_df_check_timestamp = datetime.utcnow() + self.check_and_run_queued_copy_subtask_if_enough_disk_space() + def onSubTaskStatusChanged(self, id: int, status:str): if status in ('scheduled', 'queued'): subtask = self.tmss_client.get_subtask(id) @@ -49,11 +57,10 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler): if status == 'scheduled': self.queue_copy_subtask(subtask) elif status == 'queued': - self.run_copy_subtask(subtask) + self.run_copy_subtask_if_enough_disk_space(subtask) else: logger.info("skipping subtask id=%s status=%s type=%s", subtask['id'], subtask['state_value'], subtask['subtask_type']) - def queue_copy_subtask(self, subtask): if subtask['subtask_type'] != 'copy': return @@ -62,6 +69,52 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler): self.tmss_client.set_subtask_status(subtask['id'], 'queueing') self.tmss_client.set_subtask_status(subtask['id'], 'queued') + def check_and_run_queued_copy_subtask_if_enough_disk_space(self): + subtasks = self.tmss_client.get_subtasks(subtask_type='copy', state='queued') + for subtask in subtasks: + self.run_copy_subtask_if_enough_disk_space(subtask) + + def run_copy_subtask_if_enough_disk_space(self, subtask): + if subtask['subtask_type'] != 'copy': + return + if subtask['state_value'] != 'queued': + return + + # determine destination host and root_dir + destination = subtask['specifications_doc']['destination'] + dst_host = destination[:destination.find(':')] if ':' in destination else '' + dst_root_dir = '/'+destination.lstrip(dst_host + ':').split('/')[1] + # strip unneeded localhost to prevent unneeded ssh calls + dst_host = dst_host.lstrip('localhost:').lstrip('127.0.0.1:') + + df_cmd = ['df', dst_root_dir] + if dst_host: + df_cmd = wrap_command_in_ssh_call(df_cmd, dst_host) + + logger.info("checking free disk space for copy-subtask id=%s, executing: %s", subtask['id'], ' '.join(df_cmd)) + + # run df cmd, and parse output for total free disk space + df_result = check_output_returning_strings(df_cmd) + df_result_line = df_result.splitlines()[-1] + df_result_line_parts = df_result_line.split(' ') + df_bytes = int(df_result_line_parts[3]) + input_dp_sizes = self.tmss_client.get_url_as_json_object(subtask['url'].rstrip('/')+'/input_dataproducts?fields=size') + total_size = sum(x['size'] for x in input_dp_sizes) + + if df_bytes > total_size: + logger.info("enough free disk space available for copy-subtask id=%s destination=%s df=%d needed=%d", subtask['id'], destination, df_bytes, total_size) + + # clear previously set "not enough free disk space available" error_reason if set + if subtask.get('error_reason'): + self.tmss_client.do_request_and_get_result_as_string('PATCH', subtask['url'], {'error_reason': None}) + + # run it + self.run_copy_subtask(subtask) + else: + msg = "not enough free disk space available to start copy-subtask id=%s df=%d needed=%d" % (subtask['id'], df_bytes, total_size) + logger.info(msg) + self.tmss_client.do_request_and_get_result_as_string('PATCH', subtask['url'], {'error_reason': msg}) + def run_copy_subtask(self, subtask): if subtask['subtask_type'] != 'copy': @@ -79,6 +132,7 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler): # cache to reduce (ssh) mkdir calls. Only create parent dirs once. _created_dir_cache = set() + # ToDo: maybe parallelize this? Are multiple parallel rsync's faster? for input_dataproduct in self.tmss_client.get_subtask_input_dataproducts(subtask['id']): # fetch producing subtask id and cluster name for cache if needed if input_dataproduct['producer'] not in _cache: @@ -109,12 +163,12 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler): if dst_host: mkdir_cmd = wrap_command_in_ssh_call(mkdir_cmd, dst_host) - logger.info("creating parent destination dir if needed for copy subtask id=%s, executing: %s",subtask['id'], ' '.join(mkdir_cmd)) + logger.info("creating parent destination dir if needed for copy-subtask id=%s, executing: %s",subtask['id'], ' '.join(mkdir_cmd)) if call(mkdir_cmd) == 0: _created_dir_cache.add(dst_parent_dir_path) else: - msg = "could not create parent destination dir '%s' for copy subtask id=%s" % (dst_parent_dir_path, subtask['id']) + msg = "could not create parent destination dir '%s' for copy-subtask id=%s" % (dst_parent_dir_path, subtask['id']) logger.error(msg) self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg) return @@ -127,12 +181,12 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler): if cluster_name.lower() == 'cep4': cmd = wrap_command_in_cep4_available_node_with_lowest_load_ssh_call(cmd, via_head=True) - logger.info("copying dataproduct id=%s for copy subtask id=%s, executing: %s", input_dataproduct['id'], subtask['id'], ' '.join(cmd)) + logger.info("copying dataproduct id=%s for copy-subtask id=%s, executing: %s", input_dataproduct['id'], subtask['id'], ' '.join(cmd)) if call(cmd) == 0: - logger.info("copied dataproduct id=%s for copy subtask id=%s to '%s'", input_dataproduct['id'], subtask['id'], dst_parent_dir_path) + logger.info("copied dataproduct id=%s for copy-subtask id=%s to '%s'", input_dataproduct['id'], subtask['id'], dst_parent_dir_path) else: - msg = "could not copy dataproduct id=%s for copy subtask id=%s to '%s'" % (input_dataproduct['id'], subtask['id'], dst_parent_dir_path) + msg = "could not copy dataproduct id=%s for copy-subtask id=%s to '%s'" % (input_dataproduct['id'], subtask['id'], dst_parent_dir_path) logger.error(msg) self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg) return