diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index 4834185aa5aa4ae8a2ea98ca44d820994e81ad09..0c8a09ea74559ce482ded540abab9ee920831ac7 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -29,6 +29,7 @@ from datetime import datetime import time import collections +from lofar.common.util import humanreadablesize from lofar.messaging.RPC import RPC, RPCException from lofar.parameterset import parameterset @@ -132,10 +133,6 @@ class ResourceAssigner(): taskId = self.radbrpc.insertTask(momId, otdb_id, status, taskType, specificationId)['id'] logger.info('doAssignment: insertTask taskId=%s' % (taskId,)) - #analyze the parset for needed and available resources and claim these in the radb - cluster = self.parseSpecification(mainParset) - available = self.getAvailableResources(cluster) - needed = self.getNeededResouces(specification_tree) logger.info('doAssignment: getNeededResouces=%s' % (needed,)) @@ -147,6 +144,14 @@ class ResourceAssigner(): logger.error("no task type %s found in estimator results %s" % (taskType, needed[str(otdb_id)])) return + try: + #analyze the parset for needed and available resources and claim these in the radb + available = self.getAvailableResources('cep4') + logger.info('doAssignment: getAvailableResources=%s' % (available,)) + except Exception as e: + logger.warning("Exception while getting available resources: %s" % str(e)) + return + main_needed = needed[str(otdb_id)] if self.checkResources(main_needed, available): task = self.radbrpc.getTask(taskId) @@ -181,12 +186,6 @@ class ResourceAssigner(): except Exception as e: logger.error(e) - def parseSpecification(self, parset): - # TODO: cluster is not part of specification yet. For now return CEP4. Add logic later. - default = "cep2" - cluster ="cep4" - return cluster - def getNeededResouces(self, specification_tree): replymessage, status = self.rerpc({"specification_tree":specification_tree}, timeout=10) logger.info('getNeededResouces: %s' % replymessage) @@ -195,30 +194,46 @@ class ResourceAssigner(): ##logger.info('Stations: %s' % stations) return replymessage - def getAvailableResources(self, cluster): - # Used settings - groupnames = {} - available = {} - while True: - try: - groupnames = self.ssdbrpc.getactivegroupnames() - groupnames = {v:k for k,v in groupnames.items()} #swap key/value for name->id lookup - logger.info('groupnames: %s' % groupnames) - if cluster in groupnames.keys(): - groupId = groupnames[cluster] - available = self.ssdbrpc.gethostsforgid(groupId) - logger.info('available: %s' % available) - else: - logger.error("group \'%s\' not known in SSDBService active groups (%s)" % (cluster, ', '.join(groupnames.keys()))) - return available - except KeyboardInterrupt: - break - except Exception as e: - logger.warning("Exception while getting available resources. Trying again... " + str(e)) - time.sleep(0.25) + def getAvailableResources(self, cluster, update_radb=True): + # find out which resources are available + # and what is their capacity + # For now, only look at CEP4 storage + # Later, also look at stations up/down for short term scheduling + + #get all active groupnames, find id for cluster group + groupnames = self.ssdbrpc.getactivegroupnames() + cluster_group_id = next(k for k,v in groupnames.items() if v == cluster) + + # for CEP4 cluster, do hard codes lookup of first and only node + node_info = self.ssdbrpc.gethostsforgid(cluster_group_id)['nodes'][0] + + if update_radb: + storage_resources = self.radbrpc.getResources(resource_types='storage', include_availability=True) + cep4_storage_resource = next(x for x in storage_resources if 'cep4' in x['name']) + active = node_info['statename'] == 'Active' + total_capacity = node_info['totalspace'] + available_capacity = total_capacity - node_info['usedspace'] + + logger.info("Updating resource availability of %s (id=%s) to active=%s available_capacity=%s total_capacity=%s" % + (cep4_storage_resource['name'], cep4_storage_resource['id'], active, available_capacity, total_capacity)) + + self.radbrpc.updateResourceAvailability(cep4_storage_resource['id'], + active=active, + available_capacity=available_capacity, + total_capacity=total_capacity) + + return node_info def checkResources(self, needed, available): - return True + # For now, only check cep4 storage + total_needed_storage = sum(x['storage']['total_size'] for x in needed.values() if 'storage' in x) + total_available_storage = available['totalspace'] + + logger.info("%senough storage resources available: needed=%s, available=%s" % + ('not ' if total_needed_storage >= total_available_storage else '', + humanreadablesize(total_needed_storage), + humanreadablesize(total_available_storage))) + return total_needed_storage < total_available_storage def claimResources(self, needed_resources, task): logger.info('claimResources: task %s needed_resources=%s' % (task, needed_resources)) @@ -253,7 +268,7 @@ class ResourceAssigner(): 'starttime':task['starttime'], 'endtime':task['endtime'], 'status':'claimed', - 'claim_size':1} + 'claim_size':needed_claim_value} # if the needed_claim_for_resource_type dict contains more kvp's, # then the subdict contains groups of properties for the claim