-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
assignment.py 10.58 KiB
#!/usr/bin/env python
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: assignment.py 1580 2015-09-30 14:18:57Z loose $
"""
ResourceAssigner inserts/updates tasks and assigns resources to it based on incoming parset.
"""
import logging
from datetime import datetime
import time
from lofar.messaging.RPC import RPC, RPCException
from lofar.parameterset import parameterset
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME
from lofar.sas.resourceassignment.resourceassigner.rapublisher import RAPublisher
logger = logging.getLogger(__name__)
class ResourceAssigner():
def __init__(self,
radb_busname=RADB_BUSNAME,
radb_servicename=RADB_SERVICENAME,
radb_broker=None,
re_busname=RE_BUSNAME,
re_servicename=RE_SERVICENAME,
re_broker=None,
ssdb_busname='lofar.system',
ssdb_servicename='SSDBService',
ssdb_broker=None,
broker=None):
"""
ResourceAssigner inserts/updates tasks in the radb and assigns resources to it based on incoming parset.
:param radb_busname: busname on which the radb service listens (default: lofar.ra.command)
:param radb_servicename: servicename of the radb service (default: RADBService)
:param radb_broker: valid Qpid broker host (default: None, which means localhost)
:param re_busname: busname on which the resource estimator service listens (default: lofar.ra.command)
:param re_servicename: servicename of the resource estimator service (default: ResourceEstimation)
:param re_broker: valid Qpid broker host (default: None, which means localhost)
:param ssdb_busname: busname on which the ssdb service listens (default: lofar.system)
:param ssdb_servicename: servicename of the radb service (default: SSDBService)
:param ssdb_broker: valid Qpid broker host (default: None, which means localhost)
:param broker: if specified, overrules radb_broker, re_broker and ssdb_broker. Valid Qpid broker host (default: None, which means localhost)
"""
if broker:
radb_broker = broker
re_broker = broker
ssdb_broker = broker
self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=radb_broker)
self.raPublisher = RAPublisher(broker=radb_broker)
self.rerpc = RPC(re_servicename, busname=re_busname, broker=re_broker, ForwardExceptions=True)
self.ssdbGetActiveGroupNames = RPC(ssdb_servicename+'.GetActiveGroupNames', busname=ssdb_busname, broker=ssdb_broker, ForwardExceptions=True)
self.ssdbGetHostForGID = RPC(ssdb_servicename+'.GetHostForGID', busname=ssdb_busname, broker=ssdb_broker, ForwardExceptions=True)
def __enter__(self):
"""Internal use only. (handles scope 'with')"""
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Internal use only. (handles scope 'with')"""
self.close()
def open(self):
"""Open rpc connections to radb service and resource estimator service"""
self.radbrpc.open()
self.raPublisher.open()
self.rerpc.open()
self.ssdbGetActiveGroupNames.open()
self.ssdbGetHostForGID.open()
def close(self):
"""Close rpc connections to radb service and resource estimator service"""
self.radbrpc.close()
self.raPublisher.close()
self.rerpc.close()
self.ssdbGetActiveGroupNames.close()
self.ssdbGetHostForGID.close()
def doAssignment(self, sasId, parsets, status='prescheduled'):
logger.info('doAssignment: sasId=%s parset=%s' % (sasId, parsets))
#parse main parset...
mainParsetDict = parsets[str(sasId)]
mainParset = parameterset(mainParsetDict)
momId = mainParset.getInt('Observation.momID', -1)
taskType = mainParset.getString('Task.type', '').lower()
startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S')
endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S')
#check if task already present in radb
existingTask = self.radbrpc.getTask(otdb_id=sasId)
if existingTask:
#present, delete it, and create a new task
taskId = existingTask['id']
self.radbrpc.deleteTask(taskId)
specificationId = existingTask['specification_id']
self.radbrpc.deleteSpecification(specificationId)
#insert new task and specification in the radb
logger.info('doAssignment: insertSpecification startTime=%s endTime=%s' % (startTime, endTime))
specificationId = self.radbrpc.insertSpecification(startTime, endTime, str(mainParsetDict))['id']
logger.info('doAssignment: insertSpecification specificationId=%s' % (specificationId,))
logger.info('doAssignment: insertTask momId=%s sasId=%s status=%s taskType=%s specificationId=%s' % (momId, sasId, status, taskType, specificationId))
taskId = self.radbrpc.insertTask(momId, sasId, status, taskType, specificationId)['id']
logger.info('doAssignment: insertTask taskId=%s' % (taskId,))
#analyze the parset for needed and available resources and claim these in the radb
cluster = self.parseSpecification(mainParset)
available = self.getAvailableResources(cluster)
needed = self.getNeededResouces(mainParset)
if self.checkResources(needed, available):
claimed, resourceIds = self.claimResources(needed, taskId, startTime, endTime)
if claimed:
self.radbrpc.updateTaskAndResourceClaims(taskId, claim_status='allocated')
self.radbrpc.updateTask(taskId, status='scheduled')
self.raPublisher.notifyTaskStatusChanged(taskId, momId, sasId, 'scheduled')
else:
self.radbrpc.updateTask(taskId, status='conflict')
self.raPublisher.notifyTaskStatusChanged(taskId, momId, sasId, 'conflict')
try:
predecessor_ids = [int(id) for id in parsets.keys() if id != str(sasId)]
for predecessor_id in predecessor_ids:
predecessor_task = self.radbrpc.getTask(otdb_id=predecessor_id)
if predecessor_task:
self.radbrpc.insertTaskPredecessor(sasId, predecessor_id)
except Exception as e:
logger.error(e)
def parseSpecification(self, parset):
# TODO: cluster is not part of specification yet. For now return CEP4. Add logic later.
default = "cep2"
cluster ="cep4"
return cluster
def getNeededResouces(self, parset):
replymessage, status = self.rerpc(parset.dict(), timeout=10)
logger.info('getNeededResouces: %s' % replymessage)
stations = parset.getStringVector('Observation.VirtualInstrument.stationList', '')
logger.info('Stations: %s' % stations)
return replymessage
def getAvailableResources(self, cluster):
# Used settings
groupnames = {}
available = {}
while True:
try:
replymessage, status = self.ssdbGetActiveGroupNames()
if status == 'OK':
groupnames = replymessage
logger.info('SSDBService ActiveGroupNames: %s' % groupnames)
else:
logger.error("Could not get active group names from SSDBService: %s" % status)
groupnames = {v:k for k,v in groupnames.items()} #swap key/value for name->id lookup
logger.info('groupnames: %s' % groupnames)
if cluster in groupnames.keys():
groupId = groupnames[cluster]
replymessage, status = self.ssdbGetHostForGID(groupId)
if status == 'OK':
available = replymessage
logger.info('available: %s' % available)
else:
logger.error("Could not get hosts for group %s (gid=%s) from SSDBService: %s" % (cluster, groupId, status))
else:
logger.error("group \'%s\' not known in SSDBService active groups (%s)" % (cluster, ', '.join(groupnames.keys())))
return available
except KeyboardInterrupt:
break
except Exception as e:
logger.warning("Exception while getting available resources. Trying again... " + str(e))
time.sleep(0.25)
def checkResources(self, needed, available):
return True
def claimResources(self, resources, taskId, startTime, endTime):
#TEMP HACK
cep4storage = resources['Observation']['total_data_size']
resources = dict()
resources['cep4storage'] = cep4storage
resourceNameDict = {r['name']:r for r in self.radbrpc.getResources()}
claimedStatusId = next(x['id'] for x in self.radbrpc.getResourceClaimStatuses() if x['name'].lower() == 'claimed')
resourceClaimIds = []
for r in resources:
if r in resourceNameDict:
resourceClaimIds.append(self.radbrpc.insertResourceClaim(resourceNameDict[r]['id'], taskId, startTime, endTime, claimedStatusId, 1, -1, 'anonymous', -1))
success = len(resourceClaimIds) == len(resources)
return success, resourceClaimIds