diff --git a/SAS/DataManagement/DataManagementCommon/path.py b/SAS/DataManagement/DataManagementCommon/path.py index 77adfac0f484f57e54e9b822f3a65ec38dfca7ac..d1653fc96f55d8b128b693e9142ecb74099d594d 100644 --- a/SAS/DataManagement/DataManagementCommon/path.py +++ b/SAS/DataManagement/DataManagementCommon/path.py @@ -163,7 +163,7 @@ class PathResolver: def getSubDirectories(self, path): logger.debug('getSubDirectories(%s)', path) # get the subdirectories of the given path - cmd = ['lfs', 'find', '--type', 'd', '--maxdepth', '1', path.rstrip('/')] + cmd = ['find', path.rstrip('/'), '-maxdepth', '1', '-type', 'd'] cmd = wrap_command_in_cep4_head_node_ssh_call_if_needed(cmd) logger.debug(' '.join(cmd)) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) diff --git a/SAS/DataManagement/StorageQueryService/cache.py b/SAS/DataManagement/StorageQueryService/cache.py index 8586205e5617bce5225a7f6a7c6b86755fff00c8..824514ad2864a66c461f60adb5b5c79047ac181b 100644 --- a/SAS/DataManagement/StorageQueryService/cache.py +++ b/SAS/DataManagement/StorageQueryService/cache.py @@ -72,9 +72,8 @@ class CacheManager: self.event_bus = ToBus(exchange=exchange, broker=broker) - self._scanProjectsTreeThread = None self._updateCacheThread = None - self._cacheThreadsRunning = False + self._running = False self._cacheLock = RLock() @@ -149,7 +148,7 @@ class CacheManager: except Exception as e: logger.error("Error while writing du cache: %s", e) - def _updateCache(self, du_result): + def _updateCache(self, du_result, send_notification=True): if not 'path' in du_result: return @@ -190,7 +189,8 @@ class CacheManager: self._writeCacheToDisk() - self._sendDiskUsageChangedNotification(path, du_result['disk_usage'], otdb_id) + if send_notification: + self._sendDiskUsageChangedNotification(path, du_result['disk_usage'], otdb_id) def _invalidateCacheEntryForPath(self, path): with self._cacheLock: @@ -218,7 +218,7 @@ class CacheManager: def _scanProjectsTree(self): try: def addSubDirectoriesToCache(directory): - if not self._cacheThreadsRunning: + if not self._running: return depth = self.getDepthToProjectsDir(directory) @@ -241,7 +241,6 @@ class CacheManager: # recurse addSubDirectoriesToCache(subdir_path) - add_empty_du_result_to_cache = False with self._cacheLock: path_cache = self._cache['path_du_results'] add_empty_du_result_to_cache = not directory in path_cache @@ -249,7 +248,7 @@ class CacheManager: if add_empty_du_result_to_cache: logger.info('tree scan: adding \'%s\' with empty disk_usage to cache which will be du\'ed later', directory) empty_du_result = {'found': True, 'disk_usage': None, 'path': directory, 'name': directory.split('/')[-1]} - self._updateCache(empty_du_result) + self._updateCache(empty_du_result, send_notification=False) with self._cacheLock: path_cache = self._cache['path_du_results'] @@ -261,11 +260,11 @@ class CacheManager: logger.info('tree scan complete') except Exception as e: - logger.error(str(e)) + logger.exception(str(e)) def _updateOldEntriesInCache(self): logger.info('starting updating old cache entries') - while self._cacheThreadsRunning: + while self._running: try: now = datetime.datetime.utcnow() with self._cacheLock: @@ -307,16 +306,6 @@ class CacheManager: cacheUpdateStart = datetime.datetime.utcnow() - #do a quick update of each entry by applying the sum of the subdirs to the path's du result... - #this make a best guess immediately available... - for cache_entry in updateable_entries: - try: - path = cache_entry.get('path') - if path: - self._updatePathCacheEntryToSubDirTotal(path, False) - except Exception as e: - logger.error(str(e)) - for i, cache_entry in enumerate(updateable_entries): try: # it might be that the cache_entry was already updated via another way @@ -340,7 +329,7 @@ class CacheManager: except Exception as e: logger.error(str(e)) - if not self._cacheThreadsRunning: + if not self._running: logger.info('exiting _updateCacheThread') return @@ -356,33 +345,13 @@ class CacheManager: #sleep for a while, (or stop if requested) for i in range(60): sleep(1) - if not self._cacheThreadsRunning: + if not self._running: logger.info('exiting _updateCacheThread') return except Exception as e: logger.exception(str(e)) - def _updatePathCacheEntryToSubDirTotal(self, path, force_update=False): - with self._cacheLock: - path_cache_result = self._cache['path_du_results'].get(path) - - if path_cache_result: - path_depth = path.count('/') - all_dirs = list(self._cache['path_du_results'].keys()) - subdir_paths = [sdp for sdp in all_dirs - if sdp.startswith(path) and sdp.count('/') == path_depth+1] - - subdir_du_results = self.getDiskUsageForPaths(subdir_paths, force_update=force_update).values() - valid_subdir_du_results = [du for du in subdir_du_results if du.get('disk_usage')] - sum_du = sum([du['disk_usage'] for du in valid_subdir_du_results]) - - if sum_du > 0: - logger.info('_updatePathCacheEntryToSubDirTotal: setting disk usage for %s to sum of %s known cached subdirs of %s', - path, len(valid_subdir_du_results), humanreadablesize(sum_du)) - path_cache_result['disk_usage'] = sum_du - path_cache_result['disk_usage_readable'] = humanreadablesize(sum_du) - def _updateCEP4CapacitiesInRADB(self): try: df_result = self.disk_usage.getDiskFreeSpace() @@ -410,31 +379,33 @@ class CacheManager: logger.error('_updateCEP4CapacitiesInRADB: %s', e) def open(self): + logger.info("opening storagequeryservice cache...") + self._running = True + self.disk_usage.open() self.event_bus.open() - self._cacheThreadsRunning = True + self._scanProjectsTree() self._updateCacheThread = Thread(target=self._updateOldEntriesInCache) self._updateCacheThread.daemon = True self._updateCacheThread.start() - self._scanProjectsTreeThread = Thread(target=self._scanProjectsTree) - self._scanProjectsTreeThread.daemon = True - self._scanProjectsTreeThread.start() - self.otdb_listener.start_listening() self.dm_listener.start_listening() + logger.info("opened storagequeryservice cache") def close(self): + logger.info("closing storagequeryservice cache...") + self._running = False + self.otdb_listener.stop_listening() self.dm_listener.stop_listening() - self._cacheThreadsRunning = False self._updateCacheThread.join() - self._scanProjectsTreeThread.join() self.event_bus.close() self.disk_usage.close() + logger.info("closed storagequeryservice cache") def __enter__(self): self.open() @@ -527,24 +498,32 @@ class CacheManager: def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True, force_update=False): logger.info("cache.getDiskUsageForTasks(radb_ids=%s, mom_ids=%s, otdb_ids=%s)" % (radb_ids, mom_ids, otdb_ids)) tasks_result = {'radb_ids': {}, 'mom_ids': {}, 'otdb_ids': {}} + if radb_ids is None: + radb_ids = [] + if mom_ids is None: + mom_ids = [] + if otdb_ids is None: + otdb_ids = [] with futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: - parallel_kwargs = [] - if radb_ids: - parallel_kwargs += [{'radb_id':radb_id, 'include_scratch_paths': include_scratch_paths, 'force_update':force_update} for radb_id in radb_ids] - if mom_ids: - parallel_kwargs += [{'mom_id':mom_id, 'include_scratch_paths': include_scratch_paths, 'force_update':force_update} for mom_id in mom_ids] - if otdb_ids: - parallel_kwargs += [{'otdb_id':otdb_id, 'include_scratch_paths': include_scratch_paths, 'force_update':force_update} for otdb_id in otdb_ids] - results = list(executor.map(lambda p_kwarg: self.getDiskUsageForTask(**p_kwarg), parallel_kwargs)) + # helper function to expand parallel p_kwarg dict into kwargs + def parallel_getDiskUsageForTask(p_kwarg): + return self.getDiskUsageForTask(**p_kwarg) + + parallel_kwargs = [{'radb_id':radb_id, 'include_scratch_paths': include_scratch_paths, 'force_update':force_update} for radb_id in radb_ids] + parallel_kwargs += [{'mom_id':mom_id, 'include_scratch_paths': include_scratch_paths, 'force_update':force_update} for mom_id in mom_ids] + parallel_kwargs += [{'otdb_id':otdb_id, 'include_scratch_paths': include_scratch_paths, 'force_update':force_update} for otdb_id in otdb_ids] + results = list(executor.map(parallel_getDiskUsageForTask, parallel_kwargs)) + + # collect results in a dict grouped by id_type for result in results: - if result.get('radb_id') is not None: - tasks_result['radb_ids'][str(result['radb_id'])] = results - if result.get('mom_id') is not None: - tasks_result['mom_ids'][str(result['mom_id'])] = results - if result.get('otdb_id') is not None: - tasks_result['otdb_ids'][str(result['otdb_id'])] = results + if result.get('radb_id') in radb_ids: + tasks_result['radb_ids'][result['radb_id']] = result + if result.get('mom_id') in mom_ids: + tasks_result['mom_ids'][result['mom_id']] = result + if result.get('otdb_id') in otdb_ids: + tasks_result['otdb_ids'][result['otdb_id']] = result logger.info("cache.getDiskUsageForTasks(radb_ids=%s, mom_ids=%s, otdb_ids=%s) returning: %s" % (radb_ids, mom_ids, otdb_ids, tasks_result)) @@ -552,8 +531,14 @@ class CacheManager: def getDiskUsageForPaths(self, paths, force_update=False): with futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: + + # helper function to expand parallel p_arg tuple into list of args + def parallel_getDiskUsageForPath(p_arg): + return self.getDiskUsageForPath(*p_arg) + parallel_args = [(path, force_update) for path in paths] - results = list(executor.map(lambda p_arg: self.getDiskUsageForPath(*p_arg), parallel_args)) + results = list(executor.map(parallel_getDiskUsageForPath, parallel_args)) + return { result['path']:result for result in results } def getDiskUsageForPath(self, path, force_update=False): @@ -592,9 +577,9 @@ class CacheManager: path_threading_event.set() del self._du_threading_events[path] else: - logger.info("thread=%s waiting for du call on other thread that will update the cache for %s", current_thread().name, path) + logger.info("waiting for du call on other thread that will update the cache for %s current_thread=%s", path, current_thread().name) path_threading_event.wait() - logger.info("thread=%s another thread just updated the cache for %s", current_thread().name, path) + logger.info("another thread just updated the cache for %s current_thread=%s", path, current_thread().name) with self._cacheLock: if path in self._cache['path_du_results']: diff --git a/SAS/DataManagement/StorageQueryService/rpc.py b/SAS/DataManagement/StorageQueryService/rpc.py index 8cf9eb91c01afa4c652d6a1059d7f96f246af7b2..ef327aeb5943768ce221a13481ddcc2ae3976ad5 100644 --- a/SAS/DataManagement/StorageQueryService/rpc.py +++ b/SAS/DataManagement/StorageQueryService/rpc.py @@ -88,38 +88,38 @@ def main(): with StorageQueryRPC.create(exchange=options.exchange, broker=options.broker) as rpc: if options.projects: - result = rpc.getDiskUsageForProjectsDirAndSubDirectories(force_update=options.force_update) + result = rpc.getDiskUsageForProjectsDirAndSubDirectories(force_update=bool(options.force_update)) if result['found']: pprint(result) else: print(result['message']) exit(1) elif options.project: - result = rpc.getDiskUsageForProjectDirAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, project_name=options.project, force_update=options.force_update) + result = rpc.getDiskUsageForProjectDirAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, project_name=options.project, force_update=bool(options.force_update)) if result['found']: pprint(result) else: print(result['message']) exit(1) elif options.subdirs: - result = rpc.getDiskUsageForTaskAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, force_update=options.force_update) + result = rpc.getDiskUsageForTaskAndSubDirectories(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, force_update=bool(options.force_update)) if result['found']: pprint(result) else: print(result['message']) exit(1) elif options.dir_path: - result = rpc.getDiskUsageForPath(path=options.dir_path, force_update=options.force_update) + result = rpc.getDiskUsageForPath(path=options.dir_path, force_update=bool(options.force_update)) if result['found']: pprint(result) else: print(result['message']) exit(1) elif options.all: - result = rpc.getDiskUsagesForAllOtdbIds(force_update=options.force_update) + result = rpc.getDiskUsagesForAllOtdbIds(force_update=bool(options.force_update)) pprint(result) else: - result = rpc.getDiskUsageForTask(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, force_update=options.force_update) + result = rpc.getDiskUsageForTask(otdb_id=options.otdb_id, mom_id=options.mom_id, radb_id=options.radb_id, force_update=bool(options.force_update)) if result['found']: print('path %s' % result['path']) print('disk_usage %s %s' % (result.get('disk_usage'), result.get('disk_usage_readable'))) diff --git a/SAS/DataManagement/StorageQueryService/service.py b/SAS/DataManagement/StorageQueryService/service.py index b99d990ec8ef15d87eb568b2e5fb0db96451e07d..37ad51fb3cd9bb866eca4a45893bb32927bdaf3f 100644 --- a/SAS/DataManagement/StorageQueryService/service.py +++ b/SAS/DataManagement/StorageQueryService/service.py @@ -16,9 +16,8 @@ from lofar.sas.datamanagement.storagequery.cache import CacheManager logger = logging.getLogger(__name__) class StorageQueryHandler(ServiceMessageHandler): - def __init__(self, cache_manager: CacheManager, mountpoint=CEP4_DATA_MOUNTPOINT): + def __init__(self, cache_manager: CacheManager): super(StorageQueryHandler, self).__init__() - self.mount_point = mountpoint self.cache = cache_manager def init_service_handler(self, service_name: str): @@ -36,12 +35,10 @@ class StorageQueryHandler(ServiceMessageHandler): self.register_service_method('GetDiskUsagesForAllOtdbIds', self.cache.getDiskUsagesForAllOtdbIds) self.register_service_method('GetOtdbIdsFoundOnDisk', self.cache.getOtdbIdsFoundOnDisk) -def createService(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, - mountpoint=CEP4_DATA_MOUNTPOINT, cache_manager=None): +def createService(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, cache_manager=None): return RPCService(service_name=DEFAULT_STORAGEQUERY_SERVICENAME, handler_type=StorageQueryHandler, - handler_kwargs={'mountpoint': mountpoint, - 'cache_manager':cache_manager}, + handler_kwargs={'cache_manager':cache_manager}, exchange=exchange, broker=broker, num_threads=6) diff --git a/SAS/DataManagement/StorageQueryService/test/test_storagequery_service_and_rpc.py b/SAS/DataManagement/StorageQueryService/test/test_storagequery_service_and_rpc.py index a5a6db2e1d342053045f434ed342b41bcf2faa12..3e49ba34a2a40a8f99c7a25902bf81e24849c4f2 100755 --- a/SAS/DataManagement/StorageQueryService/test/test_storagequery_service_and_rpc.py +++ b/SAS/DataManagement/StorageQueryService/test/test_storagequery_service_and_rpc.py @@ -1,31 +1,163 @@ #!/usr/bin/env python3 -import unittest -import uuid -import datetime +import unittest, unittest.mock import logging -from lofar.messaging import Service -from lofar.messaging.messagebus import TemporaryQueue +import os +import shutil +import sys +from threading import Thread +from lofar.messaging.messagebus import TemporaryExchange from lofar.sas.datamanagement.storagequery.service import createService from lofar.sas.datamanagement.storagequery.rpc import StorageQueryRPC from lofar.sas.datamanagement.storagequery.cache import CacheManager -logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s %(threadName)s', level=logging.INFO, stream=sys.stdout) + +class TestStorageQueryServiceAndRPC(unittest.TestCase): + @classmethod + def setUpClass(cls): + + cls.stored_otdb_ids = set() + + DIR_BYTES = 4096 # each dir adds 4096 bytes to a du + cls.total_bytes_stored = DIR_BYTES + cls.projects_bytes_stored = {} + cls.observation_bytes_stored = {} + + cls.DATA_DIR_PATH = os.path.join(os.getcwd(), "test_data") + cls.PROJECTS_DIR_PATH = os.path.join(cls.DATA_DIR_PATH, "test-projects") + cls.PROJECTS = ['LC100_001', 'LC100_002'] + cls.otdb_id2project_map = {} + + for i, project in enumerate(cls.PROJECTS): + cls.projects_bytes_stored[project] = DIR_BYTES + cls.total_bytes_stored += DIR_BYTES + + for j in range(10): + otdb_id = 999000 + (10*i+j) + + cls.stored_otdb_ids.add(otdb_id) + cls.otdb_id2project_map[otdb_id] = project + + obs_dir = 'L%d' % (otdb_id,) + obs_dir_path = os.path.join(cls.PROJECTS_DIR_PATH, project, obs_dir) + os.makedirs(obs_dir_path, exist_ok=True) + + cls.observation_bytes_stored[otdb_id] = DIR_BYTES + cls.projects_bytes_stored[project] += DIR_BYTES + cls.total_bytes_stored += DIR_BYTES + + obs_data_file_path = os.path.join(obs_dir_path, 'data.txt') + + with open(obs_data_file_path, 'wt') as file: + data = 1000*(i+1)*(j+1)*r'a' + file.write(data) + num_bytes = len(data) + cls.total_bytes_stored += num_bytes + cls.projects_bytes_stored[project] += num_bytes + cls.observation_bytes_stored[otdb_id] += num_bytes + + cls.ssh_cmd_list_patcher = unittest.mock.patch('lofar.common.cep4_utils.ssh_cmd_list', lambda host,user: []) + cls.ssh_cmd_list_patcher.start() + + cls.updateCEP4CapacitiesInRADB_patcher = unittest.mock.patch('lofar.sas.datamanagement.storagequery.cache.CacheManager._updateCEP4CapacitiesInRADB') + cls.updateCEP4CapacitiesInRADB_patcher.start() + + # patch RADBRPC.getTask call and return the given otdb_id for each radb/mom/otdb id + cls.radbrpc_patcher = unittest.mock.patch('lofar.sas.datamanagement.common.path.RADBRPC.getTask') + radbrpc_mock = cls.radbrpc_patcher.start() + radbrpc_mock.side_effect = lambda id, mom_id, otdb_id: { 'id': otdb_id, 'mom_id': otdb_id, 'otdb_id': otdb_id, + 'type': 'observation' } + + cls.momrpc_patcher = unittest.mock.patch('lofar.sas.datamanagement.common.path.MoMQueryRPC.getObjectDetails') + momrpc_mock = cls.momrpc_patcher.start() + momrpc_mock.side_effect = lambda mom_id: { mom_id: {'project_name': cls.otdb_id2project_map[mom_id]} } + + cls.tmp_exchange = TemporaryExchange(cls.__class__.__name__) + cls.tmp_exchange.open() + + cls.cache = CacheManager(mountpoint=cls.DATA_DIR_PATH, exchange=cls.tmp_exchange.address) + cls.cache.open() + + cls.service = createService(cls.tmp_exchange.address, cache_manager=cls.cache) + cls.service.start_listening() + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.DATA_DIR_PATH) + cls.cache.close() + cls.service.stop_listening() + cls.tmp_exchange.close() + cls.ssh_cmd_list_patcher.stop() + cls.updateCEP4CapacitiesInRADB_patcher.stop() + cls.radbrpc_patcher.stop() + cls.momrpc_patcher.stop() + + + def test_getOtdbIdsFoundOnDisk(self): + with StorageQueryRPC.create(self.tmp_exchange.address) as rpc: + found_otdb_ids = set(rpc.getOtdbIdsFoundOnDisk()) + self.assertEqual(self.stored_otdb_ids, found_otdb_ids) + + def test_getDiskUsagesForAllOtdbIds(self): + with StorageQueryRPC.create(self.tmp_exchange.address) as rpc: + results = rpc.getDiskUsagesForAllOtdbIds(force_update=True) + self.assertEqual(self.stored_otdb_ids, set(results.keys())) + + for otdb_id in self.stored_otdb_ids: + self.assertEqual(self.observation_bytes_stored[otdb_id], results[otdb_id]['disk_usage']) + + def test_getDiskUsageForProjectsDirAndSubDirectories(self): + with StorageQueryRPC.create(self.tmp_exchange.address) as rpc: + result = rpc.getDiskUsageForProjectsDirAndSubDirectories(force_update=True) + self.assertTrue(result['found']) + + self.assertEqual(self.PROJECTS_DIR_PATH, result['projectdir']['path']) + self.assertEqual(self.total_bytes_stored, result['projectdir']['disk_usage']) + + for project in self.PROJECTS: + project_path = os.path.join(self.PROJECTS_DIR_PATH, project) + self.assertTrue(project_path in result['sub_directories']) + self.assertEqual(self.projects_bytes_stored[project], result['sub_directories'][project_path]['disk_usage']) + + def test_getDiskUsageForProjectDirAndSubDirectories(self): + with StorageQueryRPC.create(self.tmp_exchange.address) as rpc: + for project in self.PROJECTS: + result = rpc.getDiskUsageForProjectDirAndSubDirectories(project_name=project, force_update=True) + self.assertEqual(self.projects_bytes_stored[project], result['projectdir']['disk_usage']) + + def test_getDiskUsageForTask(self): + with StorageQueryRPC.create(self.tmp_exchange.address) as rpc: + for otdb_id in self.stored_otdb_ids: + results = rpc.getDiskUsageForTask(otdb_id=otdb_id, force_update=True) + self.assertEqual(self.observation_bytes_stored[otdb_id], results['disk_usage']) + + def test_getDiskUsageForTasks(self): + with StorageQueryRPC.create(self.tmp_exchange.address) as rpc: + results = rpc.getDiskUsageForTasks(otdb_ids=list(self.stored_otdb_ids), force_update=True) + for otdb_id in self.stored_otdb_ids: + self.assertTrue(otdb_id in results['otdb_ids']) + self.assertEqual(self.observation_bytes_stored[otdb_id], results['otdb_ids'][otdb_id]['disk_usage']) + + def test_survive_ddos(self): + '''spam the service. It should be able to handle that. + It's interesting to analyze the logging, which reports on how all requests are handled in parallel.''' + with StorageQueryRPC.create(self.tmp_exchange.address) as rpc: + + # spamming, spawn many large getDiskUsageForTasks calls in parallel + threads = [] + for i in range(10): + threads.append(Thread(target=rpc.getDiskUsageForTasks, + kwargs={'otdb_ids': list(self.stored_otdb_ids), 'force_update': True})) + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + # and to a final check. Service should still be reachable and deliver proper results + self.test_getDiskUsageForTasks() + +if __name__ == '__main__': + unittest.main() -with TemporaryQueue(__name__) as tmp_queue: - # add test service busname - busname = tmp_queue.address - - class TestCleanupServiceAndRPC(unittest.TestCase): - def test(self): - '''basic test ''' - rpc = StorageQueryRPC(busname=busname) - #self.assertEqual('foo', rpc.foo()) - - # TODO: fix test - ## create and run the service - #with CacheManager(busname=busname) as cache_manager: - #with createService(busname=busname, cache_manager=cache_manager): - ## and run all tests - #unittest.main() diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/storage.py b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/storage.py index bef0883a11cbf185794b37305921ca3a15131476..220424037c6b443588f269c3833377e780bf648b 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/storage.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/storage.py @@ -57,7 +57,7 @@ def updateTaskStorageDetails(tasks, sqrpc, curpc): if usages: for task in cep4_tasks: - otdb_id = str(task['otdb_id']) + otdb_id = task['otdb_id'] if otdb_id in usages: usage = usages[otdb_id] task['disk_usage'] = usage.get('disk_usage')