Skip to content
Snippets Groups Projects
Commit 33673c26 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #9607: added force_update flag

parent 227a0070
No related branches found
No related tags found
No related merge requests found
...@@ -360,16 +360,16 @@ class CacheManager: ...@@ -360,16 +360,16 @@ class CacheManager:
# trigger update cache thread # trigger update cache thread
self._continueUpdateCacheThread = True self._continueUpdateCacheThread = True
def getDiskUsageForOTDBId(self, otdb_id, include_scratch_paths=True): def getDiskUsageForOTDBId(self, otdb_id, include_scratch_paths=True, force_update=False):
return self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths) return self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
def getDiskUsageForMoMId(self, mom_id, include_scratch_paths=True): def getDiskUsageForMoMId(self, mom_id, include_scratch_paths=True, force_update=False):
return self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths) return self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
def getDiskUsageForRADBId(self, radb_id, include_scratch_paths=True): def getDiskUsageForRADBId(self, radb_id, include_scratch_paths=True, force_update=False):
return self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths) return self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
def getDiskUsageForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True): def getDiskUsageForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True, force_update=False):
logger.info("cache.getDiskUsageForTask(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id)) logger.info("cache.getDiskUsageForTask(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id))
if otdb_id != None and not include_scratch_paths: if otdb_id != None and not include_scratch_paths:
...@@ -378,39 +378,39 @@ class CacheManager: ...@@ -378,39 +378,39 @@ class CacheManager:
if path: if path:
logger.info('Using path from cache for otdb_id %s %s', otdb_id, path) logger.info('Using path from cache for otdb_id %s %s', otdb_id, path)
return self.getDiskUsageForPath(path) return self.getDiskUsageForPath(path, force_update=force_update)
path_result = self.disk_usage.path_resolver.getPathForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, include_scratch_paths=include_scratch_paths) path_result = self.disk_usage.path_resolver.getPathForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, include_scratch_paths=include_scratch_paths)
if path_result['found']: if path_result['found']:
return self.getDiskUsageForPath(path_result['path']) return self.getDiskUsageForPath(path_result['path'], force_update=force_update)
return {'found': False, 'path': path_result['path']} return {'found': False, 'path': path_result['path']}
def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True): def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True, force_update=False):
logger.info("cache.getDiskUsageForTask(radb_ids=%s, mom_ids=%s, otdb_ids=%s)" % (radb_ids, mom_ids, otdb_ids)) logger.info("cache.getDiskUsageForTask(radb_ids=%s, mom_ids=%s, otdb_ids=%s)" % (radb_ids, mom_ids, otdb_ids))
result = {'radb_ids': {}, 'mom_ids': {}, 'otdb_ids': {}} result = {'radb_ids': {}, 'mom_ids': {}, 'otdb_ids': {}}
if radb_ids: if radb_ids:
for radb_id in radb_ids: for radb_id in radb_ids:
result['radb_ids'][str(radb_id)] = self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths) result['radb_ids'][str(radb_id)] = self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
if mom_ids: if mom_ids:
for mom_id in mom_ids: for mom_id in mom_ids:
result['mom_ids'][str(mom_id)] = self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths) result['mom_ids'][str(mom_id)] = self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
if otdb_ids: if otdb_ids:
for otdb_id in otdb_ids: for otdb_id in otdb_ids:
result['otdb_ids'][str(otdb_id)] = self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths) result['otdb_ids'][str(otdb_id)] = self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
return result return result
def getDiskUsageForPath(self, path): def getDiskUsageForPath(self, path, force_update=False):
logger.info("cache.getDiskUsageForPath(%s)", path) logger.info("cache.getDiskUsageForPath(%s)", path)
needs_cache_update = False needs_cache_update = False
with self._cacheLock: with self._cacheLock:
needs_cache_update |= path not in self._cache['path_du_results'] needs_cache_update |= path not in self._cache['path_du_results']
if needs_cache_update: if needs_cache_update or force_update:
logger.info("cache update needed for %s", path) logger.info("cache update needed for %s", path)
result = du_getDiskUsageForPath(path) result = du_getDiskUsageForPath(path)
self._updateCache(result) self._updateCache(result)
...@@ -426,9 +426,9 @@ class CacheManager: ...@@ -426,9 +426,9 @@ class CacheManager:
logger.info('cache.getDiskUsageForPath result: %s' % result) logger.info('cache.getDiskUsageForPath result: %s' % result)
return result return result
def getDiskUsageForTaskAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None): def getDiskUsageForTaskAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, force_update=False):
logger.info("cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id)) logger.info("cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id))
task_du_result = self.getDiskUsageForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id) task_du_result = self.getDiskUsageForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, force_update=force_update)
if task_du_result['found']: if task_du_result['found']:
task_sd_result = self.disk_usage.path_resolver.getSubDirectories(task_du_result['path']) task_sd_result = self.disk_usage.path_resolver.getSubDirectories(task_du_result['path'])
...@@ -436,7 +436,7 @@ class CacheManager: ...@@ -436,7 +436,7 @@ class CacheManager:
subdir_paths = [os.path.join(task_du_result['path'],sd) for sd in task_sd_result['sub_directories']] subdir_paths = [os.path.join(task_du_result['path'],sd) for sd in task_sd_result['sub_directories']]
#TODO: potential for parallelization #TODO: potential for parallelization
subdirs_du_result = { sd: self.getDiskUsageForPath(sd) for sd in subdir_paths } subdirs_du_result = { sd: self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths }
result = {'found':True, 'task_directory': task_du_result, 'sub_directories': subdirs_du_result } result = {'found':True, 'task_directory': task_du_result, 'sub_directories': subdirs_du_result }
logger.info("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, result) logger.info("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, result)
...@@ -445,25 +445,25 @@ class CacheManager: ...@@ -445,25 +445,25 @@ class CacheManager:
logger.warn("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, task_du_result) logger.warn("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, task_du_result)
return task_du_result return task_du_result
def getDiskUsageForProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None): def getDiskUsageForProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None, force_update=False):
logger.info("cache.getDiskUsageForProjectDirAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id)) logger.info("cache.getDiskUsageForProjectDirAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id))
path_result = self.disk_usage.path_resolver.getProjectDirAndSubDirectories(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, project_name=project_name) path_result = self.disk_usage.path_resolver.getProjectDirAndSubDirectories(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, project_name=project_name)
if path_result['found']: if path_result['found']:
projectdir_du_result = self.getDiskUsageForPath(path_result['path']) projectdir_du_result = self.getDiskUsageForPath(path_result['path'], force_update=force_update)
subdir_paths = [os.path.join(path_result['path'],sd) for sd in path_result['sub_directories']] subdir_paths = [os.path.join(path_result['path'],sd) for sd in path_result['sub_directories']]
#TODO: potential for parallelization #TODO: potential for parallelization
subdirs_du_result = { sd: self.getDiskUsageForPath(sd) for sd in subdir_paths } subdirs_du_result = { sd: self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths }
result = {'found':True, 'projectdir': projectdir_du_result, 'sub_directories': subdirs_du_result } result = {'found':True, 'projectdir': projectdir_du_result, 'sub_directories': subdirs_du_result }
logger.info('cache.getDiskUsageForProjectDirAndSubDirectories result: %s' % result) logger.info('cache.getDiskUsageForProjectDirAndSubDirectories result: %s' % result)
return result return result
return path_result return path_result
def getDiskUsageForProjectsDirAndSubDirectories(self): def getDiskUsageForProjectsDirAndSubDirectories(self, force_update=False):
logger.info("cache.getDiskUsageForProjectsDirAndSubDirectories") logger.info("cache.getDiskUsageForProjectsDirAndSubDirectories")
projects_path = self.disk_usage.path_resolver.projects_path projects_path = self.disk_usage.path_resolver.projects_path
projectsdir_du_result = self.getDiskUsageForPath(projects_path) projectsdir_du_result = self.getDiskUsageForPath(projects_path, force_update=force_update)
result = {'found':True, 'projectdir': projectsdir_du_result } result = {'found':True, 'projectdir': projectsdir_du_result }
...@@ -472,7 +472,7 @@ class CacheManager: ...@@ -472,7 +472,7 @@ class CacheManager:
subdir_paths = [os.path.join(projects_path,sd) for sd in project_subdirs_result['sub_directories']] subdir_paths = [os.path.join(projects_path,sd) for sd in project_subdirs_result['sub_directories']]
#TODO: potential for parallelization #TODO: potential for parallelization
subdirs_du_result = { sd: self.getDiskUsageForPath(sd) for sd in subdir_paths } subdirs_du_result = { sd: self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths }
result['sub_directories'] = subdirs_du_result result['sub_directories'] = subdirs_du_result
logger.info('cache.getDiskUsageForProjectsDirAndSubDirectories result: %s' % result) logger.info('cache.getDiskUsageForProjectsDirAndSubDirectories result: %s' % result)
......
...@@ -27,29 +27,29 @@ class StorageQueryRPC(RPCWrapper): ...@@ -27,29 +27,29 @@ class StorageQueryRPC(RPCWrapper):
def getPathForOTDBId(self, otdb_id): def getPathForOTDBId(self, otdb_id):
return self.rpc('GetPathForOTDBId', otdb_id=otdb_id) return self.rpc('GetPathForOTDBId', otdb_id=otdb_id)
def getDiskUsageForOTDBId(self, otdb_id, include_scratch_paths=True): def getDiskUsageForOTDBId(self, otdb_id, include_scratch_paths=True, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForOTDBId', otdb_id=otdb_id, include_scratch_paths=include_scratch_paths)) return self._convertTimestamps(self.rpc('GetDiskUsageForOTDBId', otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update))
def getDiskUsageForMoMId(self, mom_id, include_scratch_paths=True): def getDiskUsageForMoMId(self, mom_id, include_scratch_paths=True, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForMoMId', mom_id=mom_id, include_scratch_paths=include_scratch_paths)) return self._convertTimestamps(self.rpc('GetDiskUsageForMoMId', mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update))
def getDiskUsageForRADBId(self, radb_id, include_scratch_paths=True): def getDiskUsageForRADBId(self, radb_id, include_scratch_paths=True, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForRADBId', radb_id=radb_id, include_scratch_paths=include_scratch_paths)) return self._convertTimestamps(self.rpc('GetDiskUsageForRADBId', radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update))
def getDiskUsageForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True): def getDiskUsageForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForTask', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, include_scratch_paths=include_scratch_paths)) return self._convertTimestamps(self.rpc('GetDiskUsageForTask', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update))
def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True): def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForTasks', radb_ids=radb_ids, mom_ids=mom_ids, otdb_ids=otdb_ids, include_scratch_paths=include_scratch_paths)) return self._convertTimestamps(self.rpc('GetDiskUsageForTasks', radb_ids=radb_ids, mom_ids=mom_ids, otdb_ids=otdb_ids, include_scratch_paths=include_scratch_paths, force_update=force_update))
def getDiskUsageForTaskAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None): def getDiskUsageForTaskAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForTaskAndSubDirectories', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id)) return self._convertTimestamps(self.rpc('GetDiskUsageForTaskAndSubDirectories', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, force_update=force_update))
def getDiskUsageForProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None): def getDiskUsageForProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForProjectDirAndSubDirectories', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, project_name=project_name)) return self._convertTimestamps(self.rpc('GetDiskUsageForProjectDirAndSubDirectories', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, project_name=project_name, force_update=force_update))
def getDiskUsageForProjectsDirAndSubDirectories(self): def getDiskUsageForProjectsDirAndSubDirectories(self, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForProjectsDirAndSubDirectories')) return self._convertTimestamps(self.rpc('GetDiskUsageForProjectsDirAndSubDirectories', force_update=force_update))
def main(): def main():
import sys import sys
...@@ -64,6 +64,7 @@ def main(): ...@@ -64,6 +64,7 @@ def main():
parser.add_option('-s', '--subdirs', dest='subdirs', action='store_true', help='get the disk usage of the task and its sub directories for the given otdb_id/mom_id/radb_id') parser.add_option('-s', '--subdirs', dest='subdirs', action='store_true', help='get the disk usage of the task and its sub directories for the given otdb_id/mom_id/radb_id')
parser.add_option('-p', '--project', dest='project', type='string', default=None, help='get the disk usage of the project path and all its sub directories for the given project name') parser.add_option('-p', '--project', dest='project', type='string', default=None, help='get the disk usage of the project path and all its sub directories for the given project name')
parser.add_option('-P', '--projects', dest='projects', action='store_true', help='get the disk usage of the projects path and all its projects sub directories') parser.add_option('-P', '--projects', dest='projects', action='store_true', help='get the disk usage of the projects path and all its projects sub directories')
parser.add_option('-f', '--force_update', dest='force_update', action='store_true', help='force an update of the cache with a new du call')
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option('--busname', dest='busname', type='string', default=DEFAULT_BUSNAME, help='Name of the bus exchange on the qpid broker, default: %s' % DEFAULT_BUSNAME) parser.add_option('--busname', dest='busname', type='string', default=DEFAULT_BUSNAME, help='Name of the bus exchange on the qpid broker, default: %s' % DEFAULT_BUSNAME)
parser.add_option('--servicename', dest='servicename', type='string', default=DEFAULT_SERVICENAME, help='Name for this service, default: %s' % DEFAULT_SERVICENAME) parser.add_option('--servicename', dest='servicename', type='string', default=DEFAULT_SERVICENAME, help='Name for this service, default: %s' % DEFAULT_SERVICENAME)
...@@ -79,7 +80,7 @@ def main(): ...@@ -79,7 +80,7 @@ def main():
with StorageQueryRPC(busname=options.busname, servicename=options.servicename, broker=options.broker) as rpc: with StorageQueryRPC(busname=options.busname, servicename=options.servicename, broker=options.broker) as rpc:
if options.projects: if options.projects:
result = rpc.getDiskUsageForProjectsDirAndSubDirectories() result = rpc.getDiskUsageForProjectsDirAndSubDirectories(force_update=options.force_update)
if result['found']: if result['found']:
import pprint import pprint
pprint.pprint(result) pprint.pprint(result)
...@@ -87,7 +88,7 @@ def main(): ...@@ -87,7 +88,7 @@ def main():
print result['message'] print result['message']
exit(1) exit(1)
elif options.project: elif options.project:
result = rpc.getDiskUsageForProjectDirAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, project_name=options.project) result = rpc.getDiskUsageForProjectDirAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, project_name=options.project, force_update=options.force_update)
if result['found']: if result['found']:
import pprint import pprint
pprint.pprint(result) pprint.pprint(result)
...@@ -95,7 +96,7 @@ def main(): ...@@ -95,7 +96,7 @@ def main():
print result['message'] print result['message']
exit(1) exit(1)
elif options.subdirs: elif options.subdirs:
result = rpc.getDiskUsageForTaskAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id) result = rpc.getDiskUsageForTaskAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, force_update=options.force_update)
if result['found']: if result['found']:
import pprint import pprint
pprint.pprint(result) pprint.pprint(result)
...@@ -103,11 +104,11 @@ def main(): ...@@ -103,11 +104,11 @@ def main():
print result['message'] print result['message']
exit(1) exit(1)
else: else:
result = rpc.getDiskUsageForTask(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id) result = rpc.getDiskUsageForTask(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, force_update=options.force_update)
if result['found']: if result['found']:
print 'path %s' % result['path'] print 'path %s' % result['path']
print 'disk_usage %s %s' % (result['disk_usage'], result.get('disk_usage_readable')) print 'disk_usage %s %s' % (result.get('disk_usage'), result.get('disk_usage_readable'))
print 'nr_of_files %s' % result['nr_of_files'] print 'nr_of_files %s' % result.get('nr_of_files')
else: else:
print result['message'] print result['message']
exit(1) exit(1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment