Skip to content
Snippets Groups Projects
Commit b25dbe90 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8887: initial working logic for taskspeficied to task/resources claimed...

Task #8887: initial working logic for taskspeficied to task/resources claimed in radb. Still quite hacky.
parent 293a24fc
No related branches found
No related tags found
No related merge requests found
......@@ -105,35 +105,38 @@ class ResourceAssigner():
mainParset = parameterset(mainParsetDict)
momId = mainParset.getInt('Observation.momID', -1)
taskType = mainParset.getString('Task.type', '')
if taskType.lower() == 'observation':
taskType = 'Observation'
startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S')
endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S')
#check if task already present in radb
existingTask = self.radbrpc.getTask(otdb_id=sasId)
##check if task already present in radb
#existingTask = self.radbrpc.getTask(otdb_id=sasId)
if existingTask:
#present, so update task and specification in radb
taskId = existingTask['id']
specificationId = existingTask['specification_id']
self.radbrpc.updateSpecification(specificationId, startTime, endTime, str(mainParsetDict))
self.radbrpc.updateTask(taskId, momId, sasId, status, taskType, specificationId)
else:
#insert new task and specification in the radb
specificationId = self.radbrpc.insertSpecification(startTime, endTime, str(mainParsetDict))['id']
taskId = self.radbrpc.insertTask(momId, sasId, status, taskType, specificationId)['id']
#if existingTask:
##present, so update task and specification in radb
#taskId = existingTask['id']
#specificationId = existingTask['specification_id']
#self.radbrpc.updateSpecification(specificationId, startTime, endTime, str(mainParsetDict))
#self.radbrpc.updateTask(taskId, momId, sasId, status, taskType, specificationId)
#else:
#insert new task and specification in the radb
specificationId = self.radbrpc.insertSpecification(startTime, endTime, str(mainParsetDict))['id']
taskId = self.radbrpc.insertTask(momId, sasId, status, taskType, specificationId)['id']
#analyze the parset for needed and available resources and claim these in the radb
cluster = self.parseSpecification(mainParset)
needed = self.getNeededResouces(mainParset)
available = self.getAvailableResources(cluster)
#if checkResources(needed, available):
#result = claimResources(needed)
#if result.success:
#commitResources(result.id)
##SetTaskToSCHEDULED(Task.)
#else:
##SetTaskToCONFLICT(Task.)
#pass
needed = self.getNeededResouces(mainParset)
if self.checkResources(needed, available):
claimed, resourceIds = self.claimResources(needed, taskId, startTime, endTime)
if claimed:
self.commitResourceClaimsForTask(taskId)
self.radbrpc.updateTask(taskId, status='scheduled')
else:
self.radbrpc.updateTask(taskId, status='conflict')
def parseSpecification(self, parset):
# TODO: cluster is not part of specification yet. For now return CEP4. Add logic later.
......@@ -142,8 +145,11 @@ class ResourceAssigner():
return cluster
def getNeededResouces(self, parset):
replymessage, status = self.rerpc(parset.dict())
print replymessage
replymessage, status = self.rerpc(parset.dict(), timeout=10)
logger.info('getNeededResouces: %s' % replymessage)
stations = parset.getStringVector('Observation.VirtualInstrument.stationList', '')
logger.info('Stations: %s' % stations)
return replymessage
def getAvailableResources(self, cluster):
# Used settings
......@@ -180,9 +186,24 @@ class ResourceAssigner():
def checkResources(self, needed, available):
return True
def claimResources(self, needed):
rarpc.InsertTask()
def claimResources(self, resources, taskId, startTime, endTime):
#TEMP HACK
cep4storage = resources['Observation']['total_data_size']
resources = dict()
resources['cep4storage'] = cep4storage
resourceNameDict = {r['name']:r for r in self.radbrpc.getResources()}
claimedStatusId = next(x['id'] for x in self.radbrpc.getResourceClaimStatuses() if x['name'].lower() == 'claimed')
resourceClaimIds = []
for r in resources:
if r in resourceNameDict:
resourceClaimIds.append(self.radbrpc.insertResourceClaim(resourceNameDict[r]['id'], taskId, startTime, endTime, claimedStatusId, 1, -1, 'anonymous', -1))
success = len(resourceClaimIds) == len(resources)
return success, resourceClaimIds
def commitResources(self, result_id):
pass
def commitResourceClaimsForTask(self, taskId):
self.radbrpc.updateResourceClaimsForTask(taskId, status='ALLOCATED')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment