#!/usr/bin/env python # ChangesHandler.py # # Copyright (C) 2015 # ASTRON (Netherlands Institute for Radio Astronomy) # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it # and/or modify it under the terms of the GNU General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # # $Id: RADBChangesHandler.py 1580 2015-09-30 14:18:57Z loose $ """ RADBChangesHandler listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. Typical usage is to derive your own subclass from RADBChangesHandler and implement the specific on<SomeMessage> methods that you are interested in. """ from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_BUSNAME as RADB_NOTIFICATION_BUSNAME from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_SUBJECTS as RADB_NOTIFICATION_SUBJECTS from lofar.sas.resourceassignment.database.radbbuslistener import RADBBusListener from lofar.sas.datamanagement.common.config import DEFAULT_DM_NOTIFICATION_BUSNAME, DEFAULT_DM_NOTIFICATION_SUBJECTS from lofar.sas.datamanagement.common.datamanagementbuslistener import DataManagementBusListener from lofar.common.util import humanreadablesize from lofar.common.util import waitForInterrupt from lofar.sas.resourceassignment.resourceassignmenteditor.mom import updateTaskMomDetails import qpid.messaging import logging from datetime import datetime, timedelta from threading import Lock, Condition logger = logging.getLogger(__name__) CHANGE_UPDATE_TYPE = 'update' CHANGE_INSERT_TYPE = 'insert' CHANGE_DELETE_TYPE = 'delete' class ChangesHandler: def __init__(self, radb_busname=RADB_NOTIFICATION_BUSNAME, radb_subjects=RADB_NOTIFICATION_SUBJECTS, dm_busname=DEFAULT_DM_NOTIFICATION_BUSNAME, dm_subjects=DEFAULT_DM_NOTIFICATION_SUBJECTS, broker=None, momqueryrpc=None, radbrpc=None, sqrpc=None, **kwargs): """ ChangesHandler listens on the lofar notification message bus and keeps track of all the change notifications. :param broker: valid Qpid broker host (default: None, which means localhost) additional parameters in kwargs: options= <dict> Dictionary of options passed to QPID exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False) numthreads= <int> Number of parallel threads processing messages (default: 1) verbose= <bool> Output extra logging over stdout (default: False) """ self._radb_listener = RADBBusListener(busname=radb_busname, subjects=radb_subjects, broker=broker, **kwargs) self._radb_listener.onTaskUpdated = self.onTaskUpdated self._radb_listener.onTaskInserted = self.onTaskInserted self._radb_listener.onTaskDeleted = self.onTaskDeleted self._radb_listener.onResourceClaimUpdated = self.onResourceClaimUpdated self._radb_listener.onResourceClaimInserted = self.onResourceClaimInserted self._radb_listener.onResourceClaimDeleted = self.onResourceClaimDeleted self._radb_listener.onResourceAvailabilityUpdated = self.onResourceAvailabilityUpdated self._radb_listener.onResourceCapacityUpdated = self.onResourceCapacityUpdated self._dm_listener = DataManagementBusListener(busname=dm_busname, subjects=dm_subjects, broker=broker, **kwargs) self._dm_listener.onDiskUsageChanged = self.onDiskUsageChanged self._dm_listener.onTaskDeleted = self.onTaskDeletedFromDisk self._changes = [] self._lock = Lock() self._changedCondition = Condition() self._changeNumber = 0L self._momqueryrpc = momqueryrpc self._radbrpc = radbrpc def __enter__(self): self.start_listening() return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop_listening() def start_listening(self): self._radb_listener.start_listening() self._dm_listener.start_listening() def stop_listening(self): self._radb_listener.stop_listening() self._dm_listener.stop_listening() def _handleChange(self, change): '''_handleChange appends a change in the changes list and calls the onChangedCallback. :param change: dictionary with the change''' with self._lock: change['timestamp'] = datetime.utcnow().isoformat() self._changeNumber += 1 change['changeNumber'] = self._changeNumber self._changes.append(change) self.clearChangesBefore(datetime.utcnow()-timedelta(minutes=5)) with self._changedCondition: self._changedCondition.notifyAll() def onTaskUpdated(self, updated_task): '''onTaskUpdated is called upon receiving a TaskUpdated message.''' updated_task['starttime'] = updated_task['starttime'].datetime() updated_task['endtime'] = updated_task['endtime'].datetime() task_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'task', 'value':updated_task} self._handleChange(task_change) def onTaskInserted(self, new_task): '''onTaskInserted is called upon receiving a TaskInserted message. :param new_task: dictionary with the inserted task''' new_task['starttime'] = new_task['starttime'].datetime() new_task['endtime'] = new_task['endtime'].datetime() updateTaskMomDetails(new_task, self._momqueryrpc) task_change = {'changeType':CHANGE_INSERT_TYPE, 'objectType':'task', 'value':new_task} self._handleChange(task_change) def onTaskDeleted(self, old_task_id): '''onTaskDeleted is called upon receiving a TaskDeleted message. :param old_task_id: id of the deleted task''' task_change = {'changeType':CHANGE_DELETE_TYPE, 'objectType':'task', 'value':{'id':old_task_id}} self._handleChange(task_change) def onResourceClaimUpdated(self, updated_claim): '''onResourceClaimUpdated is called upon receiving a ResourceClaimUpdated message. :param updated_claim: dictionary with the updated claim''' updated_claim['starttime'] = updated_claim['starttime'].datetime() updated_claim['endtime'] = updated_claim['endtime'].datetime() claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceClaim', 'value':updated_claim} self._handleChange(claim_change) def onResourceClaimInserted(self, new_claim): '''onResourceClaimInserted is called upon receiving a ResourceClaimInserted message. :param new_claim: dictionary with the inserted claim''' new_claim['starttime'] = new_claim['starttime'].datetime() new_claim['endtime'] = new_claim['endtime'].datetime() claim_change = {'changeType':CHANGE_INSERT_TYPE, 'objectType':'resourceClaim', 'value':new_claim} self._handleChange(claim_change) def onResourceClaimDeleted(self, old_claim_id): '''onResourceClaimDeleted is called upon receiving a ResourceClaimDeleted message. :param old_claim_id: id of the deleted claim''' claim_change = {'changeType':CHANGE_DELETE_TYPE, 'objectType':'resourceClaim', 'value':{'id': old_claim_id}} self._handleChange(claim_change) def onResourceAvailabilityUpdated(self, old_availability, updated_availability): claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceAvailability', 'value':updated_availability} self._handleChange(claim_change) def onResourceCapacityUpdated(self, old_capacity, updated_capacity): claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceCapacity', 'value':updated_capacity} self._handleChange(claim_change) def _handleDiskUsageChange(self, disk_usage, otdb_id): if otdb_id != None: task = self._radbrpc.getTask(otdb_id=otdb_id) if task: du_readable = humanreadablesize(disk_usage) logger.info('disk_usage change: otdb_id %s radb_id %s disk_usage %s %s', otdb_id, task['id'], disk_usage, du_readable) task['disk_usage'] = disk_usage task['disk_usage_readable'] = du_readable task_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'task', 'value':task} self._handleChange(task_change) def onDiskUsageChanged(self, path, disk_usage, otdb_id): self._handleDiskUsageChange(disk_usage, otdb_id) def onTaskDeletedFromDisk(self, otdb_id, deleted, paths, message=''): self._handleDiskUsageChange(0, otdb_id) def getMostRecentChangeNumber(self): with self._lock: if self._changes: return self._changes[-1]['changeNumber'] return -1L def clearChangesBefore(self, timestamp): if isinstance(timestamp, datetime): timestamp = timestamp.isoformat() with self._lock: self._changes = [x for x in self._changes if x['timestamp'] >= timestamp] def getChangesSince(self, changeNumber): with self._changedCondition: while True: with self._lock: changesSince = [x for x in self._changes if x['changeNumber'] > changeNumber] if changesSince: return changesSince self._changedCondition.wait()