Skip to content
Snippets Groups Projects
Commit 80f17913 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #11155: fixed bugs. parallel scan and cache updates. made cache file path a cmdline option

parent fa26f2d9
No related branches found
No related tags found
No related merge requests found
......@@ -11,9 +11,11 @@ import ast
from optparse import OptionParser
from threading import Thread, RLock
import os.path
from pprint import pformat
from lofar.messaging import EventMessage, ToBus
from lofar.common.util import humanreadablesize
from lofar.common.datetimeutils import format_timedelta
from lofar.sas.datamanagement.storagequery.diskusage import getDiskUsageForPath as du_getDiskUsageForPath
from lofar.sas.datamanagement.storagequery.diskusage import DiskUsage
from lofar.sas.datamanagement.common.datamanagementbuslistener import DataManagementBusListener
......@@ -32,6 +34,7 @@ MAX_CACHE_ENTRY_AGE = datetime.timedelta(days=1)
class CacheManager:
def __init__(self,
cache_path='.du_cache.py',
mountpoint=CEP4_DATA_MOUNTPOINT,
radb_busname=RADB_BUSNAME,
radb_servicename=RADB_SERVICENAME,
......@@ -42,6 +45,9 @@ class CacheManager:
dm_notification_busname=DEFAULT_DM_NOTIFICATION_BUSNAME,
dm_notification_prefix=DEFAULT_DM_NOTIFICATION_PREFIX,
broker=None):
self._cache_path = cache_path
self.otdb_listener = OTDBBusListener(busname=otdb_notification_busname,
subject=otdb_notification_subject,
broker=broker,
......@@ -60,9 +66,10 @@ class CacheManager:
self.notification_prefix = dm_notification_prefix
self.event_bus = ToBus(dm_notification_busname, broker=broker)
self._scanProjectsTreeThread = None
self._updateCacheThread = None
self._updateCacheThreadRunning = False
self._continueUpdateCacheThread = False
self._cacheThreadsRunning = False
self._cacheLock = RLock()
self._cache = {'path_du_results': {}, 'otdb_id2path': {} }
......@@ -90,8 +97,8 @@ class CacheManager:
def _readCacheFromDisk(self):
# maybe this cache on disk is slow, if so, revert to proper db solution
try:
if os.path.exists('.du_cache.py'):
with open('.du_cache.py', 'r') as file:
if os.path.exists(self._cache_path):
with open(self._cache_path, 'r') as file:
with self._cacheLock:
self._cache = eval(file.read().strip())
if not isinstance(self._cache, dict):
......@@ -104,9 +111,11 @@ class CacheManager:
def _writeCacheToDisk(self):
try:
with open('.du_cache.py', 'w') as file:
tmp_path = '/tmp/tmp_storagequery_cache.py'
with open(tmp_path, 'w') as file:
with self._cacheLock:
file.write(str(self._cache))
file.write(pformat(self._cache) + '\n')
os.replace(tmp_path, self._cache_path)
except Exception as e:
logger.error("Error while writing du cache: %s", e)
......@@ -186,7 +195,7 @@ class CacheManager:
# mark cache entry for directory to be updated
path_cache[directory]['needs_update'] = True
if not self._updateCacheThreadRunning:
if not self._cacheThreadsRunning:
return
if depth < 3:
......@@ -208,7 +217,7 @@ class CacheManager:
def _updateOldEntriesInCache(self):
logger.info('starting updating old cache entries')
while self._updateCacheThreadRunning:
while self._cacheThreadsRunning:
try:
now = datetime.datetime.utcnow()
with self._cacheLock:
......@@ -247,11 +256,11 @@ class CacheManager:
try:
path = cache_entry.get('path')
if path:
logger.info('examining entry %s/%s in cache. timestamp:%s age:%s needs_update:%s path: %s',
logger.info('_updateOldEntriesInCache: examining entry %s/%s. timestamp:%s age:%s needs_update:%s path: %s',
i,
len(updateable_entries),
cache_entry['cache_timestamp'],
now - cache_entry['cache_timestamp'],
format_timedelta(now - cache_entry['cache_timestamp']),
cache_entry.get('needs_update', False),
path)
result = du_getDiskUsageForPath(path)
......@@ -260,7 +269,7 @@ class CacheManager:
except Exception as e:
logger.error(str(e))
if not self._updateCacheThreadRunning:
if not self._cacheThreadsRunning:
return
if datetime.datetime.utcnow() - cacheUpdateStart > datetime.timedelta(minutes=10):
......@@ -269,22 +278,15 @@ class CacheManager:
logger.info('skipping remaining %s old cache entries updates, they will be updated next time', len(updateable_entries)-i)
break
#sleep for a minute, (or stop if requested)
for i in range(60):
sleep(1)
if not self._updateCacheThreadRunning:
if not self._cacheThreadsRunning:
return
if self._continueUpdateCacheThread:
# break out of sleep loop and continue updating old cache entries
self._continueUpdateCacheThread = False
break
except Exception as e:
logger.error(str(e))
def _scanProjectsTreeAndUpdateOldEntriesInCache(self):
return
self._scanProjectsTree()
self._updateOldEntriesInCache()
def _updateProjectsDiskUsageInRADB(self):
try:
projects_du_result = self.getDiskUsageForPath(self.disk_usage.path_resolver.projects_path)
......@@ -312,19 +314,25 @@ class CacheManager:
self.disk_usage.open()
self.event_bus.open()
self._updateCacheThread = Thread(target=self._scanProjectsTreeAndUpdateOldEntriesInCache)
self._cacheThreadsRunning = True
self._updateCacheThread = Thread(target=self._updateOldEntriesInCache)
self._updateCacheThread.daemon = True
self._updateCacheThreadRunning = True
self._updateCacheThread.start()
self._scanProjectsTreeThread = Thread(target=self._scanProjectsTree)
self._scanProjectsTreeThread.daemon = True
self._scanProjectsTreeThread.start()
self.otdb_listener.start_listening()
self.dm_listener.start_listening()
def close(self):
self.otdb_listener.stop_listening()
self.dm_listener.stop_listening()
self._updateCacheThreadRunning = False
self._cacheThreadsRunning = False
self._updateCacheThread.join()
self._scanProjectsTreeThread.join()
self.event_bus.close()
self.disk_usage.close()
......@@ -367,9 +375,6 @@ class CacheManager:
path = parent_path
# trigger update cache thread
self._continueUpdateCacheThread = True
def getDiskUsageForOTDBId(self, otdb_id, include_scratch_paths=True, force_update=False):
return self.getDiskUsageForTask(otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
......@@ -380,7 +385,7 @@ class CacheManager:
return self.getDiskUsageForTask(radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
def getDiskUsageForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True, force_update=False):
logger.info("\n\ncache.getDiskUsageForTask(radb_id=%s, mom_id=%s, otdb_id=%s, include_scratch_paths=%s, force_update=%s)",
logger.info("cache.getDiskUsageForTask(radb_id=%s, mom_id=%s, otdb_id=%s, include_scratch_paths=%s, force_update=%s)",
radb_id, mom_id, otdb_id, include_scratch_paths, force_update)
if otdb_id != None and not include_scratch_paths:
......@@ -510,5 +515,5 @@ class CacheManager:
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
with CacheManager(broker='scu099.control.lofar') as cm:
with CacheManager() as cm:
waitForInterrupt()
......@@ -85,6 +85,9 @@ def main():
# Check the invocation arguments
parser = OptionParser("%prog [options]",
description='runs the storagequery service')
parser.add_option('-c', '--cache_path', dest='cache_path', type='string',
default=os.path.expandvars('$LOFARROOT/etc/storagequery_cache.py'),
help='path of the cache file, default: %default')
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME)
parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: %s" % DEFAULT_SERVICENAME)
......@@ -99,7 +102,7 @@ def main():
level=logging.DEBUG if options.verbose else logging.INFO)
setQpidLogLevel(logging.INFO)
with CacheManager(broker=options.broker) as cache_manager:
with CacheManager(broker=options.broker, cache_path=options.cache_path) as cache_manager:
with createService(busname=options.busname,
servicename=options.servicename,
broker=options.broker,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment