Skip to content
Snippets Groups Projects
Commit c48a43a0 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-448: various some minor speed improvements.

parent 5b5e18ce
Branches
Tags
No related merge requests found
......@@ -167,8 +167,8 @@ class PathResolver:
# get the subdirectories of the given path
cmd = ['lfs', 'find', '--type', 'd', '--maxdepth', '1', path.rstrip('/')]
hostname = socket.gethostname()
if not 'mgmt0' in hostname:
cmd = ['ssh', 'lofarsys@mgmt01.cep4.control.lofar'] + cmd
if not 'head' in hostname:
cmd = ['ssh', 'lofarsys@head.cep4.control.lofar'] + cmd
logger.debug(' '.join(cmd))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
......@@ -189,8 +189,8 @@ class PathResolver:
def pathExists(self, path):
cmd = ['lfs', 'ls', path]
hostname = socket.gethostname()
if not 'mgmt0' in hostname:
cmd = ['ssh', 'lofarsys@mgmt01.cep4.control.lofar'] + cmd
if not 'head' in hostname:
cmd = ['ssh', 'lofarsys@head.cep4.control.lofar'] + cmd
logger.debug(' '.join(cmd))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
......
......@@ -29,7 +29,7 @@ from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_M
logger = logging.getLogger(__name__)
MAX_CACHE_ENTRY_AGE = datetime.timedelta(hours=8)
MAX_CACHE_ENTRY_AGE = datetime.timedelta(hours=3*24)
class CacheManager:
def __init__(self,
......@@ -50,7 +50,7 @@ class CacheManager:
self.otdb_listener = OTDBBusListener(busname=otdb_notification_busname,
subject=otdb_notification_subject,
broker=broker,
numthreads=2)
numthreads=1)
self.otdb_listener.onObservationAborted = self.onObservationAborted
self.otdb_listener.onObservationFinished = self.onObservationFinished
......@@ -58,7 +58,7 @@ class CacheManager:
self.dm_listener = DataManagementBusListener(busname=dm_notification_busname,
subjects=dm_notification_prefix + '*',
broker=broker,
numthreads=2)
numthreads=1)
self.dm_listener.onTaskDeleted = self.onTaskDeleted
......@@ -99,8 +99,9 @@ class CacheManager:
try:
if os.path.exists(self._cache_path):
with open(self._cache_path, 'r') as file:
cache_from_disk = eval(file.read().strip()) #slow!
with self._cacheLock:
self._cache = eval(file.read().strip())
self._cache = cache_from_disk
if not isinstance(self._cache, dict):
self._cache = {'path_du_results': {}, 'otdb_id2path': {} }
if 'path_du_results' not in self._cache:
......@@ -116,7 +117,7 @@ class CacheManager:
def _writeCacheToDisk(self):
try:
# only persist (a subset of) the cache to disk every once in a while.
if datetime.datetime.utcnow() - self._last_cache_write_timestamp > datetime.timedelta(minutes=0.2):
if datetime.datetime.utcnow() - self._last_cache_write_timestamp > datetime.timedelta(minutes=5):
tmp_path = '/tmp/tmp_storagequery_cache.py'
cache_str = ''
with self._cacheLock:
......@@ -170,7 +171,7 @@ class CacheManager:
if otdb_id != None:
otdb_id2path_cache[otdb_id] = path
self._writeCacheToDisk()
self._writeCacheToDisk()
self._sendDiskUsageChangedNotification(path, du_result['disk_usage'], otdb_id)
......@@ -199,28 +200,34 @@ class CacheManager:
try:
def addSubDirectoriesToCache(directory):
depth = self.getDepthToProjectsDir(directory)
MAX_SCAN_DEPTH=2
#depth=0 : projects
#depth=1 : projects/<project>
#depth=2 : projects/<project>/<obs>
#depth=3 : projects/<project>/<obs>/<sub_dir>
if depth > 3:
if depth > MAX_SCAN_DEPTH:
return
add_empty_du_result_to_cache = False
with self._cacheLock:
path_cache = self._cache['path_du_results']
if not directory in path_cache:
logger.info('tree scan: adding \'%s\' with empty disk_usage to cache which will be du\'ed later', directory)
empty_du_result = {'found': True, 'disk_usage': None, 'path': directory, 'name': directory.split('/')[-1]}
self._updateCache(empty_du_result)
add_empty_du_result_to_cache = not directory in path_cache
if directory in path_cache:
# mark cache entry for directory to be updated
path_cache[directory]['needs_update'] = True
if add_empty_du_result_to_cache:
logger.info('tree scan: adding \'%s\' with empty disk_usage to cache which will be du\'ed later', directory)
empty_du_result = {'found': True, 'disk_usage': None, 'path': directory, 'name': directory.split('/')[-1]}
self._updateCache(empty_du_result)
with self._cacheLock:
path_cache = self._cache['path_du_results']
if directory in path_cache:
# mark cache entry for directory to be updated
path_cache[directory]['needs_update'] = True
if not self._cacheThreadsRunning:
return
if depth < 3:
if depth < MAX_SCAN_DEPTH:
logger.info('tree scan: scanning \'%s\'', directory)
sd_result = self.disk_usage.path_resolver.getSubDirectories(directory)
......@@ -287,7 +294,7 @@ class CacheManager:
path)
#do a quick update of the entry sy applying the sum of the subdirs to the path's du result...
#this make a best guess immedeiately available...
#this make a best guess immediately available...
self._updatePathCacheEntryToSubDirTotal(path, False)
#...and in the mean time, du a full update from disk, which might be (really) slow.
......@@ -319,21 +326,22 @@ class CacheManager:
logger.error(str(e))
def _updatePathCacheEntryToSubDirTotal(self, path, force_update=False):
sd_result = self.disk_usage.path_resolver.getSubDirectories(path)
with self._cacheLock:
path_cache_result = self._cache['path_du_results'].get(path)
if sd_result['found']:
subdir_paths = [os.path.join(path, sd) for sd in sd_result['sub_directories']]
if path_cache_result:
sd_result = self.disk_usage.path_resolver.getSubDirectories(path)
subdir_du_results = [self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths]
sum_du = sum([du['disk_usage'] for du in subdir_du_results])
if sd_result['found']:
subdir_paths = [os.path.join(path, sd) for sd in sd_result['sub_directories']]
with self._cacheLock:
if path in self._cache['path_du_results']:
path_result = self._cache['path_du_results'][path]
path_result['disk_usage'] = sum_du
path_result['disk_usage_readable'] = humanreadablesize(sum_du)
path_result['needs_update'] = True
self._updateCache(path_result)
subdir_du_results = [self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths]
sum_du = sum([du['disk_usage'] for du in subdir_du_results])
path_cache_result['disk_usage'] = sum_du
path_cache_result['disk_usage_readable'] = humanreadablesize(sum_du)
path_cache_result['needs_update'] = True
self._updateCache(path_cache_result)
def _updateCEP4CapacitiesInRADB(self):
try:
......@@ -464,10 +472,8 @@ class CacheManager:
scratch_path_du_result = self.getDiskUsageForPath(scratch_path, force_update=force_update)
path_du_result['scratch_paths'][scratch_path] = scratch_path_du_result
self._updateCache(path_du_result)
return path_du_result
self._updateCache(path_result)
return {'found': False, 'path': path_result['path']}
def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True, force_update=False):
......@@ -491,8 +497,9 @@ class CacheManager:
def getDiskUsageForPath(self, path, force_update=False):
logger.info("cache.getDiskUsageForPath('%s', force_update=%s)", path, force_update)
needs_cache_update = False
with self._cacheLock:
needs_cache_update |= path not in self._cache['path_du_results']
if not force_update:
with self._cacheLock:
needs_cache_update |= path not in self._cache['path_du_results']
if needs_cache_update or force_update:
logger.info("cache update needed for %s", path)
......
......@@ -12,8 +12,9 @@ logger = logging.getLogger(__name__)
class StorageQueryRPC(RPCWrapper):
def __init__(self, busname=DEFAULT_BUSNAME,
servicename=DEFAULT_SERVICENAME,
timeout=18000,
broker=None):
super(StorageQueryRPC, self).__init__(busname, servicename, broker, timeout=18000)
super(StorageQueryRPC, self).__init__(busname, servicename, broker, timeout=timeout)
def _convertTimestamps(self, result):
if isinstance(result, dict):
......
......@@ -67,7 +67,7 @@ def createService(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, brok
busname=busname,
broker=broker,
use_service_methods=True,
numthreads=8,
numthreads=1,
verbose=verbose,
handler_args={'mountpoint': mountpoint,
'radb_busname':RADB_BUSNAME,
......
......@@ -1051,7 +1051,7 @@ def main():
global curpc
curpc = CleanupRPC(busname=options.cleanup_busname, servicename=options.cleanup_servicename, broker=options.broker)
global sqrpc
sqrpc = StorageQueryRPC(busname=options.storagequery_busname, servicename=options.storagequery_servicename, broker=options.broker)
sqrpc = StorageQueryRPC(busname=options.storagequery_busname, servicename=options.storagequery_servicename, timeout=10, broker=options.broker)
global momqueryrpc
momqueryrpc = MoMQueryRPC(busname=options.mom_query_busname, servicename=options.mom_query_servicename, timeout=10, broker=options.broker)
global changeshandler
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment