Skip to content
Snippets Groups Projects
Commit 0d59e014 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-699: adaptations to new rpc.py module

parent bd97585a
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
import logging
from lofar.messaging.RPC import RPC, RPCException, RPCWrapper
from lofar.messaging import RPCClient, RPCClientContextManagerMixin, DEFAULT_RPC_TIMEOUT
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.sas.datamanagement.storagequery.config import DEFAULT_SERVICENAME
from lofar.common.util import humanreadablesize, convertStringDigitKeysToInt
import proton
from pprint import pprint
from lofar.sas.datamanagement.storagequery.config import DEFAULT_STORAGEQUERY_SERVICENAME
logger = logging.getLogger(__name__)
class StorageQueryRPC(RPCWrapper):
def __init__(self, busname=DEFAULT_BUSNAME,
timeout=18000,
broker=DEFAULT_BROKER):
super(StorageQueryRPC, self).__init__(busname, DEFAULT_SERVICENAME, broker, timeout=timeout)
class StorageQueryRPC(RPCClientContextManagerMixin):
def __init__(self, rpc_client: RPCClient = None):
"""Create an instance of the StorageQueryRPC using the given RPCClient,
or if None given, to a default RPCClient connecting to the DEFAULT_STORAGEQUERY_SERVICENAME service"""
super().__init__()
self._rpc_client = rpc_client or RPCClient(service_name=DEFAULT_STORAGEQUERY_SERVICENAME)
def _convertTimestamps(self, result):
if isinstance(result, dict):
for k, v in list(result.items()):
if isinstance(v, dict):
self._convertTimestamps(v)
elif isinstance(v, proton.timestamp):
result[k] = v
return result
@staticmethod
def create(exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, timeout: int=DEFAULT_RPC_TIMEOUT):
"""Create a StorageQueryRPC connecting to the given exchange/broker on the default DEFAULT_STORAGEQUERY_SERVICENAME service"""
return StorageQueryRPC(RPCClient(service_name=DEFAULT_STORAGEQUERY_SERVICENAME, exchange=exchange, broker=broker, timeout=timeout))
def getPathForOTDBId(self, otdb_id):
return self.rpc('GetPathForOTDBId', otdb_id=otdb_id)
return self._rpc_client.execute('GetPathForOTDBId', otdb_id=otdb_id)
def getDiskUsageForOTDBId(self, otdb_id, include_scratch_paths=True, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForOTDBId', otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update))
return self._rpc_client.execute('GetDiskUsageForOTDBId', otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
def getDiskUsageForMoMId(self, mom_id, include_scratch_paths=True, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForMoMId', mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update))
return self._rpc_client.execute('GetDiskUsageForMoMId', mom_id=mom_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
def getDiskUsageForRADBId(self, radb_id, include_scratch_paths=True, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForRADBId', radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update))
return self._rpc_client.execute('GetDiskUsageForRADBId', radb_id=radb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
def getDiskUsageForTask(self, radb_id=None, mom_id=None, otdb_id=None, include_scratch_paths=True, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForTask', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update))
return self._rpc_client.execute('GetDiskUsageForTask', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, include_scratch_paths=include_scratch_paths, force_update=force_update)
def getDiskUsageForTasks(self, radb_ids=None, mom_ids=None, otdb_ids=None, include_scratch_paths=True, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForTasks', radb_ids=radb_ids, mom_ids=mom_ids, otdb_ids=otdb_ids, include_scratch_paths=include_scratch_paths, force_update=force_update))
return self._rpc_client.execute('GetDiskUsageForTasks', radb_ids=radb_ids, mom_ids=mom_ids, otdb_ids=otdb_ids, include_scratch_paths=include_scratch_paths, force_update=force_update)
def getDiskUsageForTaskAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForTaskAndSubDirectories', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, force_update=force_update))
return self._rpc_client.execute('GetDiskUsageForTaskAndSubDirectories', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, force_update=force_update)
def getDiskUsageForProjectDirAndSubDirectories(self, radb_id=None, mom_id=None, otdb_id=None, project_name=None, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForProjectDirAndSubDirectories', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, project_name=project_name, force_update=force_update))
return self._rpc_client.execute('GetDiskUsageForProjectDirAndSubDirectories', radb_id=radb_id, mom_id=mom_id, otdb_id=otdb_id, project_name=project_name, force_update=force_update)
def getDiskUsageForProjectsDirAndSubDirectories(self, force_update=False):
return self._convertTimestamps(self.rpc('GetDiskUsageForProjectsDirAndSubDirectories', force_update=force_update))
return self._rpc_client.execute('GetDiskUsageForProjectsDirAndSubDirectories', force_update=force_update)
def getDiskUsageForPath(self, path, force_update=False):
return self.rpc('GetDiskUsageForPath', path=path, force_update=force_update)
return self._rpc_client.execute('GetDiskUsageForPath', path=path, force_update=force_update)
def getDiskUsagesForAllOtdbIds(self, force_update=False):
return convertStringDigitKeysToInt(self.rpc('GetDiskUsagesForAllOtdbIds', force_update=force_update))
return self._rpc_client.execute('GetDiskUsagesForAllOtdbIds', force_update=force_update)
def main():
from pprint import pprint
import sys
from optparse import OptionParser
# Check the invocation arguments
parser = OptionParser('%prog [options]',
description='do storage queries (on cep4) from the commandline')
parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
help='Address of the broker, default: localhost')
parser.add_option('-e', '--exchange', dest='exchange', type='string', default=DEFAULT_BUSNAME,
help='Name of the bus exchange on the broker, default: [%default]')
parser.add_option('-o', '--otdb_id', dest='otdb_id', type='int', default=None, help='otdb_id of task to get the disk usage for')
parser.add_option('-m', '--mom_id', dest='mom_id', type='int', default=None, help='mom_id of task to get the disk usage for')
parser.add_option('-r', '--radb_id', dest='radb_id', type='int', default=None, help='radb_id of task to get the disk usage for')
......@@ -75,8 +73,6 @@ def main():
parser.add_option('-d', '--dir', dest='dir_path', type='string', default=None, help='get the disk usage of the given directory path')
parser.add_option('-a', '--all', dest='all', action='store_true', help='get disk usage for all otdb ids currently on disk')
parser.add_option('-f', '--force_update', dest='force_update', action='store_true', help='force an update of the cache with a new du call')
parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default')
parser.add_option('--busname', dest='busname', type='string', default=DEFAULT_BUSNAME, help='Name of the bus exchange on the qpid broker, default: %default')
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
(options, args) = parser.parse_args()
......@@ -87,7 +83,7 @@ def main():
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.INFO if options.verbose else logging.WARN)
with StorageQueryRPC(busname=options.busname, broker=options.broker) as rpc:
with StorageQueryRPC.create(exchange=options.exchange, broker=options.broker) as rpc:
if options.projects:
result = rpc.getDiskUsageForProjectsDirAndSubDirectories(force_update=options.force_update)
if result['found']:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment