diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index df37a39fe28675ac5f7416453bdb14aafe72f68f..fe9d7a24966417c6c55394c4c082aede2126e9ff 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -29,6 +29,7 @@ from datetime import datetime, timedelta import time from lofar.common.util import humanreadablesize +from lofar.common.cache import cache from lofar.common.datetimeutils import totalSeconds, parseDatetime from lofar.messaging.messages import EventMessage from lofar.messaging.messagebus import ToBus @@ -130,6 +131,35 @@ class ResourceAssigner(): def taskTypeProducesOutput( type, subtype ): return type != 'reservation' + @property + @cache + def resource_group_relations(self): + """ Returns a dict of resource groups and their relations. Does not include resources. + + Each dict element has the resource group id as key, and the following value: + + { "child_ids": list of child resource groups + "parent_ids": list of parent resource groups + "resource_ids": list of resources in this group. } """ + + memberships = self.radbrpc.getResourceGroupMemberships() + return memberships['groups'] # resource-group-to-resource-group relations + + @property + @cache + def resource_types(self): + """ Returns a dict of all the resource types, to convert name->id. """ + + return {rt['name']:rt['id'] for rt in self.radbrpc.getResourceTypes()} + + + @property + @cache + def resource_claim_property_types(self): + """ Returns a dict of all the resource claim property types, to convert name->id. """ + + return {rcpt['name']:rcpt['id'] for rcpt in self.radbrpc.getResourceClaimPropertyTypes()} + def doAssignment(self, specification_tree): logger.info('doAssignment: specification_tree=%s' % (specification_tree)) @@ -298,20 +328,14 @@ class ResourceAssigner(): # get resources and related info from radb try: - db_resource_group_mships = self.radbrpc.getResourceGroupMemberships() - db_rgp2rgp = db_resource_group_mships['groups'] # resource-group-to-resource-group relations - # resource-to-resource-group relations are under db_resource_group_mships['resources'] - db_resource_list = self.radbrpc.getResources(include_availability=True) - db_resource_types = {rt['name']:rt['id'] for rt in self.radbrpc.getResourceTypes()} - db_resource_prop_types = {rcpt['name']:rcpt['id'] for rcpt in self.radbrpc.getResourceClaimPropertyTypes()} db_resource_max_fill_ratios = self.radbrpc.getResourceAllocationConfig(sql_like_name_pattern='max_fill_ratio_%') except Exception as e: logger.error(str(e)) self._sendStateChange(task, errStatus) return - self.applyMaxFillRatios(db_resource_list, db_rgp2rgp, db_resource_types, db_resource_max_fill_ratios) + self.applyMaxFillRatios(db_resource_list, db_resource_max_fill_ratios) # Assume estimates are (close enough to) accurate to determine resource claims for this task. # Try to get a set of non-conflicting claims from availability info in the RA DB. @@ -319,8 +343,7 @@ class ResourceAssigner(): # Also, inserted claims are still automatically validated, there can be a race. # If not enough resources are available after all, claims are put to conflict status. # If any claim is in conflict state, then the task is put to conflict status as well. - all_fit, claims, _ = self.getClaimsForTask(task, estimates, db_resource_list, db_rgp2rgp, db_resource_types, - db_resource_prop_types) + all_fit, claims, _ = self.getClaimsForTask(task, estimates, db_resource_list) if not all_fit: self._sendStateChange(task, 'error') return @@ -457,7 +480,7 @@ class ResourceAssigner(): return clusterNames - def applyMaxFillRatios(self, db_resource_list, db_rgp2rgp, db_resource_types, + def applyMaxFillRatios(self, db_resource_list, db_resource_max_fill_ratios): ''' Applies db_resource_max_fill_ratios to db_resource_list. db_resource_max_fill_ratios is e.g. [{'name': max_fill_ratio_CEP4_storage, 'value': 0.85}, ...] @@ -465,17 +488,16 @@ class ResourceAssigner(): prefix = 'max_fill_ratio_' # + resource_group_name + '_' + resource_type_name for ratio_dict in db_resource_max_fill_ratios: - for res_type_name in db_resource_types: + for res_type_name, res_type_id in self.resource_types.iteritems(): if not ratio_dict['name'].endswith('_' + res_type_name): continue - res_type_id = db_resource_types[res_type_name] res_group_name = ratio_dict['name'][len(prefix) : -len(res_type_name)-1] - res_group_id = self.getResourceGroupIdByName(db_rgp2rgp, res_group_name) + res_group_id = self.getResourceGroupIdByName(res_group_name) if res_group_id is None: logger.warn('applyMaxFillRatios: could not find resource group for %s', ratio_dict['name']) break - res_group = db_rgp2rgp[res_group_id] + res_group = self.resource_group_relations[res_group_id] for res_id in res_group['resource_ids']: res = db_resource_list[res_id] if res['type_id'] != res_type_id: @@ -491,17 +513,13 @@ class ResourceAssigner(): res['available_capacity'] = min(res['available_capacity'], int(ratio * res['total_capacity'])) logger.info('applyMaxFillRatios: applied %s = %f', ratio_dict['name'], ratio) - def getClaimsForTask(self, task, needed_resources_list, db_resource_list, db_rgp2rgp, db_resource_types, - db_resource_prop_types): + def getClaimsForTask(self, task, needed_resources_list, db_resource_list): """ Return claims that satisfy needed_resources_list within db_resource_list, or an empty claim list if no non-conflicting claims could be found, or None on error. :param task: an instance of an RADB task object :param needed_resources_list: a list of resources to be claimed :param db_resource_list: all resources in RADB with availability information - :param db_rgp2rgp: all group->group relations from RADB - :param db_resource_types: all virtual instrument resource types (and their units) from RADB - :param db_resource_prop_types: all resource claim property types from RADB :returns claims """ @@ -523,15 +541,10 @@ class ResourceAssigner(): # producing 'conflict' may be ok, if this saves implementation or excessive computational complexity. # Users will then have to free more resources than strictly necessary before retrying. - logger.debug('getClaimsForTask: db_rgp2rgp: %s', db_rgp2rgp) # big! logger.debug('getClaimsForTask: db_resource_list: %s', db_resource_list) # big! - logger.debug('getClaimsForTask: db_resource_types: %s', db_resource_types) - logger.debug('getClaimsForTask: db_resource_prop_types: %s', db_resource_prop_types) dtypes = {'uv', 'cs', 'is', 'im', 'img', 'pulp'} - summablePropTypeIds = {db_resource_prop_types['nr_of_' + dt + '_files'] for dt in dtypes} - db_storage_type_id = db_resource_types['storage'] # for sorting and to attach properties - db_rcu_type_id = db_resource_types['rcu'] # for attaching rcu bit field to the claim + summablePropTypeIds = {self.resource_claim_property_types['nr_of_' + dt + '_files'] for dt in dtypes} claims = [] @@ -540,26 +553,26 @@ class ResourceAssigner(): for needed_resources in needed_resources_list: # Replace resource names by type ids: easy matching w/ other data structs - needed_resources_by_type_id = {db_resource_types[name]: needed_resources['resource_types'][name] \ + needed_resources_by_type_id = {self.resource_types[name]: needed_resources['resource_types'][name] \ for name in needed_resources['resource_types']} # e.g. {3: 16536, 5: 170016} logger.info('getClaimsForTask: needed_resources_by_type_id: %s', needed_resources_by_type_id) # Find group id ('gid') of needed_resources['root_resource_group'], # then get list of claimable resources at root_gid and its children - root_gid = self.getResourceGroupIdByName(db_rgp2rgp, needed_resources['root_resource_group']) + root_gid = self.getResourceGroupIdByName(needed_resources['root_resource_group']) if root_gid is None: logger.error('getClaimsForTask: cannot find resources to claim: unknown root_resource_group \'%s\'', needed_resources['root_resource_group']) return None claimable_resources_list = self.getSubtreeResourcesList(root_gid, needed_resources_by_type_id, - db_resource_list, db_rgp2rgp) # e.g. [{3: <resource_dict>, 5: <resource_dict>}, ...] + db_resource_list) # e.g. [{3: <resource_dict>, 5: <resource_dict>}, ...] logger.info('getClaimsForTask: considering %d claimable resource dict(s)', len(claimable_resources_list)) logger.debug('getClaimsForTask: claimable_resources_list: %s', claimable_resources_list) input_files = needed_resources.get('input_files') output_files = needed_resources.get('output_files') - properties = self.getProperties(db_resource_prop_types, input_files, 'input') - properties.extend(self.getProperties(db_resource_prop_types, output_files, 'output')) + properties = self.getProperties(input_files, 'input') + properties.extend(self.getProperties(output_files, 'output')) # Collapse needed resources if only 1 claimable resource dict, e.g. global filesystem if len(claimable_resources_list) == 1: @@ -571,8 +584,8 @@ class ResourceAssigner(): prop['value'] *= needed_resources['resource_count'] needed_resources['resource_count'] = 1 - if db_storage_type_id in needed_resources_by_type_id: - sort_res_type = db_storage_type_id + if self.resource_types['storage'] in needed_resources_by_type_id: + sort_res_type = self.resource_types['storage'] else: sort_res_type = needed_resources_by_type_id.keys()[0] # some other if not storage @@ -590,11 +603,10 @@ class ResourceAssigner(): # Ignore check on claimable capacity of RCUs is_claimable = self.is_claimable_capacity_wise(needed_resources_by_type_id, claimable_resources_dict, - ignore_type_ids=[db_rcu_type_id]) + ignore_type_ids=[self.resource_types['rcu']]) if is_claimable: - claim = self.makeClaim(db_resource_prop_types, db_storage_type_id, db_rcu_type_id, task, - properties, needed_resources_by_type_id, claimable_resources_dict) + claim = self.makeClaim(task, properties, needed_resources_by_type_id, claimable_resources_dict) logger.debug('getClaimsForTask: created claim: %s', claim) claims_this_resource.extend(claim) break @@ -614,7 +626,7 @@ class ResourceAssigner(): return (all_fit, claims, unclaimable_resources) - def getResourceGroupIdByName(self, db_rgp2rgp, name): + def getResourceGroupIdByName(self, name): """ Returns group id of resource group named name, or None if name was not found. The search happens breadth-first. """ @@ -622,7 +634,7 @@ class ResourceAssigner(): i = 0 while i < len(gids): # careful iterating while modifying - res_group = db_rgp2rgp[gids[i]] + res_group = self.resource_group_relations[gids[i]] if res_group['resource_group_name'] == name: return gids[i] gids.extend(res_group['child_ids']) @@ -630,7 +642,7 @@ class ResourceAssigner(): return None - def getSubtreeResourcesList(self, root_gid, needed_resources_by_type_id, db_resource_list, db_rgp2rgp): + def getSubtreeResourcesList(self, root_gid, needed_resources_by_type_id, db_resource_list): """ Returns list of available resources of type id in needed_resources_by_type_id.keys() starting at group id root_gid in the format [{type_id: {<resource_dict>}, ...}, ...]. """ @@ -643,7 +655,7 @@ class ResourceAssigner(): resources = {} type_ids_seen = set() - res_group = db_rgp2rgp[gids[i]] + res_group = self.resource_group_relations[gids[i]] for rid in res_group['resource_ids']: type_id = db_resource_list[rid]['type_id'] if type_id in needed_resources_by_type_id and db_resource_list[rid]['active'] and \ @@ -676,7 +688,7 @@ class ResourceAssigner(): return is_claimable - def makeClaim(self, db_resource_prop_types, db_storage_type_id, db_rcu_type_id, task, properties, + def makeClaim(self, task, properties, needed_resources, claimable_resources): """ Returns list of claims for a data product (one for each needed resource type). Note: this function also updates claimable_resources. @@ -693,8 +705,8 @@ class ResourceAssigner(): # station's RCUs are requested to take part in a measurement and which not). In order to have it countable # (as is expected of a claim size) it needs to be replaced with the number of RCUs requested. Subsequently, # the bit pattern information is stored with the claim separately - if res_type == db_rcu_type_id: - used_rcus = needed_resources[db_rcu_type_id] + if res_type == self.resource_types['rcu']: + used_rcus = needed_resources[self.resource_types['rcu']] claim_size = used_rcus.count('1') claim['used_rcus'] = used_rcus else: @@ -703,7 +715,7 @@ class ResourceAssigner(): claim['claim_size'] = claim_size claimable_resources[res_type]['available_capacity'] -= claim_size - if res_type == db_storage_type_id: + if res_type == self.resource_types['storage']: # FIXME: find proper way to extend storage time with a year # 2016-09-27 scisup would like to be involved in chosing these kind of defaults # and what to do after the claim expires @@ -717,7 +729,7 @@ class ResourceAssigner(): return claims - def getProperties(self, db_resource_prop_types, files_dict, io_type): + def getProperties(self, files_dict, io_type): """ Return list of properties in claim format converted from files_dict. E.g. files_dict: {'cs': [ {'sap_nr': 2, ..., 'properties': {'nr_of_uv_files': 123, ...}}, {...} ], 'is': ...} """ @@ -732,7 +744,7 @@ class ResourceAssigner(): sap_nr = dptype_dict.get('sap_nr') # only with obs output and obs successor input for prop_type_name, prop_value in dptype_dict['properties'].items(): - rc_property_type_id = db_resource_prop_types.get(prop_type_name) + rc_property_type_id = self.resource_claim_property_types.get(prop_type_name) if rc_property_type_id is None: logger.error('getProperties: ignoring unknown prop type: %s', prop_type_name) continue