-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
rotspservice.py 6.96 KiB
#!/usr/bin/env python
# rotspservice.py: RAtoOTDBTaskSpecificationPropagator listens on the lofar ?? bus and calls onTaskScheduled
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: rotspservice.py 1580 2015-09-30 14:18:57Z loose $
"""
RATaskStatusChangedListener listens to a bus on which tasks handled by the ResourceAssigner get published.
It will then try to propagate the changes to OTDB as Scheduled or Conflict.
"""
import qpid.messaging
import logging
from datetime import datetime
import time
from lofar.messaging.RPC import RPC, RPCException
import lofar.sas.resourceassignment.resourceassignmentservice.rpc as rarpc ## RA DB
from lofar.sas.resourceassignment.database.radbbuslistener import RADBBusListener
from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_BUSNAME as RA_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.database.config import DEFAULT_NOTIFICATION_PREFIX as RA_NOTIFICATION_PREFIX
from lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.propagator import RAtoOTDBPropagator
logger = logging.getLogger(__name__)
class RATaskStatusChangedListener(RADBBusListener):
def __init__(self,
busname=RA_NOTIFICATION_BUSNAME,
subject=RA_NOTIFICATION_PREFIX + 'TaskUpdated',
broker=None,
propagator=None, ## TODO also give translator?
**kwargs):
"""
RATaskScheduledListener listens on the lofar ?? bus and calls onTaskScheduled or onTaskConclict
:param busname: valid Qpid address (default: lofar.ra.notification)
:param broker: valid Qpid broker host (default: None, which means localhost)
additional parameters in kwargs:
options= <dict> Dictionary of options passed to QPID
exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False)
numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False)
"""
super(RATaskStatusChangedListener, self).__init__(busname=busname, subjects=subject, broker=broker, **kwargs)
self.propagator = propagator
if not self.propagator:
self.propagator = RAtoOTDBPropagator()
def onTaskUpdated(self, old_task, new_task):
# override super onTaskUpdated
# check for status change, and call either onTaskScheduled or onTaskScheduled
if old_task['status_id'] != new_task['status_id']:
if new_task['status'] == 'scheduled':
self.onTaskScheduled(new_task['id'], new_task['otdb_id'], new_task['mom_id'])
elif new_task['status'] == 'conflict':
self.onTaskConflict(new_task['id'], new_task['otdb_id'], new_task['mom_id'])
def onTaskScheduled(self, ra_id, otdb_id, mom_id):
logger.info('onTaskScheduled: ra_id=%s otdb_id=%s mom_id=%s' % (ra_id, otdb_id, mom_id))
self.propagator.doTaskScheduled(ra_id, otdb_id, mom_id)
def onTaskConflict(self, ra_id, otdb_id, mom_id):
logger.info('onTaskConflict: ra_id=%s otdb_id=%s mom_id=%s' % (ra_id, otdb_id, mom_id))
self.propagator.doTaskConflict(ra_id, otdb_id, mom_id)
__all__ = ["RATaskStatusChangedListener"]
def main():
from optparse import OptionParser
from lofar.messaging import setQpidLogLevel
from lofar.common.util import waitForInterrupt
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
# Check the invocation arguments
parser = OptionParser("%prog [options]",
description='runs the RAtoOTDBTaskSpecificationPropagator service')
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option("--notification_busname", dest="notification_busname", type="string", default=RA_NOTIFICATION_BUSNAME, help="Name of the notification bus on which messages are published, default: %default")
parser.add_option("--notification_subject", dest="notification_subject", type="string", default=RA_NOTIFICATION_PREFIX+'TaskUpdated', help="Subject of the published messages to listen for, default: %default")
parser.add_option("--radb_busname", dest="radb_busname", type="string", default=RADB_BUSNAME, help="Name of the bus on which the RADB service listens, default: %default")
parser.add_option("--radb_servicename", dest="radb_servicename", type="string", default=RADB_SERVICENAME, help="Name of the RADB service, default: %default")
parser.add_option("--otdb_busname", dest="otdb_busname", type="string", default=DEFAULT_OTDB_SERVICE_BUSNAME, help="Name of the bus on which the OTDB service listens, default: %default")
parser.add_option("--otdb_servicename", dest="otdb_servicename", type="string", default=DEFAULT_OTDB_SERVICENAME, help="Name of the OTDB service, default: %default")
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
(options, args) = parser.parse_args()
setQpidLogLevel(logging.INFO)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
with RAtoOTDBPropagator(radb_busname=options.radb_busname,
radb_servicename=options.radb_servicename,
otdb_busname=options.otdb_busname,
otdb_servicename=options.otdb_servicename,
broker=options.broker) as propagator:
with RATaskStatusChangedListener(busname=options.notification_busname,
subject=options.notification_subject,
broker=options.broker,
propagator=propagator) as listener:
waitForInterrupt()
if __name__ == '__main__':
main()