diff --git a/SAS/DataManagement/StorageQueryService/cache.py b/SAS/DataManagement/StorageQueryService/cache.py index 2ce9f6da7c64d43b61f32c5eee5bc3c62c8d5b43..b06e9cc78bf8f3f9c0783824cd213e83997c5c39 100644 --- a/SAS/DataManagement/StorageQueryService/cache.py +++ b/SAS/DataManagement/StorageQueryService/cache.py @@ -11,9 +11,11 @@ import ast from optparse import OptionParser from threading import Thread, RLock import os.path +from pprint import pformat from lofar.messaging import EventMessage, ToBus from lofar.common.util import humanreadablesize +from lofar.common.datetimeutils import format_timedelta from lofar.sas.datamanagement.storagequery.diskusage import getDiskUsageForPath as du_getDiskUsageForPath from lofar.sas.datamanagement.storagequery.diskusage import DiskUsage from lofar.sas.datamanagement.common.datamanagementbuslistener import DataManagementBusListener @@ -32,6 +34,7 @@ MAX_CACHE_ENTRY_AGE = datetime.timedelta(days=1) class CacheManager: def __init__(self, + cache_path='.du_cache.py', mountpoint=CEP4_DATA_MOUNTPOINT, radb_busname=RADB_BUSNAME, radb_servicename=RADB_SERVICENAME, @@ -42,6 +45,9 @@ class CacheManager: dm_notification_busname=DEFAULT_DM_NOTIFICATION_BUSNAME, dm_notification_prefix=DEFAULT_DM_NOTIFICATION_PREFIX, broker=None): + + self._cache_path = cache_path + self.otdb_listener = OTDBBusListener(busname=otdb_notification_busname, subject=otdb_notification_subject, broker=broker, @@ -60,9 +66,10 @@ class CacheManager: self.notification_prefix = dm_notification_prefix self.event_bus = ToBus(dm_notification_busname, broker=broker) + self._scanProjectsTreeThread = None self._updateCacheThread = None - self._updateCacheThreadRunning = False - self._continueUpdateCacheThread = False + self._cacheThreadsRunning = False + self._cacheLock = RLock() self._cache = {'path_du_results': {}, 'otdb_id2path': {} } @@ -90,8 +97,8 @@ class CacheManager: def _readCacheFromDisk(self): # maybe this cache on disk is slow, if so, revert to proper db solution try: - if os.path.exists('.du_cache.py'): - with open('.du_cache.py', 'r') as file: + if os.path.exists(self._cache_path): + with open(self._cache_path, 'r') as file: with self._cacheLock: self._cache = eval(file.read().strip()) if not isinstance(self._cache, dict): @@ -104,9 +111,11 @@ class CacheManager: def _writeCacheToDisk(self): try: - with open('.du_cache.py', 'w') as file: + tmp_path = '/tmp/tmp_storagequery_cache.py' + with open(tmp_path, 'w') as file: with self._cacheLock: - file.write(str(self._cache)) + file.write(pformat(self._cache) + '\n') + os.replace(tmp_path, self._cache_path) except Exception as e: logger.error("Error while writing du cache: %s", e) @@ -186,7 +195,7 @@ class CacheManager: # mark cache entry for directory to be updated path_cache[directory]['needs_update'] = True - if not self._updateCacheThreadRunning: + if not self._cacheThreadsRunning: return if depth < 3: @@ -208,7 +217,7 @@ class CacheManager: def _updateOldEntriesInCache(self): logger.info('starting updating old cache entries') - while self._updateCacheThreadRunning: + while self._cacheThreadsRunning: try: now = datetime.datetime.utcnow() with self._cacheLock: @@ -247,11 +256,11 @@ class CacheManager: try: path = cache_entry.get('path') if path: - logger.info('examining entry %s/%s in cache. timestamp:%s age:%s needs_update:%s path: %s', + logger.info('_updateOldEntriesInCache: examining entry %s/%s. timestamp:%s age:%s needs_update:%s path: %s', i, len(updateable_entries), cache_entry['cache_timestamp'], - now - cache_entry['cache_timestamp'], + format_timedelta(now - cache_entry['cache_timestamp']), cache_entry.get('needs_update', False), path) result = du_getDiskUsageForPath(path) @@ -260,7 +269,7 @@ class CacheManager: except Exception as e: logger.error(str(e)) - if not self._updateCacheThreadRunning: + if not self._cacheThreadsRunning: return if datetime.datetime.utcnow() - cacheUpdateStart > datetime.timedelta(minutes=10): @@ -269,22 +278,15 @@ class CacheManager: logger.info('skipping remaining %s old cache entries updates, they will be updated next time', len(updateable_entries)-i) break + #sleep for a minute, (or stop if requested) for i in range(60): sleep(1) - if not self._updateCacheThreadRunning: + if not self._cacheThreadsRunning: return - if self._continueUpdateCacheThread: - # break out of sleep loop and continue updating old cache entries - self._continueUpdateCacheThread = False - break + except Exception as e: logger.error(str(e)) - def _scanProjectsTreeAndUpdateOldEntriesInCache(self): - return - self._scanProjectsTree() - self._updateOldEntriesInCache() - def _updateProjectsDiskUsageInRADB(self): try: projects_du_result = self.getDiskUsageForPath(self.disk_usage.path_resolver.projects_path) @@ -312,19 +314,25 @@ class CacheManager: self.disk_usage.open() self.event_bus.open() - self._updateCacheThread = Thread(target=self._scanProjectsTreeAndUpdateOldEntriesInCache) + self._cacheThreadsRunning = True + + self._updateCacheThread = Thread(target=self._updateOldEntriesInCache) self._updateCacheThread.daemon = True - self._updateCacheThreadRunning = True self._updateCacheThread.start() + self._scanProjectsTreeThread = Thread(target=self._scanProjectsTree) + self._scanProjectsTreeThread.daemon = True + self._scanProjectsTreeThread.start() + self.otdb_listener.start_listening() self.dm_listener.start_listening() def close(self): self.otdb_listener.stop_listening() self.dm_listener.stop_listening() - self._updateCacheThreadRunning = False + self._cacheThreadsRunning = False self._updateCacheThread.join() + self._scanProjectsTreeThread.join() self.event_bus.close() self.disk_usage.close() @@ -367,9 +375,6 @@ class CacheManager: path = parent_path - # trigger update cache thread - self._continueUpdateCacheThread = True - def getDiskUsageForOTDBId(self, otdb_id, include_scratch_paths=True, force_update=False): return self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update) @@ -380,7 +385,7 @@ class CacheManager: return self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update) def getDiskUsageForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True, force_update=False): - logger.info("\n\ncache.getDiskUsageForTask(radb_id=%s, mom_id=%s, otdb_id=%s, include_scratch_paths=%s, force_update=%s)", + logger.info("cache.getDiskUsageForTask(radb_id=%s, mom_id=%s, otdb_id=%s, include_scratch_paths=%s, force_update=%s)", radb_id, mom_id, otdb_id, include_scratch_paths, force_update) if otdb_id != None and not include_scratch_paths: @@ -510,5 +515,5 @@ class CacheManager: if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - with CacheManager(broker='scu099.control.lofar') as cm: + with CacheManager() as cm: waitForInterrupt() diff --git a/SAS/DataManagement/StorageQueryService/service.py b/SAS/DataManagement/StorageQueryService/service.py index 72cf369fa80e3a5f375cffd093861f6f5ef7b76b..e027fec29e2a9ac92b22c044a4824b66dad57ee3 100644 --- a/SAS/DataManagement/StorageQueryService/service.py +++ b/SAS/DataManagement/StorageQueryService/service.py @@ -85,6 +85,9 @@ def main(): # Check the invocation arguments parser = OptionParser("%prog [options]", description='runs the storagequery service') + parser.add_option('-c', '--cache_path', dest='cache_path', type='string', + default=os.path.expandvars('$LOFARROOT/etc/storagequery_cache.py'), + help='path of the cache file, default: %default') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME) parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: %s" % DEFAULT_SERVICENAME) @@ -99,7 +102,7 @@ def main(): level=logging.DEBUG if options.verbose else logging.INFO) setQpidLogLevel(logging.INFO) - with CacheManager(broker=options.broker) as cache_manager: + with CacheManager(broker=options.broker, cache_path=options.cache_path) as cache_manager: with createService(busname=options.busname, servicename=options.servicename, broker=options.broker,