Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
service.py 6.24 KiB
#!/usr/bin/python
# $Id$

import logging
import subprocess
import socket
import os.path
from optparse import OptionParser
from lofar.messaging import Service
from lofar.messaging.Service import MessageHandlerInterface
from lofar.messaging import setQpidLogLevel
from lofar.common.util import waitForInterrupt

from lofar.messaging.Service import MessageHandlerInterface
from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT
from lofar.sas.datamanagement.storagequery.cache import CacheManager
from lofar.sas.datamanagement.storagequery.diskusage import DiskUsage
from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME

logger = logging.getLogger(__name__)

class StorageQueryHandler(MessageHandlerInterface):
    def __init__(self,
                 mountpoint=CEP4_DATA_MOUNTPOINT,
                 radb_busname=RADB_BUSNAME,
                 radb_servicename=RADB_SERVICENAME,
                 mom_busname=DEFAULT_MOMQUERY_BUSNAME,
                 mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
                 broker=None,
                 **kwargs):

        self.cache = kwargs.pop('cache_manager')
        
        super(StorageQueryHandler, self).__init__(**kwargs)
        self.disk_usage = DiskUsage(mountpoint=mountpoint,
                                    radb_busname=radb_busname,
                                    radb_servicename=radb_servicename,
                                    mom_busname=mom_busname,
                                    mom_servicename=mom_servicename,
                                    broker=broker)

        self.service2MethodMap = {'GetPathForOTDBId': self.cache.disk_usage.path_resolver.getPathForOTDBId,
                                  'GetDiskUsageForOTDBId': self.cache.getDiskUsageForOTDBId,
                                  'GetDiskUsageForMoMId': self.cache.getDiskUsageForMoMId,
                                  'GetDiskUsageForRADBId': self.cache.getDiskUsageForRADBId,
                                  'GetDiskUsageForTask': self.cache.getDiskUsageForTask,
                                  'GetDiskUsageForTasks': self.cache.getDiskUsageForTasks,
                                  'GetDiskUsageForTaskAndSubDirectories': self.cache.getDiskUsageForTaskAndSubDirectories,
                                  'GetDiskUsageForProjectDirAndSubDirectories': self.cache.getDiskUsageForProjectDirAndSubDirectories,
                                  'GetDiskUsageForProjectsDirAndSubDirectories': self.cache.getDiskUsageForProjectsDirAndSubDirectories}

def createService(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, broker=None,
                  mountpoint=CEP4_DATA_MOUNTPOINT, verbose=False,
                  radb_busname=RADB_BUSNAME, radb_servicename=RADB_SERVICENAME,
                  mom_busname=DEFAULT_MOMQUERY_BUSNAME, mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
                  cache_manager=None):
    return Service(servicename,
                   StorageQueryHandler,
                   busname=busname,
                   broker=broker,
                   use_service_methods=True,
                   numthreads=4,
                   verbose=verbose,
                   handler_args={'mountpoint': mountpoint,
                                 'radb_busname':RADB_BUSNAME,
                                 'radb_servicename':RADB_SERVICENAME,
                                 'mom_busname':DEFAULT_MOMQUERY_BUSNAME,
                                 'mom_servicename':DEFAULT_MOMQUERY_SERVICENAME,
                                 'broker':broker,
                                 'cache_manager':cache_manager})

def main():
    # make sure we run in UTC timezone
    import os
    os.environ['TZ'] = 'UTC'
    
    # Check the invocation arguments
    parser = OptionParser("%prog [options]",
                          description='runs the storagequery service')
    parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
    parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME)
    parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: %s" % DEFAULT_SERVICENAME)
    parser.add_option("--radb_busname", dest="radb_busname", type="string", default=RADB_BUSNAME, help="Name of the bus on which the RADB service listens, default: %default")
    parser.add_option("--radb_servicename", dest="radb_servicename", type="string", default=RADB_SERVICENAME, help="Name of the RADB service, default: %default")
    parser.add_option("--mom_busname", dest="mom_busname", type="string", default=DEFAULT_MOMQUERY_BUSNAME, help="Name of the bus on which the MoM service listens, default: %default")
    parser.add_option("--mom_servicename", dest="mom_servicename", type="string", default=DEFAULT_MOMQUERY_SERVICENAME, help="Name of the MoM service, default: %default")
    parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
    (options, args) = parser.parse_args()

    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
                        level=logging.DEBUG if options.verbose else logging.INFO)
    setQpidLogLevel(logging.INFO)

    with CacheManager(broker=options.broker) as cache_manager:
        with createService(busname=options.busname,
                        servicename=options.servicename,
                        broker=options.broker,
                        verbose=options.verbose,
                        radb_busname=options.radb_busname,
                        radb_servicename=options.radb_servicename,
                        mom_busname=options.mom_busname,
                        mom_servicename=options.mom_busname,
                        cache_manager=cache_manager):
            waitForInterrupt()

if __name__ == '__main__':
    main()