From da4739e29258e7c33018e42cc780ebb06a7d6dd8 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Mon, 1 Mar 2021 12:28:04 +0100 Subject: [PATCH] TMSS-261: made PathResolver TMSS-aware: you can now get (sub)task path given a tmss subtask id. TODO: remove all otdb/mom code paths once we rely fully on TMSS --- .../DataManagementCommon/CMakeLists.txt | 2 +- .../DataManagementCommon/getPathForTask | 0 .../DataManagementCommon/path.py | 117 ++++++++++++------ 3 files changed, 80 insertions(+), 39 deletions(-) mode change 100644 => 100755 SAS/DataManagement/DataManagementCommon/getPathForTask diff --git a/SAS/DataManagement/DataManagementCommon/CMakeLists.txt b/SAS/DataManagement/DataManagementCommon/CMakeLists.txt index 5c160faa9b1..5e0c0554e1e 100644 --- a/SAS/DataManagement/DataManagementCommon/CMakeLists.txt +++ b/SAS/DataManagement/DataManagementCommon/CMakeLists.txt @@ -1,6 +1,6 @@ # $Id$ -lofar_package(DataManagementCommon 1.0 DEPENDS PyMessaging ResourceAssignmentService MoMQueryServiceClient) +lofar_package(DataManagementCommon 1.0 DEPENDS PyMessaging ResourceAssignmentService MoMQueryServiceClient TMSSClient) lofar_find_package(Python 3.4 REQUIRED) include(PythonInstall) diff --git a/SAS/DataManagement/DataManagementCommon/getPathForTask b/SAS/DataManagement/DataManagementCommon/getPathForTask old mode 100644 new mode 100755 diff --git a/SAS/DataManagement/DataManagementCommon/path.py b/SAS/DataManagement/DataManagementCommon/path.py index 36c15d93513..6bdcae38744 100644 --- a/SAS/DataManagement/DataManagementCommon/path.py +++ b/SAS/DataManagement/DataManagementCommon/path.py @@ -17,6 +17,7 @@ from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC +from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession logger = logging.getLogger(__name__) @@ -24,7 +25,8 @@ class PathResolver: def __init__(self, mountpoint=CEP4_DATA_MOUNTPOINT, exchange=DEFAULT_BUSNAME, - broker=DEFAULT_BROKER): + broker=DEFAULT_BROKER, + tmss_dbcreds_id: str=None): self.mountpoint = mountpoint self.projects_path = os.path.join(self.mountpoint, 'projects' if isProductionEnvironment() else 'test-projects') @@ -33,14 +35,17 @@ class PathResolver: self.radbrpc = RADBRPC.create(exchange=exchange, broker=broker) self.momrpc = MoMQueryRPC.create(exchange=exchange, broker=broker) + self._tmss_client = TMSSsession.create_from_dbcreds_for_ldap(tmss_dbcreds_id) def open(self): self.radbrpc.open() self.momrpc.open() + self._tmss_client.open() def close(self): self.radbrpc.close() self.momrpc.close() + self._tmss_client.close() def __enter__(self): self.open() @@ -61,47 +66,76 @@ class PathResolver: logger.debug("Get path for otdb_id %s" % (otdb_id,)) return self.getPathForTask(otdb_id=otdb_id) - def getPathForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True): - logger.info("getPathForTask(radb_id=%s, mom_id=%s, otdb_id=%s)", radb_id, mom_id, otdb_id) - '''get the path for a task for either the given radb_id, or for the given mom_id, or for the given otdb_id''' - result = self._getProjectPathAndDetails(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id) + def getPathForTMSSId(self, tmss_id): + logger.debug("Get path for tmss_id %s" % (tmss_id,)) + return self.getPathForTask(tmss_id=tmss_id) + + def getPathForTask(self, radb_id=None, mom_id=None, otdb_id=None, tmss_id=None, include_scratch_paths=True): + logger.info("getPathForTask(radb_id=%s, mom_id=%s, otdb_id=%s, tmss_id=%s)", radb_id, mom_id, otdb_id, tmss_id) + '''get the path for a task for either the given radb_id, or for the given mom_id, or for the given otdb_id, or for the given tmss_id''' + result = self._getProjectPathAndDetails(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, tmss_id=tmss_id) if result['found']: project_path = result['path'] - task = result['task'] - task_data_path = os.path.join(project_path, 'L%s' % task['otdb_id']) - logger.info("constructed path '%s' for otdb_id=%s mom_id=%s radb_id=%s" % (task_data_path, task['otdb_id'], task['mom_id'], task['id'])) - path_result = {'found': True, 'message': '', 'path': task_data_path, - 'radb_id': task.get('id'), 'mom_id': task.get('mom_id'), 'otdb_id': task.get('otdb_id')} + if 'task' in result: + task = result['task'] + task_data_path = os.path.join(project_path, 'L%s' % task['otdb_id']) + elif tmss_id is not None: + task_data_path = os.path.join(project_path, 'L%s' % tmss_id) + else: + task_data_path = None - if include_scratch_paths and task['type'] == 'pipeline': - path_result['scratch_paths'] = [] + path_result = {'found': task_data_path is not None, 'message': '', 'path': task_data_path, + 'radb_id': radb_id, 'mom_id': mom_id, 'otdb_id': otdb_id, 'tmss_id': tmss_id} - scratch_path = os.path.join(self.scratch_path, 'Observation%s' % task['otdb_id']) - share_path = os.path.join(self.share_path, 'Observation%s' % task['otdb_id']) - logger.info("Checking scratch paths %s %s for otdb_id=%s mom_id=%s radb_id=%s" % (scratch_path, share_path, task['otdb_id'], task['mom_id'], task['id'])) + logger.info("constructed path '%s' for otdb_id=%s mom_id=%s radb_id=%s tmss_id=%s" % (task_data_path, otdb_id, mom_id, radb_id, tmss_id)) + + if include_scratch_paths: + path_result['scratch_paths'] = [] - if self.pathExists(scratch_path): - path_result['scratch_paths'].append(scratch_path) + if 'task' in result and task['type'] == 'pipeline': + task = result['task'] + path_result['scratch_paths'].append(os.path.join(self.scratch_path, 'Observation%s' % task['otdb_id'])) + path_result['scratch_paths'].append(os.path.join(self.share_path, 'Observation%s' % task['otdb_id'])) + elif tmss_id is not None: + subtask = self._tmss_client.get_subtask(tmss_id) + if subtask['subtask_type'].lower() == 'pipeline': + path_result['scratch_paths'].append(os.path.join(self.scratch_path, 'Observation%s' % tmss_id)) + path_result['scratch_paths'].append(os.path.join(self.share_path, 'Observation%s' % tmss_id)) - if self.pathExists(share_path): - path_result['scratch_paths'].append(share_path) + logger.info("Checking scratch paths %s for otdb_id=%s mom_id=%s radb_id=%s tmss_id=%s" % (path_result['scratch_paths'], otdb_id, mom_id, radb_id, tmss_id)) + path_result['scratch_paths'] = [path for path in path_result['scratch_paths'] if self.pathExists(path)] - logger.info("result for getPathForTask(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, path_result) + logger.info("result for getPathForTask(radb_id=%s, mom_id=%s, otdb_id=%s, tmss_id=%s): %s", radb_id, mom_id, otdb_id, tmss_id, path_result) return path_result result = {'found': False, 'message': result.get('message', ''), 'path': '', - 'radb_id': radb_id, 'mom_id': mom_id, 'otdb_id': otdb_id} - logger.warn("result for getPathForTask(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, result) + 'radb_id': radb_id, 'mom_id': mom_id, 'otdb_id': otdb_id, 'tmss_id': tmss_id} + logger.warning("result for getPathForTask(radb_id=%s, mom_id=%s, otdb_id=%s, tmss_id=%s): %s", radb_id, mom_id, otdb_id, tmss_id, result) return result - def _getProjectPathAndDetails(self, radb_id=None, mom_id=None, otdb_id=None): - '''get the project path and details of a task for either the given radb_id, or for the given mom_id, or for the given otdb_id''' - ids = [radb_id, mom_id, otdb_id] + def _getProjectPathAndDetails(self, radb_id=None, mom_id=None, otdb_id=None, tmss_id=None): + '''get the project path and details of a task for either the given radb_id, or for the given mom_id, or for the given otdb_id, or for the given tmss_id''' + ids = [radb_id, mom_id, otdb_id, tmss_id] validIds = [x for x in ids if x != None and isinstance(x, int)] if len(validIds) != 1: - raise KeyError("Provide one and only one id: radb_id=%s, mom_id=%s, otdb_id=%s" % (radb_id, mom_id, otdb_id)) + raise KeyError("Provide one and only one id: radb_id=%s, mom_id=%s, otdb_id=%s, tmss_id=%s" % (radb_id, mom_id, otdb_id, tmss_id)) + + if tmss_id is not None: + output_dataproducts = self._tmss_client.get_subtask_output_dataproducts(tmss_id) + directories = set([dp['directory'] for dp in output_dataproducts]) + subtask_dir_name = 'L%s' % (tmss_id,) + # extract the project path + project_paths = [dir[:dir.find(subtask_dir_name)] for dir in directories] + + if len(project_paths) != 1: + message = "Could not determine project path for tmss_id=%s" % (tmss_id,) + logger.error(message) + return {'found': False, 'message': message, 'path': None} + + project_path = project_paths[0] + return {'found': True, 'path': project_path} task = self.radbrpc.getTask(id=radb_id, mom_id=mom_id, otdb_id=otdb_id) @@ -125,22 +159,24 @@ class PathResolver: project_path = os.path.join(self.projects_path, "_".join(project_name.split())) return {'found': True, 'path': project_path, 'mom_details':mom_details, 'task':task} - def getProjectPath(self, radb_id=None, mom_id=None, otdb_id=None): - result = self._getProjectPathAndDetails(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id) + def getProjectPath(self, radb_id=None, mom_id=None, otdb_id=None, tmss_id=None): + result = self._getProjectPathAndDetails(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, tmss_id=tmss_id) if result['found']: - del result['mom_details'] - del result['task'] + if 'mom_details' in result: + del result['mom_details'] + if 'task' in result: + del result['task'] return result - def getProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None): + def getProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, tmss_id=None, project_name=None): '''get the project directory and its subdirectories of either the project_name, or the task's project for either the given radb_id, or for the given mom_id, or for the given otdb_id''' if project_name: project_path = os.path.join(self.projects_path, "_".join(project_name.split())) return self.getSubDirectories(project_path) - result = self.getProjectPath(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id) + result = self.getProjectPath(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, tmss_id=tmss_id) if result['found']: return self.getSubDirectories(result['path']) return result @@ -154,8 +190,11 @@ class PathResolver: def getSubDirectoriesForRADBId(self, radb_id): return self.getSubDirectoriesForTask(radb_id=radb_id) - def getSubDirectoriesForTask(self, radb_id=None, mom_id=None, otdb_id=None): - result = self.getPathForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id) + def getSubDirectoriesForTMSSId(self, tmss_id): + return self.getSubDirectoriesForTask(tmss_id=tmss_id) + + def getSubDirectoriesForTask(self, radb_id=None, mom_id=None, otdb_id=None, tmss_id=None): + result = self.getPathForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, tmss_id=tmss_id) if result['found']: return self.getSubDirectories(result['path']) return result @@ -214,13 +253,15 @@ def main(): parser.add_option('-o', '--otdb_id', dest='otdb_id', type='int', default=None, help='otdb_id of task to get the path for') parser.add_option('-m', '--mom_id', dest='mom_id', type='int', default=None, help='mom_id of task to get the path for') parser.add_option('-r', '--radb_id', dest='radb_id', type='int', default=None, help='radb_id of task to get the path for') + parser.add_option('-t', '--tmss_id', dest='tmss_id', type='int', default=None, help='tmss_id of the TMSS subtask to get the path for') parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the broker, default: localhost') parser.add_option("--mountpoint", dest="mountpoint", type="string", default=CEP4_DATA_MOUNTPOINT, help="path of local cep4 mount point, default: %default") parser.add_option("--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="Name of the exchange on which the services listen, default: %default") + parser.add_option('-R', '--rest_credentials', dest='rest_credentials', type='string', default='TMSSClient', help='TMSS django REST API credentials name, default: %default') parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() - if not (options.otdb_id or options.mom_id or options.radb_id): + if not (options.otdb_id or options.mom_id or options.radb_id or options.tmss_id): parser.print_help() exit(1) @@ -230,7 +271,7 @@ def main(): with PathResolver(exchange=options.exchange, broker=options.broker) as path_resolver: if options.path: - result = path_resolver.getPathForTask(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id) + result = path_resolver.getPathForTask(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, tmss_id=options.tmss_id) if result['found']: print("path: %s" % (result['path'])) else: @@ -238,7 +279,7 @@ def main(): exit(1) if options.project: - result = path_resolver.getProjectDirAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id) + result = path_resolver.getProjectDirAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, tmss_id=options.tmss_id) if result['found']: print("projectpath: %s" % (result['path'])) print("subdirectories: %s" % (' '.join(result['sub_directories']))) @@ -247,7 +288,7 @@ def main(): exit(1) if options.subdirs: - result = path_resolver.getSubDirectoriesForTask(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id) + result = path_resolver.getSubDirectoriesForTask(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, tmss_id=options.tmss_id) if result['found']: print("path: %s" % (result['path'])) print("subdirectories: %s" % (' '.join(result['sub_directories']))) -- GitLab