Skip to content
Snippets Groups Projects
Commit 6168e812 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-772: parallelized getDiskUsageForTasks. minor logging fixes.

parent 235151c1
No related branches found
No related tags found
1 merge request!8Cobalt2 multithreading fix
......@@ -7,7 +7,7 @@ TODO: add doc
import logging
import datetime
from time import sleep
from threading import Thread, RLock, Event
from threading import Thread, RLock, Event, current_thread
import os.path
import shutil
from functools import cmp_to_key
......@@ -525,22 +525,30 @@ class CacheManager:
return {'found': False, 'path': path_result['path']}
def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True, force_update=False):
logger.info("cache.getDiskUsageForTask(radb_ids=%s, mom_ids=%s, otdb_ids=%s)" % (radb_ids, mom_ids, otdb_ids))
result = {'radb_ids': {}, 'mom_ids': {}, 'otdb_ids': {}}
logger.info("cache.getDiskUsageForTasks(radb_ids=%s, mom_ids=%s, otdb_ids=%s)" % (radb_ids, mom_ids, otdb_ids))
tasks_result = {'radb_ids': {}, 'mom_ids': {}, 'otdb_ids': {}}
if radb_ids:
for radb_id in radb_ids:
result['radb_ids'][str(radb_id)] = self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
if mom_ids:
for mom_id in mom_ids:
result['mom_ids'][str(mom_id)] = self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
if otdb_ids:
for otdb_id in otdb_ids:
result['otdb_ids'][str(otdb_id)] = self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
return result
with futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
parallel_kwargs = []
if radb_ids:
parallel_kwargs += [{'radb_id':radb_id, 'include_scratch_paths': include_scratch_paths, 'force_update':force_update} for radb_id in radb_ids]
if mom_ids:
parallel_kwargs += [{'mom_id':mom_id, 'include_scratch_paths': include_scratch_paths, 'force_update':force_update} for mom_id in mom_ids]
if otdb_ids:
parallel_kwargs += [{'otdb_id':otdb_id, 'include_scratch_paths': include_scratch_paths, 'force_update':force_update} for otdb_id in otdb_ids]
results = executor.map(lambda p_kwarg: self.getDiskUsageForTask(**p_kwarg), parallel_kwargs)
for result in results:
if result.get('radb_id') is not None:
tasks_result['radb_ids'][str(result['radb_id'])] = results
if result.get('mom_id') is not None:
tasks_result['mom_ids'][str(result['mom_id'])] = results
if result.get('otdb_id') is not None:
tasks_result['otdb_ids'][str(result['otdb_id'])] = results
logger.info("cache.getDiskUsageForTasks(radb_ids=%s, mom_ids=%s, otdb_ids=%s) returning: %s" % (radb_ids, mom_ids, otdb_ids, tasks_result))
return tasks_result
def getDiskUsageForPaths(self, paths, force_update=False):
with futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
......@@ -567,6 +575,7 @@ class CacheManager:
# no other thread is currently du'ing/updating this path
# so create a threading Event and store it in the dict,
# so other threads can wait for this event.
logger.info("updating the cache for %s current_thread=%s", path, current_thread().name)
path_threading_event = Event()
self._du_threading_events[path] = path_threading_event
......@@ -579,12 +588,13 @@ class CacheManager:
# signal threads waiting for this same path du call
# and do bookkeeping
with self._du_threading_events_lock:
logger.info("signaling other threads that the cache was updated for %s current_thread=%s", path, current_thread().name)
path_threading_event.set()
del self._du_threading_events[path]
else:
logger.info("waiting for du call on other thread that will update the cache for %s", path)
logger.info("thread=%s waiting for du call on other thread that will update the cache for %s", current_thread().name, path)
path_threading_event.wait()
logger.info("another thread just updated the cache for %s", path)
logger.info("thread=%s another thread just updated the cache for %s", current_thread().name, path)
with self._cacheLock:
if path in self._cache['path_du_results']:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment