From 33673c267e11e677328beed93e8a8c47bfd792f5 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Fri, 23 Sep 2016 06:58:21 +0000 Subject: [PATCH] Task #9607: added force_update flag --- .../StorageQueryService/cache.py | 48 +++++++++---------- SAS/DataManagement/StorageQueryService/rpc.py | 45 ++++++++--------- 2 files changed, 47 insertions(+), 46 deletions(-) diff --git a/SAS/DataManagement/StorageQueryService/cache.py b/SAS/DataManagement/StorageQueryService/cache.py index be1626146f3..9e239049540 100644 --- a/SAS/DataManagement/StorageQueryService/cache.py +++ b/SAS/DataManagement/StorageQueryService/cache.py @@ -360,16 +360,16 @@ class CacheManager: # trigger update cache thread self._continueUpdateCacheThread = True - def getDiskUsageForOTDBId(self, otdb_id, include_scratch_paths=True): - return self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths) + def getDiskUsageForOTDBId(self, otdb_id, include_scratch_paths=True, force_update=False): + return self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update) - def getDiskUsageForMoMId(self, mom_id, include_scratch_paths=True): - return self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths) + def getDiskUsageForMoMId(self, mom_id, include_scratch_paths=True, force_update=False): + return self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update) - def getDiskUsageForRADBId(self, radb_id, include_scratch_paths=True): - return self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths) + def getDiskUsageForRADBId(self, radb_id, include_scratch_paths=True, force_update=False): + return self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update) - def getDiskUsageForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True): + def getDiskUsageForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True, force_update=False): logger.info("cache.getDiskUsageForTask(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id)) if otdb_id != None and not include_scratch_paths: @@ -378,39 +378,39 @@ class CacheManager: if path: logger.info('Using path from cache for otdb_id %s %s', otdb_id, path) - return self.getDiskUsageForPath(path) + return self.getDiskUsageForPath(path, force_update=force_update) path_result = self.disk_usage.path_resolver.getPathForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, include_scratch_paths=include_scratch_paths) if path_result['found']: - return self.getDiskUsageForPath(path_result['path']) + return self.getDiskUsageForPath(path_result['path'], force_update=force_update) return {'found': False, 'path': path_result['path']} - def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True): + def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True, force_update=False): logger.info("cache.getDiskUsageForTask(radb_ids=%s, mom_ids=%s, otdb_ids=%s)" % (radb_ids, mom_ids, otdb_ids)) result = {'radb_ids': {}, 'mom_ids': {}, 'otdb_ids': {}} if radb_ids: for radb_id in radb_ids: - result['radb_ids'][str(radb_id)] = self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths) + result['radb_ids'][str(radb_id)] = self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update) if mom_ids: for mom_id in mom_ids: - result['mom_ids'][str(mom_id)] = self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths) + result['mom_ids'][str(mom_id)] = self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update) if otdb_ids: for otdb_id in otdb_ids: - result['otdb_ids'][str(otdb_id)] = self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths) + result['otdb_ids'][str(otdb_id)] = self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update) return result - def getDiskUsageForPath(self, path): + def getDiskUsageForPath(self, path, force_update=False): logger.info("cache.getDiskUsageForPath(%s)", path) needs_cache_update = False with self._cacheLock: needs_cache_update |= path not in self._cache['path_du_results'] - if needs_cache_update: + if needs_cache_update or force_update: logger.info("cache update needed for %s", path) result = du_getDiskUsageForPath(path) self._updateCache(result) @@ -426,9 +426,9 @@ class CacheManager: logger.info('cache.getDiskUsageForPath result: %s' % result) return result - def getDiskUsageForTaskAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None): + def getDiskUsageForTaskAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, force_update=False): logger.info("cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id)) - task_du_result = self.getDiskUsageForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id) + task_du_result = self.getDiskUsageForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, force_update=force_update) if task_du_result['found']: task_sd_result = self.disk_usage.path_resolver.getSubDirectories(task_du_result['path']) @@ -436,7 +436,7 @@ class CacheManager: subdir_paths = [os.path.join(task_du_result['path'],sd) for sd in task_sd_result['sub_directories']] #TODO: potential for parallelization - subdirs_du_result = { sd: self.getDiskUsageForPath(sd) for sd in subdir_paths } + subdirs_du_result = { sd: self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths } result = {'found':True, 'task_directory': task_du_result, 'sub_directories': subdirs_du_result } logger.info("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, result) @@ -445,25 +445,25 @@ class CacheManager: logger.warn("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, task_du_result) return task_du_result - def getDiskUsageForProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None): + def getDiskUsageForProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None, force_update=False): logger.info("cache.getDiskUsageForProjectDirAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id)) path_result = self.disk_usage.path_resolver.getProjectDirAndSubDirectories(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, project_name=project_name) if path_result['found']: - projectdir_du_result = self.getDiskUsageForPath(path_result['path']) + projectdir_du_result = self.getDiskUsageForPath(path_result['path'], force_update=force_update) subdir_paths = [os.path.join(path_result['path'],sd) for sd in path_result['sub_directories']] #TODO: potential for parallelization - subdirs_du_result = { sd: self.getDiskUsageForPath(sd) for sd in subdir_paths } + subdirs_du_result = { sd: self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths } result = {'found':True, 'projectdir': projectdir_du_result, 'sub_directories': subdirs_du_result } logger.info('cache.getDiskUsageForProjectDirAndSubDirectories result: %s' % result) return result return path_result - def getDiskUsageForProjectsDirAndSubDirectories(self): + def getDiskUsageForProjectsDirAndSubDirectories(self, force_update=False): logger.info("cache.getDiskUsageForProjectsDirAndSubDirectories") projects_path = self.disk_usage.path_resolver.projects_path - projectsdir_du_result = self.getDiskUsageForPath(projects_path) + projectsdir_du_result = self.getDiskUsageForPath(projects_path, force_update=force_update) result = {'found':True, 'projectdir': projectsdir_du_result } @@ -472,7 +472,7 @@ class CacheManager: subdir_paths = [os.path.join(projects_path,sd) for sd in project_subdirs_result['sub_directories']] #TODO: potential for parallelization - subdirs_du_result = { sd: self.getDiskUsageForPath(sd) for sd in subdir_paths } + subdirs_du_result = { sd: self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths } result['sub_directories'] = subdirs_du_result logger.info('cache.getDiskUsageForProjectsDirAndSubDirectories result: %s' % result) diff --git a/SAS/DataManagement/StorageQueryService/rpc.py b/SAS/DataManagement/StorageQueryService/rpc.py index c8f2fc4d3bb..600b3d21d91 100644 --- a/SAS/DataManagement/StorageQueryService/rpc.py +++ b/SAS/DataManagement/StorageQueryService/rpc.py @@ -27,29 +27,29 @@ class StorageQueryRPC(RPCWrapper): def getPathForOTDBId(self, otdb_id): return self.rpc('GetPathForOTDBId', otdb_id=otdb_id) - def getDiskUsageForOTDBId(self, otdb_id, include_scratch_paths=True): - return self._convertTimestamps(self.rpc('GetDiskUsageForOTDBId', otdb_id=otdb_id, include_scratch_paths=include_scratch_paths)) + def getDiskUsageForOTDBId(self, otdb_id, include_scratch_paths=True, force_update=False): + return self._convertTimestamps(self.rpc('GetDiskUsageForOTDBId', otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)) - def getDiskUsageForMoMId(self, mom_id, include_scratch_paths=True): - return self._convertTimestamps(self.rpc('GetDiskUsageForMoMId', mom_id=mom_id, include_scratch_paths=include_scratch_paths)) + def getDiskUsageForMoMId(self, mom_id, include_scratch_paths=True, force_update=False): + return self._convertTimestamps(self.rpc('GetDiskUsageForMoMId', mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update)) - def getDiskUsageForRADBId(self, radb_id, include_scratch_paths=True): - return self._convertTimestamps(self.rpc('GetDiskUsageForRADBId', radb_id=radb_id, include_scratch_paths=include_scratch_paths)) + def getDiskUsageForRADBId(self, radb_id, include_scratch_paths=True, force_update=False): + return self._convertTimestamps(self.rpc('GetDiskUsageForRADBId', radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)) - def getDiskUsageForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True): - return self._convertTimestamps(self.rpc('GetDiskUsageForTask', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, include_scratch_paths=include_scratch_paths)) + def getDiskUsageForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True, force_update=False): + return self._convertTimestamps(self.rpc('GetDiskUsageForTask', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)) - def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True): - return self._convertTimestamps(self.rpc('GetDiskUsageForTasks', radb_ids=radb_ids, mom_ids=mom_ids, otdb_ids=otdb_ids, include_scratch_paths=include_scratch_paths)) + def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True, force_update=False): + return self._convertTimestamps(self.rpc('GetDiskUsageForTasks', radb_ids=radb_ids, mom_ids=mom_ids, otdb_ids=otdb_ids, include_scratch_paths=include_scratch_paths, force_update=force_update)) - def getDiskUsageForTaskAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None): - return self._convertTimestamps(self.rpc('GetDiskUsageForTaskAndSubDirectories', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id)) + def getDiskUsageForTaskAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, force_update=False): + return self._convertTimestamps(self.rpc('GetDiskUsageForTaskAndSubDirectories', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, force_update=force_update)) - def getDiskUsageForProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None): - return self._convertTimestamps(self.rpc('GetDiskUsageForProjectDirAndSubDirectories', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, project_name=project_name)) + def getDiskUsageForProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None, force_update=False): + return self._convertTimestamps(self.rpc('GetDiskUsageForProjectDirAndSubDirectories', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, project_name=project_name, force_update=force_update)) - def getDiskUsageForProjectsDirAndSubDirectories(self): - return self._convertTimestamps(self.rpc('GetDiskUsageForProjectsDirAndSubDirectories')) + def getDiskUsageForProjectsDirAndSubDirectories(self, force_update=False): + return self._convertTimestamps(self.rpc('GetDiskUsageForProjectsDirAndSubDirectories', force_update=force_update)) def main(): import sys @@ -64,6 +64,7 @@ def main(): parser.add_option('-s', '--subdirs', dest='subdirs', action='store_true', help='get the disk usage of the task and its sub directories for the given otdb_id/mom_id/radb_id') parser.add_option('-p', '--project', dest='project', type='string', default=None, help='get the disk usage of the project path and all its sub directories for the given project name') parser.add_option('-P', '--projects', dest='projects', action='store_true', help='get the disk usage of the projects path and all its projects sub directories') + parser.add_option('-f', '--force_update', dest='force_update', action='store_true', help='force an update of the cache with a new du call') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') parser.add_option('--busname', dest='busname', type='string', default=DEFAULT_BUSNAME, help='Name of the bus exchange on the qpid broker, default: %s' % DEFAULT_BUSNAME) parser.add_option('--servicename', dest='servicename', type='string', default=DEFAULT_SERVICENAME, help='Name for this service, default: %s' % DEFAULT_SERVICENAME) @@ -79,7 +80,7 @@ def main(): with StorageQueryRPC(busname=options.busname, servicename=options.servicename, broker=options.broker) as rpc: if options.projects: - result = rpc.getDiskUsageForProjectsDirAndSubDirectories() + result = rpc.getDiskUsageForProjectsDirAndSubDirectories(force_update=options.force_update) if result['found']: import pprint pprint.pprint(result) @@ -87,7 +88,7 @@ def main(): print result['message'] exit(1) elif options.project: - result = rpc.getDiskUsageForProjectDirAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, project_name=options.project) + result = rpc.getDiskUsageForProjectDirAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, project_name=options.project, force_update=options.force_update) if result['found']: import pprint pprint.pprint(result) @@ -95,7 +96,7 @@ def main(): print result['message'] exit(1) elif options.subdirs: - result = rpc.getDiskUsageForTaskAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id) + result = rpc.getDiskUsageForTaskAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, force_update=options.force_update) if result['found']: import pprint pprint.pprint(result) @@ -103,11 +104,11 @@ def main(): print result['message'] exit(1) else: - result = rpc.getDiskUsageForTask(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id) + result = rpc.getDiskUsageForTask(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, force_update=options.force_update) if result['found']: print 'path %s' % result['path'] - print 'disk_usage %s %s' % (result['disk_usage'], result.get('disk_usage_readable')) - print 'nr_of_files %s' % result['nr_of_files'] + print 'disk_usage %s %s' % (result.get('disk_usage'), result.get('disk_usage_readable')) + print 'nr_of_files %s' % result.get('nr_of_files') else: print result['message'] exit(1) -- GitLab