#!/usr/bin/python # $Id$ ''' TODO: add doc ''' import logging import datetime from time import sleep import ast from optparse import OptionParser from threading import Thread, RLock import os.path from lofar.messaging import EventMessage, ToBus from lofar.common.util import humanreadablesize from lofar.common.datetimeutils import format_timedelta from lofar.sas.datamanagement.storagequery.diskusage import getDiskUsageForPath as du_getDiskUsageForPath from lofar.sas.datamanagement.storagequery.diskusage import DiskUsage from lofar.sas.datamanagement.common.datamanagementbuslistener import DataManagementBusListener from lofar.sas.otdb.OTDBBusListener import OTDBBusListener from lofar.common.util import waitForInterrupt from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT from lofar.sas.datamanagement.common.config import DEFAULT_DM_NOTIFICATION_BUSNAME, DEFAULT_DM_NOTIFICATION_PREFIX from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME logger = logging.getLogger(__name__) MAX_CACHE_ENTRY_AGE = datetime.timedelta(hours=8) class CacheManager: def __init__(self, cache_path='.du_cache.py', mountpoint=CEP4_DATA_MOUNTPOINT, radb_busname=RADB_BUSNAME, radb_servicename=RADB_SERVICENAME, mom_busname=DEFAULT_MOMQUERY_BUSNAME, mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME, otdb_notification_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT, dm_notification_busname=DEFAULT_DM_NOTIFICATION_BUSNAME, dm_notification_prefix=DEFAULT_DM_NOTIFICATION_PREFIX, broker=None): self._cache_path = cache_path self.otdb_listener = OTDBBusListener(busname=otdb_notification_busname, subject=otdb_notification_subject, broker=broker, numthreads=2) self.otdb_listener.onObservationAborted = self.onObservationAborted self.otdb_listener.onObservationFinished = self.onObservationFinished self.dm_listener = DataManagementBusListener(busname=dm_notification_busname, subjects=dm_notification_prefix + '*', broker=broker, numthreads=2) self.dm_listener.onTaskDeleted = self.onTaskDeleted self.notification_prefix = dm_notification_prefix self.event_bus = ToBus(dm_notification_busname, broker=broker) self._scanProjectsTreeThread = None self._updateCacheThread = None self._cacheThreadsRunning = False self._cacheLock = RLock() self._cache = {'path_du_results': {}, 'otdb_id2path': {} } self._last_cache_write_timestamp = datetime.datetime(1970, 1, 1) self._readCacheFromDisk() self.disk_usage = DiskUsage(mountpoint=mountpoint, radb_busname=radb_busname, radb_servicename=radb_servicename, mom_busname=mom_busname, mom_servicename=mom_servicename, broker=broker) def _sendDiskUsageChangedNotification(self, path, disk_usage, otdb_id=None): try: msg = EventMessage(context=self.notification_prefix + 'DiskUsageChanged', content={ 'path': path, 'disk_usage': disk_usage, 'disk_usage_readable': humanreadablesize(disk_usage), 'otdb_id': otdb_id }) logger.info('Sending notification with subject %s to %s: %s', msg.subject, self.event_bus.address, msg.content) self.event_bus.send(msg) except Exception as e: logger.error(str(e)) def _readCacheFromDisk(self): # maybe this cache on disk is slow, if so, revert to proper db solution try: if os.path.exists(self._cache_path): with open(self._cache_path, 'r') as file: with self._cacheLock: self._cache = eval(file.read().strip()) if not isinstance(self._cache, dict): self._cache = {'path_du_results': {}, 'otdb_id2path': {} } if 'path_du_results' not in self._cache: self._cache['path_du_results'] = {} if 'otdb_id2path' not in self._cache: self._cache['otdb_id2path'] = {} except Exception as e: logger.error("Error while reading in du cache: %s", e) with self._cacheLock: self._cache = {'path_du_results': {}, 'otdb_id2path': {} } def _writeCacheToDisk(self): try: # only persist (a subset of) the cache to disk every once in a while. if datetime.datetime.utcnow() - self._last_cache_write_timestamp > datetime.timedelta(minutes=0.2): tmp_path = '/tmp/tmp_storagequery_cache.py' cache_str = '' with self._cacheLock: # Take a subset of the entire cache # only the path_du_results for paths at project level (like /data/projects, /data/projects/LC9_001) # Do not store path_du_results for deeper levels on disk, because that makes the disk read/write too slow, # and the deeper levels can be obtained via rhb-du calls quite fast anyway. # Furthermore, once a deeper level du results is stored in the memory cache, then it is also available for fast lookup. # We just don't store these deep levels on disk. sub_cache = { path:du_result for path,du_result in self._cache['path_du_results'].items() if self.getDepthToProjectsDir(path) <= 1 } cache_str = str(sub_cache) with open(tmp_path, 'w') as file: file.write(cache_str) os.rename(tmp_path, self._cache_path) self._last_cache_write_timestamp = datetime.datetime.utcnow() except Exception as e: logger.error("Error while writing du cache: %s", e) def _updateCache(self, du_result): if not 'path' in du_result: return path = du_result['path'] otdb_id = du_result.get('otdb_id') with self._cacheLock: path_cache = self._cache['path_du_results'] otdb_id2path_cache = self._cache['otdb_id2path'] if otdb_id is None: # try to look up the otdb in the path cache if path in path_cache: otdb_id = du_result.get('otdb_id') if not du_result['found']: #make sure disk_usage is set when not found du_result['disk_usage'] = 0 du_result['disk_usage_readable'] = '0B' if not path in path_cache or path_cache[path]['disk_usage'] != du_result['disk_usage']: # update the cache entry, even when no du result found, # cause that will save disk queries next time. logger.info('updating cache entry: %s', du_result) path_cache[path] = du_result path_cache[path]['cache_timestamp'] = datetime.datetime.utcnow() path_cache[path]['needs_update'] = False if otdb_id != None: otdb_id2path_cache[otdb_id] = path self._writeCacheToDisk() self._sendDiskUsageChangedNotification(path, du_result['disk_usage'], otdb_id) def _invalidateCacheEntryForPath(self, path): with self._cacheLock: path_cache = self._cache['path_du_results'] if path in path_cache: path_cache[path]['needs_update'] = True def getDiskUsagesForAllOtdbIds(self, force_update=False): otdb_ids = [] with self._cacheLock: otdb_id2path_cache = self._cache['otdb_id2path'] otdb_ids = otdb_id2path_cache.keys() result = {} for otdb_id in otdb_ids: result[otdb_id] = self.getDiskUsageForOTDBId(otdb_id, force_update=force_update) return result def getDepthToProjectsDir(self, path): return len(path.replace(self.disk_usage.path_resolver.projects_path, '').strip('/').split('/')) def _scanProjectsTree(self): try: def addSubDirectoriesToCache(directory): depth = self.getDepthToProjectsDir(directory) #depth=0 : projects #depth=1 : projects/<project> #depth=2 : projects/<project>/<obs> #depth=3 : projects/<project>/<obs>/<sub_dir> if depth > 3: return with self._cacheLock: path_cache = self._cache['path_du_results'] if not directory in path_cache: logger.info('tree scan: adding \'%s\' with empty disk_usage to cache which will be du\'ed later', directory) empty_du_result = {'found': True, 'disk_usage': None, 'path': directory, 'name': directory.split('/')[-1]} self._updateCache(empty_du_result) if directory in path_cache: # mark cache entry for directory to be updated path_cache[directory]['needs_update'] = True if not self._cacheThreadsRunning: return if depth < 3: logger.info('tree scan: scanning \'%s\'', directory) sd_result = self.disk_usage.path_resolver.getSubDirectories(directory) if sd_result['found']: subdir_paths = [os.path.join(directory,sd) for sd in sd_result['sub_directories']] for subdir_path in subdir_paths: # recurse addSubDirectoriesToCache(subdir_path) addSubDirectoriesToCache(self.disk_usage.path_resolver.projects_path) logger.info('tree scan complete') except Exception as e: logger.error(str(e)) def _updateOldEntriesInCache(self): logger.info('starting updating old cache entries') while self._cacheThreadsRunning: try: now = datetime.datetime.utcnow() with self._cacheLock: path_cache = self._cache['path_du_results'] old_entries = [cache_entry for cache_entry in path_cache.values() if now - cache_entry['cache_timestamp'] > MAX_CACHE_ENTRY_AGE] needs_update_entries = [cache_entry for cache_entry in path_cache.values() if cache_entry.get('needs_update', False)] updateable_entries = old_entries + needs_update_entries if updateable_entries: logger.info('%s old cache entries need to be updated, #age:%s #needs_update:%s', len(updateable_entries), len(old_entries), len(needs_update_entries)) # sort them oldest to newest, 'needs_update' paths first def compareFunc(entry1, entry2): if entry1.get('needs_update') and not entry2.get('needs_update'): return -1 if not entry1.get('needs_update') and entry2.get('needs_update'): return 1 if entry1['cache_timestamp'] < entry2['cache_timestamp']: return -1 if entry1['cache_timestamp'] > entry2['cache_timestamp']: return 1 return 0 updateable_entries = sorted(updateable_entries, cmp=compareFunc) cacheUpdateStart = datetime.datetime.utcnow() for i, cache_entry in enumerate(updateable_entries): try: path = cache_entry.get('path') if path: logger.info('_updateOldEntriesInCache: examining entry %s/%s. timestamp:%s age:%s needs_update:%s path: \'%s\'', i, len(updateable_entries), cache_entry['cache_timestamp'], format_timedelta(now - cache_entry['cache_timestamp']), cache_entry.get('needs_update', False), path) #do a quick update of the entry sy applying the sum of the subdirs to the path's du result... #this make a best guess immedeiately available... self._updatePathCacheEntryToSubDirTotal(path, False) #...and in the mean time, du a full update from disk, which might be (really) slow. result = du_getDiskUsageForPath(path) logger.debug('trying to update old entry in cache: %s', result) self._updateCache(result) except Exception as e: logger.error(str(e)) if not self._cacheThreadsRunning: return if datetime.datetime.utcnow() - cacheUpdateStart > datetime.timedelta(minutes=10): # break out of cache update loop if full update takes more than 10min # next loop we'll start with the oldest cache entries again logger.info('skipping remaining %s old cache entries updates, they will be updated next time', len(updateable_entries)-i) break #update the CEP4 capacities in the RADB once in a while... self._updateCEP4CapacitiesInRADB() #sleep for a minute, (or stop if requested) for i in range(60): sleep(1) if not self._cacheThreadsRunning: return except Exception as e: logger.error(str(e)) def _updatePathCacheEntryToSubDirTotal(self, path, force_update=False): sd_result = self.disk_usage.path_resolver.getSubDirectories(path) if sd_result['found']: subdir_paths = [os.path.join(path, sd) for sd in sd_result['sub_directories']] subdir_du_results = [self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths] sum_du = sum([du['disk_usage'] for du in subdir_du_results]) with self._cacheLock: if path in self._cache['path_du_results']: path_result = self._cache['path_du_results'][path] path_result['disk_usage'] = sum_du path_result['disk_usage_readable'] = humanreadablesize(sum_du) path_result['needs_update'] = True self._updateCache(path_result) def _updateCEP4CapacitiesInRADB(self): try: df_result = self.disk_usage.getDiskFreeSpace() if df_result['found']: #get the total used space, and update the resource availability in the radb radbrpc = self.disk_usage.path_resolver.radbrpc storage_resources = radbrpc.getResources(resource_types='storage', include_availability=True) cep4_storage_resource = next(x for x in storage_resources if 'CEP4' in x['name']) total_capacity = df_result.get('disk_size') used_capacity = df_result.get('disk_usage') available_capacity = df_result.get('disk_free') logger.info('updating capacities for resource \'%s\' (id=%s) in the RADB: total=%s, used=%s, available=%s', cep4_storage_resource['name'], cep4_storage_resource['id'], humanreadablesize(total_capacity), humanreadablesize(used_capacity), humanreadablesize(available_capacity)) radbrpc.updateResourceAvailability(cep4_storage_resource['id'], available_capacity=available_capacity, total_capacity=total_capacity) except Exception as e: logger.error('_updateCEP4CapacitiesInRADB: %s', e) def open(self): self.disk_usage.open() self.event_bus.open() self._cacheThreadsRunning = True self._updateCacheThread = Thread(target=self._updateOldEntriesInCache) self._updateCacheThread.daemon = True self._updateCacheThread.start() self._scanProjectsTreeThread = Thread(target=self._scanProjectsTree) self._scanProjectsTreeThread.daemon = True self._scanProjectsTreeThread.start() self.otdb_listener.start_listening() self.dm_listener.start_listening() def close(self): self.otdb_listener.stop_listening() self.dm_listener.stop_listening() self._cacheThreadsRunning = False self._updateCacheThread.join() self._scanProjectsTreeThread.join() self.event_bus.close() self.disk_usage.close() def __enter__(self): self.open() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def onObservationFinished(self, otdb_id, modificationTime): self._onDiskActivityForOTDBId(otdb_id) def onObservationAborted(self, otdb_id, modificationTime): self._onDiskActivityForOTDBId(otdb_id) def onTaskDeleted(self, otdb_id, deleted, paths, message=''): self._onDiskActivityForOTDBId(otdb_id) def _onDiskActivityForOTDBId(self, otdb_id): result = self.disk_usage.getDiskUsageForOTDBId(otdb_id) self._updateCache(result) task_path = result.get('path') projects_path = self.disk_usage.path_resolver.projects_path # update all paths up the tree up to the projects_path # update the resource availability in the radb as well path = task_path while path: parent_path = '/'.join(path.split('/')[:-1]) if projects_path.startswith(parent_path) and len(parent_path) < len(projects_path): break logger.info('invalidating cache entry for %s because disk usage for task %s in %s changed', parent_path, otdb_id, task_path) self._invalidateCacheEntryForPath(parent_path) path = parent_path def getDiskUsageForOTDBId(self, otdb_id, include_scratch_paths=True, force_update=False): return self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update) def getDiskUsageForMoMId(self, mom_id, include_scratch_paths=True, force_update=False): return self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update) def getDiskUsageForRADBId(self, radb_id, include_scratch_paths=True, force_update=False): return self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update) def getDiskUsageForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True, force_update=False): logger.info("cache.getDiskUsageForTask(radb_id=%s, mom_id=%s, otdb_id=%s, include_scratch_paths=%s, force_update=%s)", radb_id, mom_id, otdb_id, include_scratch_paths, force_update) if otdb_id != None and not include_scratch_paths: with self._cacheLock: path = self._cache['otdb_id2path'].get(otdb_id) if path: logger.info('Using path from cache for otdb_id %s %s', otdb_id, path) return self.getDiskUsageForPath(path, force_update=force_update) logger.info("cache.getDiskUsageForTask could not find path in cache, determining path...") path_result = self.disk_usage.path_resolver.getPathForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, include_scratch_paths=include_scratch_paths) if path_result['found']: path_du_result = self.getDiskUsageForPath(path_result['path'], force_update=force_update) path_du_result['radb_id'] = path_result.get('radb_id') path_du_result['mom_id'] = path_result.get('mom_id') path_du_result['otdb_id'] = path_result.get('otdb_id') if 'scratch_paths' in path_result: path_du_result['scratch_paths'] = {} for scratch_path in path_result['scratch_paths']: scratch_path_du_result = self.getDiskUsageForPath(scratch_path, force_update=force_update) path_du_result['scratch_paths'][scratch_path] = scratch_path_du_result self._updateCache(path_du_result) return path_du_result self._updateCache(path_result) return {'found': False, 'path': path_result['path']} def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True, force_update=False): logger.info("cache.getDiskUsageForTask(radb_ids=%s, mom_ids=%s, otdb_ids=%s)" % (radb_ids, mom_ids, otdb_ids)) result = {'radb_ids': {}, 'mom_ids': {}, 'otdb_ids': {}} if radb_ids: for radb_id in radb_ids: result['radb_ids'][str(radb_id)] = self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update) if mom_ids: for mom_id in mom_ids: result['mom_ids'][str(mom_id)] = self.getDiskUsageForTask(mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update) if otdb_ids: for otdb_id in otdb_ids: result['otdb_ids'][str(otdb_id)] = self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update) return result def getDiskUsageForPath(self, path, force_update=False): logger.info("cache.getDiskUsageForPath('%s', force_update=%s)", path, force_update) needs_cache_update = False with self._cacheLock: needs_cache_update |= path not in self._cache['path_du_results'] if needs_cache_update or force_update: logger.info("cache update needed for %s", path) result = du_getDiskUsageForPath(path) self._updateCache(result) with self._cacheLock: if path in self._cache['path_du_results']: result = self._cache['path_du_results'][path] else: result = { 'found': False, 'path':path, 'message': 'unknown error' } if not self.disk_usage.path_resolver.pathExists(path): result['message'] = 'No such path: %s' % path result['disk_usage_readable'] = humanreadablesize(result.get('disk_usage', 0)) logger.info('cache.getDiskUsageForPath(\'%s\') result: %s', path, result) return result def getDiskUsageForTaskAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, force_update=False): logger.info("cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id)) task_du_result = self.getDiskUsageForTask(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, force_update=force_update) if task_du_result['found']: task_sd_result = self.disk_usage.path_resolver.getSubDirectories(task_du_result['path']) if task_sd_result['found']: subdir_paths = [os.path.join(task_du_result['path'],sd) for sd in task_sd_result['sub_directories']] #TODO: potential for parallelization subdirs_du_result = { sd: self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths } result = {'found':True, 'task_directory': task_du_result, 'sub_directories': subdirs_du_result } logger.info("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, result) return result logger.warn("result for cache.getDiskUsageForTaskAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s): %s", radb_id, mom_id, otdb_id, task_du_result) return task_du_result def getDiskUsageForProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None, force_update=False): logger.info("cache.getDiskUsageForProjectDirAndSubDirectories(radb_id=%s, mom_id=%s, otdb_id=%s)" % (radb_id, mom_id, otdb_id)) path_result = self.disk_usage.path_resolver.getProjectDirAndSubDirectories(radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, project_name=project_name) if path_result['found']: projectdir_du_result = self.getDiskUsageForPath(path_result['path'], force_update=force_update) subdir_paths = [os.path.join(path_result['path'],sd) for sd in path_result['sub_directories']] #TODO: potential for parallelization subdirs_du_result = { sd: self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths } result = {'found':True, 'projectdir': projectdir_du_result, 'sub_directories': subdirs_du_result } logger.info('cache.getDiskUsageForProjectDirAndSubDirectories result: %s' % result) return result return path_result def getDiskUsageForProjectsDirAndSubDirectories(self, force_update=False): logger.info("cache.getDiskUsageForProjectsDirAndSubDirectories") projects_path = self.disk_usage.path_resolver.projects_path projectsdir_du_result = self.getDiskUsageForPath(projects_path, force_update=force_update) result = {'found':True, 'projectdir': projectsdir_du_result } project_subdirs_result = self.disk_usage.path_resolver.getSubDirectories(projects_path) if project_subdirs_result['found']: subdir_paths = [os.path.join(projects_path,sd) for sd in project_subdirs_result['sub_directories']] #TODO: potential for parallelization subdirs_du_result = { sd: self.getDiskUsageForPath(sd, force_update=force_update) for sd in subdir_paths } result['sub_directories'] = subdirs_du_result logger.info('cache.getDiskUsageForProjectsDirAndSubDirectories result: %s' % result) return result if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) with CacheManager() as cm: waitForInterrupt()