diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py index 69828b7aff8b3522c3f7c6dddac2666c740d844c..a7d304dafd59710d6145bd03216f68b4f12885bd 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py @@ -557,19 +557,35 @@ class RADatabase: return list(self._executeQuery(query, qargs, fetch=_FETCH_ALL)) def insertResourceClaimProperty(self, claim_id, property_type, value, commit=False): - if property_type and isinstance(property_type, basestring): - #convert property_type string to id - property_type = self.getResourceClaimPropertyTypeId(property_type) + return self.insertResourceClaimProperties(claim_id, [(property_type, value)], commit) + + def insertResourceClaimProperties(self, claim_id, type_value_pairs, commit=False): + type_strings = set([tv[0] for tv in type_value_pairs if isinstance(tv[0], basestring)]) + + if type_strings: + # convert all type strings to id's + type_string2id = {t:self.getResourceClaimPropertyTypeId(t) for t in type_strings} + for tv in type_value_pairs: + if isinstance(tv[0], basestring): + tv[0] = type_string2id[tv[0]] + + insert_values = ','.join(self.cursor.mogrify('(%s, %s, %s)', (claim_id, tv[0], tv[1])) for tv in type_value_pairs) query = '''INSERT INTO resource_allocation.resource_claim_property (resource_claim_id, type_id, value) - VALUES (%s, %s, %s) - RETURNING id;''' + VALUES {values} + RETURNING id;'''.format(values=insert_values) + + ids = [x['id'] for x in self._executeQuery(query, fetch=_FETCH_ALL)] + + if [x for x in ids if x < 0]: + logger.error("One or more properties could not be inserted. Rolling back.") + self.rollback() + return None - id = self._executeQuery(query, (claim_id, property_type, value), fetch=_FETCH_ONE)['id'] if commit: self.commit() - return id + return ids def getResourceClaims(self, claim_ids=None, lower_bound=None, upper_bound=None, resource_ids=None, task_ids=None, status=None, resource_type=None, extended=False): extended |= resource_type is not None @@ -651,9 +667,7 @@ class RADatabase: if conditions: query += ' WHERE ' + ' AND '.join(conditions) - result = self._executeQuery(query, qargs, fetch=_FETCH_ALL) - result = list(result) - return result + return list(self._executeQuery(query, qargs, fetch=_FETCH_ALL)) def getResourceClaim(self, id): query = '''SELECT * from resource_allocation.resource_claim_view rcv @@ -664,65 +678,82 @@ class RADatabase: return dict(result) if result else None def insertResourceClaim(self, resource_id, task_id, starttime, endtime, status, session_id, claim_size, username, user_id, properties=None, validate=True, commit=True): - if status and isinstance(status, basestring): - #convert status string to status.id - status = self.getResourceClaimStatusId(status) - - query = '''INSERT INTO resource_allocation.resource_claim - (resource_id, task_id, starttime, endtime, status_id, session_id, claim_size, username, user_id) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) - RETURNING id;''' - - id = self._executeQuery(query, (resource_id, task_id, starttime, endtime, status, session_id, claim_size, username, user_id), fetch=_FETCH_ONE)['id'] - - try: - property_ids = set() - for p in properties: - property_ids.add(self.insertResourceClaimProperty(id, p['type'], p['value'], False)) - - if [x for x in property_ids if x < 0]: - logger.error("One or more properties cloud not be inserted. Rolling back.") - self.rollback() - return -1 - except KeyError as ke: - logger.error(ke) - self.rollback() - return -1 - - if validate: - insertedClaim = self.getResourceClaim(id) - self.validateResourceClaimsStatus([insertedClaim], False) - if commit: - self.commit() - return id + # for code re-use: + # put the one claim in a list + # and do a bulk insert of the one-item-list + claim = {'resource_id':resource_id, + 'starttime':starttime, + 'endtime':endtime, + 'status':status, + 'claim_size':1, + 'properties':properties} + + result = self.insertResourceClaims(task_id, [claim], session_id, username, user_id, commit) + if result: + return result[0] def insertResourceClaims(self, task_id, claims, session_id, username, user_id, commit=True): '''bulk insert of resource claims for a task claims is a list of dicts. Each dict is a claim for one resource containing the fields: starttime, endtime, status, claim_size ''' status_strings = set([c['status'] for c in claims if isinstance(c['status'], basestring)]) - status_string2id = {s:self.getResourceClaimStatusId(s) for s in status_strings} + if status_strings: + status_string2id = {s:self.getResourceClaimStatusId(s) for s in status_strings} + for c in claims: + if isinstance(c['status'], basestring): + c['status'] = status_string2id[c['status']] + + claim_values = [(c['resource_id'], task_id, c['starttime'], c['endtime'], + c['status'], session_id, c['claim_size'], + username, i) for i, c in enumerate(claims)] - claimIds = set() - for c in claims: - id = self.insertResourceClaim(c['resource_id'], task_id, c['starttime'], c['endtime'], - status_string2id[c['status']], session_id, c['claim_size'], - username, user_id, c.get('properties'), False, False) - claimIds.add(id) + # use psycopg2 mogrify to parse and build the insert values + # this way we can insert many values in one insert query, returning the id's of each inserted row. + # this is much faster than psycopg2's insertMany method + insert_values = ','.join(self.cursor.mogrify('(%s, %s, %s, %s, %s, %s, %s, %s, %s)', cv) for cv in claim_values) + + query = '''INSERT INTO resource_allocation.resource_claim + (resource_id, task_id, starttime, endtime, status_id, session_id, claim_size, username, user_id) + VALUES {values} + RETURNING id;'''.format(values=insert_values) + + claimIds = [x['id'] for x in self._executeQuery(query, fetch=_FETCH_ALL)] if [x for x in claimIds if x < 0]: logger.error("One or more claims cloud not be inserted. Rolling back.") self.rollback() - return {'inserted': False, 'resource_claim_ids': None } - + return None + + # map properties as tuple of claims with properties to claim_id + claimId2Props = { claim_id: [(p['type'], p['value']) for p in claim['properties']] + for claim_id, claim + in zip(claimIds, claims) + if 'properties' in claim } + + # convert all type strings to id's + # this saves a lot of lookup queries in the db + prop_type_strings = set(p[0] for p in claimId2Props.values() if isinstance(p[0], basestring)) + type_string2id = {t:self.getResourceClaimPropertyTypeId(t) for t in prop_type_strings} + for p in claimId2Props.values(): + if isinstance(p[0], basestring): + p[0] = type_string2id[p[0]] + + # and insert all properties for each claim with properties + for claim_id, props in claimId2Props.items(): + property_ids = self.insertResourceClaimProperties(claim_id, props, False) + if property_ids == None: + return None + + # get the claims as they were inserted + # and validate them against all other claims insertedClaims = self.getResourceClaims(claim_ids=claimIds) self.validateResourceClaimsStatus(insertedClaims, False) if commit: self.commit() - return {'inserted': True, 'resource_claim_ids': claimIds } + return claimIds def deleteResourceClaim(self, resource_claim_id, commit=True): query = '''DELETE FROM resource_allocation.resource_claim @@ -997,9 +1028,21 @@ if __name__ == '__main__': #print #print db.getResourceClaims() + for s in db.getSpecifications(): db.deleteSpecification(s['id']) + + #c = db.cursor + #query = '''INSERT INTO resource_allocation.resource_claim + #(resource_id, task_id, starttime, endtime, status_id, session_id, claim_size, username, user_id) + #VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) + #RETURNING id;''' + + #print c.mogrify(query, [(0, 0, datetime.utcnow(), datetime.utcnow(), 200, 1, 1, 'piet', 1)]) + #exit(0) + + from lofar.common.datetimeutils import totalSeconds begin = datetime.utcnow() for i in range(15): @@ -1018,7 +1061,7 @@ if __name__ == '__main__': 'status':'claimed', 'claim_size':1} for r in resources[:]] - for c in claims: + for c in claims[:2]: c['properties'] = [{'type':0, 'value':10}, {'type':1, 'value':20}, {'type':2, 'value':30}] db.insertResourceClaims(task['id'], claims, 1, 'paulus', 1, False)