From c0abfa3d00c1685752530957ee6980b983f53597 Mon Sep 17 00:00:00 2001
From: Jorrit Schaap <schaap@astron.nl>
Date: Tue, 5 Jul 2016 11:17:20 +0000
Subject: [PATCH] Task #9351 #9353 #9355: added notifications on deletion of
 path/task. Handle notifications with cleanupbuslistener. Various fixes and
 improvements in cache

---
 .gitattributes                                |   1 +
 .../CleanupService/CMakeLists.txt             |   1 +
 .../CleanupService/cleanupbuslistener.py      |  67 ++++++++
 SAS/DataManagement/CleanupService/config.py   |   6 +-
 SAS/DataManagement/CleanupService/service.py  |  12 +-
 .../StorageQueryService/cache.py              | 149 +++++++++++++++---
 6 files changed, 205 insertions(+), 31 deletions(-)
 create mode 100644 SAS/DataManagement/CleanupService/cleanupbuslistener.py

diff --git a/.gitattributes b/.gitattributes
index 832f8a1867c..05318bb2c1a 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -4732,6 +4732,7 @@ SAS/DataManagement/CMakeLists.txt -text
 SAS/DataManagement/CleanupService/CMakeLists.txt -text
 SAS/DataManagement/CleanupService/__init__.py -text
 SAS/DataManagement/CleanupService/cleanup -text
+SAS/DataManagement/CleanupService/cleanupbuslistener.py -text
 SAS/DataManagement/CleanupService/cleanupservice -text
 SAS/DataManagement/CleanupService/cleanupservice.ini -text
 SAS/DataManagement/CleanupService/config.py -text
diff --git a/SAS/DataManagement/CleanupService/CMakeLists.txt b/SAS/DataManagement/CleanupService/CMakeLists.txt
index 9d37337b1ad..95ba5bfab4e 100644
--- a/SAS/DataManagement/CleanupService/CMakeLists.txt
+++ b/SAS/DataManagement/CleanupService/CMakeLists.txt
@@ -10,6 +10,7 @@ set(_py_files
   config.py
   rpc.py
   service.py
+  cleanupbuslistener.py
 )
 
 python_install(${_py_files} DESTINATION lofar/sas/datamanagement/cleanup)
diff --git a/SAS/DataManagement/CleanupService/cleanupbuslistener.py b/SAS/DataManagement/CleanupService/cleanupbuslistener.py
new file mode 100644
index 00000000000..bc7882e9081
--- /dev/null
+++ b/SAS/DataManagement/CleanupService/cleanupbuslistener.py
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+
+# CleanupBusListener.py
+#
+# Copyright (C) 2015
+# ASTRON (Netherlands Institute for Radio Astronomy)
+# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
+#
+# This file is part of the LOFAR software suite.
+# The LOFAR software suite is free software: you can redistribute it
+# and/or modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# The LOFAR software suite is distributed in the hope that it will be
+# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from lofar.messaging.messagebus import AbstractBusListener
+from lofar.sas.datamanagement.cleanup.config import DEFAULT_DM_NOTIFICATION_BUSNAME, DEFAULT_DM_NOTIFICATION_SUBJECTS
+from lofar.common.util import waitForInterrupt
+
+import qpid.messaging
+import logging
+
+logger = logging.getLogger(__name__)
+
+class CleanupBusListener(AbstractBusListener):
+    def __init__(self, busname=DEFAULT_DM_NOTIFICATION_BUSNAME, subjects=DEFAULT_DM_NOTIFICATION_SUBJECTS, broker=None, **kwargs):
+        self.subject_prefix = (subjects.split('.')[0]+'.') if '.' in subjects else ''
+
+        address = "%s/%s" % (busname, subjects)
+        super(CleanupBusListener, self).__init__(address, broker, **kwargs)
+
+    def _handleMessage(self, msg):
+        logger.info("on%s: %s" % (msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' ')))
+
+        if msg.subject == '%sTaskDeleted' % self.subject_prefix:
+            self.onTaskDeleted(msg.content.get('otdb_id'), msg.content.get('paths'))
+        elif msg.subject == '%sPathDeleted' % self.subject_prefix:
+            self.onPathDeleted(msg.content.get('path'))
+        else:
+            logger.error("CleanupBusListener.handleMessage: unknown subject: %s" %str(msg.subject))
+
+    def onTaskDeleted(self, otdb_id, paths):
+        '''onTaskDeleted is called upon receiving a TaskDeleted message.
+        :param otdb_id: otdb_id of the deleted task
+        :param paths: list of paths of the deleted task'''
+        pass
+
+    def onPathDeleted(self, path):
+        '''onPathDeleted is called upon receiving a PathDeleted message.
+        :param path: path of the deleted task'''
+        pass
+
+
+
+if __name__ == '__main__':
+    with CleanupBusListener(broker=None) as listener:
+        waitForInterrupt()
+
+__all__ = ["CleanupBusListener"]
diff --git a/SAS/DataManagement/CleanupService/config.py b/SAS/DataManagement/CleanupService/config.py
index cac900bd86e..30b010909a9 100644
--- a/SAS/DataManagement/CleanupService/config.py
+++ b/SAS/DataManagement/CleanupService/config.py
@@ -6,6 +6,6 @@ from lofar.messaging import adaptNameToEnvironment
 DEFAULT_BUSNAME = adaptNameToEnvironment('lofar.dm.command')
 DEFAULT_SERVICENAME = 'CleanupService'
 
-DEFAULT_NOTIFICATION_BUSNAME = adaptNameToEnvironment('lofar.dm.notification')
-DEFAULT_NOTIFICATION_PREFIX = 'DM.'
-DEFAULT_NOTIFICATION_SUBJECTS=DEFAULT_NOTIFICATION_PREFIX+'*'
+DEFAULT_DM_NOTIFICATION_BUSNAME = adaptNameToEnvironment('lofar.dm.notification')
+DEFAULT_DM_NOTIFICATION_PREFIX = 'DM.'
+DEFAULT_DM_NOTIFICATION_SUBJECTS=DEFAULT_DM_NOTIFICATION_PREFIX+'*'
diff --git a/SAS/DataManagement/CleanupService/service.py b/SAS/DataManagement/CleanupService/service.py
index f65b7034c00..7e4bba458ae 100644
--- a/SAS/DataManagement/CleanupService/service.py
+++ b/SAS/DataManagement/CleanupService/service.py
@@ -16,7 +16,7 @@ from lofar.common.util import waitForInterrupt
 from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT
 from lofar.sas.datamanagement.common.path import PathResolver
 from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME
-from lofar.sas.datamanagement.cleanup.config import DEFAULT_NOTIFICATION_BUSNAME, DEFAULT_NOTIFICATION_PREFIX
+from lofar.sas.datamanagement.cleanup.config import DEFAULT_DM_NOTIFICATION_BUSNAME, DEFAULT_DM_NOTIFICATION_PREFIX
 from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
 from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
 from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
@@ -30,8 +30,8 @@ class CleanupHandler(MessageHandlerInterface):
                  radb_servicename=RADB_SERVICENAME,
                  mom_busname=DEFAULT_MOMQUERY_BUSNAME,
                  mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
-                 notification_busname=DEFAULT_NOTIFICATION_BUSNAME,
-                 notification_prefix=DEFAULT_NOTIFICATION_PREFIX,
+                 notification_busname=DEFAULT_DM_NOTIFICATION_BUSNAME,
+                 notification_prefix=DEFAULT_DM_NOTIFICATION_PREFIX,
                  broker=None,
                  **kwargs):
 
@@ -65,7 +65,7 @@ class CleanupHandler(MessageHandlerInterface):
     def _sendPathDeletedNotification(self, path):
         try:
             msg = EventMessage(context=self.notification_prefix + 'PathDeleted', content={ 'path': path })
-            logger.info('Sending notification: %s', msg)
+            logger.info('Sending notification: %s', str(msg).replace('\n', ' '))
             self.event_bus.send(msg)
         except Exception as e:
             logger.error(str(e))
@@ -75,7 +75,7 @@ class CleanupHandler(MessageHandlerInterface):
             msg = EventMessage(context=self.notification_prefix + 'TaskDeleted', content={'otdb_id':otdb_id,
                                                                                           'paths': paths,
                                                                                           'message': message})
-            logger.info('Sending notification: %s', msg)
+            logger.info('Sending notification: %s', str(msg).replace('\n', ' '))
             self.event_bus.send(msg)
         except Exception as e:
             logger.error(str(e))
@@ -157,7 +157,7 @@ class CleanupHandler(MessageHandlerInterface):
                 logger.error(message)
                 return {'deleted': False, 'message': message, 'path': path}
 
-        if not os.path.exists(path):
+        if not self.path_resolver.pathExists(path):
             message = "Nothing to delete, path '%s' does not exist." % (path)
             logger.warn(message)
             return {'deleted': True, 'message': message, 'path': path}
diff --git a/SAS/DataManagement/StorageQueryService/cache.py b/SAS/DataManagement/StorageQueryService/cache.py
index 878361492c9..dcab250bd68 100644
--- a/SAS/DataManagement/StorageQueryService/cache.py
+++ b/SAS/DataManagement/StorageQueryService/cache.py
@@ -12,19 +12,24 @@ from optparse import OptionParser
 from threading import Thread, RLock
 import os.path
 
