-
Jorrit Schaap authored
Task #8887: resourceassigner publishes scheduled/conflict events on bus. rotspservice reacts to these events via rabuslistener instead of radbbuslistener
Jorrit Schaap authoredTask #8887: resourceassigner publishes scheduled/conflict events on bus. rotspservice reacts to these events via rabuslistener instead of radbbuslistener
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
raservice.py 8.72 KiB
#!/usr/bin/env python
# ResourceAssigner.py: ResourceAssigner listens on the lofar ?? bus and calls onTaskSpecified
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: raservice.py 1580 2015-09-30 14:18:57Z loose $
"""
TaskSpecifiedListener listens to a bus on which specified tasks get published. It will then try
to assign resources to these tasks.
"""
import qpid.messaging
import logging
from datetime import datetime
import time
from lofar.sas.resourceassignment.rataskspecified.RABusListener import RATaskSpecifiedBusListener
from lofar.messaging.RPC import RPC, RPCException
import lofar.sas.resourceassignment.resourceassignmentservice.rpc as rarpc
from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT
from lofar.sas.resourceassignment.resourceassigner.assignment import ResourceAssigner
logger = logging.getLogger(__name__)
class SpecifiedTaskListener(RATaskSpecifiedBusListener):
def __init__(self,
busname=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME,
subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT,
broker=None,
assigner=None,
**kwargs):
"""
SpecifiedTaskListener listens on the lofar ?? bus and calls onTaskSpecified
:param busname: valid Qpid address (default: lofar.otdb.status)
:param broker: valid Qpid broker host (default: None, which means localhost)
additional parameters in kwargs:
options= <dict> Dictionary of options passed to QPID
exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False)
numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False)
"""
super(SpecifiedTaskListener, self).__init__(busname=busname, subject=subject, broker=broker, **kwargs)
self.assigner = assigner
if not self.assigner:
self.assigner = ResourceAssigner()
def onTaskSpecified(self, otdb_id, specification_tree):
logger.info('onTaskSpecified: otdb_id=%s' % otdb_id)
self.assigner.doAssignment(specification_tree)
__all__ = ["SpecifiedTaskListener"]
def main():
from optparse import OptionParser
from lofar.messaging import setQpidLogLevel
from lofar.common.util import waitForInterrupt
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.sas.systemstatus.service.config import DEFAULT_SSDB_BUSNAME
from lofar.sas.systemstatus.service.config import DEFAULT_SSDB_SERVICENAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX
# Check the invocation arguments
parser = OptionParser("%prog [options]",
description='runs the resourceassigner service')
parser.add_option('-q', '--broker', dest='broker', type='string',
default=None,
help='Address of the qpid broker, default: localhost')
parser.add_option("--notification_busname", dest="notification_busname", type="string",
default=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME,
help="Name of the notification bus on which taskspecified messages are published. [default: %default]")
parser.add_option("--notification_subject", dest="notification_subject", type="string",
default=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT,
help="Subject of the published taskspecified messages to listen for. [default: %default]")
parser.add_option("--radb_busname", dest="radb_busname", type="string",
default=RADB_BUSNAME,
help="Name of the bus on which the radb service listens. [default: %default]")
parser.add_option("--radb_servicename", dest="radb_servicename", type="string",
default=RADB_SERVICENAME,
help="Name of the radb service. [default: %default]")
parser.add_option("--re_busname", dest="re_busname", type="string",
default=RE_BUSNAME,
help="Name of the bus on which the resource estimator service listens. [default: %default]")
parser.add_option("--re_servicename", dest="re_servicename", type="string",
default=RE_SERVICENAME,
help="Name of the resource estimator service. [default: %default]")
parser.add_option("--otdb_busname", dest="otdb_busname", type="string", default=DEFAULT_OTDB_SERVICE_BUSNAME, help="Name of the bus on which the OTDB service listens, default: %default")
parser.add_option("--otdb_servicename", dest="otdb_servicename", type="string", default=DEFAULT_OTDB_SERVICENAME, help="Name of the OTDB service, default: %default")
parser.add_option("--ssdb_busname", dest="ssdb_busname", type="string",
default=DEFAULT_SSDB_BUSNAME,
help="Name of the bus on which the ssdb service listens. [default: %default]")
parser.add_option("--ssdb_servicename", dest="ssdb_servicename", type="string",
default=DEFAULT_SSDB_SERVICENAME,
help="Name of the ssdb service. [default: %default]")
parser.add_option("--ra_notification_busname", dest="ra_notification_busname", type="string",
default=DEFAULT_RA_NOTIFICATION_BUSNAME,
help="Name of the notification bus on which the resourceassigner publishes its notifications. [default: %default]")
parser.add_option("--ra_notification_prefix", dest="ra_notification_prefix", type="string",
default=DEFAULT_RA_NOTIFICATION_PREFIX,
help="Prefix for the subject of the by the resourceassigner published notification messages. [default: %default]")
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
(options, args) = parser.parse_args()
setQpidLogLevel(logging.INFO)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
with ResourceAssigner(radb_busname=options.radb_busname,
radb_servicename=options.radb_servicename,
re_busname=options.re_busname,
re_servicename=options.re_servicename,
otdb_busname=options.otdb_busname,
otdb_servicename=options.otdb_servicename,
ssdb_busname=options.ssdb_busname,
ssdb_servicename=options.ssdb_servicename,
ra_notification_busname=options.ra_notification_busname,
ra_notification_prefix=options.ra_notification_prefix,
broker=options.broker) as assigner:
with SpecifiedTaskListener(busname=options.notification_busname,
subject=options.notification_subject,
broker=options.broker,
assigner=assigner) as listener:
waitForInterrupt()
if __name__ == '__main__':
main()