diff --git a/SAS/DataManagement/DataManagementCommon/path.py b/SAS/DataManagement/DataManagementCommon/path.py index fa801d79c7fc0dcf3406360836de40eaeff05bdf..8c721c964b8ca50e7d864d5351d8024978629241 100644 --- a/SAS/DataManagement/DataManagementCommon/path.py +++ b/SAS/DataManagement/DataManagementCommon/path.py @@ -167,8 +167,8 @@ class PathResolver: # get the subdirectories of the given path cmd = ['lfs', 'find', '--type', 'd', '--maxdepth', '1', path.rstrip('/')] hostname = socket.gethostname() - if not 'mgmt0' in hostname: - cmd = ['ssh', 'lofarsys@mgmt01.cep4.control.lofar'] + cmd + if not 'head' in hostname: + cmd = ['ssh', 'lofarsys@head.cep4.control.lofar'] + cmd logger.debug(' '.join(cmd)) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = proc.communicate() @@ -189,8 +189,8 @@ class PathResolver: def pathExists(self, path): cmd = ['lfs', 'ls', path] hostname = socket.gethostname() - if not 'mgmt0' in hostname: - cmd = ['ssh', 'lofarsys@mgmt01.cep4.control.lofar'] + cmd + if not 'head' in hostname: + cmd = ['ssh', 'lofarsys@head.cep4.control.lofar'] + cmd logger.debug(' '.join(cmd)) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = proc.communicate() diff --git a/SAS/DataManagement/StorageQueryService/cache.py b/SAS/DataManagement/StorageQueryService/cache.py index 45b61063f80c212cfe989cfdcae17201b022bc45..a8164af644a9902ba326482a13683950f38148dc 100644 --- a/SAS/DataManagement/StorageQueryService/cache.py +++ b/SAS/DataManagement/StorageQueryService/cache.py @@ -29,7 +29,7 @@ from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_M logger = logging.getLogger(__name__) -MAX_CACHE_ENTRY_AGE = datetime.timedelta(hours=8) +MAX_CACHE_ENTRY_AGE = datetime.timedelta(hours=3*24) class CacheManager: def __init__(self, @@ -50,7 +50,7 @@ class CacheManager: self.otdb_listener = OTDBBusListener(busname=otdb_notification_busname, subject=otdb_notification_subject, broker=broker, - numthreads=2) + numthreads=1) self.otdb_listener.onObservationAborted = self.onObservationAborted self.otdb_listener.onObservationFinished = self.onObservationFinished @@ -58,7 +58,7 @@ class CacheManager: self.dm_listener = DataManagementBusListener(busname=dm_notification_busname, subjects=dm_notification_prefix + '*', broker=broker, - numthreads=2) + numthreads=1) self.dm_listener.onTaskDeleted = self.onTaskDeleted @@ -99,8 +99,9 @@ class CacheManager: try: if os.path.exists(self._cache_path): with open(self._cache_path, 'r') as file: + cache_from_disk = eval(file.read().strip()) #slow! with self._cacheLock: - self._cache = eval(file.read().strip()) + self._cache = cache_from_disk if not isinstance(self._cache, dict): self._cache = {'path_du_results': {}, 'otdb_id2path': {} } if 'path_du_results' not in self._cache: @@ -116,7 +117,7 @@ class CacheManager: def _writeCacheToDisk(self): try: # only persist (a subset of) the cache to disk every once in a while. - if datetime.datetime.utcnow() - self._last_cache_write_timestamp > datetime.timedelta(minutes=0.2): + if datetime.datetime.utcnow() - self._last_cache_write_timestamp > datetime.timedelta(minutes=5): tmp_path = '/tmp/tmp_storagequery_cache.py' cache_str = '' with self._cacheLock: @@ -170,7 +171,7 @@ class CacheManager: if otdb_id != None: otdb_id2path_cache[otdb_id] = path - self._writeCacheToDisk() + self._writeCacheToDisk() self._sendDiskUsageChangedNotification(path, du_result['disk_usage'], otdb_id) @@ -199,28 +200,34 @@ class CacheManager: try: def addSubDirectoriesToCache(directory): depth = self.getDepthToProjectsDir(directory) + MAX_SCAN_DEPTH=2 #depth=0 : projects #depth=1 : projects/<project> #depth=2 : projects/<project>/<obs> #depth=3 : projects/<project>/<obs>/<sub_dir> - if depth > 3: + if depth > MAX_SCAN_DEPTH: return + add_empty_du_result_to_cache = False with self._cacheLock: path_cache = self._cache['path_du_results'] - if not directory in path_cache: - logger.info('tree scan: adding \'%s\' with empty disk_usage to cache which will be du\'ed later', directory) - empty_du_result = {'found': True, 'disk_usage': None, 'path': directory, 'name': directory.split('/')[-1]} - self._updateCache(empty_du_result) + add_empty_du_result_to_cache = not directory in path_cache - if directory in path_cache: - # mark cache entry for directory to be updated - path_cache[directory]['needs_update'] = True + if add_empty_du_result_to_cache: + logger.info('tree scan: adding \'%s\' with empty disk_usage to cache which will be du\'ed later', directory) + empty_du_result = {'found': True, 'disk_usage': None, 'path': directory, 'name': directory.split('/')[-1]} + self._updateCache(empty_du_result) + + with self._cacheLock: + path_cache = self._cache['path_du_results'] + if directory in path_cache: + # mark cache entry for directory to be updated + path_cache[directory]['needs_update'] = True if not self._cacheThreadsRunning: return - if depth < 3: + if depth < MAX_SCAN_DEPTH: logger.info('tree scan: scanning \'%s\'', directory) sd_result = self.disk_usage.path_resolver.getSubDirectories(directory) @@ -287,7 +294,7 @@ class CacheManager: path) #do a quick update of the entry sy applying the sum of the subdirs to the path's du result... - #this make a best guess immedeiately available... + #this make a best guess immediately available... self._updatePathCacheEntryToSubDirTotal(path, False) #...and in the mean time, du a full update from disk, which might be (really) slow. @@ -319,21 +326,22 @@ class CacheManager: logger.error(str(e)) def _updatePathCacheEntryToSubDirTotal(self, path, force_update=False): - sd_result = self.disk_usage.path_resolver.getSubDirectories(path) + with self._cacheLock: + path_cache_result = self._cache['path_du_results'].get(path) - if sd_result['found']: - subdir_paths = [os.path.join(path, sd) for sd in sd_result['sub_directories']] + if path_cache_result: + sd_result = self.disk_usage.path_resolver.getSubDirectories(path) - subdir_du_results = [self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths] - sum_du = sum([du['disk_usage'] for du in subdir_du_results]) + if sd_result['found']: + subdir_paths = [os.path.join(path, sd) for sd in sd_result['sub_directories']] - with self._cacheLock: - if path in self._cache['path_du_results']: - path_result = self._cache['path_du_results'][path] - path_result['disk_usage'] = sum_du - path_result['disk_usage_readable'] = humanreadablesize(sum_du) - path_result['needs_update'] = True - self._updateCache(path_result) + subdir_du_results = [self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths] + sum_du = sum([du['disk_usage'] for du in subdir_du_results]) + + path_cache_result['disk_usage'] = sum_du + path_cache_result['disk_usage_readable'] = humanreadablesize(sum_du) + path_cache_result['needs_update'] = True + self._updateCache(path_cache_result) def _updateCEP4CapacitiesInRADB(self): try: @@ -464,10 +472,8 @@ class CacheManager: scratch_path_du_result = self.getDiskUsageForPath(scratch_path, force_update=force_update) path_du_result['scratch_paths'][scratch_path] = scratch_path_du_result - self._updateCache(path_du_result) return path_du_result - self._updateCache(path_result) return {'found': False, 'path': path_result['path']} def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True, force_update=False): @@ -491,8 +497,9 @@ class CacheManager: def getDiskUsageForPath(self, path, force_update=False): logger.info("cache.getDiskUsageForPath('%s', force_update=%s)", path, force_update) needs_cache_update = False - with self._cacheLock: - needs_cache_update |= path not in self._cache['path_du_results'] + if not force_update: + with self._cacheLock: + needs_cache_update |= path not in self._cache['path_du_results'] if needs_cache_update or force_update: logger.info("cache update needed for %s", path) diff --git a/SAS/DataManagement/StorageQueryService/rpc.py b/SAS/DataManagement/StorageQueryService/rpc.py index 7192032deae37ba58db65d1d46a9bbf7f079d97e..e79c158746b2c2f64bea9d258c14e2c89249784e 100644 --- a/SAS/DataManagement/StorageQueryService/rpc.py +++ b/SAS/DataManagement/StorageQueryService/rpc.py @@ -12,8 +12,9 @@ logger = logging.getLogger(__name__) class StorageQueryRPC(RPCWrapper): def __init__(self, busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, + timeout=18000, broker=None): - super(StorageQueryRPC, self).__init__(busname, servicename, broker, timeout=18000) + super(StorageQueryRPC, self).__init__(busname, servicename, broker, timeout=timeout) def _convertTimestamps(self, result): if isinstance(result, dict): diff --git a/SAS/DataManagement/StorageQueryService/service.py b/SAS/DataManagement/StorageQueryService/service.py index e027fec29e2a9ac92b22c044a4824b66dad57ee3..5fc54b287d5e21f20281c404b2c968bbfe5e13b5 100644 --- a/SAS/DataManagement/StorageQueryService/service.py +++ b/SAS/DataManagement/StorageQueryService/service.py @@ -67,7 +67,7 @@ def createService(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, brok busname=busname, broker=broker, use_service_methods=True, - numthreads=8, + numthreads=1, verbose=verbose, handler_args={'mountpoint': mountpoint, 'radb_busname':RADB_BUSNAME, diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py index 461f8860c5130c9854abff54e09ac904a6dd4bf0..7be3f99d59fda6da50d64ab33142d99c35197270 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py @@ -1051,7 +1051,7 @@ def main(): global curpc curpc = CleanupRPC(busname=options.cleanup_busname, servicename=options.cleanup_servicename, broker=options.broker) global sqrpc - sqrpc = StorageQueryRPC(busname=options.storagequery_busname, servicename=options.storagequery_servicename, broker=options.broker) + sqrpc = StorageQueryRPC(busname=options.storagequery_busname, servicename=options.storagequery_servicename, timeout=10, broker=options.broker) global momqueryrpc momqueryrpc = MoMQueryRPC(busname=options.mom_query_busname, servicename=options.mom_query_servicename, timeout=10, broker=options.broker) global changeshandler