Skip to content
Snippets Groups Projects
Commit e438f46a authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8887: further improved performance of insert claims and claim properties

parent 1cf044fc
No related branches found
No related tags found
No related merge requests found
......@@ -557,19 +557,35 @@ class RADatabase:
return list(self._executeQuery(query, qargs, fetch=_FETCH_ALL))
def insertResourceClaimProperty(self, claim_id, property_type, value, commit=False):
if property_type and isinstance(property_type, basestring):
#convert property_type string to id
property_type = self.getResourceClaimPropertyTypeId(property_type)
return self.insertResourceClaimProperties(claim_id, [(property_type, value)], commit)
def insertResourceClaimProperties(self, claim_id, type_value_pairs, commit=False):
type_strings = set([tv[0] for tv in type_value_pairs if isinstance(tv[0], basestring)])
if type_strings:
# convert all type strings to id's
type_string2id = {t:self.getResourceClaimPropertyTypeId(t) for t in type_strings}
for tv in type_value_pairs:
if isinstance(tv[0], basestring):
tv[0] = type_string2id[tv[0]]
insert_values = ','.join(self.cursor.mogrify('(%s, %s, %s)', (claim_id, tv[0], tv[1])) for tv in type_value_pairs)
query = '''INSERT INTO resource_allocation.resource_claim_property
(resource_claim_id, type_id, value)
VALUES (%s, %s, %s)
RETURNING id;'''
VALUES {values}
RETURNING id;'''.format(values=insert_values)
ids = [x['id'] for x in self._executeQuery(query, fetch=_FETCH_ALL)]
if [x for x in ids if x < 0]:
logger.error("One or more properties could not be inserted. Rolling back.")
self.rollback()
return None
id = self._executeQuery(query, (claim_id, property_type, value), fetch=_FETCH_ONE)['id']
if commit:
self.commit()
return id
return ids
def getResourceClaims(self, claim_ids=None, lower_bound=None, upper_bound=None, resource_ids=None, task_ids=None, status=None, resource_type=None, extended=False):
extended |= resource_type is not None
......@@ -651,9 +667,7 @@ class RADatabase:
if conditions:
query += ' WHERE ' + ' AND '.join(conditions)
result = self._executeQuery(query, qargs, fetch=_FETCH_ALL)
result = list(result)
return result
return list(self._executeQuery(query, qargs, fetch=_FETCH_ALL))
def getResourceClaim(self, id):
query = '''SELECT * from resource_allocation.resource_claim_view rcv
......@@ -664,65 +678,82 @@ class RADatabase:
return dict(result) if result else None
def insertResourceClaim(self, resource_id, task_id, starttime, endtime, status, session_id, claim_size, username, user_id, properties=None, validate=True, commit=True):
if status and isinstance(status, basestring):
#convert status string to status.id
status = self.getResourceClaimStatusId(status)
query = '''INSERT INTO resource_allocation.resource_claim
(resource_id, task_id, starttime, endtime, status_id, session_id, claim_size, username, user_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id;'''
id = self._executeQuery(query, (resource_id, task_id, starttime, endtime, status, session_id, claim_size, username, user_id), fetch=_FETCH_ONE)['id']
try:
property_ids = set()
for p in properties:
property_ids.add(self.insertResourceClaimProperty(id, p['type'], p['value'], False))
if [x for x in property_ids if x < 0]:
logger.error("One or more properties cloud not be inserted. Rolling back.")
self.rollback()
return -1
except KeyError as ke:
logger.error(ke)
self.rollback()
return -1
if validate:
insertedClaim = self.getResourceClaim(id)
self.validateResourceClaimsStatus([insertedClaim], False)
if commit:
self.commit()
return id
# for code re-use:
# put the one claim in a list
# and do a bulk insert of the one-item-list
claim = {'resource_id':resource_id,
'starttime':starttime,
'endtime':endtime,
'status':status,
'claim_size':1,
'properties':properties}
result = self.insertResourceClaims(task_id, [claim], session_id, username, user_id, commit)
if result:
return result[0]
def insertResourceClaims(self, task_id, claims, session_id, username, user_id, commit=True):
'''bulk insert of resource claims for a task
claims is a list of dicts. Each dict is a claim for one resource containing the fields: starttime, endtime, status, claim_size
'''
status_strings = set([c['status'] for c in claims if isinstance(c['status'], basestring)])
if status_strings:
status_string2id = {s:self.getResourceClaimStatusId(s) for s in status_strings}
claimIds = set()
for c in claims:
id = self.insertResourceClaim(c['resource_id'], task_id, c['starttime'], c['endtime'],
status_string2id[c['status']], session_id, c['claim_size'],
username, user_id, c.get('properties'), False, False)
claimIds.add(id)
if isinstance(c['status'], basestring):
c['status'] = status_string2id[c['status']]
claim_values = [(c['resource_id'], task_id, c['starttime'], c['endtime'],
c['status'], session_id, c['claim_size'],
username, i) for i, c in enumerate(claims)]
# use psycopg2 mogrify to parse and build the insert values
# this way we can insert many values in one insert query, returning the id's of each inserted row.
# this is much faster than psycopg2's insertMany method
insert_values = ','.join(self.cursor.mogrify('(%s, %s, %s, %s, %s, %s, %s, %s, %s)', cv) for cv in claim_values)
query = '''INSERT INTO resource_allocation.resource_claim
(resource_id, task_id, starttime, endtime, status_id, session_id, claim_size, username, user_id)
VALUES {values}
RETURNING id;'''.format(values=insert_values)
claimIds = [x['id'] for x in self._executeQuery(query, fetch=_FETCH_ALL)]
if [x for x in claimIds if x < 0]:
logger.error("One or more claims cloud not be inserted. Rolling back.")
self.rollback()
return {'inserted': False, 'resource_claim_ids': None }
return None
# map properties as tuple of claims with properties to claim_id
claimId2Props = { claim_id: [(p['type'], p['value']) for p in claim['properties']]
for claim_id, claim
in zip(claimIds, claims)
if 'properties' in claim }
# convert all type strings to id's
# this saves a lot of lookup queries in the db
prop_type_strings = set(p[0] for p in claimId2Props.values() if isinstance(p[0], basestring))
type_string2id = {t:self.getResourceClaimPropertyTypeId(t) for t in prop_type_strings}
for p in claimId2Props.values():
if isinstance(p[0], basestring):
p[0] = type_string2id[p[0]]
# and insert all properties for each claim with properties
for claim_id, props in claimId2Props.items():
property_ids = self.insertResourceClaimProperties(claim_id, props, False)
if property_ids == None:
return None
# get the claims as they were inserted
# and validate them against all other claims
insertedClaims = self.getResourceClaims(claim_ids=claimIds)
self.validateResourceClaimsStatus(insertedClaims, False)
if commit:
self.commit()
return {'inserted': True, 'resource_claim_ids': claimIds }
return claimIds
def deleteResourceClaim(self, resource_claim_id, commit=True):
query = '''DELETE FROM resource_allocation.resource_claim
......@@ -997,9 +1028,21 @@ if __name__ == '__main__':
#print
#print db.getResourceClaims()
for s in db.getSpecifications():
db.deleteSpecification(s['id'])
#c = db.cursor
#query = '''INSERT INTO resource_allocation.resource_claim
#(resource_id, task_id, starttime, endtime, status_id, session_id, claim_size, username, user_id)
#VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
#RETURNING id;'''
#print c.mogrify(query, [(0, 0, datetime.utcnow(), datetime.utcnow(), 200, 1, 1, 'piet', 1)])
#exit(0)
from lofar.common.datetimeutils import totalSeconds
begin = datetime.utcnow()
for i in range(15):
......@@ -1018,7 +1061,7 @@ if __name__ == '__main__':
'status':'claimed',
'claim_size':1} for r in resources[:]]
for c in claims:
for c in claims[:2]:
c['properties'] = [{'type':0, 'value':10}, {'type':1, 'value':20}, {'type':2, 'value':30}]
db.insertResourceClaims(task['id'], claims, 1, 'paulus', 1, False)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment