diff --git a/SAS/DataManagement/CleanupService/service.py b/SAS/DataManagement/CleanupService/service.py index cbd92cc36cfca3bfb57f016bdfc1f393fedab9db..5fd540e30bce0fb305ad27fb173fefaffd08101c 100644 --- a/SAS/DataManagement/CleanupService/service.py +++ b/SAS/DataManagement/CleanupService/service.py @@ -125,7 +125,7 @@ def createService(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, brok def main(): # Check the invocation arguments parser = OptionParser("%prog [options]", - description='runs the resourceassignment database service') + description='runs the cleanup service') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %default") parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: default %default") diff --git a/SAS/DataManagement/StorageQueryService/CMakeLists.txt b/SAS/DataManagement/StorageQueryService/CMakeLists.txt index 088a803803276294fd7606d73cdbf4a4bd5e26e9..1bf03b6fb94b59cfb900d44260cbdc11d1b49520 100644 --- a/SAS/DataManagement/StorageQueryService/CMakeLists.txt +++ b/SAS/DataManagement/StorageQueryService/CMakeLists.txt @@ -1,6 +1,6 @@ # $Id$ -lofar_package(StorageQueryService 1.0 DEPENDS PyMessaging MoMQueryService) +lofar_package(StorageQueryService 1.0 DEPENDS PyMessaging MoMQueryService DataManagementCommon) lofar_find_package(Python 2.6 REQUIRED) include(PythonInstall) diff --git a/SAS/DataManagement/StorageQueryService/rpc.py b/SAS/DataManagement/StorageQueryService/rpc.py index ba2d3e7717ba7796529b252624a1910fe587e385..4839501c7d2674825e1b4e606f3017bfed9cec31 100644 --- a/SAS/DataManagement/StorageQueryService/rpc.py +++ b/SAS/DataManagement/StorageQueryService/rpc.py @@ -3,6 +3,7 @@ import logging from lofar.messaging.RPC import RPC, RPCException, RPCWrapper from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME +from lofar.common.util import humanreadablesize logger = logging.getLogger(__name__) @@ -10,10 +11,10 @@ class StorageQueryRPC(RPCWrapper): def __init__(self, busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, broker=None): - super(StorageQueryRPC, self).__init__(busname, servicename, broker) + super(StorageQueryRPC, self).__init__(busname, servicename, broker, timeout=60) - def foo(self): - return self.rpc('foo') + def getDiskUsageForOTDBId(self, otdb_id): + return self.rpc('GetDiskUsageForOTDBId', otdb_id=otdb_id) def main(): import sys @@ -22,14 +23,24 @@ def main(): # Check the invocation arguments parser = OptionParser('%prog [options]', description='do storage queries (on cep4) from the commandline') + parser.add_option('-o', '--otdb_id', dest='otdb_id', type='int', default=None, help='otdb_id of task to get the disk usage for') + parser.add_option('-m', '--mom_id', dest='mom_id', type='int', default=None, help='mom_id of task to get the disk usage for') + parser.add_option('-r', '--radb_id', dest='radb_id', type='int', default=None, help='radb_id of task to get the disk usage for') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') parser.add_option('-b', '--busname', dest='busname', type='string', default=DEFAULT_BUSNAME, help='Name of the bus exchange on the qpid broker, default: %s' % DEFAULT_BUSNAME) parser.add_option('-s', '--servicename', dest='servicename', type='string', default=DEFAULT_SERVICENAME, help='Name for this service, default: %s' % DEFAULT_SERVICENAME) parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() + if not (options.otdb_id or options.mom_id or options.radb_id): + parser.print_help() + exit(1) + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO if options.verbose else logging.WARN) with StorageQueryRPC(busname=options.busname, servicename=options.servicename, broker=options.broker) as rpc: - print rpc.foo() + result = rpc.getDiskUsageForOTDBId(options.otdb_id) + if 'disk_usage' in result: + result['disk_usage_readable'] = humanreadablesize(result['disk_usage']) + print result diff --git a/SAS/DataManagement/StorageQueryService/service.py b/SAS/DataManagement/StorageQueryService/service.py index 5ea343f93fe45028e03c7af5ec4b8bd9ad17e3a0..16d6e963f4df947a16b08717e76f59ba83aada17 100644 --- a/SAS/DataManagement/StorageQueryService/service.py +++ b/SAS/DataManagement/StorageQueryService/service.py @@ -2,44 +2,122 @@ # $Id$ import logging +import subprocess +import socket from optparse import OptionParser from lofar.messaging import Service from lofar.messaging import setQpidLogLevel -from lofar.messaging.Service import MessageHandlerInterface from lofar.common.util import waitForInterrupt -from lofar.common.util import convertIntKeysToString + +from lofar.sas.datamanagement.common.messagehandler import MessageHandler +from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME +from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME +from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME +from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME logger = logging.getLogger(__name__) -class StorageQueryHandler(MessageHandlerInterface): - def __init__(self, **kwargs): - super(StorageQueryHandler, self).__init__(**kwargs) +class StorageQueryHandler(MessageHandler): + def __init__(self, + mountpoint=CEP4_DATA_MOUNTPOINT, + radb_busname=RADB_BUSNAME, + radb_servicename=RADB_SERVICENAME, + mom_busname=DEFAULT_MOMQUERY_BUSNAME, + mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, + broker=None, + **kwargs): + + super(StorageQueryHandler, self).__init__(mountpoint=mountpoint, + radb_busname=radb_busname, + radb_servicename=radb_servicename, + mom_busname=mom_busname, + mom_servicename=mom_servicename, + broker=broker, + **kwargs) - self.service2MethodMap = {'foo': self._foo} + self.service2MethodMap = {'GetDiskUsageForOTDBId': self.getDiskUsageForOTDBId} def prepare_loop(self): pass - def _foo(self): - return 'foo' + def getDiskUsageForPath(self, path): + cmd = ['rbh-du', '-bd', path] + hostname = socket.gethostname() + if not 'mgmt0' in hostname: + cmd = ['ssh', '-AXt', 'lofarsys@mgmt01.cep4.control.lofar'] + cmd + logger.info(' '.join(cmd)) + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = proc.communicate() + + if proc.returncode != 0: + logger.error(err) + return {'found': False, 'path': path, 'message': err} + + logger.info(out) + + # example of out + # Using config file '/etc/robinhood.d/tmpfs/tmp_fs_mgr_basic.conf'. + # /data/projects/2016LOFAROBS/L522380 + # dir count:3906, size:16048128, spc_used:16052224 + # file count:17568, size:42274164368, spc_used:42327519232 + + #parse out + lines = [l.strip() for l in out.split('\n')] + file_line = next(l for l in lines if 'file count' in l) + if file_line: + parts = [p.strip() for p in file_line.split(',')] + partsDict = {p.split(':')[0].strip():p.split(':')[1].strip() for p in parts} + + results = {'found': True, 'disk_usage': None, 'path': path} + + if 'size' in partsDict: + results['disk_usage'] = int(partsDict['size']) + + if 'file count' in partsDict: + results['nr_of_files'] = int(partsDict['file count']) + + logger.info('returning: %s' % results) + return results + + results = {'found': False, 'path': path } + + def getDiskUsageForOTDBId(self, otdb_id): + result = self.path_resolver.getPathForOTDBId(otdb_id) + if result['found']: + return self.getDiskUsageForPath(result['path']) + + return {'found': False, 'path': result['path']} -def createService(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, broker=None, verbose=False): +def createService(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, broker=None, + mountpoint=CEP4_DATA_MOUNTPOINT, verbose=False, + radb_busname=RADB_BUSNAME, radb_servicename=RADB_SERVICENAME, + mom_busname=DEFAULT_MOMQUERY_BUSNAME, mom_servicename=DEFAULT_MOMQUERY_SERVICENAME): return Service(servicename, StorageQueryHandler, busname=busname, broker=broker, use_service_methods=True, numthreads=2, - verbose=verbose) + verbose=verbose, + handler_args={'mountpoint': mountpoint, + 'radb_busname':RADB_BUSNAME, + 'radb_servicename':RADB_SERVICENAME, + 'mom_busname':DEFAULT_MOMQUERY_BUSNAME, + 'mom_servicename':DEFAULT_MOMQUERY_SERVICENAME, + 'broker':broker}) def main(): # Check the invocation arguments parser = OptionParser("%prog [options]", - description='runs the resourceassignment database service') + description='runs the storagequery service') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME) parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: %s" % DEFAULT_SERVICENAME) + parser.add_option("--radb_busname", dest="radb_busname", type="string", default=RADB_BUSNAME, help="Name of the bus on which the RADB service listens, default: %default") + parser.add_option("--radb_servicename", dest="radb_servicename", type="string", default=RADB_SERVICENAME, help="Name of the RADB service, default: %default") + parser.add_option("--mom_busname", dest="mom_busname", type="string", default=DEFAULT_MOMQUERY_BUSNAME, help="Name of the bus on which the MoM service listens, default: %default") + parser.add_option("--mom_servicename", dest="mom_servicename", type="string", default=DEFAULT_MOMQUERY_SERVICENAME, help="Name of the MoM service, default: %default") parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() @@ -50,7 +128,11 @@ def main(): with createService(busname=options.busname, servicename=options.servicename, broker=options.broker, - verbose=options.verbose): + verbose=options.verbose, + radb_busname=options.radb_busname, + radb_servicename=options.radb_servicename, + mom_busname=options.mom_busname, + mom_servicename=options.mom_busname): waitForInterrupt() if __name__ == '__main__':