diff --git a/SAS/TMSS/backend/services/copy_service/copy_service.py b/SAS/TMSS/backend/services/copy_service/copy_service.py index a0c89cc360e4a37094c685352f5c4ba8080f3e66..4e8754fb7c7a4b5d43321fe22e72ac79c6e42cf7 100755 --- a/SAS/TMSS/backend/services/copy_service/copy_service.py +++ b/SAS/TMSS/backend/services/copy_service/copy_service.py @@ -43,86 +43,107 @@ class TMSSCopyServiceEventMessageHandler(TMSSEventMessageHandler): self.tmss_client.close() def onSubTaskStatusChanged(self, id: int, status:str): - if status == 'scheduled': + if status in ('scheduled', 'queued'): subtask = self.tmss_client.get_subtask(id) if subtask['subtask_type'] == 'copy': - try: - self.tmss_client.set_subtask_status(id, 'starting') - self.tmss_client.set_subtask_status(id, 'started') - - # cache to reduced rest-calls - # maps producer_id to tuple of producing subtask id and cluster name - _cache = {} - # cache to reduce (ssh) mkdir calls. Only create parent dirs once. - _created_dir_cache = set() - - for input_dataproduct in self.tmss_client.get_subtask_input_dataproducts(subtask['id']): - # fetch producing subtask id and cluster name for cache if needed - if input_dataproduct['producer'] not in _cache: - producer = self.tmss_client.get_url_as_json_object(input_dataproduct['producer']) - filesystem = self.tmss_client.get_url_as_json_object(producer['filesystem']) - cluster = self.tmss_client.get_url_as_json_object(filesystem['cluster']) - _cache[input_dataproduct['producer']] = {'producing_subtask_id': producer['subtask_id'], 'cluster_name': cluster['name'] } - - if subtask['specifications_doc'].get('managed_output', False): - output_dataproduct = self.tmss_client.get_subtask_transformed_output_dataproduct(subtask['id'], input_dataproduct['id']) - output_dp_path = output_dataproduct['filepath'] - else: - output_dp_path = os.path.join(subtask['specifications_doc']['destination'].rstrip('/'), - ('L' + str(_cache[input_dataproduct['producer']]['producing_subtask_id'])) if subtask['specifications_doc'].get('group_by_id', True) else '', - input_dataproduct['filename']) - - # split in host & parent_dir_path - dst_host = output_dp_path[:output_dp_path.find(':')] if ':' in output_dp_path else '' - dst_parent_dir_path = output_dp_path.lstrip(dst_host + ':').rstrip(input_dataproduct['filename']) - - # strip unneeded localhost to prevent under-the-hood ssh wrapping in rsync, and to prevent unneed ssh calls - dst_host = dst_host.lstrip('localhost:').lstrip('127.0.0.1:') - dst_parent_dir_path = dst_parent_dir_path.lstrip('localhost:').lstrip('127.0.0.1:') - - if dst_parent_dir_path not in _created_dir_cache: - # create dst_parent_dir_path directories if needed, prepend them to the cmd - mkdir_cmd = ['mkdir', '-p', dst_parent_dir_path] - if dst_host: - mkdir_cmd = wrap_command_in_ssh_call(mkdir_cmd, dst_host) - - logger.info("creating parent destination dir if needed for copy subtask id=%s, executing: %s",subtask['id'], ' '.join(mkdir_cmd)) - - if call(mkdir_cmd) == 0: - _created_dir_cache.add(dst_parent_dir_path) - else: - msg = "could not create parent destination dir '%s' for copy subtask id=%s" % (dst_parent_dir_path, subtask['id']) - logger.error(msg) - self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg) - return - - # prepare the actual copy command - cmd = ['rsync', '-a', input_dataproduct['filepath'].rstrip('/'), dst_parent_dir_path] - - # wrap in cep4 ssh call if cep4 - cluster_name = _cache[input_dataproduct['producer']]['cluster_name'] - if cluster_name.lower() == 'cep4': - cmd = wrap_command_in_cep4_available_node_with_lowest_load_ssh_call(cmd, via_head=True) - - logger.info("copying dataproduct id=%s for copy subtask id=%s, executing: %s", input_dataproduct['id'], subtask['id'], ' '.join(cmd)) - - if call(cmd) == 0: - logger.info("copied dataproduct id=%s for copy subtask id=%s to '%s'", input_dataproduct['id'], subtask['id'], dst_parent_dir_path) - else: - msg = "could not copy dataproduct id=%s for copy subtask id=%s to '%s'" % (input_dataproduct['id'], subtask['id'], dst_parent_dir_path) - logger.error(msg) - self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg) - return - - self.tmss_client.set_subtask_status(subtask['id'], 'finishing') - self.tmss_client.set_subtask_status(subtask['id'], 'finished') - except Exception as e: - logger.error(e) - self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=str(e)) + if status == 'scheduled': + self.queue_copy_subtask(subtask) + elif status == 'queued': + self.run_copy_subtask(subtask) else: logger.info("skipping subtask id=%s status=%s type=%s", subtask['id'], subtask['state_value'], subtask['subtask_type']) + def queue_copy_subtask(self, subtask): + if subtask['subtask_type'] != 'copy': + return + if subtask['state_value'] != 'scheduled': + return + self.tmss_client.set_subtask_status(subtask['id'], 'queueing') + self.tmss_client.set_subtask_status(subtask['id'], 'queued') + + + def run_copy_subtask(self, subtask): + if subtask['subtask_type'] != 'copy': + return + if subtask['state_value'] != 'queued': + return + + try: + self.tmss_client.set_subtask_status(subtask['id'], 'starting') + self.tmss_client.set_subtask_status(subtask['id'], 'started') + + # cache to reduced rest-calls + # maps producer_id to tuple of producing subtask id and cluster name + _cache = {} + # cache to reduce (ssh) mkdir calls. Only create parent dirs once. + _created_dir_cache = set() + + for input_dataproduct in self.tmss_client.get_subtask_input_dataproducts(subtask['id']): + # fetch producing subtask id and cluster name for cache if needed + if input_dataproduct['producer'] not in _cache: + producer = self.tmss_client.get_url_as_json_object(input_dataproduct['producer']) + filesystem = self.tmss_client.get_url_as_json_object(producer['filesystem']) + cluster = self.tmss_client.get_url_as_json_object(filesystem['cluster']) + _cache[input_dataproduct['producer']] = {'producing_subtask_id': producer['subtask_id'], 'cluster_name': cluster['name'] } + + if subtask['specifications_doc'].get('managed_output', False): + output_dataproduct = self.tmss_client.get_subtask_transformed_output_dataproduct(subtask['id'], input_dataproduct['id']) + output_dp_path = output_dataproduct['filepath'] + else: + output_dp_path = os.path.join(subtask['specifications_doc']['destination'].rstrip('/'), + ('L' + str(_cache[input_dataproduct['producer']]['producing_subtask_id'])) if subtask['specifications_doc'].get('group_by_id', True) else '', + input_dataproduct['filename']) + + # split in host & parent_dir_path + dst_host = output_dp_path[:output_dp_path.find(':')] if ':' in output_dp_path else '' + dst_parent_dir_path = output_dp_path.lstrip(dst_host + ':').rstrip(input_dataproduct['filename']) + + # strip unneeded localhost to prevent under-the-hood ssh wrapping in rsync, and to prevent unneed ssh calls + dst_host = dst_host.lstrip('localhost:').lstrip('127.0.0.1:') + dst_parent_dir_path = dst_parent_dir_path.lstrip('localhost:').lstrip('127.0.0.1:') + + if dst_parent_dir_path not in _created_dir_cache: + # create dst_parent_dir_path directories if needed, prepend them to the cmd + mkdir_cmd = ['mkdir', '-p', dst_parent_dir_path] + if dst_host: + mkdir_cmd = wrap_command_in_ssh_call(mkdir_cmd, dst_host) + + logger.info("creating parent destination dir if needed for copy subtask id=%s, executing: %s",subtask['id'], ' '.join(mkdir_cmd)) + + if call(mkdir_cmd) == 0: + _created_dir_cache.add(dst_parent_dir_path) + else: + msg = "could not create parent destination dir '%s' for copy subtask id=%s" % (dst_parent_dir_path, subtask['id']) + logger.error(msg) + self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg) + return + + # prepare the actual copy command + cmd = ['rsync', '-a', input_dataproduct['filepath'].rstrip('/'), dst_parent_dir_path] + + # wrap in cep4 ssh call if cep4 + cluster_name = _cache[input_dataproduct['producer']]['cluster_name'] + if cluster_name.lower() == 'cep4': + cmd = wrap_command_in_cep4_available_node_with_lowest_load_ssh_call(cmd, via_head=True) + + logger.info("copying dataproduct id=%s for copy subtask id=%s, executing: %s", input_dataproduct['id'], subtask['id'], ' '.join(cmd)) + + if call(cmd) == 0: + logger.info("copied dataproduct id=%s for copy subtask id=%s to '%s'", input_dataproduct['id'], subtask['id'], dst_parent_dir_path) + else: + msg = "could not copy dataproduct id=%s for copy subtask id=%s to '%s'" % (input_dataproduct['id'], subtask['id'], dst_parent_dir_path) + logger.error(msg) + self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=msg) + return + + self.tmss_client.set_subtask_status(subtask['id'], 'finishing') + self.tmss_client.set_subtask_status(subtask['id'], 'finished') + except Exception as e: + logger.error(e) + self.tmss_client.set_subtask_status(subtask['id'], 'error', error_reason=str(e)) + + def create_copy_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER, tmss_client_credentials_id: str="TMSSClient"): return TMSSBusListener(handler_type=TMSSCopyServiceEventMessageHandler, handler_kwargs={'tmss_client_credentials_id': tmss_client_credentials_id},