Skip to content
Snippets Groups Projects
Select Git revision
  • 70765073e1c0a28bf38a48784fbfd11851f9a2f5
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

radbchangeshandler.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    radbchangeshandler.py 6.87 KiB
    #!/usr/bin/env python
    
    # RADBChangesHandler.py
    #
    # Copyright (C) 2015
    # ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it
    # and/or modify it under the terms of the GNU General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be
    # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    #
    # $Id: RADBChangesHandler.py 1580 2015-09-30 14:18:57Z loose $
    
    """
    RADBChangesHandler listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received.
    Typical usage is to derive your own subclass from RADBChangesHandler and implement the specific on<SomeMessage> methods that you are interested in.
    """
    
    from lofar.sas.resourceassignment.database.config import DEFAULT_BUSNAME
    from lofar.sas.resourceassignment.resourceassignmentservice.radbbuslistener import RADBBusListener
    from lofar.common.util import waitForInterrupt
    from lofar.mom.momqueryservice.momqueryrpc import MoMRPC
    from lofar.mom.momqueryservice.config import DEFAULT_BUSNAME as DEFAULT_MOM_BUSNAME
    from lofar.mom.momqueryservice.config import DEFAULT_SERVICENAME as DEFAULT_MOM_SERVICENAME
    from lofar.sas.resourceassignment.resourceassignmenteditor.mom import updateTaskMomDetails
    
    import qpid.messaging
    import logging
    from datetime import datetime, timedelta
    from threading import Lock, Condition
    
    logger = logging.getLogger(__name__)
    
    CHANGE_UPDATE_TYPE = 'update'
    CHANGE_INSERT_TYPE = 'insert'
    CHANGE_DELETE_TYPE = 'delete'
    
    class RADBChangesHandler(RADBBusListener):
        def __init__(self, busname=DEFAULT_BUSNAME, subject='RADB.*', broker=None, momrpc=None, **kwargs):
            """
            RADBChangesHandler listens on the lofar notification message bus and keeps track of all the change notifications.
            :param busname: valid Qpid address (default: lofar.ra.notification)
            :param broker: valid Qpid broker host (default: None, which means localhost)
            additional parameters in kwargs:
                options=   <dict>  Dictionary of options passed to QPID
                exclusive= <bool>  Create an exclusive binding so no other services can consume duplicate messages (default: False)
                numthreads= <int>  Number of parallel threads processing messages (default: 1)
                verbose=   <bool>  Output extra logging over stdout (default: False)
            """
            address = "%s/%s" % (busname, subject)
            super(RADBChangesHandler, self).__init__(busname=busname, subject=subject, broker=broker, **kwargs)
    
            self._changes = []
            self._lock = Lock()
            self._changedCondition = Condition()
            self._changeNumber = 0L
            self._momrpc = momrpc
    
        def _handleChange(self, change):
            '''_handleChange appends a change in the changes list and calls the onChangedCallback.
            :param change: dictionary with the change'''
            with self._lock:
                change['timestamp'] = datetime.utcnow().isoformat()
                self._changeNumber += 1
                change['changeNumber'] = self._changeNumber
                self._changes.append(change)
    
            self.clearChangesBefore(datetime.utcnow()-timedelta(minutes=5))
    
            with self._changedCondition:
                self._changedCondition.notifyAll()
    
        def onTaskUpdated(self, task):
            '''onTaskUpdated is called upon receiving a TaskUpdated message.
            :param task: dictionary with the updated task'''
            task['starttime'] = task['starttime'].datetime()
            task['endtime'] = task['endtime'].datetime()
            updateTaskMomDetails(task, self._momrpc)
            task_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'task', 'value':task}
            self._handleChange(task_change)
    
        def onTaskInserted(self, task):
            '''onTaskInserted is called upon receiving a TaskInserted message.
            :param task: dictionary with the inserted task'''
            task['starttime'] = task['starttime'].datetime()
            task['endtime'] = task['endtime'].datetime()
            updateTaskMomDetails(task, self._momrpc)
            task_change = {'changeType':CHANGE_INSERT_TYPE, 'objectType':'task', 'value':task}
            self._handleChange(task_change)
    
        def onTaskDeleted(self, task):
            '''onTaskDeleted is called upon receiving a TaskDeleted message.
            :param task: dictionary with the deleted task'''
            task_change = {'changeType':CHANGE_DELETE_TYPE, 'objectType':'task', 'value':task}
            self._handleChange(task_change)
    
        def onResourceClaimUpdated(self, claim):
            '''onResourceClaimUpdated is called upon receiving a ResourceClaimUpdated message.
            :param task: dictionary with the updated claim'''
            claim['starttime'] = claim['starttime'].datetime()
            claim['endtime'] = claim['endtime'].datetime()
            claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceClaim', 'value':claim}
            self._handleChange(claim_change)
    
        def onResourceClaimInserted(self, claim):
            '''onResourceClaimInserted is called upon receiving a ResourceClaimInserted message.
            :param claim: dictionary with the inserted claim'''
            claim['starttime'] = claim['starttime'].datetime()
            claim['endtime'] = claim['endtime'].datetime()
            claim_change = {'changeType':CHANGE_INSERT_TYPE, 'objectType':'resourceClaim', 'value':claim}
            self._handleChange(claim_change)
    
        def onResourceClaimDeleted(self, claim):
            '''onResourceClaimDeleted is called upon receiving a ResourceClaimDeleted message.
            :param claim: dictionary with the deleted claim'''
            claim_change = {'changeType':CHANGE_DELETE_TYPE, 'objectType':'resourceClaim', 'value':claim}
            self._handleChange(claim_change)
    
        def getMostRecentChangeNumber(self):
            with self._lock:
                if self._changes:
                    return self._changes[-1]['changeNumber']
            return -1L
    
        def clearChangesBefore(self, timestamp):
            if isinstance(timestamp, datetime):
                timestamp = timestamp.isoformat()
    
            with self._lock:
                self._changes = [x for x in self._changes if x['timestamp'] >= timestamp]
    
        def getChangesSince(self, changeNumber):
            with self._changedCondition:
                while True:
                    with self._lock:
                        changesSince = [x for x in self._changes if x['changeNumber'] > changeNumber]
    
                        if changesSince:
                            return changesSince
    
                    self._changedCondition.wait()