Skip to content
Snippets Groups Projects
Commit f094843a authored by Ruud Beukema's avatar Ruud Beukema
Browse files

Task #10560: Added functionality for adding RCU capacity to the claims, and...

Task #10560: Added functionality for adding RCU capacity to the claims, and added functionality for storing the  RCU bit patterns to the RADB. Please, mind the TODOs!!!
parent 45363586
Branches
Tags
No related merge requests found
...@@ -317,6 +317,8 @@ class ResourceAssigner(): ...@@ -317,6 +317,8 @@ class ResourceAssigner():
if claims: if claims:
logger.info('doAssignment: inserting %d claims in the radb: %s' % (len(claims), claims)) logger.info('doAssignment: inserting %d claims in the radb: %s' % (len(claims), claims))
try: try:
claims = self.__insert_rcu_specifications_and_add_rcu_id_references_to_claims(claims)
claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids'] claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids']
logger.info('doAssignment: %d claims were inserted in the radb' % len(claim_ids)) logger.info('doAssignment: %d claims were inserted in the radb' % len(claim_ids))
if len(claim_ids) != len(claims): if len(claim_ids) != len(claims):
...@@ -350,6 +352,22 @@ class ResourceAssigner(): ...@@ -350,6 +352,22 @@ class ResourceAssigner():
# send notification that the task was scheduled # send notification that the task was scheduled
self._sendStateChange(task, 'scheduled') self._sendStateChange(task, 'scheduled')
def __insert_rcu_specifications_and_add_rcu_id_references_to_claims(self, claims):
""" Insert RCU specifications for claims that require it and construct a list of references to them
:param claims:
:return: claims with updated claim['rcu_id'] field
"""
rcu_ids = []
rcu_bit_patterns = [c['rcu_bit_pattern'] for c in claims if c['rcu_bit_pattern'] is not None]
_rcu_ids = self.radbrpc.insertRcuSpecifications(rcu_bit_patterns)
_rcu_ids_idx = 0
for c in claims:
# TODO: Check if 'None' will become 'NULL' in the RADB?
c['rcu_id'] = _rcu_ids[_rcu_ids_idx] if c['rcu_bit_pattern'] is not None else None
_rcu_ids_idx += 1
return claims
def _sendStateChange(self, task, status): def _sendStateChange(self, task, status):
if status == 'scheduled' or status == 'conflict' or status == 'error': if status == 'scheduled' or status == 'conflict' or status == 'error':
content={'radb_id': task['id'], 'otdb_id':task['otdb_id'], 'mom_id': task['mom_id']} content={'radb_id': task['id'], 'otdb_id':task['otdb_id'], 'mom_id': task['mom_id']}
...@@ -515,6 +533,7 @@ class ResourceAssigner(): ...@@ -515,6 +533,7 @@ class ResourceAssigner():
dtypes = {'uv', 'cs', 'is', 'im', 'img', 'pulp'} dtypes = {'uv', 'cs', 'is', 'im', 'img', 'pulp'}
summablePropTypeIds = {db_resource_prop_types['nr_of_' + dt + '_files'] for dt in dtypes} summablePropTypeIds = {db_resource_prop_types['nr_of_' + dt + '_files'] for dt in dtypes}
db_storage_type_id = db_resource_types['storage'] # for sorting and to attach properties db_storage_type_id = db_resource_types['storage'] # for sorting and to attach properties
db_rcu_type_id = db_resource_types['rcu'] # for attaching rcu bit field to the claim
claims = [] claims = []
for needed_resources in needed_resources_list: for needed_resources in needed_resources_list:
...@@ -565,8 +584,8 @@ class ResourceAssigner(): ...@@ -565,8 +584,8 @@ class ResourceAssigner():
claim = None claim = None
for claimable_resources_dict in claimable_resources_list: for claimable_resources_dict in claimable_resources_list:
if self.isClaimable(needed_resources_by_type_id, claimable_resources_dict): if self.isClaimable(needed_resources_by_type_id, claimable_resources_dict):
claim = self.makeClaim(db_resource_prop_types, db_storage_type_id, task, properties, claim = self.makeClaim(db_resource_prop_types, db_storage_type_id, db_rcu_type_id, task,
needed_resources_by_type_id, claimable_resources_dict) properties, needed_resources_by_type_id, claimable_resources_dict)
logger.debug('getClaimsForTask: created claim: %s', claim) logger.debug('getClaimsForTask: created claim: %s', claim)
claims.extend(claim) claims.extend(claim)
break break
...@@ -612,7 +631,7 @@ class ResourceAssigner(): ...@@ -612,7 +631,7 @@ class ResourceAssigner():
for rid in res_group['resource_ids']: for rid in res_group['resource_ids']:
type_id = db_resource_list[rid]['type_id'] type_id = db_resource_list[rid]['type_id']
if type_id in needed_resources_by_type_id and db_resource_list[rid]['active'] and \ if type_id in needed_resources_by_type_id and db_resource_list[rid]['active'] and \
db_resource_list[rid]['available_capacity'] > 0: # <-- capacity not available for e.g. rcus db_resource_list[rid]['available_capacity'] > 0:
resources[type_id] = db_resource_list[rid] resources[type_id] = db_resource_list[rid]
type_ids_seen.add(type_id) type_ids_seen.add(type_id)
...@@ -633,7 +652,7 @@ class ResourceAssigner(): ...@@ -633,7 +652,7 @@ class ResourceAssigner():
return all(claim_size <= claimable_resources[res_type]['available_capacity'] \ return all(claim_size <= claimable_resources[res_type]['available_capacity'] \
for res_type, claim_size in needed_resources.items()) for res_type, claim_size in needed_resources.items())
def makeClaim(self, db_resource_prop_types, db_storage_type_id, task, properties, def makeClaim(self, db_resource_prop_types, db_storage_type_id, db_rcu_type_id, task, properties,
needed_resources, claimable_resources): needed_resources, claimable_resources):
""" Returns list of claims for a data product (one for each needed resource type). """ Returns list of claims for a data product (one for each needed resource type).
Note: this function also updates claimable_resources. Note: this function also updates claimable_resources.
...@@ -643,6 +662,15 @@ class ResourceAssigner(): ...@@ -643,6 +662,15 @@ class ResourceAssigner():
claims = [] claims = []
for res_type, claim_size in needed_resources.items(): for res_type, claim_size in needed_resources.items():
# RCU claim size as returned by the ResourceEstimator is actually a bit pattern (encoding which of a
# station's RCUs are requested to take part in a measurement and which not). In order to have it countable
# (as is expected of a claim size) it needs to be replaced with the number of RCUs requested. Subsequently,
# the bit pattern information is stored with the claim separately
if res_type == db_rcu_type_id:
rcu_bit_pattern = needed_resources[db_rcu_type_id] # TODO: determine if .copy() is needed here
claim_size = rcu_bit_pattern.count('1')
claim['rcu_bit_pattern'] = rcu_bit_pattern
claim = {'starttime': task['starttime'], 'endtime': task['endtime'], 'status': 'claimed'} claim = {'starttime': task['starttime'], 'endtime': task['endtime'], 'status': 'claimed'}
claim['resource_id'] = claimable_resources[res_type]['id'] claim['resource_id'] = claimable_resources[res_type]['id']
claim['claim_size'] = claim_size claim['claim_size'] = claim_size
...@@ -700,6 +728,7 @@ class ResourceAssigner(): ...@@ -700,6 +728,7 @@ class ResourceAssigner():
return properties return properties
# TODO: check and decide if we need to be able to merge claims based on claim[rcu_bit_pattern]
def mergeClaims(self, summablePropTypeIds, claims): def mergeClaims(self, summablePropTypeIds, claims):
""" Merge claims allocated onto the same resources. """ Merge claims allocated onto the same resources.
To merge claim properties, summablePropTypeIds is used. To merge claim properties, summablePropTypeIds is used.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment