diff --git a/SAS/DataManagement/StorageQueryService/cache.py b/SAS/DataManagement/StorageQueryService/cache.py index 35f0105c927e2f28f7fb07977b7358833d2ec098..529fbd2cedbe38dcd32e5df7a318bf988847a2c1 100644 --- a/SAS/DataManagement/StorageQueryService/cache.py +++ b/SAS/DataManagement/StorageQueryService/cache.py @@ -142,7 +142,8 @@ class CacheManager: self._sendDiskUsageChangedNotification(path, 0, otdb_id) if otdb_id != None and otdb_id in otdb_id2path_cache: - otdb_id2path_cache[otdb_id] = None + logger.info('removing otdb_id->path mapping %s:\'%s\' from cache', otdb_id, path) + del otdb_id2path_cache[otdb_id] self._writeCacheToDisk() @@ -155,6 +156,18 @@ class CacheManager: if path in path_cache: path_cache[path]['needs_update'] = True + def getDiskUsagesForAllOtdbIds(self, force_update=False): + otdb_ids = [] + with self._cacheLock: + otdb_id2path_cache = self._cache['otdb_id2path'] + otdb_ids = otdb_id2path_cache.keys() + + result = {} + for otdb_id in otdb_ids: + result[otdb_id] = self.getDiskUsageForOTDBId(otdb_id, force_update) + + return result + def _scanProjectsTree(self): try: def addSubDirectoriesToCache(directory): @@ -377,6 +390,23 @@ class CacheManager: return self.getDiskUsageForPath(path, force_update=force_update) path_result = self.disk_usage.path_resolver.getPathForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, include_scratch_paths=include_scratch_paths) + + if otdb_id != None and not path_result['found']: + logger.info('getDiskUsageForTask otdb_id=%s path_result:%s', otdb_id, path_result) + + #update cache, entries for this otdb_id are wiped + #construct du_result dict with proper keys/values which can delete the entries in the cache + del_du_result = { 'found':False, 'otdb_id':otdb_id, 'message':'no such file or directory' } + + with self._cacheLock: + otdb_id2path_cache = self._cache['otdb_id2path'] + if otdb_id in otdb_id2path_cache: + del_du_result['path'] = otdb_id2path_cache[otdb_id] + + logger.info('del_du_result: %s', del_du_result) + + self._updateCache(del_du_result) + if path_result['found']: path_du_result = self.getDiskUsageForPath(path_result['path'], force_update=force_update) diff --git a/SAS/DataManagement/StorageQueryService/rpc.py b/SAS/DataManagement/StorageQueryService/rpc.py index 5b4889c3299ef00510c68aa6b7485b22493b1ffd..7192032deae37ba58db65d1d46a9bbf7f079d97e 100644 --- a/SAS/DataManagement/StorageQueryService/rpc.py +++ b/SAS/DataManagement/StorageQueryService/rpc.py @@ -4,7 +4,8 @@ import logging import qpid from lofar.messaging.RPC import RPC, RPCException, RPCWrapper from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME -from lofar.common.util import humanreadablesize +from lofar.common.util import humanreadablesize, convertStringDigitKeysToInt +from pprint import pprint logger = logging.getLogger(__name__) @@ -54,6 +55,9 @@ class StorageQueryRPC(RPCWrapper): def getDiskUsageForPath(self, path, force_update=False): return self.rpc('GetDiskUsageForPath', path=path, force_update=force_update) + def getDiskUsagesForAllOtdbIds(self, force_update=False): + return convertStringDigitKeysToInt(self.rpc('GetDiskUsagesForAllOtdbIds', force_update=force_update)) + def main(): import sys from optparse import OptionParser @@ -68,6 +72,7 @@ def main(): parser.add_option('-p', '--project', dest='project', type='string', default=None, help='get the disk usage of the project path and all its sub directories for the given project name') parser.add_option('-P', '--projects', dest='projects', action='store_true', help='get the disk usage of the projects path and all its projects sub directories') parser.add_option('-d', '--dir', dest='dir_path', type='string', default=None, help='get the disk usage of the given directory path') + parser.add_option('-a', '--all', dest='all', action='store_true', help='get disk usage for all otdb ids currently on disk') parser.add_option('-f', '--force_update', dest='force_update', action='store_true', help='force an update of the cache with a new du call') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') parser.add_option('--busname', dest='busname', type='string', default=DEFAULT_BUSNAME, help='Name of the bus exchange on the qpid broker, default: %s' % DEFAULT_BUSNAME) @@ -75,7 +80,7 @@ def main(): parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() - if not (options.otdb_id or options.mom_id or options.radb_id or options.project or options.projects or options.dir_path): + if not (options.otdb_id or options.mom_id or options.radb_id or options.project or options.projects or options.dir_path or options.all): parser.print_help() exit(1) @@ -86,35 +91,34 @@ def main(): if options.projects: result = rpc.getDiskUsageForProjectsDirAndSubDirectories(force_update=options.force_update) if result['found']: - import pprint - pprint.pprint(result) + pprint(result) else: print result['message'] exit(1) elif options.project: result = rpc.getDiskUsageForProjectDirAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, project_name=options.project, force_update=options.force_update) if result['found']: - import pprint - pprint.pprint(result) + pprint(result) else: print result['message'] exit(1) elif options.subdirs: result = rpc.getDiskUsageForTaskAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, force_update=options.force_update) if result['found']: - import pprint - pprint.pprint(result) + pprint(result) else: print result['message'] exit(1) elif options.dir_path: result = rpc.getDiskUsageForPath(path=options.dir_path, force_update=options.force_update) if result['found']: - import pprint - pprint.pprint(result) + pprint(result) else: print result['message'] exit(1) + elif options.all: + result = rpc.getDiskUsagesForAllOtdbIds(force_update=options.force_update) + pprint(result) else: result = rpc.getDiskUsageForTask(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, force_update=options.force_update) if result['found']: diff --git a/SAS/DataManagement/StorageQueryService/service.py b/SAS/DataManagement/StorageQueryService/service.py index bcab783c824f09a433d9e3471bedc532011697fd..2a6e3a115f1bf6a27285f1bb4281b6062dd19661 100644 --- a/SAS/DataManagement/StorageQueryService/service.py +++ b/SAS/DataManagement/StorageQueryService/service.py @@ -9,7 +9,7 @@ from optparse import OptionParser from lofar.messaging import Service from lofar.messaging.Service import MessageHandlerInterface from lofar.messaging import setQpidLogLevel -from lofar.common.util import waitForInterrupt +from lofar.common.util import waitForInterrupt, convertIntKeysToString from lofar.messaging.Service import MessageHandlerInterface from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT @@ -33,7 +33,7 @@ class StorageQueryHandler(MessageHandlerInterface): **kwargs): self.cache = kwargs.pop('cache_manager') - + super(StorageQueryHandler, self).__init__(**kwargs) self.disk_usage = DiskUsage(mountpoint=mountpoint, radb_busname=radb_busname, @@ -51,8 +51,11 @@ class StorageQueryHandler(MessageHandlerInterface): 'GetDiskUsageForTaskAndSubDirectories': self.cache.getDiskUsageForTaskAndSubDirectories, 'GetDiskUsageForProjectDirAndSubDirectories': self.cache.getDiskUsageForProjectDirAndSubDirectories, 'GetDiskUsageForProjectsDirAndSubDirectories': self.cache.getDiskUsageForProjectsDirAndSubDirectories, - 'GetDiskUsageForPath': self.cache.getDiskUsageForPath} + 'GetDiskUsageForPath': self.cache.getDiskUsageForPath, + 'GetDiskUsagesForAllOtdbIds': self.getDiskUsagesForAllOtdbIds} + def getDiskUsagesForAllOtdbIds(self, force_update=False): + return convertIntKeysToString(self.cache.getDiskUsagesForAllOtdbIds(force_update)) def createService(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, broker=None, mountpoint=CEP4_DATA_MOUNTPOINT, verbose=False, @@ -78,7 +81,7 @@ def main(): # make sure we run in UTC timezone import os os.environ['TZ'] = 'UTC' - + # Check the invocation arguments parser = OptionParser("%prog [options]", description='runs the storagequery service')