Skip to content
Snippets Groups Projects
Commit 1fe3cb3f authored by Adriaan Renting's avatar Adriaan Renting
Browse files

Task #8886: started the RAtoOTDBTaskSpecificationPropagator by making a copy...

Task #8886: started the RAtoOTDBTaskSpecificationPropagator by making a copy of some resourceassigner code
parent 7ec39ba2
No related branches found
No related tags found
No related merge requests found
Showing
with 415 additions and 0 deletions
......@@ -4945,6 +4945,15 @@ SAS/OTDB_Services/test/t_TreeStatusEvents.run -text svneol=unset#application/x-s
SAS/OTDB_Services/test/t_TreeStatusEvents.sh -text svneol=unset#application/x-shellscript
SAS/OTDB_Services/test/unittest_db.dump.gz -text svneol=unset#application/x-gzip
SAS/ResourceAssignment/CMakeLists.txt -text
SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/CMakeLists.txt -text
SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/bin/CMakeLists.txt -text
SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/bin/rotspservice -text
SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/bin/rotspservice.ini -text
SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/CMakeLists.txt -text
SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/__init__.py -text
SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/config.py -text
SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotsservice.py -text
SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py -text
SAS/ResourceAssignment/ResourceAssigner/CMakeLists.txt -text
SAS/ResourceAssignment/ResourceAssigner/bin/CMakeLists.txt -text
SAS/ResourceAssignment/ResourceAssigner/bin/resourceassigner -text
......
# $Id: CMakeLists.txt 30355 2014-11-04 13:46:05Z loose $
lofar_package(ResourceAssigner 0.1 DEPENDS PyCommon)
include(PythonInstall)
set(USE_PYTHON_COMPILATION Off)
add_subdirectory(lib)
add_subdirectory(bin)
add_subdirectory(test)
# $Id: CMakeLists.txt 32341 2015-08-28 11:59:26Z schaap $
lofar_add_bin_scripts(resourceassigner)
# supervisord config files
install(FILES
resourceassigner.ini
DESTINATION etc/supervisord.d)
#!/usr/bin/python
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
'''Startup script for resourceassigner service'''
import sys
from lofar.sas.resourceassignment.resourceassigner.raservice import main
if __name__ == '__main__':
main()
[program:resourceassigner]
command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;resourceassigner'
user=lofarsys
stopsignal=INT ; KeyboardInterrupt
stopasgroup=true ; bash does not propagate signals
stdout_logfile=%(program_name)s.log
redirect_stderr=true
stderr_logfile=NONE
# $Id: CMakeLists.txt 32905 2015-11-17 15:31:54Z schaap $
python_install(
__init__.py
raservice.py
assignment.py
config.py
DESTINATION lofar/sas/resourceassignment/resourceassigner)
#!/usr/bin/python
# $Id$
DEFAULT_BUSNAME = 'lofar.ra.command'
DEFAULT_SERVICENAME = 'RAService'
try:
from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_NOTIFICATION_BUSNAME as RATASKSPECIFIED_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.rataskspecified.config import RATASKSPECIFIED_NOTIFICATIONNAME as RATASKSPECIFIED_NOTIFICATIONNAME
except ImportError:
RATASKSPECIFIED_NOTIFICATION_BUSNAME = 'lofar.ra.notification'
RATASKSPECIFIED_NOTIFICATIONNAME = 'OTDB.TaskSpecified'
#!/usr/bin/env python
# ResourceAssigner.py: ResourceAssigner listens on the lofar ?? bus and calls onTaskSpecified
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: raservice.py 1580 2015-09-30 14:18:57Z loose $
"""
TaskSpecifiedListener listens to a bus on which specified tasks get published. It will then try
to assign resources to these tasks.
"""
import qpid.messaging
import logging
from datetime import datetime
import time
from lofar.sas.resourceassignment.rataskspecified.RABusListener import RATaskSpecifiedBusListener
from lofar.messaging.RPC import RPC, RPCException
import lofar.sas.resourceassignment.resourceassignmentservice.rpc as rarpc
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME
from lofar.sas.resourceassignment.resourceassigner.config import RATASKSPECIFIED_NOTIFICATION_BUSNAME, RATASKSPECIFIED_NOTIFICATIONNAME
from lofar.sas.resourceassignment.resourceassigner.assignment import ResourceAssigner
logger = logging.getLogger(__name__)
class SpecifiedTaskListener(RATaskSpecifiedBusListener):
def __init__(self,
busname=RATASKSPECIFIED_NOTIFICATION_BUSNAME,
subject=RATASKSPECIFIED_NOTIFICATIONNAME,
broker=None,
assigner=None,
**kwargs):
"""
SpecifiedTaskListener listens on the lofar ?? bus and calls onTaskSpecified
:param busname: valid Qpid address (default: lofar.otdb.status)
:param broker: valid Qpid broker host (default: None, which means localhost)
additional parameters in kwargs:
options= <dict> Dictionary of options passed to QPID
exclusive= <bool> Create an exclusive binding so no other services can consume duplicate messages (default: False)
numthreads= <int> Number of parallel threads processing messages (default: 1)
verbose= <bool> Output extra logging over stdout (default: False)
"""
super(SpecifiedTaskListener, self).__init__(busname=busname, subject=subject, broker=broker, **kwargs)
self.assigner = assigner
if not self.assigner:
self.assigner = ResourceAssigner()
def onTaskSpecified(self, sasId, modificationTime, resourceIndicators):
logger.info('onTaskSpecified: sasId=%s' % sasId)
self.assigner.doAssignment(sasId, resourceIndicators, 'prescheduled')
__all__ = ["SpecifiedTaskListener"]
def main():
from optparse import OptionParser
from lofar.messaging import setQpidLogLevel
from lofar.common.util import waitForInterrupt
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME
SSDB_BUSNAME = 'lofar.system' #TODO, import from future ssdb config
SSDB_SERVICENAME = 'SSDBService' #TODO, import from future ssdb config
# Check the invocation arguments
parser = OptionParser("%prog [options]",
description='runs the resourceassigner service')
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME)
parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: %s" % DEFAULT_SERVICENAME)
parser.add_option("--notification_busname", dest="notification_busname", type="string", default=RATASKSPECIFIED_NOTIFICATION_BUSNAME, help="Name of the notification bus on which taskspecified messages are published, default: %s" % RATASKSPECIFIED_NOTIFICATION_BUSNAME)
parser.add_option("--notification_subject", dest="notification_subject", type="string", default=RATASKSPECIFIED_NOTIFICATIONNAME, help="Subject of the published taskspecified messages to listen for, default: %s" % RATASKSPECIFIED_NOTIFICATIONNAME)
parser.add_option("--radb_busname", dest="radb_busname", type="string", default=RADB_BUSNAME, help="Name of the bus on which the radb service listens, default: %s" % RADB_BUSNAME)
parser.add_option("--radb_servicename", dest="radb_servicename", type="string", default=RADB_SERVICENAME, help="Name of the radb service, default: %s" % RADB_SERVICENAME)
parser.add_option("--re_busname", dest="re_busname", type="string", default=RE_BUSNAME, help="Name of the bus on which the resource estimator service listens, default: %s" % RE_BUSNAME)
parser.add_option("--re_servicename", dest="re_servicename", type="string", default=RE_SERVICENAME, help="Name of the resource estimator service, default: %s" % RE_SERVICENAME)
parser.add_option("--ssdb_busname", dest="ssdb_busname", type="string", default=SSDB_BUSNAME, help="Name of the bus on which the ssdb service listens, default: %s" % SSDB_BUSNAME)
parser.add_option("--ssdb_servicename", dest="ssdb_servicename", type="string", default=SSDB_SERVICENAME, help="Name of the ssdb service, default: %s" % SSDB_SERVICENAME)
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
(options, args) = parser.parse_args()
setQpidLogLevel(logging.INFO)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
with ResourceAssigner(radb_busname=options.radb_busname,
radb_servicename=options.radb_servicename,
re_busname=options.re_busname,
re_servicename=options.re_servicename,
ssdb_busname=options.ssdb_busname,
ssdb_servicename=options.ssdb_servicename,
broker=options.broker) as assigner:
with SpecifiedTaskListener(busname=options.notification_busname,
subject=options.notification_subject,
broker=options.broker,
assigner=assigner) as listener:
waitForInterrupt()
if __name__ == '__main__':
main()
#!/usr/bin/env python
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: assignment.py 1580 2015-09-30 14:18:57Z loose $
"""
ResourceAssigner inserts/updates tasks and assigns resources to it based on incoming parset.
"""
import logging
from datetime import datetime
import time
from lofar.messaging.RPC import RPC, RPCException
from lofar.parameterset import parameterset
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME
logger = logging.getLogger(__name__)
class ResourceAssigner():
def __init__(self,
radb_busname=RADB_BUSNAME,
radb_servicename=RADB_SERVICENAME,
radb_broker=None,
re_busname=RE_BUSNAME,
re_servicename=RE_SERVICENAME,
re_broker=None,
ssdb_busname='lofar.system',
ssdb_servicename='SSDBService',
ssdb_broker=None,
broker=None):
"""
ResourceAssigner inserts/updates tasks in the radb and assigns resources to it based on incoming parset.
:param radb_busname: busname on which the radb service listens (default: lofar.ra.command)
:param radb_servicename: servicename of the radb service (default: RADBService)
:param radb_broker: valid Qpid broker host (default: None, which means localhost)
:param re_busname: busname on which the resource estimator service listens (default: lofar.ra.command)
:param re_servicename: servicename of the resource estimator service (default: ResourceEstimation)
:param re_broker: valid Qpid broker host (default: None, which means localhost)
:param ssdb_busname: busname on which the ssdb service listens (default: lofar.system)
:param ssdb_servicename: servicename of the radb service (default: SSDBService)
:param ssdb_broker: valid Qpid broker host (default: None, which means localhost)
:param broker: if specified, overrules radb_broker, re_broker and ssdb_broker. Valid Qpid broker host (default: None, which means localhost)
"""
if broker:
radb_broker = broker
re_broker = broker
ssdb_broker = broker
self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=radb_broker)
self.rerpc = RPC(re_servicename, busname=re_busname, broker=re_broker, ForwardExceptions=True)
self.ssdbGetActiveGroupNames = RPC(ssdb_servicename+'.GetActiveGroupNames', busname=ssdb_busname, broker=ssdb_broker, ForwardExceptions=True)
self.ssdbGetHostForGID = RPC(ssdb_servicename+'.GetHostForGID', busname=ssdb_busname, broker=ssdb_broker, ForwardExceptions=True)
def __enter__(self):
"""Internal use only. (handles scope 'with')"""
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Internal use only. (handles scope 'with')"""
self.close()
def open(self):
"""Open rpc connections to radb service and resource estimator service"""
self.radbrpc.open()
self.rerpc.open()
self.ssdbGetActiveGroupNames.open()
self.ssdbGetHostForGID.open()
def close(self):
"""Close rpc connections to radb service and resource estimator service"""
self.radbrpc.close()
self.rerpc.close()
self.ssdbGetActiveGroupNames.close()
self.ssdbGetHostForGID.close()
def doAssignment(self, sasId, parsets, status='prescheduled'):
logger.info('doAssignment: sasId=%s parset=%s' % (sasId, parsets))
#parse main parset...
mainParsetDict = parsets[str(sasId)]
mainParset = parameterset(mainParsetDict)
momId = mainParset.getInt('Observation.momID', -1)
taskType = mainParset.getString('Task.type', '')
if taskType.lower() == 'observation':
taskType = 'Observation'
startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S')
endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S')
##check if task already present in radb
#existingTask = self.radbrpc.getTask(otdb_id=sasId)
#if existingTask:
##present, so update task and specification in radb
#taskId = existingTask['id']
#specificationId = existingTask['specification_id']
#self.radbrpc.updateSpecification(specificationId, startTime, endTime, str(mainParsetDict))
#self.radbrpc.updateTask(taskId, momId, sasId, status, taskType, specificationId)
#else:
#insert new task and specification in the radb
specificationId = self.radbrpc.insertSpecification(startTime, endTime, str(mainParsetDict))['id']
taskId = self.radbrpc.insertTask(momId, sasId, status, taskType, specificationId)['id']
#analyze the parset for needed and available resources and claim these in the radb
cluster = self.parseSpecification(mainParset)
available = self.getAvailableResources(cluster)
needed = self.getNeededResouces(mainParset)
if self.checkResources(needed, available):
claimed, resourceIds = self.claimResources(needed, taskId, startTime, endTime)
if claimed:
self.commitResourceClaimsForTask(taskId)
self.radbrpc.updateTask(taskId, status='scheduled')
else:
self.radbrpc.updateTask(taskId, status='conflict')
def parseSpecification(self, parset):
# TODO: cluster is not part of specification yet. For now return CEP4. Add logic later.
default = "cep2"
cluster ="cep4"
return cluster
def getNeededResouces(self, parset):
replymessage, status = self.rerpc(parset.dict(), timeout=10)
logger.info('getNeededResouces: %s' % replymessage)
stations = parset.getStringVector('Observation.VirtualInstrument.stationList', '')
logger.info('Stations: %s' % stations)
return replymessage
def getAvailableResources(self, cluster):
# Used settings
groupnames = {}
available = {}
while True:
try:
replymessage, status = self.ssdbGetActiveGroupNames()
if status == 'OK':
groupnames = replymessage
logger.info('SSDBService ActiveGroupNames: %s' % groupnames)
else:
logger.error("Could not get active group names from SSDBService: %s" % status)
groupnames = {v:k for k,v in groupnames.items()} #swap key/value for name->id lookup
logger.info('groupnames: %s' % groupnames)
if cluster in groupnames.keys():
groupId = groupnames[cluster]
replymessage, status = self.ssdbGetHostForGID(groupId)
if status == 'OK':
available = replymessage
logger.info('available: %s' % available)
else:
logger.error("Could not get hosts for group %s (gid=%s) from SSDBService: %s" % (cluster, groupId, status))
else:
logger.error("group \'%s\' not known in SSDBService active groups (%s)" % (cluster, ', '.join(groupnames.keys())))
return available
except KeyboardInterrupt:
break
except Exception as e:
logger.warning("Exception while getting available resources. Trying again... " + str(e))
time.sleep(0.25)
def checkResources(self, needed, available):
return True
def claimResources(self, resources, taskId, startTime, endTime):
#TEMP HACK
cep4storage = resources['Observation']['total_data_size']
resources = dict()
resources['cep4storage'] = cep4storage
resourceNameDict = {r['name']:r for r in self.radbrpc.getResources()}
claimedStatusId = next(x['id'] for x in self.radbrpc.getResourceClaimStatuses() if x['name'].lower() == 'claimed')
resourceClaimIds = []
for r in resources:
if r in resourceNameDict:
resourceClaimIds.append(self.radbrpc.insertResourceClaim(resourceNameDict[r]['id'], taskId, startTime, endTime, claimedStatusId, 1, -1, 'anonymous', -1))
success = len(resourceClaimIds) == len(resources)
return success, resourceClaimIds
def commitResourceClaimsForTask(self, taskId):
self.radbrpc.updateResourceClaimsForTask(taskId, status='ALLOCATED')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment