-
Jorrit Schaap authored
Task #11090: keep results for unknown paths in cache as well, so we can return them quickly as well.
Jorrit Schaap authoredTask #11090: keep results for unknown paths in cache as well, so we can return them quickly as well.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
cache.py 24.26 KiB
#!/usr/bin/python
# $Id$
'''
TODO: add doc
'''
import logging
import datetime
from time import sleep
import ast
from optparse import OptionParser
from threading import Thread, RLock
import os.path
from lofar.messaging import EventMessage, ToBus
from lofar.common.util import humanreadablesize
from lofar.sas.datamanagement.storagequery.diskusage import getDiskUsageForPath as du_getDiskUsageForPath
from lofar.sas.datamanagement.storagequery.diskusage import DiskUsage
from lofar.sas.datamanagement.common.datamanagementbuslistener import DataManagementBusListener
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
from lofar.common.util import waitForInterrupt
from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT
from lofar.sas.datamanagement.common.config import DEFAULT_DM_NOTIFICATION_BUSNAME, DEFAULT_DM_NOTIFICATION_PREFIX
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
logger = logging.getLogger(__name__)
MAX_CACHE_ENTRY_AGE = datetime.timedelta(days=1)
class CacheManager:
def __init__(self,
mountpoint=CEP4_DATA_MOUNTPOINT,
radb_busname=RADB_BUSNAME,
radb_servicename=RADB_SERVICENAME,
mom_busname=DEFAULT_MOMQUERY_BUSNAME,
mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
otdb_notification_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
dm_notification_busname=DEFAULT_DM_NOTIFICATION_BUSNAME,
dm_notification_prefix=DEFAULT_DM_NOTIFICATION_PREFIX,
broker=None):
self.otdb_listener = OTDBBusListener(busname=otdb_notification_busname,
subject=otdb_notification_subject,
broker=broker,
numthreads=2)
self.otdb_listener.onObservationAborted = self.onObservationAborted
self.otdb_listener.onObservationFinished = self.onObservationFinished
self.dm_listener = DataManagementBusListener(busname=dm_notification_busname,
subjects=dm_notification_prefix + '*',
broker=broker,
numthreads=2)
self.dm_listener.onTaskDeleted = self.onTaskDeleted
self.notification_prefix = dm_notification_prefix
self.event_bus = ToBus(dm_notification_busname, broker=broker)
self._updateCacheThread = None
self._updateCacheThreadRunning = False
self._continueUpdateCacheThread = False
self._cacheLock = RLock()
self._cache = {'path_du_results': {}, 'otdb_id2path': {} }
self._readCacheFromDisk()
self.disk_usage = DiskUsage(mountpoint=mountpoint,
radb_busname=radb_busname,
radb_servicename=radb_servicename,
mom_busname=mom_busname,
mom_servicename=mom_servicename,
broker=broker)
def _sendDiskUsageChangedNotification(self, path, disk_usage, otdb_id=None):
try:
msg = EventMessage(context=self.notification_prefix + 'DiskUsageChanged',
content={ 'path': path,
'disk_usage': disk_usage,
'disk_usage_readable': humanreadablesize(disk_usage),
'otdb_id': otdb_id })
logger.info('Sending notification with subject %s to %s: %s', msg.subject, self.event_bus.address, msg.content)
self.event_bus.send(msg)
except Exception as e:
logger.error(str(e))
def _readCacheFromDisk(self):
# maybe this cache on disk is slow, if so, revert to proper db solution
try:
if os.path.exists('.du_cache.py'):
with open('.du_cache.py', 'r') as file:
with self._cacheLock:
self._cache = eval(file.read().strip())
if not isinstance(self._cache, dict):
self._cache = {'path_du_results': {}, 'otdb_id2path': {} }
except Exception as e:
logger.error("Error while reading in du cache: %s", e)
with self._cacheLock:
self._cache = {'path_du_results': {}, 'otdb_id2path': {} }
def _writeCacheToDisk(self):
try:
with open('.du_cache.py', 'w') as file:
with self._cacheLock:
file.write(str(self._cache))
except Exception as e:
logger.error("Error while writing du cache: %s", e)
def _updateCache(self, du_result):
if not 'path' in du_result:
return
path = du_result['path']
otdb_id = du_result.get('otdb_id')
with self._cacheLock:
path_cache = self._cache['path_du_results']
otdb_id2path_cache = self._cache['otdb_id2path']
if otdb_id is None:
# try to look up the otdb in the path cache
if path in path_cache:
otdb_id = du_result.get('otdb_id')
if not du_result['found']:
#make sure disk_usage is set when not found
du_result['disk_usage'] = 0
du_result['disk_usage_readable'] = '0B'
if not path in path_cache or path_cache[path]['disk_usage'] != du_result['disk_usage']:
# update the cache entry, even when no du result found,
# cause that will save disk queries next time.
logger.info('updating cache entry: %s', du_result)
path_cache[path] = du_result
path_cache[path]['cache_timestamp'] = datetime.datetime.utcnow()
path_cache[path]['needs_update'] = False
if otdb_id != None:
otdb_id2path_cache[otdb_id] = path
self._writeCacheToDisk()
self._sendDiskUsageChangedNotification(path, du_result['disk_usage'], otdb_id)
if du_result.get('path') == self.disk_usage.path_resolver.projects_path:
self._updateProjectsDiskUsageInRADB()
def _invalidateCacheEntryForPath(self, path):
with self._cacheLock:
path_cache = self._cache['path_du_results']
if path in path_cache:
path_cache[path]['needs_update'] = True
def getDiskUsagesForAllOtdbIds(self, force_update=False):
otdb_ids = []
with self._cacheLock:
otdb_id2path_cache = self._cache['otdb_id2path']
otdb_ids = otdb_id2path_cache.keys()
result = {}
for otdb_id in otdb_ids:
result[otdb_id] = self.getDiskUsageForOTDBId(otdb_id, force_update)
return result
def _scanProjectsTree(self):
try:
def addSubDirectoriesToCache(directory):
depth = len(directory.replace(self.disk_usage.path_resolver.projects_path, '').strip('/').split('/'))
if depth > 3:
return
with self._cacheLock:
path_cache = self._cache['path_du_results']
if not directory in path_cache:
logger.info('tree scan: adding \'%s\' with empty disk_usage to cache which will be du\'ed later', directory)
empty_du_result = {'found': True, 'disk_usage': None, 'path': directory, 'name': directory.split('/')[-1]}
self._updateCache(empty_du_result)
if directory in path_cache:
# mark cache entry for directory to be updated
path_cache[directory]['needs_update'] = True
if not self._updateCacheThreadRunning:
return
if depth < 3:
logger.info('tree scan: scanning \'%s\'', directory)
sd_result = self.disk_usage.path_resolver.getSubDirectories(directory)
if sd_result['found']:
subdir_paths = [os.path.join(directory,sd) for sd in sd_result['sub_directories']]
for subdir_path in subdir_paths:
# recurse
addSubDirectoriesToCache(subdir_path)
addSubDirectoriesToCache(self.disk_usage.path_resolver.projects_path)
logger.info('tree scan complete')
except Exception as e:
logger.error(str(e))
def _updateOldEntriesInCache(self):
logger.info('starting updating old cache entries')
while self._updateCacheThreadRunning:
try:
now = datetime.datetime.utcnow()
with self._cacheLock:
path_cache = self._cache['path_du_results']
old_entries = [cache_entry for cache_entry in path_cache.values()
if now - cache_entry['cache_timestamp'] > MAX_CACHE_ENTRY_AGE]
needs_update_entries = [cache_entry for cache_entry in path_cache.values()
if cache_entry.get('needs_update', False)]
updateable_entries = old_entries + needs_update_entries
if updateable_entries:
logger.info('%s old cache entries need to be updated, #age:%s #needs_update:%s',
len(updateable_entries),
len(old_entries),
len(needs_update_entries))
# sort them oldest to newest, 'needs_update' paths first
def compareFunc(entry1, entry2):
if entry1.get('needs_update') and not entry2.get('needs_update'):
return -1
if not entry1.get('needs_update') and entry2.get('needs_update'):
return 1
if entry1['cache_timestamp'] < entry2['cache_timestamp']:
return -1
if entry1['cache_timestamp'] > entry2['cache_timestamp']:
return 1
return 0
updateable_entries = sorted(updateable_entries, cmp=compareFunc)
cacheUpdateStart = datetime.datetime.utcnow()
for i, cache_entry in enumerate(updateable_entries):
try:
path = cache_entry.get('path')
if path:
logger.info('examining entry %s/%s in cache. timestamp:%s age:%s needs_update:%s path: %s',
i,
len(updateable_entries),
cache_entry['cache_timestamp'],
now - cache_entry['cache_timestamp'],
cache_entry.get('needs_update', False),
path)
result = du_getDiskUsageForPath(path)
logger.debug('trying to update old entry in cache: %s', result)
self._updateCache(result)
except Exception as e:
logger.error(str(e))
if not self._updateCacheThreadRunning:
return
if datetime.datetime.utcnow() - cacheUpdateStart > datetime.timedelta(minutes=10):
# break out of cache update loop if full update takes more than 10min
# next loop we'll start with the oldest cache entries again
logger.info('skipping remaining %s old cache entries updates, they will be updated next time', len(updateable_entries)-i)
break
for i in range(60):
sleep(1)
if not self._updateCacheThreadRunning:
return
if self._continueUpdateCacheThread:
# break out of sleep loop and continue updating old cache entries
self._continueUpdateCacheThread = False
break
except Exception as e:
logger.error(str(e))
def _scanProjectsTreeAndUpdateOldEntriesInCache(self):
return
self._scanProjectsTree()
self._updateOldEntriesInCache()
def _updateProjectsDiskUsageInRADB(self):
try:
projects_du_result = self.getDiskUsageForPath(self.disk_usage.path_resolver.projects_path)
if projects_du_result['found']:
#get the total used space, and update the resource availability in the radb
radbrpc = self.disk_usage.path_resolver.radbrpc
storage_resources = radbrpc.getResources(resource_types='storage', include_availability=True)
cep4_storage_resource = next(x for x in storage_resources if 'CEP4' in x['name'])
total_capacity = cep4_storage_resource.get('total_capacity')
used_capacity = projects_du_result.get('disk_usage')
if total_capacity != None and used_capacity != None:
available_capacity = total_capacity - used_capacity
logger.info('updating availability capacity for %s (id=%s) to %s in the RADB',
cep4_storage_resource['name'],
cep4_storage_resource['id'],
humanreadablesize(available_capacity))
radbrpc.updateResourceAvailability(cep4_storage_resource['id'], available_capacity=available_capacity)
except Exception as e:
logger.error(e)
def open(self):
self.disk_usage.open()
self.event_bus.open()
self._updateCacheThread = Thread(target=self._scanProjectsTreeAndUpdateOldEntriesInCache)
self._updateCacheThread.daemon = True
self._updateCacheThreadRunning = True
self._updateCacheThread.start()
self.otdb_listener.start_listening()
self.dm_listener.start_listening()
def close(self):
self.otdb_listener.stop_listening()
self.dm_listener.stop_listening()
self._updateCacheThreadRunning = False
self._updateCacheThread.join()
self.event_bus.close()
self.disk_usage.close()
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def onObservationFinished(self, otdb_id, modificationTime):
self._onDiskActivityForOTDBId(otdb_id)
def onObservationAborted(self, otdb_id, modificationTime):
self._onDiskActivityForOTDBId(otdb_id)
def onTaskDeleted(self, otdb_id, deleted, paths, message=''):
self._onDiskActivityForOTDBId(otdb_id)
def _onDiskActivityForOTDBId(self, otdb_id):
result = self.disk_usage.getDiskUsageForOTDBId(otdb_id)
self._updateCache(result)
task_path = result.get('path')
projects_path = self.disk_usage.path_resolver.projects_path
# update all paths up the tree up to the projects_path
# update the resource availability in the radb as well
path = task_path
while path:
parent_path = '/'.join(path.split('/')[:-1])
if projects_path.startswith(parent_path) and len(parent_path) < len(projects_path):
break
logger.info('invalidating cache entry for %s because disk usage for task %s in %s changed', parent_path, otdb_id, task_path)
self._invalidateCacheEntryForPath(parent_path)
path = parent_path
# trigger update cache thread
self._continueUpdateCacheThread = True
def getDiskUsageForOTDBId(self, otdb_id, include_scratch_paths=True, force_update=False):
return self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
def getDiskUsageForMoMId(self, mom_id, include_scratch_paths=True, force_update=False):
return self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
def getDiskUsageForRADBId(self, radb_id, include_scratch_paths=True, force_update=False):
return self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
def getDiskUsageForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True, force_update=False):
logger.info("\n\ncache.getDiskUsageForTask(radb_id=%s, mom_id=%s, otdb_id=%s, include_scratch_paths=%s, force_update=%s)",
radb_id, mom_id, otdb_id, include_scratch_paths, force_update)
if otdb_id != None and not include_scratch_paths:
with self._cacheLock:
path = self._cache['otdb_id2path'].get(otdb_id)
if path:
logger.info('Using path from cache for otdb_id %s %s', otdb_id, path)
return self.getDiskUsageForPath(path, force_update=force_update)
logger.info("cache.getDiskUsageForTask could not find path in cache, determining path...")
path_result = self.disk_usage.path_resolver.getPathForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, include_scratch_paths=include_scratch_paths)
if path_result['found']:
path_du_result = self.getDiskUsageForPath(path_result['path'], force_update=force_update)
path_du_result['radb_id'] = path_result.get('radb_id')
path_du_result['mom_id'] = path_result.get('mom_id')
path_du_result['otdb_id'] = path_result.get('otdb_id')
if 'scratch_paths' in path_result:
path_du_result['scratch_paths'] = {}
for scratch_path in path_result['scratch_paths']:
scratch_path_du_result = self.getDiskUsageForPath(scratch_path, force_update=force_update)
path_du_result['scratch_paths'][scratch_path] = scratch_path_du_result
self._updateCache(path_du_result)
return path_du_result
self._updateCache(path_result)
return {'found': False, 'path': path_result['path']}
def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True, force_update=False):
logger.info("cache.getDiskUsageForTask(radb_ids=%s, mom_ids=%s, otdb_ids=%s)" % (radb_ids, mom_ids, otdb_ids))
result = {'radb_ids': {}, 'mom_ids': {}, 'otdb_ids': {}}
if radb_ids:
for radb_id in radb_ids:
result['radb_ids'][str(radb_id)] = self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
if mom_ids:
for mom_id in mom_ids:
result['mom_ids'][str(mom_id)] = self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
if otdb_ids:
for otdb_id in otdb_ids:
result['otdb_ids'][str(otdb_id)] = self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
return result
def getDiskUsageForPath(self, path, force_update=False):
logger.info("cache.getDiskUsageForPath(%s, force_update=%s)", path, force_update)
needs_cache_update = False
with self._cacheLock:
needs_cache_update |= path not in self._cache['path_du_results']
if needs_cache_update or force_update:
logger.info("cache update needed for %s", path)
result = du_getDiskUsageForPath(path)
self._updateCache(result)
with self._cacheLock:
if path in self._cache['path_du_results']:
result = self._cache['path_du_results'][path]
else:
result = { 'found': False, 'path':path, 'message': 'unknown error' }
if not self.disk_usage.path_resolver.pathExists(path):
result['message'] = 'No such path: %s' % path
result['disk_usage_readable'] = humanreadablesize(result.get('disk_usage', 0))
logger.info('cache.getDiskUsageForPath result: %s' % result)
return result
def getDiskUsageForTaskAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, force_update=False):
logger.info("cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id))
task_du_result = self.getDiskUsageForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, force_update=force_update)
if task_du_result['found']:
task_sd_result = self.disk_usage.path_resolver.getSubDirectories(task_du_result['path'])
if task_sd_result['found']:
subdir_paths = [os.path.join(task_du_result['path'],sd) for sd in task_sd_result['sub_directories']]
#TODO: potential for parallelization
subdirs_du_result = { sd: self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths }
result = {'found':True, 'task_directory': task_du_result, 'sub_directories': subdirs_du_result }
logger.info("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, result)
return result
logger.warn("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, task_du_result)
return task_du_result
def getDiskUsageForProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None, force_update=False):
logger.info("cache.getDiskUsageForProjectDirAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id))
path_result = self.disk_usage.path_resolver.getProjectDirAndSubDirectories(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, project_name=project_name)
if path_result['found']:
projectdir_du_result = self.getDiskUsageForPath(path_result['path'], force_update=force_update)
subdir_paths = [os.path.join(path_result['path'],sd) for sd in path_result['sub_directories']]
#TODO: potential for parallelization
subdirs_du_result = { sd: self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths }
result = {'found':True, 'projectdir': projectdir_du_result, 'sub_directories': subdirs_du_result }
logger.info('cache.getDiskUsageForProjectDirAndSubDirectories result: %s' % result)
return result
return path_result
def getDiskUsageForProjectsDirAndSubDirectories(self, force_update=False):
logger.info("cache.getDiskUsageForProjectsDirAndSubDirectories")
projects_path = self.disk_usage.path_resolver.projects_path
projectsdir_du_result = self.getDiskUsageForPath(projects_path, force_update=force_update)
result = {'found':True, 'projectdir': projectsdir_du_result }
project_subdirs_result = self.disk_usage.path_resolver.getSubDirectories(projects_path)
if project_subdirs_result['found']:
subdir_paths = [os.path.join(projects_path,sd) for sd in project_subdirs_result['sub_directories']]
#TODO: potential for parallelization
subdirs_du_result = { sd: self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths }
result['sub_directories'] = subdirs_du_result
logger.info('cache.getDiskUsageForProjectsDirAndSubDirectories result: %s' % result)
return result
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
with CacheManager(broker='scu099.control.lofar') as cm:
waitForInterrupt()