diff --git a/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py b/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py index 30bad8597e179277c9129c1381bd3cd5c8984545..5828719360adc423abf230507c6b796d548fdcba 100644 --- a/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py +++ b/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py @@ -38,15 +38,11 @@ class OTDBtoRATaskStatusPropagator(OTDBBusListener): super(OTDBtoRATaskStatusPropagator, self).stop_listening(**kwargs) def _update_radb_task_status(self, otdb_id, task_status): - task = self.radb.getTask(otdb_id=otdb_id) + logger.info("updating task with otdb_id %s to status %s" % (otdb_id, task_status)) + result = self.radb.updateTaskStatusForOtdbId(otdb_id=otdb_id, status=task_status) - if not task: - logger.warning("Task with otdb_id %s in not present in the RADB" % otdb_id) - return - - task_id = task['id'] - logger.info("updating task %s with otdb_id %s to status %s" % (task_id, otdb_id, task_status)) - self.radb.updateTask(task_id=task_id, status=task_status) + if not result or 'updated' not in result or not result['updated']: + logger.warning("could not update task with otdb_id %s to status %s" % (otdb_id, task_status)) def onObservationPrepared(self, treeId, modificationTime): self._update_radb_task_status(treeId, 'prepared') diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py index 1993c0c4582c67d1e17cd25e83e05d4b2e5e7173..c90337f9cd15107e55a9991e637537be20850906 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py @@ -223,6 +223,22 @@ class RADatabase: self.commit() return self.cursor.rowcount > 0 + def updateTaskStatusForOtdbId(self, otdb_id, task_status, commit=True): + '''converts task_status and task_type to id's in case one and/or the other are strings''' + if task_status and isinstance(task_status, basestring): + #convert task_status string to task_status.id + task_status = self.getTaskStatusId(task_status) + + query = '''UPDATE resource_allocation.task + SET (status_id) = (%s) + WHERE resource_allocation.task.otdb_id = %s;''' + + self._executeQuery(query, [task_status, otdb_id]) + if commit: + self.commit() + + return self.cursor.rowcount > 0 + def updateTask(self, task_id, mom_id=None, otdb_id=None, task_status=None, task_type=None, specification_id=None, commit=True): task_status, task_type = self._convertTaskTypeAndStatusToIds(task_status, task_type) @@ -1310,26 +1326,27 @@ class RADatabase: for resource in resources: resource_id = resource['id'] - resource_usages = all_usages[resource_id] - # copy resource capacities - for item in ['total_capacity', 'available_capacity', 'used_capacity']: - try: - resource_usages[item] = 0 - if item in resource: - resource_usages[item] = resource[item] - if item == 'used_capacity': - # and compute unaccounted-for usage, - # which is the actual used_capacity minus the currently allocated total claim size - # defaults to used_capacity if no currently allocated total claim size - resource_usages['misc_used_capacity'] = resource['used_capacity'] - utcnow = datetime.utcnow() - allocated_usages = resource_usages['usages'].get('allocated', []) - past_allocated_usages = sorted([au for au in allocated_usages if au['timestamp'] <= utcnow]) - if past_allocated_usages: - currently_allocated_usage = past_allocated_usages[-1] - resource_usages['misc_used_capacity'] = resource['used_capacity'] - currently_allocated_usage['value'] - except Exception as e: - logger.error(e) + if resource_id in all_usages: + resource_usages = all_usages[resource_id] + # copy resource capacities + for item in ['total_capacity', 'available_capacity', 'used_capacity']: + try: + resource_usages[item] = 0 + if item in resource: + resource_usages[item] = resource[item] + if item == 'used_capacity': + # and compute unaccounted-for usage, + # which is the actual used_capacity minus the currently allocated total claim size + # defaults to used_capacity if no currently allocated total claim size + resource_usages['misc_used_capacity'] = resource['used_capacity'] + utcnow = datetime.utcnow() + allocated_usages = resource_usages['usages'].get('allocated', []) + past_allocated_usages = sorted([au for au in allocated_usages if au['timestamp'] <= utcnow]) + if past_allocated_usages: + currently_allocated_usage = past_allocated_usages[-1] + resource_usages['misc_used_capacity'] = resource['used_capacity'] - currently_allocated_usage['value'] + except Exception as e: + logger.error(e) all_usages_list = all_usages.values() return all_usages_list diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py b/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py index 7365ca8b1d71a98f67484eb57488f81321b47adf..996db5835d23c92a188c6cfba6797f9a22e2f1ee 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py +++ b/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py @@ -169,6 +169,11 @@ class RARPC(RPCWrapper): task_type=task_type, specification_id=specification_id) + def updateTaskStatusForOtdbId(self, otdb_id, status): + return self.rpc('UpdateTaskStatusForOtdbId', + otdb_id=otdb_id, + status=status) + def getTasks(self): tasks = self.rpc('GetTasks') for task in tasks: diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/service.py b/SAS/ResourceAssignment/ResourceAssignmentService/service.py index a4d1a9d67608400f7c8ab1f23b35ddb5e3002661..f1f96a1ced3f60d32b6df0a46e9fd095bc12d354 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentService/service.py +++ b/SAS/ResourceAssignment/ResourceAssignmentService/service.py @@ -61,6 +61,7 @@ class RADBHandler(MessageHandlerInterface): 'InsertTask': self._insertTask, 'DeleteTask': self._deleteTask, 'UpdateTask': self._updateTask, + 'UpdateTaskStatusForOtdbId': self._updateTaskStatusForOtdbId, 'InsertTaskPredecessor': self._insertTaskPredecessor, 'insertTaskPredecessors': self._insertTaskPredecessors, 'GetTaskStatuses': self._getTaskStatuses, @@ -239,6 +240,13 @@ class RADBHandler(MessageHandlerInterface): deleted = self.radb.deleteTask(id) return {'id': id, 'deleted': deleted} + def _updateTaskStatusForOtdbId(self, **kwargs): + logger.info('UpdateTaskStatusForOtdbId: %s' % dict({k:v for k,v in kwargs.items() if v != None})) + otdb_id=kwargs.get('otdb_id') + updated = self.radb.updateTaskStatusForOtdbId(otdb_id=otdb_id, + task_status=kwargs.get('status_id', kwargs.get('status'))) + return {'otdb_id': otdb_id, 'updated': updated} + def _updateTask(self, **kwargs): logger.info('UpdateTask: %s' % dict({k:v for k,v in kwargs.items() if v != None})) id = kwargs['id']