+from lofar.common.util import humanreadablesize
 from lofar.sas.datamanagement.storagequery.diskusage import getDiskUsageForPath as du_getDiskUsageForPath
 from lofar.sas.datamanagement.storagequery.diskusage import DiskUsage
+from lofar.sas.datamanagement.cleanup.cleanupbuslistener import CleanupBusListener
 from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
 from lofar.common.util import waitForInterrupt
 from lofar.sas.datamanagement.common.config import CEP4_DATA_MOUNTPOINT
 from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT
+from lofar.sas.datamanagement.cleanup.config import DEFAULT_DM_NOTIFICATION_BUSNAME, DEFAULT_DM_NOTIFICATION_SUBJECTS
 from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
 from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
 from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
 
 logger = logging.getLogger(__name__)
 
-class CacheManager(OTDBBusListener):
+MAX_CACHE_ENTRY_AGE = datetime.timedelta(minutes=60)
+
+class CacheManager:
     def __init__(self,
                  mountpoint=CEP4_DATA_MOUNTPOINT,
                  radb_busname=RADB_BUSNAME,
@@ -33,13 +38,25 @@ class CacheManager(OTDBBusListener):
                  mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
                  otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
                  otdb_notification_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
+                 dm_notification_busname=DEFAULT_DM_NOTIFICATION_BUSNAME,
+                 dm_notification_subjects=DEFAULT_DM_NOTIFICATION_SUBJECTS,
                  broker=None):
-        super(CacheManager, self).__init__(busname=otdb_notification_busname,
-                                           subject=otdb_notification_subject,
-                                           broker=broker)
+        self.otdb_listener = OTDBBusListener(busname=otdb_notification_busname,
+                                             subject=otdb_notification_subject,
+                                             broker=broker)
+
+        self.otdb_listener.onObservationAborted = self.onObservationAborted
+        self.otdb_listener.onObservationFinished = self.onObservationFinished
+
+        self.cleanup_listener = CleanupBusListener(busname=dm_notification_busname,
+                                                subjects=dm_notification_subjects,
+                                                broker=broker)
+
+        self.cleanup_listener.onTaskDeleted = self.onTaskDeleted
 
         self._updateCacheThread = None
         self._updateCacheThreadRunning = False
+        self._continueUpdateCacheThread = False
         self._cacheLock = RLock()
 
         self._cache = {'paths':{}, 'otdb_ids': {}}
@@ -97,21 +114,78 @@ class CacheManager(OTDBBusListener):
                         logger.info('removing otdb_id %s from cache', otdb_id)
                         del otdb_cache[otdb_id]
 
-    def _updateOldEntriesInCache(self):
-        while self._updateCacheThreadRunning:
-            now = datetime.datetime.utcnow()
-            with self._cacheLock:
-                old_entries = {path:du_result for path,du_result in self._cache['paths'].items() if now - du_result['cache_timestamp'] > datetime.timedelta(minutes=15)}
+        if du_result.get('path') == self.disk_usage.path_resolver.projects_path:
+            self._updateProjectsDiskUsageInRADB()
 
-            for path, du_result in old_entries.items():
-                logger.info('updating old entry in cache: %s', path)
-                result = du_getDiskUsageForPath(path)
-                self._updateCache(result)
+    def _invalidateCacheEntryForPath(self, path):
+        with self._cacheLock:
+            if path in self._cache['paths']:
+                path_cache = self._cache['paths'][path]
+                path_cache['cache_timestamp'] = path_cache['cache_timestamp'] - MAX_CACHE_ENTRY_AGE
 
-            for i in range(60):
-                sleep(1)
-                if self._updateCacheThreadRunning:
-                    return
+    def _updateOldEntriesInCache(self):
+        while self._updateCacheThreadRunning:
+            try:
+                now = datetime.datetime.utcnow()
+                with self._cacheLock:
+                    old_entries = {path:du_result for path,du_result in self._cache['paths'].items() if now - du_result['cache_timestamp'] > MAX_CACHE_ENTRY_AGE}
+
+                if old_entries:
+                    logger.info('updating %s old cache entries', len(old_entries))
+
+                    # sort them oldest to newest
+                    old_entries = sorted(old_entries.items(), key=lambda x: x[1]['cache_timestamp'])
+
+                    cacheUpdateStart = datetime.datetime.utcnow()
+
+                    for path, du_result in old_entries:
+                        try:
+                            logger.info('updating old entry in cache: %s', path)
+                            result = du_getDiskUsageForPath(path)
+                            self._updateCache(result)
+                        except Exception as e:
+                            logger.error(str(e))
+
+                        if not self._updateCacheThreadRunning:
+                            return
+
+                        if datetime.datetime.utcnow() - cacheUpdateStart > datetime.timedelta(minutes=5):
+                            # break out of cache update loop if full update takes more than 5min
+                            # next loop we'll start with the oldest cache entries again
+                            break
+
+                for i in range(60):
+                    sleep(1)
+                    if not self._updateCacheThreadRunning:
+                        return
+                    if self._continueUpdateCacheThread:
+                        # break out of sleep loop and continue updating old cache entries
+                        self._continueUpdateCacheThread = False
+                        break
+            except Exception as e:
+                logger.error(str(e))
+
+    def _updateProjectsDiskUsageInRADB(self):
+        try:
+            projects_du_result = self.getDiskUsageForPath(self.disk_usage.path_resolver.projects_path)
+            if projects_du_result['found']:
+                #get the total used space, and update the resource availability in the radb
+                radbrpc = self.disk_usage.path_resolver.radbrpc
+                storage_resources = radbrpc.getResources(resource_types='storage', include_availability=True)
+                cep4_storage_resource = next(x for x in storage_resources if 'cep4' in x['name'])
+
+                total_capacity = cep4_storage_resource['total_capacity']
+                used_capacity = projects_du_result['disk_usage']
+                available_capacity = total_capacity - used_capacity
+
+                logger.info('updating availability capacity for %s (id=%s) to %s in the RADB',
+                            cep4_storage_resource['name'],
+                            cep4_storage_resource['id'],
+                            humanreadablesize(available_capacity))
+
+                radbrpc.updateResourceAvailability(cep4_storage_resource['id'], available_capacity=available_capacity)
+        except Exception as e:
+            logger.error(e)
 
     def open(self):
         self.disk_usage.open()
@@ -121,14 +195,16 @@ class CacheManager(OTDBBusListener):
         self._updateCacheThreadRunning = True
         self._updateCacheThread.start()
 
-        super(CacheManager, self).start_listening()
+        self.otdb_listener.start_listening()
+        self.cleanup_listener.start_listening()
 
     def close(self):
+        self.otdb_listener.stop_listening()
+        self.cleanup_listener.stop_listening()
         self._updateCacheThreadRunning = False
         self._updateCacheThread.join()
 
         self.disk_usage.close()
-        super(CacheManager, self).stop_listening()
 
     def __enter__(self):
         self.open()
@@ -138,13 +214,39 @@ class CacheManager(OTDBBusListener):
         self.close()
 
     def onObservationFinished(self, otdb_id, modificationTime):
-        result = self.disk_usage.getDiskUsageForOTDBId(otdb_id)
-        self._updateCache(result)
+        self._onDiskActivityForOTDBId(otdb_id)
 
     def onObservationAborted(self, otdb_id, modificationTime):
+        self._onDiskActivityForOTDBId(otdb_id)
+
+    def onTaskDeleted(self, otdb_id, paths):
+        self._onDiskActivityForOTDBId(otdb_id)
+
+    def _onDiskActivityForOTDBId(self, otdb_id):
         result = self.disk_usage.getDiskUsageForOTDBId(otdb_id)
         self._updateCache(result)
 
+        task_path = result.get('path')
+        projects_path = self.disk_usage.path_resolver.projects_path
+
+        # update all paths up the tree up to the projects_path
+        # update the resource availability in the radb as well
+        path = task_path
+        while path:
+            parent_path = '/'.join(path.split('/')[:-1])
+
+            if projects_path.startswith(parent_path) and len(parent_path) < len(projects_path):
+                break
+
+            logger.info('invalidating cache entry for %s because disk usage for task %s in %s changed', parent_path, otdb_id, task_path)
+
+            self._invalidateCacheEntryForPath(parent_path)
+
+            path = parent_path
+
+        # trigger update cache thread
+        self._continueUpdateCacheThread = True
+
     def getDiskUsageForOTDBId(self, otdb_id):
         return self.getDiskUsageForTask(otdb_id=otdb_id)
 
@@ -166,9 +268,12 @@ class CacheManager(OTDBBusListener):
         logger.info("cache.getDiskUsageForPath(%s)", path)
         needs_cache_update = False
         with self._cacheLock:
-            needs_cache_update = path not in self._cache['paths']
+            needs_cache_update |= path not in self._cache['paths']
+            if path in self._cache['paths']:
+                needs_cache_update |= datetime.datetime.utcnow() - self._cache['paths'][path]['cache_timestamp'] > MAX_CACHE_ENTRY_AGE
 
         if needs_cache_update:
+            logger.info("cache update needed for %s", path)
             result = du_getDiskUsageForPath(path)
             self._updateCache(result)
 
-- 
GitLab