From e96f7b37f54e8cc97178f4504a5bf64fcf0573cc Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Tue, 11 Jun 2019 15:04:57 +0000 Subject: [PATCH] SW-699: adaptations to new rpc.py module. Not fully finished yet for all datamanagement services and rpcs --- .../datamanagementbuslistener.py | 49 +++++++--- .../DataManagementCommon/path.py | 10 +- .../StorageQueryService/cache.py | 60 +++++++----- .../StorageQueryService/config.py | 2 +- .../StorageQueryService/diskusage.py | 4 +- .../StorageQueryService/service.py | 92 ++++++++----------- 6 files changed, 120 insertions(+), 97 deletions(-) diff --git a/SAS/DataManagement/DataManagementCommon/datamanagementbuslistener.py b/SAS/DataManagement/DataManagementCommon/datamanagementbuslistener.py index 94d25ac5836..41ad97b2ee5 100644 --- a/SAS/DataManagement/DataManagementCommon/datamanagementbuslistener.py +++ b/SAS/DataManagement/DataManagementCommon/datamanagementbuslistener.py @@ -21,7 +21,7 @@ # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -from lofar.messaging.messagebus import AbstractBusListener +from lofar.messaging import AbstractMessageHandler, BusListener, LofarMessage, EventMessage from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.sas.datamanagement.common.config import DEFAULT_DM_NOTIFICATION_PREFIX from lofar.common.util import waitForInterrupt @@ -30,27 +30,28 @@ import logging logger = logging.getLogger(__name__) -class DataManagementBusListener(AbstractBusListener): - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): - super(DataManagementBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_DM_NOTIFICATION_PREFIX+"#", broker=broker) +class DataManagementEventMessageHandler(AbstractMessageHandler): + def handle_message(self, msg: LofarMessage): + if not isinstance(msg, EventMessage): + raise ValueError("%s: Ignoring non-EventMessage: %s" % (self.__class__.__name__, msg)) - def _handleMessage(self, msg): - logger.info("on%s: %s" % (msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' '))) + stripped_subject = msg.subject.replace("%s." % DEFAULT_DM_NOTIFICATION_PREFIX, '') + logger.info("on%s: %s" % (stripped_subject, str(msg.content).replace('\n', ' '))) - if msg.subject == '%sTaskDeleting' % self.subject_prefix: + if stripped_subject == 'TaskDeleting': self.onTaskDeleting(msg.content.get('otdb_id')) - elif msg.subject == '%sTaskDeleted' % self.subject_prefix: + elif stripped_subject == 'TaskDeleted': self.onTaskDeleted(msg.content.get('otdb_id'), msg.content.get('deleted'), msg.content.get('paths'), msg.content.get('message', '')) - elif msg.subject == '%sTaskDataPinned' % self.subject_prefix: + elif stripped_subject == 'TaskDataPinned': self.onTaskDataPinned(msg.content.get('otdb_id'), msg.content.get('pinned')) - elif msg.subject == '%sPathDeleting' % self.subject_prefix: + elif stripped_subject == 'PathDeleting': self.onPathDeleting(msg.content.get('path')) - elif msg.subject == '%sPathDeleted' % self.subject_prefix: + elif stripped_subject == 'PathDeleted': self.onPathDeleted(msg.content.get('path'), msg.content.get('deleted'), msg.content.get('message', '')) - elif msg.subject == '%sDiskUsageChanged' % self.subject_prefix: + elif stripped_subject == 'DiskUsageChanged': self.onDiskUsageChanged(msg.content.get('path'), msg.content.get('disk_usage'), msg.content.get('otdb_id')) else: - logger.error("DataManagementBusListener.handleMessage: unknown subject: %s" %str(msg.subject)) + raise ValueError("DataManagementBusListener.handleMessage: unknown subject: %s" % msg.subject) def onTaskDeleting(self, otdb_id): '''onTaskDeleting is called upon receiving a TaskDeleting message. @@ -90,9 +91,29 @@ class DataManagementBusListener(AbstractBusListener): pass +class DataManagementBusListener(BusListener): + def __init__(self, + handler_type: DataManagementEventMessageHandler.__class__ = DataManagementEventMessageHandler, + handler_kwargs: dict = None, + exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, + num_threads: int=1): + """ + DataManagementBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. + Typical usage is to derive your own subclass from DataManagementBusListener and implement the specific on<SomeMessage> methods that you are interested in. + :param busname: valid Qpid address + :param broker: valid Qpid broker host + """ + if not issubclass(handler_type, DataManagementEventMessageHandler): + raise TypeError("handler_type should be a DataManagementEventMessageHandler subclass") + + super().__init__(handler_type=handler_type, handler_kwargs=handler_kwargs, + exchange=exchange, broker=broker, + routing_key="%s.#" % DEFAULT_DM_NOTIFICATION_PREFIX, + num_threads=num_threads) + if __name__ == '__main__': - with DataManagementBusListener(broker=None) as listener: + with DataManagementBusListener() as listener: waitForInterrupt() __all__ = ["DataManagementBusListener"] diff --git a/SAS/DataManagement/DataManagementCommon/path.py b/SAS/DataManagement/DataManagementCommon/path.py index fda18683f11..8bcddfec8a8 100644 --- a/SAS/DataManagement/DataManagementCommon/path.py +++ b/SAS/DataManagement/DataManagementCommon/path.py @@ -6,14 +6,14 @@ import logging import socket import subprocess -from lofar.common import isProductionEnvironment, isTestEnvironment +from lofar.common import isProductionEnvironment from lofar.common.subprocess_utils import communicate_returning_strings from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT -from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC as RADBRPC +from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC @@ -22,7 +22,7 @@ logger = logging.getLogger(__name__) class PathResolver: def __init__(self, mountpoint=CEP4_DATA_MOUNTPOINT, - busname=DEFAULT_BUSNAME, + exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): self.mountpoint = mountpoint @@ -30,8 +30,8 @@ class PathResolver: self.scratch_path = os.path.join(self.mountpoint, 'scratch', 'pipeline') self.share_path = os.path.join(self.mountpoint, 'share', 'pipeline') - self.radbrpc = RADBRPC(busname=busname, broker=broker) - self.momrpc = MoMQueryRPC(busname=busname, broker=broker) + self.radbrpc = RADBRPC.create(exchange=exchange, broker=broker) + self.momrpc = MoMQueryRPC.create(exchange=exchange, broker=broker) def open(self): self.radbrpc.open() diff --git a/SAS/DataManagement/StorageQueryService/cache.py b/SAS/DataManagement/StorageQueryService/cache.py index ab65b9f669c..4b1747b8f35 100644 --- a/SAS/DataManagement/StorageQueryService/cache.py +++ b/SAS/DataManagement/StorageQueryService/cache.py @@ -7,10 +7,9 @@ TODO: add doc import logging import datetime from time import sleep -import ast -from optparse import OptionParser from threading import Thread, RLock import os.path +import shutil from functools import cmp_to_key from lofar.messaging import EventMessage, ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME @@ -19,8 +18,8 @@ from lofar.common.datetimeutils import format_timedelta from lofar.sas.datamanagement.storagequery.diskusage import getDiskUsageForPath as du_getDiskUsageForPath from lofar.sas.datamanagement.storagequery.diskusage import getOTDBIdFromPath from lofar.sas.datamanagement.storagequery.diskusage import DiskUsage -from lofar.sas.datamanagement.common.datamanagementbuslistener import DataManagementBusListener -from lofar.sas.otdb.OTDBBusListener import OTDBBusListener +from lofar.sas.datamanagement.common.datamanagementbuslistener import DataManagementBusListener, DataManagementEventMessageHandler +from lofar.sas.otdb.OTDBBusListener import OTDBBusListener, OTDBEventMessageHandler from lofar.common.util import waitForInterrupt from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT from lofar.sas.datamanagement.common.config import DEFAULT_DM_NOTIFICATION_PREFIX @@ -29,30 +28,48 @@ logger = logging.getLogger(__name__) MAX_CACHE_ENTRY_AGE = datetime.timedelta(hours=3*24) + +class _CacheManagerOTDBEventMessageHandler(OTDBEventMessageHandler): + def __init__(self, cache_manager: 'CacheManager'): + self.cache_manager = cache_manager + + def onObservationAborted(self, treeId, modificationTime): + self.cache_manager.onObservationAborted(treeId, modificationTime) + + def onObservationFinished(self, treeId, modificationTime): + self.cache_manager.onObservationFinished(treeId, modificationTime) + + +class _CacheManagerDataManagementEventMessageHandler(DataManagementEventMessageHandler): + def __init__(self, cache_manager: 'CacheManager'): + self.cache_manager = cache_manager + + def onTaskDeleted(self, otdb_id, deleted, paths, message=''): + self.cache_manager.onTaskDeleted(otdb_id, deleted, paths, message) + + class CacheManager: def __init__(self, cache_path='.du_cache.py', mountpoint=CEP4_DATA_MOUNTPOINT, - busname=DEFAULT_BUSNAME, + exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): self._cache_path = cache_path - self.otdb_listener = OTDBBusListener(busname=busname, + self.otdb_listener = OTDBBusListener(_CacheManagerOTDBEventMessageHandler, + handler_kwargs={'cache_manager': self}, + exchange=exchange, broker=broker, - numthreads=1) - - self.otdb_listener.onObservationAborted = self.onObservationAborted - self.otdb_listener.onObservationFinished = self.onObservationFinished + num_threads=1) - self.dm_listener = DataManagementBusListener(busname=busname, + self.dm_listener = DataManagementBusListener(_CacheManagerDataManagementEventMessageHandler, + handler_kwargs={'cache_manager': self}, + exchange=exchange, broker=broker, - numthreads=1) + num_threads=1) - self.dm_listener.onTaskDeleted = self.onTaskDeleted - - self.notification_prefix = DEFAULT_DM_NOTIFICATION_PREFIX - self.event_bus = ToBus(address=busname, broker=broker) + self.event_bus = ToBus(exchange=exchange, broker=broker) self._scanProjectsTreeThread = None self._updateCacheThread = None @@ -65,17 +82,17 @@ class CacheManager: self._readCacheFromDisk() self.disk_usage = DiskUsage(mountpoint=mountpoint, - busname=busname, + exchange=exchange, broker=broker) def _sendDiskUsageChangedNotification(self, path, disk_usage, otdb_id=None): try: - msg = EventMessage(context=self.notification_prefix + 'DiskUsageChanged', + msg = EventMessage(subject='%s.DiskUsageChanged' % DEFAULT_DM_NOTIFICATION_PREFIX, content={ 'path': path, 'disk_usage': disk_usage, 'disk_usage_readable': humanreadablesize(disk_usage), 'otdb_id': otdb_id }) - logger.info('Sending notification with subject %s to %s: %s', msg.subject, self.event_bus.address, msg.content) + logger.info('Sending notification with subject %s to %s: %s', msg.subject, self.event_bus.exchange, msg.content) self.event_bus.send(msg) except Exception as e: logger.error(str(e)) @@ -104,7 +121,6 @@ class CacheManager: try: # only persist (a subset of) the cache to disk every once in a while. if datetime.datetime.utcnow() - self._last_cache_write_timestamp > datetime.timedelta(minutes=5): - tmp_path = '/tmp/tmp_storagequery_cache.py' cache_str = '' with self._cacheLock: # Take a subset of the entire cache @@ -117,9 +133,11 @@ class CacheManager: if self.getDepthToProjectsDir(path) <= 1 and du_result.get('found') } cache_str = str(sub_cache) + tmp_path = '/tmp/tmp_storagequery_cache.py' with open(tmp_path, 'w') as file: file.write(cache_str) - os.rename(tmp_path, self._cache_path) + os.makedirs(os.path.dirname(self._cache_path), exist_ok=True) + shutil.move(tmp_path, self._cache_path) self._last_cache_write_timestamp = datetime.datetime.utcnow() except Exception as e: logger.error("Error while writing du cache: %s", e) diff --git a/SAS/DataManagement/StorageQueryService/config.py b/SAS/DataManagement/StorageQueryService/config.py index f9fb4ae3683..6add9d35caa 100644 --- a/SAS/DataManagement/StorageQueryService/config.py +++ b/SAS/DataManagement/StorageQueryService/config.py @@ -1,4 +1,4 @@ #!/usr/bin/env python3 # $Id$ -DEFAULT_SERVICENAME = 'StorageQueryService' +DEFAULT_STORAGEQUERY_SERVICENAME = 'StorageQueryService' diff --git a/SAS/DataManagement/StorageQueryService/diskusage.py b/SAS/DataManagement/StorageQueryService/diskusage.py index f996c82d380..620465779f0 100644 --- a/SAS/DataManagement/StorageQueryService/diskusage.py +++ b/SAS/DataManagement/StorageQueryService/diskusage.py @@ -177,10 +177,10 @@ def getDiskFreeSpaceForMountpoint(mountpoint=CEP4_DATA_MOUNTPOINT): class DiskUsage: def __init__(self, mountpoint=CEP4_DATA_MOUNTPOINT, - busname=DEFAULT_BUSNAME, + exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): self.path_resolver = PathResolver(mountpoint=mountpoint, - busname=busname, + exchange=exchange, broker=broker) def open(self): diff --git a/SAS/DataManagement/StorageQueryService/service.py b/SAS/DataManagement/StorageQueryService/service.py index 68ae6f32b59..def1aef75c7 100644 --- a/SAS/DataManagement/StorageQueryService/service.py +++ b/SAS/DataManagement/StorageQueryService/service.py @@ -6,61 +6,45 @@ import subprocess import socket import os.path from optparse import OptionParser -from lofar.messaging import Service, DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.messaging.Service import MessageHandlerInterface -from lofar.messaging import setQpidLogLevel -from lofar.common.util import waitForInterrupt, convertIntKeysToString +from lofar.messaging import RPCService, DEFAULT_BROKER, DEFAULT_BUSNAME, ServiceMessageHandler +from lofar.common.util import waitForInterrupt -from lofar.messaging.Service import MessageHandlerInterface from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT +from lofar.sas.datamanagement.storagequery.config import DEFAULT_STORAGEQUERY_SERVICENAME from lofar.sas.datamanagement.storagequery.cache import CacheManager from lofar.sas.datamanagement.storagequery.diskusage import DiskUsage logger = logging.getLogger(__name__) -class StorageQueryHandler(MessageHandlerInterface): - def __init__(self, - mountpoint=CEP4_DATA_MOUNTPOINT, - busname=DEFAULT_BUSNAME, - broker=DEFAULT_BROKER, - **kwargs): +class StorageQueryHandler(ServiceMessageHandler): + def __init__(self, cache_manager: CacheManager, mountpoint=CEP4_DATA_MOUNTPOINT): + super(StorageQueryHandler, self).__init__() + self.mount_point = mountpoint + self.cache = cache_manager - self.cache = kwargs.pop('cache_manager') + def init(self, service_name: str, exchange: str, broker: str): + super().init(service_name, exchange, broker) + self.register_service_method('GetPathForOTDBId', self.cache.disk_usage.path_resolver.getPathForOTDBId) + self.register_service_method('GetDiskUsageForOTDBId', self.cache.getDiskUsageForOTDBId) + self.register_service_method('GetDiskUsageForMoMId', self.cache.getDiskUsageForMoMId) + self.register_service_method('GetDiskUsageForRADBId', self.cache.getDiskUsageForRADBId) + self.register_service_method('GetDiskUsageForTask', self.cache.getDiskUsageForTask) + self.register_service_method('GetDiskUsageForTasks', self.cache.getDiskUsageForTasks) + self.register_service_method('GetDiskUsageForTaskAndSubDirectories', self.cache.getDiskUsageForTaskAndSubDirectories) + self.register_service_method('GetDiskUsageForProjectDirAndSubDirectories', self.cache.getDiskUsageForProjectDirAndSubDirectories) + self.register_service_method('GetDiskUsageForProjectsDirAndSubDirectories', self.cache.getDiskUsageForProjectsDirAndSubDirectories) + self.register_service_method('GetDiskUsageForPath', self.cache.getDiskUsageForPath) + self.register_service_method('GetDiskUsagesForAllOtdbIds', self.cache.getDiskUsagesForAllOtdbIds) - super(StorageQueryHandler, self).__init__(**kwargs) - self.disk_usage = DiskUsage(mountpoint=mountpoint, - busname=busname, - broker=broker) - - self.service2MethodMap = {'GetPathForOTDBId': self.cache.disk_usage.path_resolver.getPathForOTDBId, - 'GetDiskUsageForOTDBId': self.cache.getDiskUsageForOTDBId, - 'GetDiskUsageForMoMId': self.cache.getDiskUsageForMoMId, - 'GetDiskUsageForRADBId': self.cache.getDiskUsageForRADBId, - 'GetDiskUsageForTask': self.cache.getDiskUsageForTask, - 'GetDiskUsageForTasks': self.cache.getDiskUsageForTasks, - 'GetDiskUsageForTaskAndSubDirectories': self.cache.getDiskUsageForTaskAndSubDirectories, - 'GetDiskUsageForProjectDirAndSubDirectories': self.cache.getDiskUsageForProjectDirAndSubDirectories, - 'GetDiskUsageForProjectsDirAndSubDirectories': self.cache.getDiskUsageForProjectsDirAndSubDirectories, - 'GetDiskUsageForPath': self.cache.getDiskUsageForPath, - 'GetDiskUsagesForAllOtdbIds': self.getDiskUsagesForAllOtdbIds} - - def getDiskUsagesForAllOtdbIds(self, force_update=False): - return convertIntKeysToString(self.cache.getDiskUsagesForAllOtdbIds(force_update)) - -def createService(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, - mountpoint=CEP4_DATA_MOUNTPOINT, verbose=False, - cache_manager=None): - return Service(DEFAULT_SERVICENAME, - StorageQueryHandler, - busname=busname, - broker=broker, - use_service_methods=True, - numthreads=1, - verbose=verbose, - handler_args={'mountpoint': mountpoint, - 'busname':busname, - 'broker':broker, - 'cache_manager':cache_manager}) +def createService(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, + mountpoint=CEP4_DATA_MOUNTPOINT, cache_manager=None): + return RPCService(service_name=DEFAULT_STORAGEQUERY_SERVICENAME, + handler_type=StorageQueryHandler, + handler_kwargs={'mountpoint': mountpoint, + 'cache_manager':cache_manager}, + exchange=exchange, + broker=broker, + num_threads=6) def main(): # make sure we run in UTC timezone @@ -73,20 +57,20 @@ def main(): parser.add_option('-c', '--cache_path', dest='cache_path', type='string', default=os.path.expandvars('$LOFARROOT/etc/storagequery_cache.py'), help='path of the cache file, default: %default') - parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') - parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME) + parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, + help='Address of the messaging broker, default: %default') + parser.add_option("-e", "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, + help="Name of the bus exchange on the broker, [default: %default]") parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG if options.verbose else logging.INFO) - setQpidLogLevel(logging.INFO) - with CacheManager(busname=options.busname, broker=options.broker, cache_path=options.cache_path) as cache_manager: - with createService(busname=options.busname, - broker=options.broker, - verbose=options.verbose, - cache_manager=cache_manager): + with CacheManager(exchange=options.exchange, broker=options.broker, cache_path=options.cache_path) as cache_manager: + with createService(exchange=options.exchange, + broker=options.broker, + cache_manager=cache_manager): waitForInterrupt() if __name__ == '__main__': -- GitLab