diff --git a/src/ska_tango_base/base/task_queue_component_manager.py b/src/ska_tango_base/base/task_queue_component_manager.py index f7768d8343c9fea8ca1841b715f2555c1654ef50..6abcecbdfe214864559f6a63b08f593d54192b1d 100644 --- a/src/ska_tango_base/base/task_queue_component_manager.py +++ b/src/ska_tango_base/base/task_queue_component_manager.py @@ -14,7 +14,7 @@ TaskResult ********** This is a simple convenience class to parse or format the task result. The result of a task will -be made available as a Tango device attribute named `command_result`. It will be a list of 3 strings. +be made available as a Tango device attribute named `command_result`. It will be a tuple of 3 strings. 1. The unique ID Every command/task put on the queue for execution by background threads will have a unique ID. @@ -30,7 +30,7 @@ be made available as a Tango device attribute named `command_result`. It will be tr TaskResult(result_code=<ResultCode.OK: 0>, task_result='The task result', unique_id='UniqueID') tr.to_task_result() - ['UniqueID', '0', 'The task result'] + ('UniqueID', '0', 'The task result') ********* QueueTask @@ -169,7 +169,7 @@ import time import traceback from queue import Empty, Queue from threading import Event -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Dict, Optional, Tuple from attr import dataclass import tango @@ -220,13 +220,13 @@ class TaskResult: task_result: str unique_id: str - def to_task_result(self) -> List[str]: + def to_task_result(self) -> Tuple[str]: """Convert TaskResult to task_result. :return: The task result :rtype: list[str] """ - return [f"{self.unique_id}", f"{int(self.result_code)}", f"{self.task_result}"] + return (f"{self.unique_id}", f"{int(self.result_code)}", f"{self.task_result}") @classmethod def from_task_result(cls, task_result: list) -> TaskResult: @@ -381,11 +381,7 @@ class QueueManager: task.kwargs[ "update_progress_callback" ] = self._update_progress_callback - - task_result = self.execute_task(task) - result = TaskResult( - task_result[0], f"{task_result[1]}", unique_id - ) + result = self.execute_task(task, unique_id) self._result_callback(result) self._work_queue.task_done() except Empty: @@ -401,20 +397,23 @@ class QueueManager: self.current_task_progress = progress @classmethod - def execute_task(cls, task: QueueTask): + def execute_task(cls, task: QueueTask, unique_id: str) -> TaskResult: """Execute a task, return results in a standardised format. :param task: Task to execute :type task: QueueTask - :return: (ResultCode, result) - :rtype: tuple + :param unique_id: The task unique ID + :type unique_id: str + :return: The result of the task + :rtype: TaskResult """ try: - result = (ResultCode.OK, task.do()) + result = TaskResult(ResultCode.OK, f"{task.do()}", unique_id) except Exception as err: - result = ( + result = TaskResult( ResultCode.FAILED, f"Error: {err} {traceback.format_exc()}", + unique_id, ) return result @@ -448,7 +447,7 @@ class QueueManager: self.is_stopping = threading.Event() self._property_update_lock = threading.Lock() - self._task_result = [] + self._task_result = () self._tasks_in_queue: Dict[str, str] = {} # unique_id, task_name self._task_status: Dict[str, str] = {} # unique_id, status self._threads = [] @@ -480,13 +479,13 @@ class QueueManager: return self._work_queue.full() @property - def task_result(self) -> list: + def task_result(self) -> Tuple[str]: """Return the last task result. :return: Last task result - :rtype: list + :rtype: Tuple """ - return self._task_result.copy() + return self._task_result @property def task_ids_in_queue(self) -> list: @@ -541,8 +540,7 @@ class QueueManager: # If there is no queue, just execute the command and return if self._max_queue_size == 0: self.update_task_state_callback(unique_id, "IN_PROGRESS") - task_result = self.Worker.execute_task(task) - result = TaskResult(task_result[0], f"{task_result[1]}", unique_id) + result = self.Worker.execute_task(task, unique_id) self.result_callback(result) return unique_id @@ -615,12 +613,12 @@ class QueueManager: self.is_stopping.set() @property - def is_aborting(self): + def is_aborting(self) -> bool: """Return False if any of the threads are aborting.""" return all([worker.is_aborting.is_set() for worker in self._threads]) @classmethod - def get_unique_id(cls, task_name): + def get_unique_id(cls, task_name) -> str: """Generate a unique ID for the task. :param task_name: The name of the task @@ -638,8 +636,8 @@ class QueueManager: :return: State of the QueueTask :rtype: TaskState """ - if self.task_result: - _task_result = TaskResult.from_task_result(self.task_result) + if self._task_result: + _task_result = TaskResult.from_task_result(self._task_result) if unique_id == _task_result.unique_id: return TaskState.COMPLETED diff --git a/tests/test_task_queue_component_manager.py b/tests/test_task_queue_component_manager.py index 5f79cd7d82d67b687e777c6681027858c1311d3f..4b343afc565c0f46010469133f27e890afec597c 100644 --- a/tests/test_task_queue_component_manager.py +++ b/tests/test_task_queue_component_manager.py @@ -337,7 +337,7 @@ class TestQueueManagerTasks: assert unique_id_two in qm.task_progress progress_one_before = qm.task_progress[unique_id_one] progress_two_before = qm.task_progress[unique_id_two] - time.sleep(0.5) + time.sleep(1.0) progress_one_after = qm.task_progress[unique_id_one] progress_two_after = qm.task_progress[unique_id_two]