diff --git a/.gitattributes b/.gitattributes index ca4f85ece13bbc039c652d31c2d25e6bde0d6dfe..f2b0632a5e5103e97beb8092b18438e3f4347089 100644 --- a/.gitattributes +++ b/.gitattributes @@ -4950,10 +4950,11 @@ SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/bin/CMakeLists.txt -t SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/bin/rotspservice -text SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/bin/rotspservice.ini -text SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/CMakeLists.txt -text +SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/RABusListener.py -text SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/__init__.py -text SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/config.py -text +SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py -text SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py -text -SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py -text SAS/ResourceAssignment/ResourceAssigner/CMakeLists.txt -text SAS/ResourceAssignment/ResourceAssigner/bin/CMakeLists.txt -text SAS/ResourceAssignment/ResourceAssigner/bin/resourceassigner -text diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/CMakeLists.txt b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/CMakeLists.txt index 658a170041262a2c9dcd476386390870c97aba0c..bb148abf7e0f5f79eeb8b39bce94c7cac2c3e435 100644 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/CMakeLists.txt +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/CMakeLists.txt @@ -2,8 +2,8 @@ python_install( __init__.py - raservice.py - assignment.py + rotspservice.py + translator.py config.py - DESTINATION lofar/sas/resourceassignment/resourceassigner) + DESTINATION lofar/sas/resourceassignment/RAtoOTDBTaskSpecificationPropagator) diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/RABusListener.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/RABusListener.py new file mode 100644 index 0000000000000000000000000000000000000000..765dbc93caf6b927465e856a7acc81531d7bdbd4 --- /dev/null +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/RABusListener.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python + +# RABusListener.py: RABusListener listens on the lofar ra message bus and calls (empty) on<SomeMessage> methods when such a message is received. +# +# Copyright (C) 2015 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# $Id: RABusListener.py 33534 2016-02-08 14:28:26Z schaap $ + +""" +RABusListener listens on the lofar ra message bus and calls (empty) on<SomeMessage> methods when such a message is received. +Typical usage is to derive your own subclass from RABusListener and implement the specific on<SomeMessage> methods that you are interested in. +""" + +from lofar.messaging.messagebus import AbstractBusListener +from .config import DEFAULT_NOTIFICATION_BUSNAME, RATASKSCHEDULED_NOTIFICATIONNAME + +import qpid.messaging +import logging +from datetime import datetime + +logger = logging.getLogger(__name__) + + +class RATaskScheduledBusListener(AbstractBusListener): + def __init__(self, busname=DEFAULT_NOTIFICATION_BUSNAME, subject=RATASKSCHEDULED_NOTIFICATIONNAME, broker=None, **kwargs): + """ + RATaskScheduledBusListener listens on the lofar ra message bus and calls (empty) on<SomeMessage> methods when such a message is received. + Typical usage is to derive your own subclass from RATaskScheduledBusListener and implement the specific on<SomeMessage> methods that you are interested in. + :param address: valid Qpid address (default: lofar.ra.notification) + :param broker: valid Qpid broker host (default: None, which means localhost) + additional parameters in kwargs: + options= <dict> Dictionary of options passed to QPID + exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False) + numthreads= <int> Number of parallel threads processing messages (default: 1) + verbose= <bool> Output extra logging over stdout (default: False) + """ + address = "%s/%s" % (busname, subject) + super(RATaskScheduledBusListener, self).__init__(address, broker, **kwargs) + + def _handleMessage(self, msg): + logger.debug("RABusListener.handleMessage: %s" %str(msg)) + + otdbId = msg.content['otdbID'] + momId = msg.content['momID'] + modificationTime = msg.content['time_of_change'].datetime() + + self.onTaskScheduled(otdbId, momId, modificationTime) + + def onTaskScheduled(self, otdbId, momId, modificationTime): + pass + +## def onTaskConflict(self, otdbId, momId, modificationTime): +## pass + + +__all__ = ["RATaskScheduledBusListener"] diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/config.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/config.py index d66e897267d2253b9f39465787b31d7fca61bdad..269359f0440aa869e365a86ef203840dabd4f6ef 100644 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/config.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/config.py @@ -5,8 +5,9 @@ DEFAULT_BUSNAME = 'lofar.ra.command' DEFAULT_SERVICENAME = 'RAService' try: - from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_NOTIFICATION_BUSNAME as RATASKSPECIFIED_NOTIFICATION_BUSNAME - from lofar.sas.resourceassignment.rataskspecified.config import RATASKSPECIFIED_NOTIFICATIONNAME as RATASKSPECIFIED_NOTIFICATIONNAME + from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_NOTIFICATION_BUSNAME as RATASKSCHEDULED_NOTIFICATION_BUSNAME + from lofar.sas.resourceassignment.resourceassigner.config import RATASKSCHEDULED_NOTIFICATIONNAME as RATASKSCHEDULED_NOTIFICATIONNAME except ImportError: - RATASKSPECIFIED_NOTIFICATION_BUSNAME = 'lofar.ra.notification' - RATASKSPECIFIED_NOTIFICATIONNAME = 'OTDB.TaskSpecified' + RATASKSCHEDULED_NOTIFICATION_BUSNAME = 'lofar.ra.notification' + RATASKSCHEDULED_NOTIFICATIONNAME = 'RA.TaskScheduled' +# RATASKCONFLICT_NOTIFICATIONNAME = 'RA.TaskConflict' diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py similarity index 78% rename from SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py rename to SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py index dc6e1b61a754dc1c842ddf7de0279d577e408032..0295ca8ba6dcbbf595627a565e3f5cbd547e67fc 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py @@ -35,45 +35,37 @@ from lofar.parameterset import parameterset from lofar.sas.resourceassignment.rotspservice.rpc import RARPC from lofar.sas.resourceassignment.rotspservice.config import DEFAULT_BUSNAME as RADB_BUSNAME from lofar.sas.resourceassignment.rotspservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME -#from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME -#from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME +from lofar.sas.otdb.otdbservice.config import DEFAULT_BUSNAME as OTDB_BUSNAME +from lofar.sas.otdb.otdbservice.config import DEFAULT_SERVICENAME as OTDB_SERVICENAME logger = logging.getLogger(__name__) -class ResourceAssigner(): + +class RAtoOTDBTranslator(): def __init__(self, radb_busname=RADB_BUSNAME, radb_servicename=RADB_SERVICENAME, radb_broker=None, - re_busname=RE_BUSNAME, - re_servicename=RE_SERVICENAME, - re_broker=None, - ssdb_busname='lofar.system', - ssdb_servicename='SSDBService', - ssdb_broker=None, + otdb_busname=OTDB_BUSNAME, + otdb_servicename=OTDB_SERVICENAME, + otdb_broker=None, broker=None): """ - ResourceAssigner inserts/updates tasks in the radb and assigns resources to it based on incoming parset. + RAtoOTDBTranslator inserts/updates tasks in the radb and assigns resources to it based on incoming parset. :param radb_busname: busname on which the radb service listens (default: lofar.ra.command) :param radb_servicename: servicename of the radb service (default: RADBService) :param radb_broker: valid Qpid broker host (default: None, which means localhost) - :param re_busname: busname on which the resource estimator service listens (default: lofar.ra.command) - :param re_servicename: servicename of the resource estimator service (default: ResourceEstimation) - :param re_broker: valid Qpid broker host (default: None, which means localhost) - :param ssdb_busname: busname on which the ssdb service listens (default: lofar.system) - :param ssdb_servicename: servicename of the radb service (default: SSDBService) - :param ssdb_broker: valid Qpid broker host (default: None, which means localhost) - :param broker: if specified, overrules radb_broker, re_broker and ssdb_broker. Valid Qpid broker host (default: None, which means localhost) + :param otdb_busname: busname on which the OTDB service listens (default: lofar.otdb.command) + :param otdb_servicename: servicename of the OTDB service (default: OTDBService) + :param otdb_broker: valid Qpid broker host (default: None, which means localhost) + :param broker: if specified, overrules radb_broker and otdb_broker. Valid Qpid broker host (default: None, which means localhost) """ if broker: radb_broker = broker - re_broker = broker - ssdb_broker = broker + otdb_broker = broker self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=radb_broker) - self.rerpc = RPC(re_servicename, busname=re_busname, broker=re_broker, ForwardExceptions=True) - self.ssdbGetActiveGroupNames = RPC(ssdb_servicename+'.GetActiveGroupNames', busname=ssdb_busname, broker=ssdb_broker, ForwardExceptions=True) - self.ssdbGetHostForGID = RPC(ssdb_servicename+'.GetHostForGID', busname=ssdb_busname, broker=ssdb_broker, ForwardExceptions=True) + self.otdbrpc = RPC(otdb_servicename, busname=otdb_busname, broker=otdb_broker, ForwardExceptions=True) def __enter__(self): """Internal use only. (handles scope 'with')""" @@ -87,19 +79,15 @@ class ResourceAssigner(): def open(self): """Open rpc connections to radb service and resource estimator service""" self.radbrpc.open() - self.rerpc.open() - self.ssdbGetActiveGroupNames.open() - self.ssdbGetHostForGID.open() + self.otdbrpc.open() def close(self): """Close rpc connections to radb service and resource estimator service""" self.radbrpc.close() - self.rerpc.close() - self.ssdbGetActiveGroupNames.close() - self.ssdbGetHostForGID.close() + self.otdbrpc.close() - def doAssignment(self, sasId, parsets, status='prescheduled'): - logger.info('doAssignment: sasId=%s parset=%s' % (sasId, parsets)) + def doTranslation(self, otdbId, momId, status='scheduled'): + logger.info('doTranslation: otdbId=%s momId=%s' % (sasId, momId)) #parse main parset... mainParsetDict = parsets[str(sasId)] diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py index c57d89e33bc5786f8200ad2006777fdea553e796..1eade68326857a346d8ab1390508fc483b507c6d 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py @@ -23,8 +23,8 @@ # $Id: rotspservice.py 1580 2015-09-30 14:18:57Z loose $ """ -TaskScheduledListener listens to a bus on which scheduled tasks get published. It will then try -to propagate the changes to OTDB. +TaskScheduledListener listens to a bus on which tasks handled by the ResourceAssigner get published. +It will then try to propagate the changes to OTDB as Scheduled or Conflict. """ import qpid.messaging @@ -32,17 +32,17 @@ import logging from datetime import datetime import time -from lofar.sas.resourceassignment.rataskscheduled.RABusListener import RATaskScheduledBusListener +from lofar.sas.resourceassignment.RAtoOTDBTaskSpecificationPropagator.RABusListener import RATaskScheduledBusListener from lofar.messaging.RPC import RPC, RPCException -import lofar.sas.resourceassignment.resourceassignmentservice.rpc as rarpc +import lofar.sas.resourceassignment.resourceassignmentservice.rpc as rarpc ## RA DB from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME from lofar.sas.resourceassignment.resourceassigner.config import RATASKSCHEDULED_NOTIFICATION_BUSNAME, RATASKSCHEDULED_NOTIFICATIONNAME -from lofar.sas.resourceassignment.resourceassigner.assignment import ResourceAssigner +from lofar.sas.resourceassignment.RAtoOTDBTaskSpecificationPropagator.translator import RAtoOTDBTranslator logger = logging.getLogger(__name__) -class ScheduledTaskListener(RATaskScheduledBusListener): +class RATaskScheduledListener(RATaskScheduledBusListener): def __init__(self, busname=RATASKSCHEDULED_NOTIFICATION_BUSNAME, subject=RATASKSCHEDULED_NOTIFICATIONNAME, @@ -50,8 +50,8 @@ class ScheduledTaskListener(RATaskScheduledBusListener): assigner=None, **kwargs): """ - SpecifiedTaskListener listens on the lofar ?? bus and calls onTaskSpecified - :param busname: valid Qpid address (default: lofar.otdb.status) + RATaskScheduledListener listens on the lofar ?? bus and calls onTaskScheduled + :param busname: valid Qpid address (default: lofar.ra.notification) :param broker: valid Qpid broker host (default: None, which means localhost) additional parameters in kwargs: options= <dict> Dictionary of options passed to QPID @@ -59,18 +59,24 @@ class ScheduledTaskListener(RATaskScheduledBusListener): numthreads= <int> Number of parallel threads processing messages (default: 1) verbose= <bool> Output extra logging over stdout (default: False) """ - super(SpecifiedTaskListener, self).__init__(busname=busname, subject=subject, broker=broker, **kwargs) + super(RATaskScheduledListener, self).__init__(busname=busname, subject=subject, broker=broker, **kwargs) - self.assigner = assigner - if not self.assigner: - self.assigner = ResourceAssigner() + self.translator = translator + if not self.translator: + self.translator = RAtoOTDBTranslator() - def onTaskSpecified(self, sasId, modificationTime, resourceIndicators): - logger.info('onTaskSpecified: sasId=%s' % sasId) + def onTaskScheduled(self, otdbId, momId, modificationTime): + logger.info('onTaskScheduled: otdbId=%s' % otdbId) - self.assigner.doAssignment(sasId, resourceIndicators, 'prescheduled') + self.translator.doTranslation(otdbId, momId, 'scheduled') -__all__ = ["SpecifiedTaskListener"] +# def onTaskConflict(self, otdbId, momId, modificationTime): +# logger.info('onTaskConflict: otdbId=%s' % otdbId) +# +# self.translator.doTranslation(otdbId, momId, 'conflict') + + +__all__ = ["TaskScheduledListener"] def main(): from optparse import OptionParser @@ -79,10 +85,8 @@ def main(): from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME - from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME - from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME - SSDB_BUSNAME = 'lofar.system' #TODO, import from future ssdb config - SSDB_SERVICENAME = 'SSDBService' #TODO, import from future ssdb config + from lofar.sas.resourceassignment.otdbservice.config import DEFAULT_BUSNAME as OTDB_BUSNAME + from lofar.sas.resourceassignment.otdb.config import DEFAULT_SERVICENAME as OTDB_SERVICENAME # Check the invocation arguments parser = OptionParser("%prog [options]", @@ -90,14 +94,12 @@ def main(): parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME) parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: %s" % DEFAULT_SERVICENAME) - parser.add_option("--notification_busname", dest="notification_busname", type="string", default=RATASKSPECIFIED_NOTIFICATION_BUSNAME, help="Name of the notification bus on which taskspecified messages are published, default: %s" % RATASKSPECIFIED_NOTIFICATION_BUSNAME) - parser.add_option("--notification_subject", dest="notification_subject", type="string", default=RATASKSPECIFIED_NOTIFICATIONNAME, help="Subject of the published taskspecified messages to listen for, default: %s" % RATASKSPECIFIED_NOTIFICATIONNAME) - parser.add_option("--radb_busname", dest="radb_busname", type="string", default=RADB_BUSNAME, help="Name of the bus on which the radb service listens, default: %s" % RADB_BUSNAME) - parser.add_option("--radb_servicename", dest="radb_servicename", type="string", default=RADB_SERVICENAME, help="Name of the radb service, default: %s" % RADB_SERVICENAME) - parser.add_option("--re_busname", dest="re_busname", type="string", default=RE_BUSNAME, help="Name of the bus on which the resource estimator service listens, default: %s" % RE_BUSNAME) - parser.add_option("--re_servicename", dest="re_servicename", type="string", default=RE_SERVICENAME, help="Name of the resource estimator service, default: %s" % RE_SERVICENAME) - parser.add_option("--ssdb_busname", dest="ssdb_busname", type="string", default=SSDB_BUSNAME, help="Name of the bus on which the ssdb service listens, default: %s" % SSDB_BUSNAME) - parser.add_option("--ssdb_servicename", dest="ssdb_servicename", type="string", default=SSDB_SERVICENAME, help="Name of the ssdb service, default: %s" % SSDB_SERVICENAME) + parser.add_option("--notification_busname", dest="notification_busname", type="string", default=RATASKSCHEDULED_NOTIFICATION_BUSNAME, help="Name of the notification bus on which taskscheduled messages are published, default: %s" % RATASKSCHEDULED_NOTIFICATION_BUSNAME) + parser.add_option("--notification_subject", dest="notification_subject", type="string", default=RATASKSCHEDULED_NOTIFICATIONNAME, help="Subject of the published taskscheduled messages to listen for, default: %s" % RATASKSCHEDULED_NOTIFICATIONNAME) + parser.add_option("--radb_busname", dest="radb_busname", type="string", default=RADB_BUSNAME, help="Name of the bus on which the RADB service listens, default: %s" % RADB_BUSNAME) + parser.add_option("--radb_servicename", dest="radb_servicename", type="string", default=RADB_SERVICENAME, help="Name of the RADB service, default: %s" % RADB_SERVICENAME) + parser.add_option("--otdb_busname", dest="otdb_busname", type="string", default=OTDB_BUSNAME, help="Name of the bus on which the OTDB service listens, default: %s" % OTDB_BUSNAME) + parser.add_option("--otdb_servicename", dest="otdb_servicename", type="string", default=OTDB_SERVICENAME, help="Name of the OTDB service, default: %s" % OTDB_SERVICENAME) parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() @@ -105,17 +107,15 @@ def main(): logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG if options.verbose else logging.INFO) - with ResourceAssigner(radb_busname=options.radb_busname, - radb_servicename=options.radb_servicename, - re_busname=options.re_busname, - re_servicename=options.re_servicename, - ssdb_busname=options.ssdb_busname, - ssdb_servicename=options.ssdb_servicename, - broker=options.broker) as assigner: - with SpecifiedTaskListener(busname=options.notification_busname, + with RAtoOTDBTranslator(radb_busname=options.radb_busname, + radb_servicename=options.radb_servicename, + otdb_busname=options.otdb_busname, + otdb_servicename=options.otdb_servicename, + broker=options.broker) as translator: + with TaskScheduledListener(busname=options.notification_busname, subject=options.notification_subject, broker=options.broker, - assigner=assigner) as listener: + translator=translator) as listener: waitForInterrupt() if __name__ == '__main__':