From 6168e8122964e83af73db14974629eeb54f2057d Mon Sep 17 00:00:00 2001
From: Jorrit Schaap <schaap@astron.nl>
Date: Thu, 4 Jul 2019 16:25:05 +0200
Subject: [PATCH] SW-772: parallelized getDiskUsageForTasks. minor logging
 fixes.

---
 .../StorageQueryService/cache.py              | 46 +++++++++++--------
 1 file changed, 28 insertions(+), 18 deletions(-)

diff --git a/SAS/DataManagement/StorageQueryService/cache.py b/SAS/DataManagement/StorageQueryService/cache.py
index a796298f828..6201dec3ff8 100644
--- a/SAS/DataManagement/StorageQueryService/cache.py
+++ b/SAS/DataManagement/StorageQueryService/cache.py
@@ -7,7 +7,7 @@ TODO: add doc
 import logging
 import datetime
 from time import sleep
-from threading import Thread, RLock, Event
+from threading import Thread, RLock, Event, current_thread
 import os.path
 import shutil
 from functools import cmp_to_key
@@ -525,22 +525,30 @@ class CacheManager:
         return {'found': False, 'path': path_result['path']}
 
     def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True, force_update=False):
-        logger.info("cache.getDiskUsageForTask(radb_ids=%s, mom_ids=%s, otdb_ids=%s)" % (radb_ids, mom_ids, otdb_ids))
-        result = {'radb_ids': {}, 'mom_ids': {}, 'otdb_ids': {}}
+        logger.info("cache.getDiskUsageForTasks(radb_ids=%s, mom_ids=%s, otdb_ids=%s)" % (radb_ids, mom_ids, otdb_ids))
+        tasks_result = {'radb_ids': {}, 'mom_ids': {}, 'otdb_ids': {}}
 
-        if radb_ids:
-            for radb_id in radb_ids:
-                result['radb_ids'][str(radb_id)] = self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
-
-        if mom_ids:
-            for mom_id in mom_ids:
-                result['mom_ids'][str(mom_id)] = self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
-
-        if otdb_ids:
-            for otdb_id in otdb_ids:
-                result['otdb_ids'][str(otdb_id)] = self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
-
-        return result
+        with futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
+            parallel_kwargs = []
+            if radb_ids:
+                parallel_kwargs += [{'radb_id':radb_id, 'include_scratch_paths': include_scratch_paths, 'force_update':force_update} for radb_id in radb_ids]
+            if mom_ids:
+                parallel_kwargs += [{'mom_id':mom_id, 'include_scratch_paths': include_scratch_paths, 'force_update':force_update} for mom_id in mom_ids]
+            if otdb_ids:
+                parallel_kwargs += [{'otdb_id':otdb_id, 'include_scratch_paths': include_scratch_paths, 'force_update':force_update} for otdb_id in otdb_ids]
+            results = executor.map(lambda p_kwarg: self.getDiskUsageForTask(**p_kwarg), parallel_kwargs)
+
+            for result in results:
+                if result.get('radb_id') is not None:
+                    tasks_result['radb_ids'][str(result['radb_id'])] = results
+                if result.get('mom_id') is not None:
+                    tasks_result['mom_ids'][str(result['mom_id'])] = results
+                if result.get('otdb_id') is not None:
+                    tasks_result['otdb_ids'][str(result['otdb_id'])] = results
+
+        logger.info("cache.getDiskUsageForTasks(radb_ids=%s, mom_ids=%s, otdb_ids=%s) returning: %s" % (radb_ids, mom_ids, otdb_ids, tasks_result))
+
+        return tasks_result
 
     def getDiskUsageForPaths(self, paths, force_update=False):
         with futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
@@ -567,6 +575,7 @@ class CacheManager:
                     # no other thread is currently du'ing/updating this path
                     # so create a threading Event and store it in the dict,
                     # so other threads can wait for this event.
+                    logger.info("updating the cache for %s current_thread=%s", path, current_thread().name)
                     path_threading_event = Event()
                     self._du_threading_events[path] = path_threading_event
 
@@ -579,12 +588,13 @@ class CacheManager:
                 # signal threads waiting for this same path du call
                 # and do bookkeeping
                 with self._du_threading_events_lock:
+                    logger.info("signaling other threads that the cache was updated for %s current_thread=%s", path, current_thread().name)
                     path_threading_event.set()
                     del self._du_threading_events[path]
             else:
-                logger.info("waiting for du call on other thread that will update the cache for %s", path)
+                logger.info("thread=%s waiting for du call on other thread that will update the cache for %s", current_thread().name, path)
                 path_threading_event.wait()
-                logger.info("another thread just updated the cache for %s", path)
+                logger.info("thread=%s another thread just updated the cache for %s", current_thread().name, path)
 
         with self._cacheLock:
             if path in self._cache['path_du_results']:
-- 
GitLab