Skip to content
Snippets Groups Projects
Commit a8b10a98 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8887: get total and used cep4 storage size from ssdb, update this to...

Task #8887: get total and used cep4 storage size from ssdb, update this to radb. check claim size against available size
parent 164cca89
No related branches found
No related tags found
No related merge requests found
......@@ -29,6 +29,7 @@ from datetime import datetime
import time
import collections
from lofar.common.util import humanreadablesize
from lofar.messaging.RPC import RPC, RPCException
from lofar.parameterset import parameterset
......@@ -132,10 +133,6 @@ class ResourceAssigner():
taskId = self.radbrpc.insertTask(momId, otdb_id, status, taskType, specificationId)['id']
logger.info('doAssignment: insertTask taskId=%s' % (taskId,))
#analyze the parset for needed and available resources and claim these in the radb
cluster = self.parseSpecification(mainParset)
available = self.getAvailableResources(cluster)
needed = self.getNeededResouces(specification_tree)
logger.info('doAssignment: getNeededResouces=%s' % (needed,))
......@@ -147,6 +144,14 @@ class ResourceAssigner():
logger.error("no task type %s found in estimator results %s" % (taskType, needed[str(otdb_id)]))
return
try:
#analyze the parset for needed and available resources and claim these in the radb
available = self.getAvailableResources('cep4')
logger.info('doAssignment: getAvailableResources=%s' % (available,))
except Exception as e:
logger.warning("Exception while getting available resources: %s" % str(e))
return
main_needed = needed[str(otdb_id)]
if self.checkResources(main_needed, available):
task = self.radbrpc.getTask(taskId)
......@@ -181,12 +186,6 @@ class ResourceAssigner():
except Exception as e:
logger.error(e)
def parseSpecification(self, parset):
# TODO: cluster is not part of specification yet. For now return CEP4. Add logic later.
default = "cep2"
cluster ="cep4"
return cluster
def getNeededResouces(self, specification_tree):
replymessage, status = self.rerpc({"specification_tree":specification_tree}, timeout=10)
logger.info('getNeededResouces: %s' % replymessage)
......@@ -195,30 +194,46 @@ class ResourceAssigner():
##logger.info('Stations: %s' % stations)
return replymessage
def getAvailableResources(self, cluster):
# Used settings
groupnames = {}
available = {}
while True:
try:
def getAvailableResources(self, cluster, update_radb=True):
# find out which resources are available
# and what is their capacity
# For now, only look at CEP4 storage
# Later, also look at stations up/down for short term scheduling
#get all active groupnames, find id for cluster group
groupnames = self.ssdbrpc.getactivegroupnames()
groupnames = {v:k for k,v in groupnames.items()} #swap key/value for name->id lookup
logger.info('groupnames: %s' % groupnames)
if cluster in groupnames.keys():
groupId = groupnames[cluster]
available = self.ssdbrpc.gethostsforgid(groupId)
logger.info('available: %s' % available)
else:
logger.error("group \'%s\' not known in SSDBService active groups (%s)" % (cluster, ', '.join(groupnames.keys())))
return available
except KeyboardInterrupt:
break
except Exception as e:
logger.warning("Exception while getting available resources. Trying again... " + str(e))
time.sleep(0.25)
cluster_group_id = next(k for k,v in groupnames.items() if v == cluster)
# for CEP4 cluster, do hard codes lookup of first and only node
node_info = self.ssdbrpc.gethostsforgid(cluster_group_id)['nodes'][0]
if update_radb:
storage_resources = self.radbrpc.getResources(resource_types='storage', include_availability=True)
cep4_storage_resource = next(x for x in storage_resources if 'cep4' in x['name'])
active = node_info['statename'] == 'Active'
total_capacity = node_info['totalspace']
available_capacity = total_capacity - node_info['usedspace']
logger.info("Updating resource availability of %s (id=%s) to active=%s available_capacity=%s total_capacity=%s" %
(cep4_storage_resource['name'], cep4_storage_resource['id'], active, available_capacity, total_capacity))
self.radbrpc.updateResourceAvailability(cep4_storage_resource['id'],
active=active,
available_capacity=available_capacity,
total_capacity=total_capacity)
return node_info
def checkResources(self, needed, available):
return True
# For now, only check cep4 storage
total_needed_storage = sum(x['storage']['total_size'] for x in needed.values() if 'storage' in x)
total_available_storage = available['totalspace']
logger.info("%senough storage resources available: needed=%s, available=%s" %
('not ' if total_needed_storage >= total_available_storage else '',
humanreadablesize(total_needed_storage),
humanreadablesize(total_available_storage)))
return total_needed_storage < total_available_storage
def claimResources(self, needed_resources, task):
logger.info('claimResources: task %s needed_resources=%s' % (task, needed_resources))
......@@ -253,7 +268,7 @@ class ResourceAssigner():
'starttime':task['starttime'],
'endtime':task['endtime'],
'status':'claimed',
'claim_size':1}
'claim_size':needed_claim_value}
# if the needed_claim_for_resource_type dict contains more kvp's,
# then the subdict contains groups of properties for the claim
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment