diff --git a/src/ska_tango_base/base/task_queue_component_manager.py b/src/ska_tango_base/base/task_queue_component_manager.py index 30669a7cdff449e34e199d96280e89208a908cc6..f7768d8343c9fea8ca1841b715f2555c1654ef50 100644 --- a/src/ska_tango_base/base/task_queue_component_manager.py +++ b/src/ska_tango_base/base/task_queue_component_manager.py @@ -1,5 +1,5 @@ """ -This module provides a QueueManager and TaskResult classes. +This module provides a QueueManager, TaskResult and QueueTask classes. * **TaskResult**: is a convenience `dataclass` for parsing and formatting the results of a task. @@ -36,9 +36,12 @@ be made available as a Tango device attribute named `command_result`. It will be QueueTask ********* -This class should be subclasssed and the `do` method implemented with the required functionality. +This class should be subclassed and the `do` method implemented with the required functionality. The `do` method will be executed by the background worker in a thread. +`get_task_name` can be overridden if you want to change the name of the task as it would appear in +the `tasks_in_queue` property. + Simple example: .. code-block:: py @@ -93,6 +96,70 @@ Simple example: time.sleep(0.5) return ProgressTask() + +************ +QueueManager +************ + +The queue manager class manages the queue, workers and the update of properties. +The number of worker threads can be specified. + +In the case of no worker threads, tasks that are enqueued will *not* be put on the queue, +but will simply be executed and thus block until done. No worker threads are started in this case. + +As tasks are taken off the queue and completes, the properties below will be updated. An optional callback +`on_property_update_callback` can be specified that will be executed for every property change. This callback +will be called with the name of the property and its current value. + +* **tasks_in_queue**: A list of names for the tasks in the queue. + Changes when a queue item is added or removed. + +* **task_ids_in_queue**: A list of unique IDs for the tasks in the queue. + Changes when a queue item is added or removed. + +* **task_result**: The result of the latest completed task. Changes when task completes. + +* **task_status**: A list of unique IDs and the their status. + Currently indicates when a task is in progress and changes when a task is started or completed. + +Other properties of note: + +* **queue_full**: Indicates whether the queue is full or not. + If the queue is full, the result of any enqueued task will immediately be set to REJECTED. + +* **task_progress**: When reading this property the progress of the tasks are fetched from the worker threads. + +Aborting tasks +-------------- + +When `abort_tasks` is called on the queue manager the following will happen. + +* Any tasks in progress will complete. Tasks that check `is_aborting_event` will know to complete otherwise + it will complete as per normal. + +* Any tasks on the queue will be removed and their result set to ABORTED. They will not be executed. + +* Any tasks enqueued while in aborted state will immediately be removed from the queue and marked as ABORTED. + +* The thread stays alive. + +When `resume_tasks` is then called, tasks are again put on the queue, retrieved and executed as per normal. + +Stopping tasks +-------------- + +Once `stop_tasks` is called the worker threads completes as soon as possible. + +* Any tasks in progress will complete. Tasks that check `is_stopping_event` will know to exit gracefully. + +* The thread will cease. + +Getting the state of a task +--------------------------- + +Calling `get_task_state` with the task ID will check the state of the task. A history of completed tasks +are not kept, so it may not be found. + """ from __future__ import annotations import enum @@ -361,7 +428,7 @@ class QueueManager: ): """Init QueryManager. - Creates the queue and starts the thread that will execute commands + Creates the queue and starts the thread that will execute tasks from it. :param logger: Python logger @@ -533,17 +600,17 @@ class QueueManager: property_name, getattr(self, property_name) ) - def abort_commands(self): - """Start aborting commands.""" + def abort_tasks(self): + """Start aborting tasks.""" for worker in self._threads: worker.is_aborting.set() - def resume_commands(self): - """Unsets aborting so commands can be picked up again.""" + def resume_tasks(self): + """Unsets aborting so tasks can be picked up again.""" for worker in self._threads: worker.is_aborting.clear() - def stop_commands(self): + def stop_tasks(self): """Set is_stopping on each thread so it exists out. Killing the thread.""" self.is_stopping.set() @@ -595,7 +662,7 @@ class QueueManager: if not self._threads: return - self.abort_commands() + self.abort_tasks() self._work_queue.join() for worker in self._threads: worker.is_stopping.set() diff --git a/tests/test_task_queue_component_manager.py b/tests/test_task_queue_component_manager.py index cffc15bbb3f502144e31b36db696c2af2f21ebf3..5f79cd7d82d67b687e777c6681027858c1311d3f 100644 --- a/tests/test_task_queue_component_manager.py +++ b/tests/test_task_queue_component_manager.py @@ -398,7 +398,7 @@ class TestQueueManagerExit: while not qm.task_status: pass # Start aborting - cm.message_queue.abort_commands() + cm.message_queue.abort_tasks() # Wait for the exit while not qm.task_result: pass @@ -411,7 +411,7 @@ class TestQueueManagerExit: break # Resume the commands - qm.resume_commands() + qm.resume_tasks() assert not qm.is_aborting # Wait for my slow command to finish @@ -440,7 +440,7 @@ class TestQueueManagerExit: while not qm.task_status: pass # Stop all threads - cm.message_queue.stop_commands() + cm.message_queue.stop_tasks() # Wait for the exit while not qm.task_result: pass