diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index 2f35a309680bb687ebba9d20de8ad6f1b6853faa..44f02dccadc68e87599bf4a0c4ca0d64f9f85cc0 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -105,35 +105,38 @@ class ResourceAssigner(): mainParset = parameterset(mainParsetDict) momId = mainParset.getInt('Observation.momID', -1) taskType = mainParset.getString('Task.type', '') + if taskType.lower() == 'observation': + taskType = 'Observation' startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S') endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S') - #check if task already present in radb - existingTask = self.radbrpc.getTask(otdb_id=sasId) + ##check if task already present in radb + #existingTask = self.radbrpc.getTask(otdb_id=sasId) - if existingTask: - #present, so update task and specification in radb - taskId = existingTask['id'] - specificationId = existingTask['specification_id'] - self.radbrpc.updateSpecification(specificationId, startTime, endTime, str(mainParsetDict)) - self.radbrpc.updateTask(taskId, momId, sasId, status, taskType, specificationId) - else: - #insert new task and specification in the radb - specificationId = self.radbrpc.insertSpecification(startTime, endTime, str(mainParsetDict))['id'] - taskId = self.radbrpc.insertTask(momId, sasId, status, taskType, specificationId)['id'] + #if existingTask: + ##present, so update task and specification in radb + #taskId = existingTask['id'] + #specificationId = existingTask['specification_id'] + #self.radbrpc.updateSpecification(specificationId, startTime, endTime, str(mainParsetDict)) + #self.radbrpc.updateTask(taskId, momId, sasId, status, taskType, specificationId) + #else: + #insert new task and specification in the radb + specificationId = self.radbrpc.insertSpecification(startTime, endTime, str(mainParsetDict))['id'] + taskId = self.radbrpc.insertTask(momId, sasId, status, taskType, specificationId)['id'] #analyze the parset for needed and available resources and claim these in the radb cluster = self.parseSpecification(mainParset) - needed = self.getNeededResouces(mainParset) available = self.getAvailableResources(cluster) - #if checkResources(needed, available): - #result = claimResources(needed) - #if result.success: - #commitResources(result.id) - ##SetTaskToSCHEDULED(Task.) - #else: - ##SetTaskToCONFLICT(Task.) - #pass + + needed = self.getNeededResouces(mainParset) + + if self.checkResources(needed, available): + claimed, resourceIds = self.claimResources(needed, taskId, startTime, endTime) + if claimed: + self.commitResourceClaimsForTask(taskId) + self.radbrpc.updateTask(taskId, status='scheduled') + else: + self.radbrpc.updateTask(taskId, status='conflict') def parseSpecification(self, parset): # TODO: cluster is not part of specification yet. For now return CEP4. Add logic later. @@ -142,8 +145,11 @@ class ResourceAssigner(): return cluster def getNeededResouces(self, parset): - replymessage, status = self.rerpc(parset.dict()) - print replymessage + replymessage, status = self.rerpc(parset.dict(), timeout=10) + logger.info('getNeededResouces: %s' % replymessage) + stations = parset.getStringVector('Observation.VirtualInstrument.stationList', '') + logger.info('Stations: %s' % stations) + return replymessage def getAvailableResources(self, cluster): # Used settings @@ -180,9 +186,24 @@ class ResourceAssigner(): def checkResources(self, needed, available): return True - def claimResources(self, needed): - rarpc.InsertTask() + def claimResources(self, resources, taskId, startTime, endTime): + #TEMP HACK + cep4storage = resources['Observation']['total_data_size'] + resources = dict() + resources['cep4storage'] = cep4storage + + resourceNameDict = {r['name']:r for r in self.radbrpc.getResources()} + claimedStatusId = next(x['id'] for x in self.radbrpc.getResourceClaimStatuses() if x['name'].lower() == 'claimed') + + resourceClaimIds = [] + + for r in resources: + if r in resourceNameDict: + resourceClaimIds.append(self.radbrpc.insertResourceClaim(resourceNameDict[r]['id'], taskId, startTime, endTime, claimedStatusId, 1, -1, 'anonymous', -1)) + + success = len(resourceClaimIds) == len(resources) + return success, resourceClaimIds - def commitResources(self, result_id): - pass + def commitResourceClaimsForTask(self, taskId): + self.radbrpc.updateResourceClaimsForTask(taskId, status='ALLOCATED')