Skip to content
Snippets Groups Projects
Unverified Commit c055956e authored by SKAJohanVenter's avatar SKAJohanVenter
Browse files

SAR-275 Added more docs

parent 8696a1b7
Branches
No related tags found
No related merge requests found
"""
This module provides a QueueManager and TaskResult classes.
This module provides a QueueManager, TaskResult and QueueTask classes.
* **TaskResult**: is a convenience `dataclass` for parsing and formatting the
results of a task.
......@@ -36,9 +36,12 @@ be made available as a Tango device attribute named `command_result`. It will be
QueueTask
*********
This class should be subclasssed and the `do` method implemented with the required functionality.
This class should be subclassed and the `do` method implemented with the required functionality.
The `do` method will be executed by the background worker in a thread.
`get_task_name` can be overridden if you want to change the name of the task as it would appear in
the `tasks_in_queue` property.
Simple example:
.. code-block:: py
......@@ -93,6 +96,70 @@ Simple example:
time.sleep(0.5)
return ProgressTask()
************
QueueManager
************
The queue manager class manages the queue, workers and the update of properties.
The number of worker threads can be specified.
In the case of no worker threads, tasks that are enqueued will *not* be put on the queue,
but will simply be executed and thus block until done. No worker threads are started in this case.
As tasks are taken off the queue and completes, the properties below will be updated. An optional callback
`on_property_update_callback` can be specified that will be executed for every property change. This callback
will be called with the name of the property and its current value.
* **tasks_in_queue**: A list of names for the tasks in the queue.
Changes when a queue item is added or removed.
* **task_ids_in_queue**: A list of unique IDs for the tasks in the queue.
Changes when a queue item is added or removed.
* **task_result**: The result of the latest completed task. Changes when task completes.
* **task_status**: A list of unique IDs and the their status.
Currently indicates when a task is in progress and changes when a task is started or completed.
Other properties of note:
* **queue_full**: Indicates whether the queue is full or not.
If the queue is full, the result of any enqueued task will immediately be set to REJECTED.
* **task_progress**: When reading this property the progress of the tasks are fetched from the worker threads.
Aborting tasks
--------------
When `abort_tasks` is called on the queue manager the following will happen.
* Any tasks in progress will complete. Tasks that check `is_aborting_event` will know to complete otherwise
it will complete as per normal.
* Any tasks on the queue will be removed and their result set to ABORTED. They will not be executed.
* Any tasks enqueued while in aborted state will immediately be removed from the queue and marked as ABORTED.
* The thread stays alive.
When `resume_tasks` is then called, tasks are again put on the queue, retrieved and executed as per normal.
Stopping tasks
--------------
Once `stop_tasks` is called the worker threads completes as soon as possible.
* Any tasks in progress will complete. Tasks that check `is_stopping_event` will know to exit gracefully.
* The thread will cease.
Getting the state of a task
---------------------------
Calling `get_task_state` with the task ID will check the state of the task. A history of completed tasks
are not kept, so it may not be found.
"""
from __future__ import annotations
import enum
......@@ -361,7 +428,7 @@ class QueueManager:
):
"""Init QueryManager.
Creates the queue and starts the thread that will execute commands
Creates the queue and starts the thread that will execute tasks
from it.
:param logger: Python logger
......@@ -533,17 +600,17 @@ class QueueManager:
property_name, getattr(self, property_name)
)
def abort_commands(self):
"""Start aborting commands."""
def abort_tasks(self):
"""Start aborting tasks."""
for worker in self._threads:
worker.is_aborting.set()
def resume_commands(self):
"""Unsets aborting so commands can be picked up again."""
def resume_tasks(self):
"""Unsets aborting so tasks can be picked up again."""
for worker in self._threads:
worker.is_aborting.clear()
def stop_commands(self):
def stop_tasks(self):
"""Set is_stopping on each thread so it exists out. Killing the thread."""
self.is_stopping.set()
......@@ -595,7 +662,7 @@ class QueueManager:
if not self._threads:
return
self.abort_commands()
self.abort_tasks()
self._work_queue.join()
for worker in self._threads:
worker.is_stopping.set()
......
......@@ -398,7 +398,7 @@ class TestQueueManagerExit:
while not qm.task_status:
pass
# Start aborting
cm.message_queue.abort_commands()
cm.message_queue.abort_tasks()
# Wait for the exit
while not qm.task_result:
pass
......@@ -411,7 +411,7 @@ class TestQueueManagerExit:
break
# Resume the commands
qm.resume_commands()
qm.resume_tasks()
assert not qm.is_aborting
# Wait for my slow command to finish
......@@ -440,7 +440,7 @@ class TestQueueManagerExit:
while not qm.task_status:
pass
# Stop all threads
cm.message_queue.stop_commands()
cm.message_queue.stop_tasks()
# Wait for the exit
while not qm.task_result:
pass
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment