Newer
Older
Jörn Künsemöller
committed
#!/usr/bin/env python3
# $Id$
'''
TODO: add doc
'''
import logging
import datetime
from time import sleep

Jorrit Schaap
committed
from threading import Thread, RLock, Event
import os.path
from functools import cmp_to_key
from concurrent import futures
from lofar.messaging import EventMessage, ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME

Jorrit Schaap
committed
from lofar.common.util import humanreadablesize

Jorrit Schaap
committed
from lofar.common.datetimeutils import format_timedelta
from lofar.sas.datamanagement.storagequery.diskusage import getDiskUsageForPath as du_getDiskUsageForPath
from lofar.sas.datamanagement.storagequery.diskusage import getOTDBIdFromPath
from lofar.sas.datamanagement.storagequery.diskusage import DiskUsage
from lofar.sas.datamanagement.common.datamanagementbuslistener import DataManagementBusListener, DataManagementEventMessageHandler
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener, OTDBEventMessageHandler
from lofar.common.util import waitForInterrupt
from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT
from lofar.sas.datamanagement.common.config import DEFAULT_DM_NOTIFICATION_PREFIX
logger = logging.getLogger(__name__)
MAX_CACHE_ENTRY_AGE = datetime.timedelta(hours=3*24)

Jorrit Schaap
committed
class _CacheManagerOTDBEventMessageHandler(OTDBEventMessageHandler):
def __init__(self, cache_manager: 'CacheManager'):
self.cache_manager = cache_manager
def onObservationAborted(self, treeId, modificationTime):
self.cache_manager.onObservationAborted(treeId, modificationTime)
def onObservationFinished(self, treeId, modificationTime):
self.cache_manager.onObservationFinished(treeId, modificationTime)
class _CacheManagerDataManagementEventMessageHandler(DataManagementEventMessageHandler):
def __init__(self, cache_manager: 'CacheManager'):
self.cache_manager = cache_manager
def onTaskDeleted(self, otdb_id, deleted, paths, message=''):
self.cache_manager.onTaskDeleted(otdb_id, deleted, paths, message)

Jorrit Schaap
committed
class CacheManager:
def __init__(self,

Jorrit Schaap
committed
cache_path='.du_cache.py',
mountpoint=CEP4_DATA_MOUNTPOINT,
exchange=DEFAULT_BUSNAME,
broker=DEFAULT_BROKER):

Jorrit Schaap
committed
self._cache_path = cache_path
self.otdb_listener = OTDBBusListener(_CacheManagerOTDBEventMessageHandler,
handler_kwargs={'cache_manager': self},
exchange=exchange,

Jorrit Schaap
committed
broker=broker,

Jorrit Schaap
committed
self.dm_listener = DataManagementBusListener(_CacheManagerDataManagementEventMessageHandler,
handler_kwargs={'cache_manager': self},
exchange=exchange,

Jorrit Schaap
committed
broker=broker,

Jorrit Schaap
committed
self.event_bus = ToBus(exchange=exchange, broker=broker)

Jorrit Schaap
committed
self._scanProjectsTreeThread = None
self._updateCacheThread = None

Jorrit Schaap
committed
self._cacheThreadsRunning = False
self._cacheLock = RLock()
self._cache = {'path_du_results': {}, 'otdb_id2path': {} }

Jorrit Schaap
committed
self._last_cache_write_timestamp = datetime.datetime(1970, 1, 1)
self._readCacheFromDisk()

Jorrit Schaap
committed
# dict to hold threading Events per path to prevent expensive multiple du calls for each path
self._du_threading_events = {}
self._du_threading_events_lock = RLock()
self.disk_usage = DiskUsage(mountpoint=mountpoint,
exchange=exchange,
broker=broker)
def _sendDiskUsageChangedNotification(self, path, disk_usage, otdb_id=None):

Jorrit Schaap
committed
try:
msg = EventMessage(subject='%s.DiskUsageChanged' % DEFAULT_DM_NOTIFICATION_PREFIX,

Jorrit Schaap
committed
content={ 'path': path,
'disk_usage': disk_usage,
'disk_usage_readable': humanreadablesize(disk_usage),
'otdb_id': otdb_id })
logger.info('Sending notification with subject %s to %s: %s', msg.subject, self.event_bus.exchange, msg.content)

Jorrit Schaap
committed
self.event_bus.send(msg)
except Exception as e:
logger.error(str(e))
def _readCacheFromDisk(self):
# maybe this cache on disk is slow, if so, revert to proper db solution
try:

Jorrit Schaap
committed
if os.path.exists(self._cache_path):
with open(self._cache_path, 'r') as file:
cache_from_disk = eval(file.read().strip()) #slow!
with self._cacheLock:
self._cache = cache_from_disk
if not isinstance(self._cache, dict):
self._cache = {'path_du_results': {}, 'otdb_id2path': {} }

Jorrit Schaap
committed
if 'path_du_results' not in self._cache:
self._cache['path_du_results'] = {}
if 'otdb_id2path' not in self._cache:
self._cache['otdb_id2path'] = {}
except Exception as e:
logger.error("Error while reading in du cache: %s", e)
with self._cacheLock:
self._cache = {'path_du_results': {}, 'otdb_id2path': {} }
def _writeCacheToDisk(self):
try:

Jorrit Schaap
committed
# only persist (a subset of) the cache to disk every once in a while.
if datetime.datetime.utcnow() - self._last_cache_write_timestamp > datetime.timedelta(minutes=5):

Jorrit Schaap
committed
cache_str = ''
with self._cacheLock:

Jorrit Schaap
committed
# Take a subset of the entire cache
# only the path_du_results for paths at project level (like /data/projects, /data/projects/LC9_001)
# Do not store path_du_results for deeper levels on disk, because that makes the disk read/write too slow,
# and the deeper levels can be obtained via rhb-du calls quite fast anyway.
# Furthermore, once a deeper level du results is stored in the memory cache, then it is also available for fast lookup.
# We just don't store these deep levels on disk.
sub_cache = { path:du_result for path,du_result in list(self._cache['path_du_results'].items())
if self.getDepthToProjectsDir(path) <= 1 and du_result.get('found') }

Jorrit Schaap
committed
cache_str = str(sub_cache)

Jorrit Schaap
committed
tmp_path = '/tmp/tmp_storagequery_cache.py'

Jorrit Schaap
committed
with open(tmp_path, 'w') as file:
file.write(cache_str)
dir_path = os.path.dirname(self._cache_path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
shutil.move(tmp_path, self._cache_path)

Jorrit Schaap
committed
self._last_cache_write_timestamp = datetime.datetime.utcnow()
except Exception as e:
logger.error("Error while writing du cache: %s", e)
def _updateCache(self, du_result):

Jorrit Schaap
committed
if not 'path' in du_result:
return

Jorrit Schaap
committed
path = du_result['path']
otdb_id = du_result.get('otdb_id')

Jorrit Schaap
committed
with self._cacheLock:
path_cache = self._cache['path_du_results']
otdb_id2path_cache = self._cache['otdb_id2path']

Jorrit Schaap
committed

Jorrit Schaap
committed
if otdb_id is None:
# try to look up the otdb in the path cache
if path in path_cache:
otdb_id = du_result.get('otdb_id')
# if still None, try to get the id from the path
if otdb_id is None:
otdb_id = getOTDBIdFromPath(path)
if not path in path_cache or path_cache[path]['disk_usage'] != du_result['disk_usage']:
# update the cache entry, even when no du result found,
# cause that will save disk queries next time.
logger.info('updating cache entry: %s', du_result)
path_cache[path] = du_result
if otdb_id != None:
otdb_id2path_cache[otdb_id] = path
if not du_result['found']:
# even when the du for the path is not found,
# keep a copy in the cache for fast lookup by clients
# Make sure the size is 0
du_result['disk_usage'] = 0
du_result['disk_usage_readable'] = humanreadablesize(0)
path_cache[path]['cache_timestamp'] = datetime.datetime.utcnow()
path_cache[path]['needs_update'] = False

Jorrit Schaap
committed

Jorrit Schaap
committed
self._sendDiskUsageChangedNotification(path, du_result['disk_usage'], otdb_id)

Jorrit Schaap
committed
def _invalidateCacheEntryForPath(self, path):
with self._cacheLock:
path_cache = self._cache['path_du_results']
if path in path_cache:

Jorrit Schaap
committed
path_cache[path]['needs_update'] = True
def getOtdbIdsFoundOnDisk(self):
with self._cacheLock:
otdb_id2path_cache = self._cache['otdb_id2path']
return sorted(list(otdb_id2path_cache.keys()))
def getDiskUsagesForAllOtdbIds(self, force_update=False):
otdb_ids = self.getOtdbIdsFoundOnDisk()
result = {}
for otdb_id in otdb_ids:
result[otdb_id] = self.getDiskUsageForOTDBId(otdb_id, force_update=force_update)
return result

Jorrit Schaap
committed
def getDepthToProjectsDir(self, path):
return len(path.replace(self.disk_usage.path_resolver.projects_path, '').strip('/').split('/'))
def _scanProjectsTree(self):
try:
def addSubDirectoriesToCache(directory):

Jorrit Schaap
committed
depth = self.getDepthToProjectsDir(directory)

Jorrit Schaap
committed
#depth=0 : projects
#depth=1 : projects/<project>
#depth=2 : projects/<project>/<obs>
#depth=3 : projects/<project>/<obs>/<sub_dir>
if depth > MAX_SCAN_DEPTH:
return
add_empty_du_result_to_cache = False
with self._cacheLock:
path_cache = self._cache['path_du_results']
add_empty_du_result_to_cache = not directory in path_cache
if add_empty_du_result_to_cache:
logger.info('tree scan: adding \'%s\' with empty disk_usage to cache which will be du\'ed later', directory)
empty_du_result = {'found': True, 'disk_usage': None, 'path': directory, 'name': directory.split('/')[-1]}
self._updateCache(empty_du_result)
with self._cacheLock:
path_cache = self._cache['path_du_results']
if directory in path_cache:
# mark cache entry for directory to be updated
path_cache[directory]['needs_update'] = True

Jorrit Schaap
committed
if not self._cacheThreadsRunning:
return
if depth < MAX_SCAN_DEPTH:
logger.info('tree scan: scanning \'%s\'', directory)
sd_result = self.disk_usage.path_resolver.getSubDirectories(directory)
if sd_result['found']:
subdir_paths = [os.path.join(directory,sd) for sd in sd_result['sub_directories']]
for subdir_path in subdir_paths:
# recurse
addSubDirectoriesToCache(subdir_path)
addSubDirectoriesToCache(self.disk_usage.path_resolver.projects_path)
logger.info('tree scan complete')
except Exception as e:
logger.error(str(e))

Jorrit Schaap
committed
def _updateOldEntriesInCache(self):
logger.info('starting updating old cache entries')

Jorrit Schaap
committed
while self._cacheThreadsRunning:

Jorrit Schaap
committed
try:
now = datetime.datetime.utcnow()
with self._cacheLock:
path_cache = self._cache['path_du_results']
old_entries = [cache_entry for cache_entry in list(path_cache.values())
if now - cache_entry['cache_timestamp'] > MAX_CACHE_ENTRY_AGE]
needs_update_entries = [cache_entry for cache_entry in list(path_cache.values())
if cache_entry.get('needs_update', False)]

Jorrit Schaap
committed
updateable_entries = old_entries + needs_update_entries
logger.info('%s old cache entries need to be updated, #age:%s #needs_update:%s',
len(updateable_entries),
len(old_entries),
len(needs_update_entries))

Jorrit Schaap
committed
if updateable_entries:

Jorrit Schaap
committed
# sort them oldest to newest, 'needs_update' paths first
def compareFunc(entry1, entry2):

Jorrit Schaap
committed
if entry1.get('needs_update') and not entry2.get('needs_update'):
return -1
if not entry1.get('needs_update') and entry2.get('needs_update'):
return 1
depth1 = self.getDepthToProjectsDir(entry1['path'])
depth2 = self.getDepthToProjectsDir(entry2['path'])
if depth1 != depth2:

Jorrit Schaap
committed
# lower level dirs are sorted in front of higher level dirs
return depth2 - depth1
if entry1['cache_timestamp'] < entry2['cache_timestamp']:
return -1
if entry1['cache_timestamp'] > entry2['cache_timestamp']:
return 1
return 0
updateable_entries = sorted(updateable_entries, key=cmp_to_key(compareFunc))

Jorrit Schaap
committed
cacheUpdateStart = datetime.datetime.utcnow()

Jorrit Schaap
committed
#do a quick update of each entry by applying the sum of the subdirs to the path's du result...
#this make a best guess immediately available...
for cache_entry in updateable_entries:
try:
path = cache_entry.get('path')
if path:
self._updatePathCacheEntryToSubDirTotal(path, False)
except Exception as e:
logger.error(str(e))
for i, cache_entry in enumerate(updateable_entries):

Jorrit Schaap
committed
try:
# it might be that the cache_entry was already updated via another way
# so only update it if still to old or needs_update
now = datetime.datetime.utcnow()
if now - cache_entry['cache_timestamp'] > MAX_CACHE_ENTRY_AGE or cache_entry.get('needs_update', False):
path = cache_entry.get('path')
if path:
logger.info('_updateOldEntriesInCache: examining entry %s/%s. timestamp:%s age:%s needs_update:%s path: \'%s\'',
i,
len(updateable_entries),
cache_entry['cache_timestamp'],
format_timedelta(now - cache_entry['cache_timestamp']),
cache_entry.get('needs_update', False),
path)
#du a full update from disk, which might be (really) slow.
result = du_getDiskUsageForPath(path)
logger.debug('trying to update old entry in cache: %s', result)
self._updateCache(result)

Jorrit Schaap
committed
except Exception as e:
logger.error(str(e))

Jorrit Schaap
committed
if not self._cacheThreadsRunning:
logger.info('exiting _updateCacheThread')

Jorrit Schaap
committed
return
if datetime.datetime.utcnow() - cacheUpdateStart > datetime.timedelta(minutes=10):
# break out of cache update loop if full update takes more than 1min

Jorrit Schaap
committed
# next loop we'll start with the oldest cache entries again
logger.info('skipping remaining %s old cache entries updates, they will be updated next time', len(updateable_entries)-i)

Jorrit Schaap
committed
break
#update the CEP4 capacities in the RADB once in a while...
self._updateCEP4CapacitiesInRADB()
#sleep for a while, (or stop if requested)

Jorrit Schaap
committed
sleep(1)

Jorrit Schaap
committed
if not self._cacheThreadsRunning:
logger.info('exiting _updateCacheThread')

Jorrit Schaap
committed
return

Jorrit Schaap
committed

Jorrit Schaap
committed
except Exception as e:
logger.exception(str(e))

Jorrit Schaap
committed
def _updatePathCacheEntryToSubDirTotal(self, path, force_update=False):
with self._cacheLock:
path_cache_result = self._cache['path_du_results'].get(path)
if path_cache_result:
path_depth = path.count('/')
all_dirs = list(self._cache['path_du_results'].keys())
subdir_paths = [sdp for sdp in all_dirs
if sdp.startswith(path) and sdp.count('/') == path_depth+1]
subdir_du_results = self.getDiskUsageForPaths(subdir_paths, force_update=force_update).values()
valid_subdir_du_results = [du for du in subdir_du_results if du.get('disk_usage')]
sum_du = sum([du['disk_usage'] for du in valid_subdir_du_results])
if sum_du > 0:
logger.info('_updatePathCacheEntryToSubDirTotal: setting disk usage for %s to sum of %s known cached subdirs of %s',
path, len(valid_subdir_du_results), humanreadablesize(sum_du))
path_cache_result['disk_usage'] = sum_du
path_cache_result['disk_usage_readable'] = humanreadablesize(sum_du)
def _updateCEP4CapacitiesInRADB(self):

Jorrit Schaap
committed
try:
df_result = self.disk_usage.getDiskFreeSpace()
if df_result['found']:

Jorrit Schaap
committed
#get the total used space, and update the resource availability in the radb
radbrpc = self.disk_usage.path_resolver.radbrpc
storage_resources = radbrpc.getResources(resource_types='storage', include_availability=True)
Alexander van Amesfoort
committed
cep4_storage_resource = next(x for x in storage_resources if 'CEP4' in x['name'])

Jorrit Schaap
committed
total_capacity = df_result.get('disk_size')
used_capacity = df_result.get('disk_usage')
available_capacity = df_result.get('disk_free')

Jorrit Schaap
committed
logger.info('updating capacities for resource \'%s\' (id=%s) in the RADB: total=%s, used=%s, available=%s',
cep4_storage_resource['name'],
cep4_storage_resource['id'],
humanreadablesize(total_capacity),
humanreadablesize(used_capacity),
humanreadablesize(available_capacity))

Jorrit Schaap
committed
radbrpc.updateResourceAvailability(cep4_storage_resource['id'],
available_capacity=available_capacity,
total_capacity=total_capacity)

Jorrit Schaap
committed
except Exception as e:
logger.error('_updateCEP4CapacitiesInRADB: %s', e)
def open(self):
self.disk_usage.open()

Jorrit Schaap
committed
self.event_bus.open()

Jorrit Schaap
committed
self._cacheThreadsRunning = True
self._updateCacheThread = Thread(target=self._updateOldEntriesInCache)
self._updateCacheThread.daemon = True
self._updateCacheThread.start()

Jorrit Schaap
committed
self._scanProjectsTreeThread = Thread(target=self._scanProjectsTree)
self._scanProjectsTreeThread.daemon = True
self._scanProjectsTreeThread.start()

Jorrit Schaap
committed
self.otdb_listener.start_listening()

Jorrit Schaap
committed
self.dm_listener.start_listening()
def close(self):

Jorrit Schaap
committed
self.otdb_listener.stop_listening()

Jorrit Schaap
committed
self.dm_listener.stop_listening()

Jorrit Schaap
committed
self._cacheThreadsRunning = False
self._updateCacheThread.join()

Jorrit Schaap
committed
self._scanProjectsTreeThread.join()

Jorrit Schaap
committed
self.event_bus.close()
self.disk_usage.close()
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def onObservationFinished(self, otdb_id, modificationTime):

Jorrit Schaap
committed
self._onDiskActivityForOTDBId(otdb_id)
def onObservationAborted(self, otdb_id, modificationTime):

Jorrit Schaap
committed
self._onDiskActivityForOTDBId(otdb_id)
def onTaskDeleted(self, otdb_id, deleted, paths, message=''):

Jorrit Schaap
committed
self._onDiskActivityForOTDBId(otdb_id)
def _onDiskActivityForOTDBId(self, otdb_id):
result = self.disk_usage.getDiskUsageForOTDBId(otdb_id)
self._updateCache(result)

Jorrit Schaap
committed
task_path = result.get('path')
projects_path = self.disk_usage.path_resolver.projects_path
# update all paths up the tree up to the projects_path
# update the resource availability in the radb as well
path = task_path
while path:
parent_path = '/'.join(path.split('/')[:-1])
if projects_path.startswith(parent_path) and len(parent_path) < len(projects_path):
break
logger.info('invalidating cache entry for %s because disk usage for task %s in %s changed', parent_path, otdb_id, task_path)
self._invalidateCacheEntryForPath(parent_path)
path = parent_path
def getDiskUsageForOTDBId(self, otdb_id, include_scratch_paths=True, force_update=False):
return self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
def getDiskUsageForMoMId(self, mom_id, include_scratch_paths=True, force_update=False):
return self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
def getDiskUsageForRADBId(self, radb_id, include_scratch_paths=True, force_update=False):
return self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
def getDiskUsageForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True, force_update=False):

Jorrit Schaap
committed
logger.info("cache.getDiskUsageForTask(radb_id=%s, mom_id=%s, otdb_id=%s, include_scratch_paths=%s, force_update=%s)",

Jorrit Schaap
committed
radb_id, mom_id, otdb_id, include_scratch_paths, force_update)
if otdb_id != None and not include_scratch_paths:
with self._cacheLock:
path = self._cache['otdb_id2path'].get(otdb_id)
if path:
logger.info('Using path from cache for otdb_id %s %s', otdb_id, path)
return self.getDiskUsageForPath(path, force_update=force_update)

Jorrit Schaap
committed
logger.info("cache.getDiskUsageForTask could not find path in cache, determining path...")

Jorrit Schaap
committed
path_result = self.disk_usage.path_resolver.getPathForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, include_scratch_paths=include_scratch_paths)
if path_result['found']:
task_path = path_result['path']
scratch_paths = path_result.get('scratch_paths', [])
# get all du's in parallel over all paths
paths = [task_path] + scratch_paths
paths_du_result = self.getDiskUsageForPaths(paths, force_update=force_update)
# split into project and subdir
path_du_result = paths_du_result.pop(task_path)
scratch_du_result = paths_du_result
task_du_result = dict(path_du_result)
# yield id's for if available, or None
for id in ['radb_id', 'otdb_id', 'mom_id']:
task_du_result[id] = path_result.get(id)
if scratch_du_result:
task_du_result['scratch_paths'] = scratch_du_result
return task_du_result
return {'found': False, 'path': path_result['path']}
def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True, force_update=False):
logger.info("cache.getDiskUsageForTask(radb_ids=%s, mom_ids=%s, otdb_ids=%s)" % (radb_ids, mom_ids, otdb_ids))
result = {'radb_ids': {}, 'mom_ids': {}, 'otdb_ids': {}}
if radb_ids:
for radb_id in radb_ids:
result['radb_ids'][str(radb_id)] = self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
if mom_ids:
for mom_id in mom_ids:
result['mom_ids'][str(mom_id)] = self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
if otdb_ids:
for otdb_id in otdb_ids:
result['otdb_ids'][str(otdb_id)] = self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
return result
def getDiskUsageForPaths(self, paths, force_update=False):
with futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
parallel_args = [(path, force_update) for path in paths]
results = executor.map(lambda p_arg: self.getDiskUsageForPath(*p_arg), parallel_args)
return { result['path']:result for result in results }
def getDiskUsageForPath(self, path, force_update=False):
logger.info("cache.getDiskUsageForPath('%s', force_update=%s)", path, force_update)
needs_cache_update = False
if not force_update:
with self._cacheLock:
needs_cache_update |= path not in self._cache['path_du_results']
if needs_cache_update or force_update:

Jorrit Schaap
committed
logger.info("cache update needed for %s", path)

Jorrit Schaap
committed
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
# check if some other thread is already doing a du call for this path...
with self._du_threading_events_lock:
path_threading_event = self._du_threading_events.get(path)
need_to_do_du_call = path_threading_event is None
if need_to_do_du_call:
# no other thread is currently du'ing/updating this path
# so create a threading Event and store it in the dict,
# so other threads can wait for this event.
path_threading_event = Event()
self._du_threading_events[path] = path_threading_event
if need_to_do_du_call:
# no other thread is currently du'ing/updating this path
# so we need to do it here.
result = du_getDiskUsageForPath(path)
self._updateCache(result)
# signal threads waiting for this same path du call
# and do bookkeeping
with self._du_threading_events_lock:
path_threading_event.set()
del self._du_threading_events[path]
else:
logger.info("waiting for du call on other thread that will update the cache for %s", path)
path_threading_event.wait()
logger.info("another thread just updated the cache for %s", path)
with self._cacheLock:
if path in self._cache['path_du_results']:
result = self._cache['path_du_results'][path]
else:
result = { 'found': False, 'path':path, 'message': 'unknown error' }
if not self.disk_usage.path_resolver.pathExists(path):
result['message'] = 'No such path: %s' % path
result['disk_usage_readable'] = humanreadablesize(result.get('disk_usage', 0))
logger.info('cache.getDiskUsageForPath(\'%s\') result: %s', path, result)
return result
def getDiskUsageForTaskAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, force_update=False):
logger.info("cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id))
task_du_result = self.getDiskUsageForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, force_update=force_update)
if task_du_result['found']:

Jorrit Schaap
committed
task_sd_result = self.disk_usage.path_resolver.getSubDirectories(task_du_result['path'])
if task_sd_result['found']:
subdir_paths = [os.path.join(task_du_result['path'],sd) for sd in task_sd_result['sub_directories']]
subdirs_du_result = self.getDiskUsageForPaths(subdir_paths, force_update=force_update)
result = {'found':True, 'task_directory': task_du_result, 'sub_directories': subdirs_du_result }

Jorrit Schaap
committed
logger.info("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, result)
return result
logger.warning("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, task_du_result)
return task_du_result
def getDiskUsageForProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None, force_update=False):
logger.info("cache.getDiskUsageForProjectDirAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id))
path_result = self.disk_usage.path_resolver.getProjectDirAndSubDirectories(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, project_name=project_name)
if path_result['found']:
projectdir_path = path_result['path']
subdir_paths = [os.path.join(path_result['path'], sd) for sd in path_result['sub_directories']]
# get all du's in parallel over all paths
paths = [projectdir_path] + subdir_paths
paths_du_result = self.getDiskUsageForPaths(paths, force_update=force_update)
# split into project and subdir
projectdir_du_result = paths_du_result.pop(projectdir_path)
subdirs_du_result = paths_du_result
# create total result dict
result = {'found':True, 'projectdir': projectdir_du_result, 'sub_directories': subdirs_du_result }
logger.info('cache.getDiskUsageForProjectDirAndSubDirectories result: %s' % result)
return result
return path_result
def getDiskUsageForProjectsDirAndSubDirectories(self, force_update=False):
logger.info("cache.getDiskUsageForProjectsDirAndSubDirectories")
projects_path = self.disk_usage.path_resolver.projects_path
project_subdirs_result = self.disk_usage.path_resolver.getSubDirectories(projects_path)
subdir_paths = [os.path.join(projects_path,sd) for sd in project_subdirs_result['sub_directories']] if project_subdirs_result['found'] else []
# get all du's in parallel over all paths
paths = [projects_path] + subdir_paths
paths_du_result = self.getDiskUsageForPaths(paths, force_update=force_update)
# split into project and subdir
projectsdir_du_result = paths_du_result.pop(projects_path)
subdirs_du_result = paths_du_result
# create total result dict
result = {'found':True, 'projectdir': projectsdir_du_result, 'sub_directories': subdirs_du_result }
logger.info('cache.getDiskUsageForProjectsDirAndSubDirectories result: %s' % result)
return result
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(threadName)s %(message)s', level=logging.INFO)

Jorrit Schaap
committed
with CacheManager() as cm:
waitForInterrupt()