Skip to content
Snippets Groups Projects
Unverified Commit 2378de03 authored by SKAJohanVenter's avatar SKAJohanVenter
Browse files

SAR-275 Added docs to TaskResult and QueueTask

parent 66c4420c
No related branches found
No related tags found
No related merge requests found
......@@ -32,3 +32,4 @@ API
Faults<faults>
Release<release>
Utils<utils>
QueueManager<queue_manager>
=============
Queue Manager
=============
.. automodule:: ska_tango_base.base.task_queue_component_manager
:members:
"""This module implements a component manager that can queue tasks for execution by background threads."""
"""
This module provides a QueueManager and TaskResult classes.
* **TaskResult**: is a convenience `dataclass` for parsing and formatting the
results of a task.
* **QueueTask**: is a class that instances of which can be added to the queue for execution
by background threads.
* **QueueManager**: that implements the queue and thread worker functionality.
**********
TaskResult
**********
This is a simple convenience class to parse or format the task result. The result of a task will
be made available as a Tango device attribute named `command_result`. It will be a list of 3 strings.
1. The unique ID
Every command/task put on the queue for execution by background threads will have a unique ID.
2. The Result Code
This is the result of the task executed by a worker from the queue.
3. The task result
The string representation of the returned result for the task on the queue.
.. code-block:: py
from ska_tango_base.base.task_queue_component_manager import TaskResult
tr = TaskResult.from_task_result(["UniqueID", "0", "The task result"])
tr
TaskResult(result_code=<ResultCode.OK: 0>, task_result='The task result', unique_id='UniqueID')
tr.to_task_result()
['UniqueID', '0', 'The task result']
*********
QueueTask
*********
This class should be subclasssed and the `do` method implemented with the required functionality.
The `do` method will be executed by the background worker in a thread.
Simple example:
.. code-block:: py
class SimpleTask(QueueTask):
def do(self):
num_one = self.args[0]
num_two = self.kwargs.get("num_two")
return num_one + num_two
return SimpleTask(2, num_two=3)
3 items are added dynamically by the worker thread and is available for use in the class instance.
* **is_aborting_event**: can be check periodically to determine whether
the queue tasks have been aborted to gracefully complete the task in progress.
The thread will stay active and `once is_aborting_event` has been unset,
new tasks will be fetched from the queue for execution.
.. code-block:: py
class AbortTask(QueueTask):
def do(self):
sleep_time = self.args[0]
while not self.is_aborting_event.is_set():
time.sleep(sleep_time)
return AbortTask(0.2)
* **is_stopping_event**: can be check periodically to determine whether
the queue tasks have been stopped. In this case the thread will complete.
.. code-block:: py
class StopTask(QueueTask):
def do(self):
assert not self.is_stopping_event.is_set()
while not self.is_stopping_event.is_set():
pass
return StopTask()
* **update_progress**: a callback that can be called wth the current progress
of the task in progress
.. code-block:: py
class ProgressTask(QueueTask):
def do(self):
for i in range(100):
self.update_progress(str(i))
time.sleep(0.5)
return ProgressTask()
"""
from __future__ import annotations
import logging
import threading
......@@ -59,6 +154,28 @@ class QueueTask:
self.args = args
self.kwargs = kwargs
@property
def is_aborting_event(self) -> threading.Event:
"""Worker adds is_aborting_event threading event.
Indicates whether task execution have been aborted.
:return: The is_aborted event.
:rtype: threading.Event
"""
return self.kwargs.get("is_aborting_event")
@property
def is_stopping_event(self) -> threading.Event:
"""Worker adds is_stopping_event threading event.
Indicates whether task execution have been stopped.
:return: The is_stopping event.
:rtype: threading.Event
"""
return self.kwargs.get("is_stopping_event")
def update_progress(self, progress: str):
"""Private method to call the callback to update the progress.
......
......@@ -95,8 +95,7 @@ def abort_task():
class AbortTask(QueueTask):
def do(self):
sleep_time = self.args[0]
is_aborting_event = self.kwargs.get("is_aborting_event")
while not is_aborting_event.is_set():
while not self.is_aborting_event.is_set():
time.sleep(sleep_time)
return AbortTask(0.2)
......@@ -111,9 +110,8 @@ def stop_task():
def get_task():
class StopTask(QueueTask):
def do(self):
is_stopping_event = self.kwargs.get("is_stopping_event")
assert not is_stopping_event.is_set()
while not is_stopping_event.is_set():
assert not self.is_stopping_event.is_set()
while not self.is_stopping_event.is_set():
pass
return StopTask()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment