Skip to content
Snippets Groups Projects
Commit b7d3f9c9 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8887: modified getResourceClaims so we can query for time-window,...

Task #8887: modified getResourceClaims so we can query for time-window, status, resource_type or task_id
parent b2ea66cc
No related branches found
No related tags found
No related merge requests found
...@@ -25,7 +25,7 @@ TODO: documentation ...@@ -25,7 +25,7 @@ TODO: documentation
import logging import logging
import psycopg2 import psycopg2
import psycopg2.extras import psycopg2.extras
import datetime from datetime import datetime, timedelta
import time import time
from optparse import OptionParser from optparse import OptionParser
from lofar.common import dbcredentials from lofar.common import dbcredentials
...@@ -505,10 +505,53 @@ class RADatabase: ...@@ -505,10 +505,53 @@ class RADatabase:
return result return result
def getResourceClaims(self): def getResourceClaims(self, lower_bound=None, upper_bound=None, task_id=None, status=None, resource_type=None, extended=False):
query = '''SELECT * from resource_allocation.resource_claim_view''' extended |= resource_type is not None
query = '''SELECT * from %s''' % ('resource_allocation.resource_claim_extended_view' if extended else 'resource_allocation.resource_claim_view')
return list(self._executeQuery(query, fetch=_FETCH_ALL)) if lower_bound and not isinstance(lower_bound, datetime):
lower_bound = None
if upper_bound and not isinstance(upper_bound, datetime):
upper_bound = None
if status is not None and isinstance(status, basestring):
#convert status string to status.id
status = self.getResourceClaimStatusId(status)
if resource_type is not None and isinstance(resource_type, basestring):
#convert resource_type string to resource_type.id
resource_type = self.getResourceTypeId(resource_type)
qargs = None
if lower_bound or upper_bound or task_id is not None or status is not None or resource_type is not None:
conditions = []
qargs = []
if lower_bound:
conditions.append('endtime >= %s')
qargs.append(lower_bound)
if upper_bound:
conditions.append('starttime <= %s')
qargs.append(upper_bound)
if task_id is not None:
conditions.append('task_id = %s')
qargs.append(task_id)
if status is not None:
conditions.append('status_id = %s')
qargs.append(status)
if resource_type is not None and extended:
conditions.append('resource_type_id = %s')
qargs.append(resource_type)
query += ' WHERE ' + ' AND '.join(conditions)
return list(self._executeQuery(query, qargs, fetch=_FETCH_ALL))
def getResourceClaim(self, id): def getResourceClaim(self, id):
query = '''SELECT * from resource_allocation.resource_claim_view rcv query = '''SELECT * from resource_allocation.resource_claim_view rcv
...@@ -622,11 +665,6 @@ class RADatabase: ...@@ -622,11 +665,6 @@ class RADatabase:
return self.cursor.rowcount > 0 return self.cursor.rowcount > 0
def getResourceClaimsForTask(self, task_id):
query = '''SELECT * from resource_allocation.resource_claim_view
WHERE resource_allocation.resource_claim_view.task_id = %s'''
return list(self._executeQuery(query, [task_id], fetch=_FETCH_ALL))
def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, task_status=None, claim_status=None, session_id=None, username=None, user_id=None, commit=True): def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, task_status=None, claim_status=None, session_id=None, username=None, user_id=None, commit=True):
if claim_status and isinstance(claim_status, basestring): if claim_status and isinstance(claim_status, basestring):
...@@ -729,7 +767,19 @@ if __name__ == '__main__': ...@@ -729,7 +767,19 @@ if __name__ == '__main__':
print '\n-- ' + str(method.__name__) + ' --' print '\n-- ' + str(method.__name__) + ' --'
print '\n'.join([str(x) for x in method()]) print '\n'.join([str(x) for x in method()])
print db.getResourceClaims()
print
print db.getResourceClaims(task_id=440)
print
print db.getResourceClaims(lower_bound=datetime.utcnow() + timedelta(days=9))
print
print db.getResourceClaims(upper_bound=datetime.utcnow() + timedelta(days=19))
print
print db.getResourceClaims(status='allocated')
print
print db.getResourceClaims(status='claimed')
print
print db.getResourceClaims(resource_type='storage')
#resultPrint(db.getTaskStatuses) #resultPrint(db.getTaskStatuses)
#resultPrint(db.getTaskStatusNames) #resultPrint(db.getTaskStatusNames)
#resultPrint(db.getTaskTypes) #resultPrint(db.getTaskTypes)
...@@ -754,27 +804,27 @@ if __name__ == '__main__': ...@@ -754,27 +804,27 @@ if __name__ == '__main__':
#for s in db.getSpecifications(): #for s in db.getSpecifications():
#db.deleteSpecification()) #db.deleteSpecification())
result = db.insertSpecificationAndTask(1234, 5678, 600, 0, datetime.datetime.utcnow(), datetime.datetime.utcnow() + datetime.timedelta(hours=1), "", False) #result = db.insertSpecificationAndTask(1234, 5678, 600, 0, datetime.utcnow(), datetime.utcnow() + timedelta(hours=1), "", False)
print result #print result
resultPrint(db.getSpecifications) #resultPrint(db.getSpecifications)
resultPrint(db.getTasks) #resultPrint(db.getTasks)
task = db.getTask(result['task_id']) #task = db.getTask(result['task_id'])
resources = db.getResources() #resources = db.getResources()
claims = [{'resource_id':r['id'], #claims = [{'resource_id':r['id'],
'starttime':task['starttime'], #'starttime':task['starttime'],
'endtime':task['endtime'], #'endtime':task['endtime'],
'status':'claimed', #'status':'claimed',
'claim_size':1} for r in resources] #'claim_size':1} for r in resources]
db.insertResourceClaims(task['id'], claims, 1, 'paulus', 1, False) #db.insertResourceClaims(task['id'], claims, 1, 'paulus', 1, False)
resultPrint(db.getResourceClaims) #resultPrint(db.getResourceClaims)
raw_input() #raw_input()
db.commit() #db.commit()
...@@ -785,7 +835,7 @@ if __name__ == '__main__': ...@@ -785,7 +835,7 @@ if __name__ == '__main__':
#predTaskId = None #predTaskId = None
#for i in range(2): #for i in range(2):
#specId = db.insertSpecification(datetime.datetime.utcnow(), datetime.datetime.utcnow() + datetime.timedelta(hours=4), "") #specId = db.insertSpecification(datetime.utcnow(), datetime.utcnow() + timedelta(hours=4), "")
#taskId = db.insertTask(1234+i, 5678+i, 600, 0, specId) #taskId = db.insertTask(1234+i, 5678+i, 600, 0, specId)
#if predTaskId: #if predTaskId:
...@@ -794,7 +844,7 @@ if __name__ == '__main__': ...@@ -794,7 +844,7 @@ if __name__ == '__main__':
#resources = db.getResources() #resources = db.getResources()
#for r in resources: #for r in resources:
#rcId = db.insertResourceClaim(r['id'], taskId, datetime.datetime.utcnow() + datetime.timedelta(hours=2*i), datetime.datetime.utcnow() + datetime.timedelta(hours=2*(i+1)), 0, 1, 10, 'einstein', -1) #rcId = db.insertResourceClaim(r['id'], taskId, datetime.utcnow() + timedelta(hours=2*i), datetime.utcnow() + timedelta(hours=2*(i+1)), 0, 1, 10, 'einstein', -1)
##tasks = db.getTasks() ##tasks = db.getTasks()
...@@ -808,7 +858,7 @@ if __name__ == '__main__': ...@@ -808,7 +858,7 @@ if __name__ == '__main__':
##for i in range(1): ##for i in range(1):
##taskId = db.insertTask(1234, 5678, 600, 0, 1) ##taskId = db.insertTask(1234, 5678, 600, 0, 1)
##for j in range(2*i): ##for j in range(2*i):
##rcId = db.insertResourceClaim(j, taskId, datetime.datetime.utcnow() + datetime.timedelta(hours=4*i), datetime.datetime.utcnow() + datetime.timedelta(hours=4*i+3.5), 0, 4, 10, 'einstein', -1) ##rcId = db.insertResourceClaim(j, taskId, datetime.utcnow() + timedelta(hours=4*i), datetime.utcnow() + timedelta(hours=4*i+3.5), 0, 4, 10, 'einstein', -1)
##time.sleep(0.5) ##time.sleep(0.5)
......
...@@ -27,8 +27,13 @@ class RARPC(RPCWrapper): ...@@ -27,8 +27,13 @@ class RARPC(RPCWrapper):
def getResourceClaimStatuses(self): def getResourceClaimStatuses(self):
return self.rpc('GetResourceClaimStatuses') return self.rpc('GetResourceClaimStatuses')
def getResourceClaims(self): def getResourceClaims(self, lower_bound=None, upper_bound=None, task_id=None, status=None, resource_type=None, extended=False):
claims = self.rpc('GetResourceClaims') claims = self.rpc('GetResourceClaims', lower_bound=lower_bound,
upper_bound=upper_bound,
task_id=task_id,
status=status,
resource_type=resource_type,
extended=extended)
for claim in claims: for claim in claims:
claim['starttime'] = claim['starttime'].datetime() claim['starttime'] = claim['starttime'].datetime()
claim['endtime'] = claim['endtime'].datetime() claim['endtime'] = claim['endtime'].datetime()
......
...@@ -78,8 +78,13 @@ class RADBHandler(MessageHandlerInterface): ...@@ -78,8 +78,13 @@ class RADBHandler(MessageHandlerInterface):
def _getResourceClaimStatuses(self): def _getResourceClaimStatuses(self):
return self.radb.getResourceClaimStatuses() return self.radb.getResourceClaimStatuses()
def _getResourceClaims(self): def _getResourceClaims(self, **kwargs):
return self.radb.getResourceClaims() return self.radb.getResourceClaims(lower_bound=kwargs.get('lower_bound'),
upper_bound=kwargs.get('upper_bound'),
task_id=kwargs.get('task_id'),
status=kwargs.get('status'),
resource_type=kwargs.get('resource_type'),
extended=kwargs.get('extended', False))
def _getResourceClaim(self, **kwargs): def _getResourceClaim(self, **kwargs):
claim = self.radb.getResourceClaim(kwargs['id']) claim = self.radb.getResourceClaim(kwargs['id'])
...@@ -122,7 +127,7 @@ class RADBHandler(MessageHandlerInterface): ...@@ -122,7 +127,7 @@ class RADBHandler(MessageHandlerInterface):
return {'id': id, 'updated': updated} return {'id': id, 'updated': updated}
def _getResourceClaimsForTask(self, task_id): def _getResourceClaimsForTask(self, task_id):
claims = self.radb.getResourceClaimsForTask(task_id) claims = self.radb.getResourceClaims(task_id=task_id)
return claims return claims
def _updateTaskAndResourceClaims(self, **kwargs): def _updateTaskAndResourceClaims(self, **kwargs):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment