diff --git a/SAS/DataManagement/Cleanup/CleanupClient/rpc.py b/SAS/DataManagement/Cleanup/CleanupClient/rpc.py index 98d7abd810aad4fcd3b4808c075081e337d7d96e..1c5c93f62e3fff09002b99e0f7f5abfe3e12eec2 100644 --- a/SAS/DataManagement/Cleanup/CleanupClient/rpc.py +++ b/SAS/DataManagement/Cleanup/CleanupClient/rpc.py @@ -1,35 +1,42 @@ #!/usr/bin/env python3 import logging -from lofar.messaging.RPC import RPC, RPCException, RPCWrapper -from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME +from lofar.messaging import RPCClient, RPCClientContextManagerMixin +from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_RPC_TIMEOUT +from lofar.sas.datamanagement.cleanup.config import DEFAULT_CLEANUP_SERVICENAME from lofar.common.util import convertStringDigitKeysToInt logger = logging.getLogger(__name__) -class CleanupRPC(RPCWrapper): - def __init__(self, busname=DEFAULT_BUSNAME, - broker=DEFAULT_BROKER): - super(CleanupRPC, self).__init__(busname, DEFAULT_SERVICENAME, broker=broker, timeout=18000) +class CleanupRPC(RPCClientContextManagerMixin): + def __init__(self, rpc_client: RPCClient = None): + """Create an instance of the CleanupRPC using the given RPCClient, + or if None given, to a default RPCClient connecting to the DEFAULT_CLEANUP_SERVICENAME service""" + super().__init__() + self._rpc_client = rpc_client or RPCClient(service_name=DEFAULT_CLEANUP_SERVICENAME) + + @staticmethod + def create(exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, timeout: int=DEFAULT_RPC_TIMEOUT): + """Create a CleanupRPC connecting to the given exchange/broker on the default DEFAULT_CLEANUP_SERVICENAME service""" + return CleanupRPC(RPCClient(service_name=DEFAULT_CLEANUP_SERVICENAME, exchange=exchange, broker=broker, timeout=timeout)) def getPathForOTDBId(self, otdb_id): - return self.rpc('GetPathForOTDBId', otdb_id=otdb_id) + return self._rpc_client.execute('GetPathForOTDBId', otdb_id=otdb_id) def removePath(self, path): - return self.rpc('RemovePath', path=path) + return self._rpc_client.execute('RemovePath', path=path) def removeTaskData(self, otdb_id, delete_is=True, delete_cs=True, delete_uv=True, delete_im=True, delete_img=True, delete_pulp=True, delete_scratch=True, force=False): - return self.rpc('RemoveTaskData', otdb_id=otdb_id, delete_is=delete_is, delete_cs=delete_cs, delete_uv=delete_uv, delete_im=delete_im, delete_img=delete_img, delete_pulp=delete_pulp, delete_scratch=delete_scratch, force=force) + return self._rpc_client.execute('RemoveTaskData', otdb_id=otdb_id, delete_is=delete_is, delete_cs=delete_cs, delete_uv=delete_uv, delete_im=delete_im, delete_img=delete_img, delete_pulp=delete_pulp, delete_scratch=delete_scratch, force=force) def setTaskDataPinned(self, otdb_id, pinned=True): - return self.rpc('SetTaskDataPinned', otdb_id=otdb_id, pinned=pinned) + return self._rpc_client.execute('SetTaskDataPinned', otdb_id=otdb_id, pinned=pinned) def isTaskDataPinned(self, otdb_id): - return convertStringDigitKeysToInt(self.rpc('IsTaskDataPinned', otdb_id=otdb_id)).get(otdb_id, False) + return convertStringDigitKeysToInt(self._rpc_client.execute('IsTaskDataPinned', otdb_id=otdb_id)).get(otdb_id, False) def getPinnedStatuses(self): - return convertStringDigitKeysToInt(self.rpc('GetPinnedStatuses')) + return convertStringDigitKeysToInt(self._rpc_client.execute('GetPinnedStatuses')) def main(): import sys @@ -42,8 +49,10 @@ def main(): parser.add_option('-f', '--force', dest='force', action='store_true', help='in combination with --delete, always delete the data even when safety checks block deletion. (But pinned data is still kept, even when this force flag is supplied.)') parser.add_option('-p', '--pin', dest='pin', action='store_true', help='pin the data for the given otdb_id') parser.add_option('-u', '--unpin', dest='unpin', action='store_true', help='unpin the data for the given otdb_id') - parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') - parser.add_option('-b', '--busname', dest='busname', type='string', default=DEFAULT_BUSNAME, help='Name of the bus exchange on the qpid broker, default: %s' % DEFAULT_BUSNAME) + parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, + help='Address of the broker, default: localhost') + parser.add_option('-e', '--exchange', dest='exchange', type='string', default=DEFAULT_BUSNAME, + help='Name of the bus exchange on the broker, default: [%default]') parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() @@ -54,7 +63,7 @@ def main(): logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO if options.verbose else logging.WARN) - with CleanupRPC(busname=options.busname, broker=options.broker) as rpc: + with CleanupRPC.create(exchange=options.exchange, broker=options.broker) as rpc: otdb_id = int(args[0]) if options.pin or options.unpin: diff --git a/SAS/DataManagement/Cleanup/CleanupCommon/config.py b/SAS/DataManagement/Cleanup/CleanupCommon/config.py index 78675cd189f20ce41019829fffe4e8b0418c88af..caa915f0ed3aa7832656f67a85cbf1f12c20af38 100644 --- a/SAS/DataManagement/Cleanup/CleanupCommon/config.py +++ b/SAS/DataManagement/Cleanup/CleanupCommon/config.py @@ -1,4 +1,4 @@ #!/usr/bin/env python3 # $Id$ -DEFAULT_SERVICENAME = 'CleanupService' +DEFAULT_CLEANUP_SERVICENAME = 'CleanupService' diff --git a/SAS/DataManagement/Cleanup/CleanupService/service.py b/SAS/DataManagement/Cleanup/CleanupService/service.py index b352d31653e921394cdf859e3d5f236808d5c05d..754d05bc9fed9cbdad77854a63b0cca4af95ec52 100644 --- a/SAS/DataManagement/Cleanup/CleanupService/service.py +++ b/SAS/DataManagement/Cleanup/CleanupService/service.py @@ -10,17 +10,14 @@ import time import subprocess from datetime import datetime from optparse import OptionParser -from lofar.messaging import Service +from lofar.messaging import RPCService, ServiceMessageHandler from lofar.messaging import EventMessage, ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.messaging.Service import MessageHandlerInterface -from lofar.messaging import setQpidLogLevel -from lofar.common.util import waitForInterrupt, convertIntKeysToString -from lofar.common.util import humanreadablesize +from lofar.common.util import waitForInterrupt, humanreadablesize from lofar.common.subprocess_utils import communicate_returning_strings from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT from lofar.sas.datamanagement.common.path import PathResolver -from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME +from lofar.sas.datamanagement.cleanup.config import DEFAULT_CLEANUP_SERVICENAME from lofar.sas.datamanagement.common.config import DEFAULT_DM_NOTIFICATION_PREFIX from lofar.sas.datamanagement.storagequery.rpc import StorageQueryRPC @@ -77,42 +74,41 @@ def _getPinnedStatuses(): return {} -class CleanupHandler(MessageHandlerInterface): - def __init__(self, - mountpoint=CEP4_DATA_MOUNTPOINT, - busname=DEFAULT_BUSNAME, - broker=DEFAULT_BROKER, - **kwargs): +class CleanupHandler(ServiceMessageHandler): + def __init__(self, mountpoint=CEP4_DATA_MOUNTPOINT): + super().__init__() + self.mountpoint = mountpoint + self.path_resolver = None + self._sqrpc = None + self.event_bus = None - super(CleanupHandler, self).__init__(**kwargs) + def init(self, service_name: str, exchange: str, broker: str): + super().init(service_name, exchange, broker) - self.path_resolver = PathResolver(mountpoint=mountpoint, - busname=busname, - broker=broker) + self.path_resolver = PathResolver(mountpoint=self.mountpoint, exchange=exchange, broker=broker) + self.event_bus = ToBus(exchange=exchange, broker=broker) + self._sqrpc = StorageQueryRPC.create(exchange=exchange, broker=broker) - self.notification_prefix = DEFAULT_DM_NOTIFICATION_PREFIX - self.event_bus = ToBus(address=busname, broker=broker) - self._sqrpc = StorageQueryRPC(busname=busname, broker=broker) + self.register_service_method('GetPathForOTDBId', self.path_resolver.getPathForOTDBId) + self.register_service_method('RemovePath', self._removePath) + self.register_service_method('RemoveTaskData', self._removeTaskData) + self.register_service_method('SetTaskDataPinned', self._setTaskDataPinned) + self.register_service_method('IsTaskDataPinned', self._isTaskDataPinned) + self.register_service_method('GetPinnedStatuses', self._getPinnedStatuses) - self.service2MethodMap = { 'GetPathForOTDBId': self.path_resolver.getPathForOTDBId, - 'RemovePath': self._removePath, - 'RemoveTaskData': self._removeTaskData, - 'SetTaskDataPinned': self._setTaskDataPinned, - 'IsTaskDataPinned': self._isTaskDataPinned, - 'GetPinnedStatuses': self._getPinnedStatuses } - def prepare_loop(self): - super(CleanupHandler, self).prepare_loop() + def start_handling(self): + super().start_handling() self.path_resolver.open() self.event_bus.open() self._sqrpc.open() - logger.info("cleanup service started with projects_path=%s", self.path_resolver.projects_path) + logger.info("%s started with projects_path=%s", self, self.path_resolver.projects_path) - def finalize_loop(self): + def stop_handling(self): self.path_resolver.close() self.event_bus.close() self._sqrpc.close() - super(CleanupHandler, self).finalize_loop() + super().stop_handling() def _setTaskDataPinned(self, otdb_id, pinned=True): logger.info('setTaskDataPinned(otdb_id=%s, pinned=%s)', otdb_id, pinned) @@ -123,12 +119,12 @@ class CleanupHandler(MessageHandlerInterface): return { str(otdb_id): _isTaskDataPinned(otdb_id) } def _getPinnedStatuses(self): - return convertIntKeysToString(_getPinnedStatuses()) + return _getPinnedStatuses() def _sendNotification(self, subject, content): try: - msg = EventMessage(context=self.notification_prefix + subject, content=content) - logger.info('Sending notification with subject %s to %s: %s', msg.subject, self.event_bus.address, msg.content) + msg = EventMessage(subject="%s.%s" % (DEFAULT_DM_NOTIFICATION_PREFIX, subject), content=content) + logger.info('Sending notification with subject %s to %s: %s', msg.subject, self.event_bus.exchange, msg.content) self.event_bus.send(msg) except Exception as e: logger.error(str(e)) @@ -350,17 +346,14 @@ class CleanupHandler(MessageHandlerInterface): -def createService(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, - mountpoint=CEP4_DATA_MOUNTPOINT, verbose=False): - return Service(DEFAULT_SERVICENAME, - CleanupHandler, - busname=busname, +def createService(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, + mountpoint=CEP4_DATA_MOUNTPOINT): + return RPCService(DEFAULT_CLEANUP_SERVICENAME, + handler_type=CleanupHandler, + handler_kwargs={'mountpoint': mountpoint}, + exchange=exchange, broker=broker, - use_service_methods=True, - numthreads=4, - verbose=verbose, - handler_args={'mountpoint': mountpoint, - 'broker':broker}) + num_threads=4) def main(): # make sure we run in UTC timezone @@ -370,20 +363,19 @@ def main(): # Check the invocation arguments parser = OptionParser("%prog [options]", description='runs the cleanup service') - parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') - parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %default") - parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: default %default") + parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, + help='Address of the messaging broker, default: %default') + parser.add_option("-e", "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, + help="Name of the bus exchange on the broker, [default: %default]") parser.add_option("--mountpoint", dest="mountpoint", type="string", default=CEP4_DATA_MOUNTPOINT, help="path of local cep4 mount point, default: %default") parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG if options.verbose else logging.INFO) - setQpidLogLevel(logging.INFO) - with createService(busname=options.busname, - broker=options.broker, - verbose=options.verbose): + with createService(exchange=options.exchange, + broker=options.broker): waitForInterrupt() if __name__ == '__main__': diff --git a/SAS/DataManagement/DataManagementCommon/config.py b/SAS/DataManagement/DataManagementCommon/config.py index 6e8db9adbbe5a1d3910669601023b423e5887087..3206dbff2f1739b5dd13f4d89e30ae1249edf5d1 100644 --- a/SAS/DataManagement/DataManagementCommon/config.py +++ b/SAS/DataManagement/DataManagementCommon/config.py @@ -3,4 +3,4 @@ CEP4_DATA_MOUNTPOINT='/data' -DEFAULT_DM_NOTIFICATION_PREFIX = 'DM.' +DEFAULT_DM_NOTIFICATION_PREFIX = 'DataManagement'