Skip to content
Snippets Groups Projects
Commit 0d3fb877 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-772: used python3's concurrent.futures to parallelize slow loops

parent 0d574628
No related branches found
No related tags found
1 merge request!8Cobalt2 multithreading fix
...@@ -11,6 +11,7 @@ from threading import Thread, RLock ...@@ -11,6 +11,7 @@ from threading import Thread, RLock
import os.path import os.path
import shutil import shutil
from functools import cmp_to_key from functools import cmp_to_key
from concurrent import futures
from lofar.messaging import EventMessage, ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.messaging import EventMessage, ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.common.util import humanreadablesize from lofar.common.util import humanreadablesize
...@@ -136,7 +137,9 @@ class CacheManager: ...@@ -136,7 +137,9 @@ class CacheManager:
tmp_path = '/tmp/tmp_storagequery_cache.py' tmp_path = '/tmp/tmp_storagequery_cache.py'
with open(tmp_path, 'w') as file: with open(tmp_path, 'w') as file:
file.write(cache_str) file.write(cache_str)
os.makedirs(os.path.dirname(self._cache_path), exist_ok=True) dir_path = os.path.dirname(self._cache_path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
shutil.move(tmp_path, self._cache_path) shutil.move(tmp_path, self._cache_path)
self._last_cache_write_timestamp = datetime.datetime.utcnow() self._last_cache_write_timestamp = datetime.datetime.utcnow()
except Exception as e: except Exception as e:
...@@ -527,6 +530,12 @@ class CacheManager: ...@@ -527,6 +530,12 @@ class CacheManager:
return result return result
def getDiskUsageForPaths(self, paths, force_update=False):
with futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
parallel_args = [(path, force_update) for path in paths]
results = executor.map(lambda p_arg: self.getDiskUsageForPath(*p_arg), parallel_args)
return { result['path']:result for result in results }
def getDiskUsageForPath(self, path, force_update=False): def getDiskUsageForPath(self, path, force_update=False):
logger.info("cache.getDiskUsageForPath('%s', force_update=%s)", path, force_update) logger.info("cache.getDiskUsageForPath('%s', force_update=%s)", path, force_update)
needs_cache_update = False needs_cache_update = False
...@@ -559,26 +568,31 @@ class CacheManager: ...@@ -559,26 +568,31 @@ class CacheManager:
if task_sd_result['found']: if task_sd_result['found']:
subdir_paths = [os.path.join(task_du_result['path'],sd) for sd in task_sd_result['sub_directories']] subdir_paths = [os.path.join(task_du_result['path'],sd) for sd in task_sd_result['sub_directories']]
subdirs_du_result = self.getDiskUsageForPaths(subdir_paths, force_update=force_update)
#TODO: potential for parallelization
subdirs_du_result = { sd: self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths }
result = {'found':True, 'task_directory': task_du_result, 'sub_directories': subdirs_du_result } result = {'found':True, 'task_directory': task_du_result, 'sub_directories': subdirs_du_result }
logger.info("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, result) logger.info("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, result)
return result return result
logger.warn("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, task_du_result) logger.warning("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, task_du_result)
return task_du_result return task_du_result
def getDiskUsageForProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None, force_update=False): def getDiskUsageForProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None, force_update=False):
logger.info("cache.getDiskUsageForProjectDirAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id)) logger.info("cache.getDiskUsageForProjectDirAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id))
path_result = self.disk_usage.path_resolver.getProjectDirAndSubDirectories(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, project_name=project_name) path_result = self.disk_usage.path_resolver.getProjectDirAndSubDirectories(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, project_name=project_name)
if path_result['found']: if path_result['found']:
projectdir_du_result = self.getDiskUsageForPath(path_result['path'], force_update=force_update) projectdir_path = path_result['path']
subdir_paths = [os.path.join(path_result['path'],sd) for sd in path_result['sub_directories']] subdir_paths = [os.path.join(path_result['path'], sd) for sd in path_result['sub_directories']]
# get all du's in parallel over all paths
paths = [projectdir_path] + subdir_paths
paths_du_result = self.getDiskUsageForPaths(paths, force_update=force_update)
#TODO: potential for parallelization # split into project and subdir
subdirs_du_result = { sd: self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths } projectdir_du_result = paths_du_result.pop(projectdir_path)
subdirs_du_result = paths_du_result
# create total result dict
result = {'found':True, 'projectdir': projectdir_du_result, 'sub_directories': subdirs_du_result } result = {'found':True, 'projectdir': projectdir_du_result, 'sub_directories': subdirs_du_result }
logger.info('cache.getDiskUsageForProjectDirAndSubDirectories result: %s' % result) logger.info('cache.getDiskUsageForProjectDirAndSubDirectories result: %s' % result)
return result return result
...@@ -588,23 +602,25 @@ class CacheManager: ...@@ -588,23 +602,25 @@ class CacheManager:
def getDiskUsageForProjectsDirAndSubDirectories(self, force_update=False): def getDiskUsageForProjectsDirAndSubDirectories(self, force_update=False):
logger.info("cache.getDiskUsageForProjectsDirAndSubDirectories") logger.info("cache.getDiskUsageForProjectsDirAndSubDirectories")
projects_path = self.disk_usage.path_resolver.projects_path projects_path = self.disk_usage.path_resolver.projects_path
projectsdir_du_result = self.getDiskUsageForPath(projects_path, force_update=force_update) project_subdirs_result = self.disk_usage.path_resolver.getSubDirectories(projects_path)
subdir_paths = [os.path.join(projects_path,sd) for sd in project_subdirs_result['sub_directories']] if project_subdirs_result['found'] else []
result = {'found':True, 'projectdir': projectsdir_du_result } # get all du's in parallel over all paths
paths = [projects_path] + subdir_paths
paths_du_result = self.getDiskUsageForPaths(paths, force_update=force_update)
project_subdirs_result = self.disk_usage.path_resolver.getSubDirectories(projects_path) # split into project and subdir
if project_subdirs_result['found']: projectsdir_du_result = paths_du_result.pop(projects_path)
subdir_paths = [os.path.join(projects_path,sd) for sd in project_subdirs_result['sub_directories']] subdirs_du_result = paths_du_result
#TODO: potential for parallelization # create total result dict
subdirs_du_result = { sd: self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths } result = {'found':True, 'projectdir': projectsdir_du_result, 'sub_directories': subdirs_du_result }
result['sub_directories'] = subdirs_du_result
logger.info('cache.getDiskUsageForProjectsDirAndSubDirectories result: %s' % result) logger.info('cache.getDiskUsageForProjectsDirAndSubDirectories result: %s' % result)
return result return result
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logging.basicConfig(format='%(asctime)s %(levelname)s %(threadName)s %(message)s', level=logging.INFO)
with CacheManager() as cm: with CacheManager() as cm:
waitForInterrupt() waitForInterrupt()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment