Skip to content
Snippets Groups Projects
Commit e96f7b37 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-699: adaptations to new rpc.py module. Not fully finished yet for all...

SW-699: adaptations to new rpc.py module. Not fully finished yet for all datamanagement services and rpcs
parent 1e2bb7bd
Branches
Tags
No related merge requests found
......@@ -21,7 +21,7 @@
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
from lofar.messaging.messagebus import AbstractBusListener
from lofar.messaging import AbstractMessageHandler, BusListener, LofarMessage, EventMessage
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.sas.datamanagement.common.config import DEFAULT_DM_NOTIFICATION_PREFIX
from lofar.common.util import waitForInterrupt
......@@ -30,27 +30,28 @@ import logging
logger = logging.getLogger(__name__)
class DataManagementBusListener(AbstractBusListener):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
super(DataManagementBusListener, self).__init__(exchange=busname, routing_key=DEFAULT_DM_NOTIFICATION_PREFIX+"#", broker=broker)
class DataManagementEventMessageHandler(AbstractMessageHandler):
def handle_message(self, msg: LofarMessage):
if not isinstance(msg, EventMessage):
raise ValueError("%s: Ignoring non-EventMessage: %s" % (self.__class__.__name__, msg))
def _handleMessage(self, msg):
logger.info("on%s: %s" % (msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' ')))
stripped_subject = msg.subject.replace("%s." % DEFAULT_DM_NOTIFICATION_PREFIX, '')
logger.info("on%s: %s" % (stripped_subject, str(msg.content).replace('\n', ' ')))
if msg.subject == '%sTaskDeleting' % self.subject_prefix:
if stripped_subject == 'TaskDeleting':
self.onTaskDeleting(msg.content.get('otdb_id'))
elif msg.subject == '%sTaskDeleted' % self.subject_prefix:
elif stripped_subject == 'TaskDeleted':
self.onTaskDeleted(msg.content.get('otdb_id'), msg.content.get('deleted'), msg.content.get('paths'), msg.content.get('message', ''))
elif msg.subject == '%sTaskDataPinned' % self.subject_prefix:
elif stripped_subject == 'TaskDataPinned':
self.onTaskDataPinned(msg.content.get('otdb_id'), msg.content.get('pinned'))
elif msg.subject == '%sPathDeleting' % self.subject_prefix:
elif stripped_subject == 'PathDeleting':
self.onPathDeleting(msg.content.get('path'))
elif msg.subject == '%sPathDeleted' % self.subject_prefix:
elif stripped_subject == 'PathDeleted':
self.onPathDeleted(msg.content.get('path'), msg.content.get('deleted'), msg.content.get('message', ''))
elif msg.subject == '%sDiskUsageChanged' % self.subject_prefix:
elif stripped_subject == 'DiskUsageChanged':
self.onDiskUsageChanged(msg.content.get('path'), msg.content.get('disk_usage'), msg.content.get('otdb_id'))
else:
logger.error("DataManagementBusListener.handleMessage: unknown subject: %s" %str(msg.subject))
raise ValueError("DataManagementBusListener.handleMessage: unknown subject: %s" % msg.subject)
def onTaskDeleting(self, otdb_id):
'''onTaskDeleting is called upon receiving a TaskDeleting message.
......@@ -90,9 +91,29 @@ class DataManagementBusListener(AbstractBusListener):
pass
class DataManagementBusListener(BusListener):
def __init__(self,
handler_type: DataManagementEventMessageHandler.__class__ = DataManagementEventMessageHandler,
handler_kwargs: dict = None,
exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER,
num_threads: int=1):
"""
DataManagementBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received.
Typical usage is to derive your own subclass from DataManagementBusListener and implement the specific on<SomeMessage> methods that you are interested in.
:param busname: valid Qpid address
:param broker: valid Qpid broker host
"""
if not issubclass(handler_type, DataManagementEventMessageHandler):
raise TypeError("handler_type should be a DataManagementEventMessageHandler subclass")
super().__init__(handler_type=handler_type, handler_kwargs=handler_kwargs,
exchange=exchange, broker=broker,
routing_key="%s.#" % DEFAULT_DM_NOTIFICATION_PREFIX,
num_threads=num_threads)
if __name__ == '__main__':
with DataManagementBusListener(broker=None) as listener:
with DataManagementBusListener() as listener:
waitForInterrupt()
__all__ = ["DataManagementBusListener"]
......@@ -6,14 +6,14 @@ import logging
import socket
import subprocess
from lofar.common import isProductionEnvironment, isTestEnvironment
from lofar.common import isProductionEnvironment
from lofar.common.subprocess_utils import communicate_returning_strings
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC as RADBRPC
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
......@@ -22,7 +22,7 @@ logger = logging.getLogger(__name__)
class PathResolver:
def __init__(self,
mountpoint=CEP4_DATA_MOUNTPOINT,
busname=DEFAULT_BUSNAME,
exchange=DEFAULT_BUSNAME,
broker=DEFAULT_BROKER):
self.mountpoint = mountpoint
......@@ -30,8 +30,8 @@ class PathResolver:
self.scratch_path = os.path.join(self.mountpoint, 'scratch', 'pipeline')
self.share_path = os.path.join(self.mountpoint, 'share', 'pipeline')
self.radbrpc = RADBRPC(busname=busname, broker=broker)
self.momrpc = MoMQueryRPC(busname=busname, broker=broker)
self.radbrpc = RADBRPC.create(exchange=exchange, broker=broker)
self.momrpc = MoMQueryRPC.create(exchange=exchange, broker=broker)
def open(self):
self.radbrpc.open()
......
......@@ -7,10 +7,9 @@ TODO: add doc
import logging
import datetime
from time import sleep
import ast
from optparse import OptionParser
from threading import Thread, RLock
import os.path
import shutil
from functools import cmp_to_key
from lofar.messaging import EventMessage, ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME
......@@ -19,8 +18,8 @@ from lofar.common.datetimeutils import format_timedelta
from lofar.sas.datamanagement.storagequery.diskusage import getDiskUsageForPath as du_getDiskUsageForPath
from lofar.sas.datamanagement.storagequery.diskusage import getOTDBIdFromPath
from lofar.sas.datamanagement.storagequery.diskusage import DiskUsage
from lofar.sas.datamanagement.common.datamanagementbuslistener import DataManagementBusListener
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
from lofar.sas.datamanagement.common.datamanagementbuslistener import DataManagementBusListener, DataManagementEventMessageHandler
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener, OTDBEventMessageHandler
from lofar.common.util import waitForInterrupt
from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT
from lofar.sas.datamanagement.common.config import DEFAULT_DM_NOTIFICATION_PREFIX
......@@ -29,30 +28,48 @@ logger = logging.getLogger(__name__)
MAX_CACHE_ENTRY_AGE = datetime.timedelta(hours=3*24)
class _CacheManagerOTDBEventMessageHandler(OTDBEventMessageHandler):
def __init__(self, cache_manager: 'CacheManager'):
self.cache_manager = cache_manager
def onObservationAborted(self, treeId, modificationTime):
self.cache_manager.onObservationAborted(treeId, modificationTime)
def onObservationFinished(self, treeId, modificationTime):
self.cache_manager.onObservationFinished(treeId, modificationTime)
class _CacheManagerDataManagementEventMessageHandler(DataManagementEventMessageHandler):
def __init__(self, cache_manager: 'CacheManager'):
self.cache_manager = cache_manager
def onTaskDeleted(self, otdb_id, deleted, paths, message=''):
self.cache_manager.onTaskDeleted(otdb_id, deleted, paths, message)
class CacheManager:
def __init__(self,
cache_path='.du_cache.py',
mountpoint=CEP4_DATA_MOUNTPOINT,
busname=DEFAULT_BUSNAME,
exchange=DEFAULT_BUSNAME,
broker=DEFAULT_BROKER):
self._cache_path = cache_path
self.otdb_listener = OTDBBusListener(busname=busname,
self.otdb_listener = OTDBBusListener(_CacheManagerOTDBEventMessageHandler,
handler_kwargs={'cache_manager': self},
exchange=exchange,
broker=broker,
numthreads=1)
self.otdb_listener.onObservationAborted = self.onObservationAborted
self.otdb_listener.onObservationFinished = self.onObservationFinished
num_threads=1)
self.dm_listener = DataManagementBusListener(busname=busname,
self.dm_listener = DataManagementBusListener(_CacheManagerDataManagementEventMessageHandler,
handler_kwargs={'cache_manager': self},
exchange=exchange,
broker=broker,
numthreads=1)
num_threads=1)
self.dm_listener.onTaskDeleted = self.onTaskDeleted
self.notification_prefix = DEFAULT_DM_NOTIFICATION_PREFIX
self.event_bus = ToBus(address=busname, broker=broker)
self.event_bus = ToBus(exchange=exchange, broker=broker)
self._scanProjectsTreeThread = None
self._updateCacheThread = None
......@@ -65,17 +82,17 @@ class CacheManager:
self._readCacheFromDisk()
self.disk_usage = DiskUsage(mountpoint=mountpoint,
busname=busname,
exchange=exchange,
broker=broker)
def _sendDiskUsageChangedNotification(self, path, disk_usage, otdb_id=None):
try:
msg = EventMessage(context=self.notification_prefix + 'DiskUsageChanged',
msg = EventMessage(subject='%s.DiskUsageChanged' % DEFAULT_DM_NOTIFICATION_PREFIX,
content={ 'path': path,
'disk_usage': disk_usage,
'disk_usage_readable': humanreadablesize(disk_usage),
'otdb_id': otdb_id })
logger.info('Sending notification with subject %s to %s: %s', msg.subject, self.event_bus.address, msg.content)
logger.info('Sending notification with subject %s to %s: %s', msg.subject, self.event_bus.exchange, msg.content)
self.event_bus.send(msg)
except Exception as e:
logger.error(str(e))
......@@ -104,7 +121,6 @@ class CacheManager:
try:
# only persist (a subset of) the cache to disk every once in a while.
if datetime.datetime.utcnow() - self._last_cache_write_timestamp > datetime.timedelta(minutes=5):
tmp_path = '/tmp/tmp_storagequery_cache.py'
cache_str = ''
with self._cacheLock:
# Take a subset of the entire cache
......@@ -117,9 +133,11 @@ class CacheManager:
if self.getDepthToProjectsDir(path) <= 1 and du_result.get('found') }
cache_str = str(sub_cache)
tmp_path = '/tmp/tmp_storagequery_cache.py'
with open(tmp_path, 'w') as file:
file.write(cache_str)
os.rename(tmp_path, self._cache_path)
os.makedirs(os.path.dirname(self._cache_path), exist_ok=True)
shutil.move(tmp_path, self._cache_path)
self._last_cache_write_timestamp = datetime.datetime.utcnow()
except Exception as e:
logger.error("Error while writing du cache: %s", e)
......
#!/usr/bin/env python3
# $Id$
DEFAULT_SERVICENAME = 'StorageQueryService'
DEFAULT_STORAGEQUERY_SERVICENAME = 'StorageQueryService'
......@@ -177,10 +177,10 @@ def getDiskFreeSpaceForMountpoint(mountpoint=CEP4_DATA_MOUNTPOINT):
class DiskUsage:
def __init__(self,
mountpoint=CEP4_DATA_MOUNTPOINT,
busname=DEFAULT_BUSNAME,
exchange=DEFAULT_BUSNAME,
broker=DEFAULT_BROKER):
self.path_resolver = PathResolver(mountpoint=mountpoint,
busname=busname,
exchange=exchange,
broker=broker)
def open(self):
......
......@@ -6,61 +6,45 @@ import subprocess
import socket
import os.path
from optparse import OptionParser
from lofar.messaging import Service, DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.messaging.Service import MessageHandlerInterface
from lofar.messaging import setQpidLogLevel
from lofar.common.util import waitForInterrupt, convertIntKeysToString
from lofar.messaging import RPCService, DEFAULT_BROKER, DEFAULT_BUSNAME, ServiceMessageHandler
from lofar.common.util import waitForInterrupt
from lofar.messaging.Service import MessageHandlerInterface
from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT
from lofar.sas.datamanagement.storagequery.config import DEFAULT_STORAGEQUERY_SERVICENAME
from lofar.sas.datamanagement.storagequery.cache import CacheManager
from lofar.sas.datamanagement.storagequery.diskusage import DiskUsage
logger = logging.getLogger(__name__)
class StorageQueryHandler(MessageHandlerInterface):
def __init__(self,
mountpoint=CEP4_DATA_MOUNTPOINT,
busname=DEFAULT_BUSNAME,
broker=DEFAULT_BROKER,
**kwargs):
class StorageQueryHandler(ServiceMessageHandler):
def __init__(self, cache_manager: CacheManager, mountpoint=CEP4_DATA_MOUNTPOINT):
super(StorageQueryHandler, self).__init__()
self.mount_point = mountpoint
self.cache = cache_manager
self.cache = kwargs.pop('cache_manager')
def init(self, service_name: str, exchange: str, broker: str):
super().init(service_name, exchange, broker)
self.register_service_method('GetPathForOTDBId', self.cache.disk_usage.path_resolver.getPathForOTDBId)
self.register_service_method('GetDiskUsageForOTDBId', self.cache.getDiskUsageForOTDBId)
self.register_service_method('GetDiskUsageForMoMId', self.cache.getDiskUsageForMoMId)
self.register_service_method('GetDiskUsageForRADBId', self.cache.getDiskUsageForRADBId)
self.register_service_method('GetDiskUsageForTask', self.cache.getDiskUsageForTask)
self.register_service_method('GetDiskUsageForTasks', self.cache.getDiskUsageForTasks)
self.register_service_method('GetDiskUsageForTaskAndSubDirectories', self.cache.getDiskUsageForTaskAndSubDirectories)
self.register_service_method('GetDiskUsageForProjectDirAndSubDirectories', self.cache.getDiskUsageForProjectDirAndSubDirectories)
self.register_service_method('GetDiskUsageForProjectsDirAndSubDirectories', self.cache.getDiskUsageForProjectsDirAndSubDirectories)
self.register_service_method('GetDiskUsageForPath', self.cache.getDiskUsageForPath)
self.register_service_method('GetDiskUsagesForAllOtdbIds', self.cache.getDiskUsagesForAllOtdbIds)
super(StorageQueryHandler, self).__init__(**kwargs)
self.disk_usage = DiskUsage(mountpoint=mountpoint,
busname=busname,
broker=broker)
self.service2MethodMap = {'GetPathForOTDBId': self.cache.disk_usage.path_resolver.getPathForOTDBId,
'GetDiskUsageForOTDBId': self.cache.getDiskUsageForOTDBId,
'GetDiskUsageForMoMId': self.cache.getDiskUsageForMoMId,
'GetDiskUsageForRADBId': self.cache.getDiskUsageForRADBId,
'GetDiskUsageForTask': self.cache.getDiskUsageForTask,
'GetDiskUsageForTasks': self.cache.getDiskUsageForTasks,
'GetDiskUsageForTaskAndSubDirectories': self.cache.getDiskUsageForTaskAndSubDirectories,
'GetDiskUsageForProjectDirAndSubDirectories': self.cache.getDiskUsageForProjectDirAndSubDirectories,
'GetDiskUsageForProjectsDirAndSubDirectories': self.cache.getDiskUsageForProjectsDirAndSubDirectories,
'GetDiskUsageForPath': self.cache.getDiskUsageForPath,
'GetDiskUsagesForAllOtdbIds': self.getDiskUsagesForAllOtdbIds}
def getDiskUsagesForAllOtdbIds(self, force_update=False):
return convertIntKeysToString(self.cache.getDiskUsagesForAllOtdbIds(force_update))
def createService(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER,
mountpoint=CEP4_DATA_MOUNTPOINT, verbose=False,
cache_manager=None):
return Service(DEFAULT_SERVICENAME,
StorageQueryHandler,
busname=busname,
def createService(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER,
mountpoint=CEP4_DATA_MOUNTPOINT, cache_manager=None):
return RPCService(service_name=DEFAULT_STORAGEQUERY_SERVICENAME,
handler_type=StorageQueryHandler,
handler_kwargs={'mountpoint': mountpoint,
'cache_manager':cache_manager},
exchange=exchange,
broker=broker,
use_service_methods=True,
numthreads=1,
verbose=verbose,
handler_args={'mountpoint': mountpoint,
'busname':busname,
'broker':broker,
'cache_manager':cache_manager})
num_threads=6)
def main():
# make sure we run in UTC timezone
......@@ -73,19 +57,19 @@ def main():
parser.add_option('-c', '--cache_path', dest='cache_path', type='string',
default=os.path.expandvars('$LOFARROOT/etc/storagequery_cache.py'),
help='path of the cache file, default: %default')
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME)
parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
help='Address of the messaging broker, default: %default')
parser.add_option("-e", "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME,
help="Name of the bus exchange on the broker, [default: %default]")
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
(options, args) = parser.parse_args()
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
setQpidLogLevel(logging.INFO)
with CacheManager(busname=options.busname, broker=options.broker, cache_path=options.cache_path) as cache_manager:
with createService(busname=options.busname,
with CacheManager(exchange=options.exchange, broker=options.broker, cache_path=options.cache_path) as cache_manager:
with createService(exchange=options.exchange,
broker=options.broker,
verbose=options.verbose,
cache_manager=cache_manager):
waitForInterrupt()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment