Skip to content
Snippets Groups Projects
Commit be5f73c9 authored by Johan Venter's avatar Johan Venter
Browse files

Merge branch 'sar-275-add-LRC' into 'main'

SAR-275 Add Task Queue Component Manager

See merge request ska-telescope/ska-tango-base!62
parents f1b24b7a 4286656b
No related branches found
No related tags found
No related merge requests found
...@@ -32,3 +32,4 @@ API ...@@ -32,3 +32,4 @@ API
Faults<faults> Faults<faults>
Release<release> Release<release>
Utils<utils> Utils<utils>
QueueManager<queue_manager>
=============
Queue Manager
=============
.. automodule:: ska_tango_base.base.task_queue_component_manager
:members:
...@@ -42,7 +42,7 @@ setuptools.setup( ...@@ -42,7 +42,7 @@ setuptools.setup(
"ska_ser_logging", "ska_ser_logging",
"transitions", "transitions",
], ],
tests_require=["pytest", "coverage", "pytest-json-report", "pytest-forked"], tests_require=["pytest", "coverage", "pytest-json-report", "pytest-forked", "pytest-timeout"],
entry_points={ entry_points={
"console_scripts": [ "console_scripts": [
"SKAAlarmHandler=ska_tango_base.alarm_handler_device:main", "SKAAlarmHandler=ska_tango_base.alarm_handler_device:main",
......
"""
This module provides a QueueManager, TaskResult and QueueTask classes.
* **TaskResult**: is a convenience `dataclass` for parsing and formatting the
results of a task.
* **QueueTask**: is a class that instances of which can be added to the queue for execution
by background threads.
* **QueueManager**: that implements the queue and thread worker functionality.
**********
TaskResult
**********
This is a simple convenience class to parse or format the task result. The result of a task will
be made available as a Tango device attribute named `command_result`. It will be a tuple of 3 strings.
1. The unique ID
Every command/task put on the queue for execution by background threads will have a unique ID.
2. The Result Code
This is the result of the task executed by a worker from the queue.
3. The task result
The string representation of the returned result for the task on the queue.
.. code-block:: py
from ska_tango_base.base.task_queue_component_manager import TaskResult
tr = TaskResult.from_task_result(["UniqueID", "0", "The task result"])
tr
TaskResult(result_code=<ResultCode.OK: 0>, task_result='The task result', unique_id='UniqueID')
tr.to_task_result()
('UniqueID', '0', 'The task result')
*********
QueueTask
*********
This class should be subclassed and the `do` method implemented with the required functionality.
The `do` method will be executed by the background worker in a thread.
`get_task_name` can be overridden if you want to change the name of the task as it would appear in
the `tasks_in_queue` property.
Simple example:
.. code-block:: py
class SimpleTask(QueueTask):
def do(self):
num_one = self.args[0]
num_two = self.kwargs.get("num_two")
return num_one + num_two
return SimpleTask(2, num_two=3)
3 items are added dynamically by the worker thread and is available for use in the class instance.
* **is_aborting_event**: can be check periodically to determine whether
the queue tasks have been aborted to gracefully complete the task in progress.
The thread will stay active and once `is_aborting_event` has been unset,
new tasks will be fetched from the queue for execution.
.. code-block:: py
class AbortTask(QueueTask):
def do(self):
sleep_time = self.args[0]
while not self.is_aborting_event.is_set():
time.sleep(sleep_time)
return AbortTask(0.2)
* **is_stopping_event**: can be check periodically to determine whether
the queue tasks have been stopped. In this case the thread will complete.
.. code-block:: py
class StopTask(QueueTask):
def do(self):
assert not self.is_stopping_event.is_set()
while not self.is_stopping_event.is_set():
pass
return StopTask()
* **update_progress**: a callback that can be called wth the current progress
of the task in progress
.. code-block:: py
class ProgressTask(QueueTask):
def do(self):
for i in range(100):
self.update_progress(str(i))
time.sleep(0.5)
return ProgressTask()
************
QueueManager
************
The queue manager class manages the queue, workers and the update of properties.
The number of worker threads can be specified.
When `num_workers` is 0, tasks that are enqueued will *not* be put on the queue,
but will simply be executed and thus block until done. No worker threads are started in this case.
As tasks are taken off the queue and completes, the properties below will be updated. An optional callback
`on_property_update_callback` can be specified that will be executed for every property change. This callback
will be called with the name of the property and its current value.
* **tasks_in_queue**: A list of names for the tasks in the queue.
Changes when a queue item is added or removed.
* **task_ids_in_queue**: A list of unique IDs for the tasks in the queue.
Changes when a queue item is added or removed.
* **task_result**: The result of the latest completed task. Changes when task completes.
* **task_status**: A list of unique IDs and the their status.
Currently indicates when a task is in progress and changes when a task is started or completed.
Other properties of note:
* **queue_full**: Indicates whether the queue is full or not.
If the queue is full, the result of any enqueued task will immediately be set to REJECTED.
* **task_progress**: When reading this property the progress of the tasks are fetched from the worker threads.
Aborting tasks
--------------
When `abort_tasks` is called on the queue manager the following will happen.
* Any tasks in progress will complete. Tasks that check `is_aborting_event` will know to complete otherwise
it will complete as per normal.
* Any tasks on the queue will be removed and their result set to ABORTED. They will not be executed.
* Any tasks enqueued while in aborted state will immediately be removed from the queue and marked as ABORTED.
* The thread stays alive.
When `resume_tasks` is then called, tasks are again put on the queue, retrieved and executed as per normal.
Stopping tasks
--------------
Once `stop_tasks` is called the worker threads completes as soon as possible.
* Any tasks in progress will complete. Tasks that check `is_stopping_event` will know to exit gracefully.
* The thread will cease.
Getting the state of a task
---------------------------
Calling `get_task_state` with the task ID will check the state of the task. A history of completed tasks
are not kept, so it may not be found.
"""
from __future__ import annotations
import enum
import logging
import threading
import time
import traceback
from queue import Empty, Queue
from threading import Event
from typing import Any, Callable, Dict, Optional, Tuple
from attr import dataclass
import tango
from ska_tango_base.base.component_manager import BaseComponentManager
from ska_tango_base.commands import ResultCode
class TaskState(enum.IntEnum):
"""The state of the QueueTask in the QueueManager."""
QUEUED = 0
"""
The task has been accepted and will be executed at a future time
"""
IN_PROGRESS = 1
"""
The task in progress
"""
ABORTED = 2
"""
The task in progress has been aborted
"""
NOT_FOUND = 3
"""
The task is not found
"""
COMPLETED = 4
"""
The task was completed.
"""
NOT_ALLOWED = 5
"""
The task is not allowed to be executed
"""
@dataclass
class TaskResult:
"""Convenience class for results."""
result_code: ResultCode
task_result: str
unique_id: str
def to_task_result(self) -> Tuple[str]:
"""Convert TaskResult to task_result.
:return: The task result
:rtype: list[str]
"""
return (f"{self.unique_id}", f"{int(self.result_code)}", f"{self.task_result}")
@classmethod
def from_task_result(cls, task_result: list) -> TaskResult:
"""Convert task_result list to TaskResult.
:param task_result: The task_result [unique_id, result_code, task_result]
:type task_result: list
:return: The task result
:rtype: TaskResult
:raises: ValueError
"""
if len(task_result) != 3:
raise ValueError(f"Cannot parse task_result {task_result}")
return TaskResult(
result_code=ResultCode(int(task_result[1])),
task_result=task_result[2],
unique_id=task_result[0],
)
class QueueTask:
"""A task that can be put on the queue."""
def __init__(self: QueueTask, *args, **kwargs) -> None:
"""Create the task. args and kwargs are stored and should be referenced in the `do` method."""
self.args = args
self.kwargs = kwargs
self._update_progress_callback = None
@property
def is_aborting_event(self) -> threading.Event:
"""Worker adds is_aborting_event threading event.
Indicates whether task execution have been aborted.
:return: The is_aborted event.
:rtype: threading.Event
"""
return self.kwargs.get("is_aborting_event")
@property
def is_stopping_event(self) -> threading.Event:
"""Worker adds is_stopping_event threading event.
Indicates whether task execution have been stopped.
:return: The is_stopping event.
:rtype: threading.Event
"""
return self.kwargs.get("is_stopping_event")
def update_progress(self, progress: str):
"""Call the callback to update the progress.
:param progress: String that to indicate progress of task
:type progress: str
"""
self._update_progress_callback = self.kwargs.get("update_progress_callback")
if self._update_progress_callback:
self._update_progress_callback(progress)
def get_task_name(self) -> str:
"""Return a custom task name.
:return: The name of the task
:rtype: str
"""
return self.__class__.__name__
def do(self: QueueTask) -> Any:
"""Implement this method with your functionality."""
raise NotImplementedError
class QueueManager:
"""Manages the worker threads. Updates the properties as the tasks are completed."""
class Worker(threading.Thread):
"""A worker thread that takes tasks from the queue and performs them."""
def __init__(
self: QueueManager.Worker,
queue: Queue,
logger: logging.Logger,
stopping_event: Event,
result_callback: Callable,
update_command_state_callback: Callable,
queue_fetch_timeout: int = 0.1,
) -> None:
"""Initiate a worker.
:param self: Worker class
:type self: QueueManager.Worker
:param queue: The queue from which tasks are pulled
:type queue: Queue
:param logger: Logger to log to
:type logger: logging.Logger
:param stopping_event: Indicates whether to get more tasks off the queue
:type stopping_event: Event
:param update_command_state_callback: Callback to update command state
:type update_command_state_callback: Callable
"""
super().__init__()
self._work_queue = queue
self._logger = logger
self.is_stopping = stopping_event
self.is_aborting = threading.Event()
self._result_callback = result_callback
self._update_command_state_callback = update_command_state_callback
self._queue_fetch_timeout = queue_fetch_timeout
self.current_task_progress: Optional[str] = None
self.current_task_id: Optional[str] = None
self.setDaemon(True)
def run(self) -> None:
"""Run in the thread.
Tasks are fetched off the queue and executed.
if _is_stopping is set the thread wil exit.
If _is_aborting is set the queue will be emptied. All new commands will be aborted until
is_aborting cleared.
"""
with tango.EnsureOmniThread():
while not self.is_stopping.is_set():
self.current_task_id = None
self.current_task_progress = ""
if self.is_aborting.is_set():
# Drain the Queue since self.is_aborting is set
while not self._work_queue.empty():
unique_id, _ = self._work_queue.get()
self.current_task_id = unique_id
self._logger.warning("Aborting task ID [%s]", unique_id)
result = TaskResult(
ResultCode.ABORTED, f"{unique_id} Aborted", unique_id
)
self._result_callback(result)
self._work_queue.task_done()
time.sleep(self._queue_fetch_timeout)
continue # Don't try and get work off the queue below, continue next loop
try:
(unique_id, task) = self._work_queue.get(
block=True, timeout=self._queue_fetch_timeout
)
self._update_command_state_callback(unique_id, "IN_PROGRESS")
self.current_task_id = unique_id
# Inject is_aborting, is_stopping, progress_update into task
task.kwargs["is_aborting_event"] = self.is_aborting
task.kwargs["is_stopping_event"] = self.is_stopping
task.kwargs[
"update_progress_callback"
] = self._update_progress_callback
result = self.execute_task(task, unique_id)
self._result_callback(result)
self._work_queue.task_done()
except Empty:
continue
return
def _update_progress_callback(self, progress: str) -> None:
"""Update the current task progress.
:param progress: An indication of progress
:type progress: str
"""
self.current_task_progress = progress
@classmethod
def execute_task(cls, task: QueueTask, unique_id: str) -> TaskResult:
"""Execute a task, return results in a standardised format.
:param task: Task to execute
:type task: QueueTask
:param unique_id: The task unique ID
:type unique_id: str
:return: The result of the task
:rtype: TaskResult
"""
try:
result = TaskResult(ResultCode.OK, f"{task.do()}", unique_id)
except Exception as err:
result = TaskResult(
ResultCode.FAILED,
f"Error: {err} {traceback.format_exc()}",
unique_id,
)
return result
def __init__(
self: QueueManager,
logger: logging.Logger,
max_queue_size: int = 0,
queue_fetch_timeout: float = 0.1,
num_workers: int = 0,
on_property_update_callback: Optional[Callable] = None,
):
"""Init QueryManager.
Creates the queue and starts the thread that will execute tasks
from it.
:param logger: Python logger
:type logger: logging.Logger
:param max_queue_size: The maximum size of the queue
:type max_queue_size: int
:param max_queue_size: The time to wait for items in the queue
:type max_queue_size: float
:param num_workers: The number of worker threads to start
:type num_workers: float
"""
self._logger = logger
self._max_queue_size = max_queue_size
self._work_queue = Queue(self._max_queue_size)
self._queue_fetch_timeout = queue_fetch_timeout
self._on_property_update_callback = on_property_update_callback
self.is_stopping = threading.Event()
self._property_update_lock = threading.Lock()
self._task_result = ()
self._tasks_in_queue: Dict[str, str] = {} # unique_id, task_name
self._task_status: Dict[str, str] = {} # unique_id, status
self._threads = []
# If there's no queue, don't start threads
if not self._max_queue_size:
return
self._threads = [
self.Worker(
self._work_queue,
self._logger,
self.is_stopping,
self.result_callback,
self.update_task_state_callback,
)
for _ in range(num_workers)
]
for thread in self._threads:
thread.start()
@property
def queue_full(self) -> bool:
"""Check if the queue is full.
:return: Whether or not the queue is full.
:rtype: bool
"""
return self._work_queue.full()
@property
def task_result(self) -> Tuple[str]:
"""Return the last task result.
:return: Last task result
:rtype: Tuple
"""
return self._task_result
@property
def task_ids_in_queue(self) -> list:
"""Task IDs in the queue.
:return: The task IDs in the queue
:rtype: list
"""
return list(self._tasks_in_queue.keys())
@property
def tasks_in_queue(self) -> list:
"""Task names in the queue.
:return: The list of task names in the queue
:rtype: list
"""
return list(self._tasks_in_queue.values())
@property
def task_status(self) -> Dict[str, str]:
"""Return task status.
:return: The task status
:rtype: Dict[str, str]
"""
return self._task_status.copy()
@property
def task_progress(self) -> Dict[str, str]:
"""Return the task progress.
:return: The task progress
:rtype: Dict[str, str]
"""
progress = {}
for worker in self._threads:
if worker.current_task_id:
progress[worker.current_task_id] = worker.current_task_progress
return progress
def enqueue_task(self, task: QueueTask) -> str:
"""Add the task to be done onto the queue.
:param task: The task to execute in a thread
:type task: QueueTask
:return: The unique ID of the command
:rtype: string
"""
unique_id = self.get_unique_id(task.get_task_name())
# If there is no queue, just execute the command and return
if self._max_queue_size == 0:
self.update_task_state_callback(unique_id, "IN_PROGRESS")
result = self.Worker.execute_task(task, unique_id)
self.result_callback(result)
return unique_id
if self.queue_full:
self.result_callback(
TaskResult(ResultCode.REJECTED, "Queue is full", unique_id)
)
return unique_id
self._work_queue.put([unique_id, task])
with self._property_update_lock:
self._tasks_in_queue[unique_id] = task.get_task_name()
self._on_property_change("tasks_in_queue")
self._on_property_change("task_ids_in_queue")
return unique_id
def result_callback(self, task_result: TaskResult):
"""Run when the task, taken from the queue, have completed to update the appropriate attributes.
:param task_result: The result of the task
:type task_result: TaskResult
"""
with self._property_update_lock:
if task_result.unique_id in self._task_status:
del self._task_status[task_result.unique_id]
self._task_result = task_result.to_task_result()
self._on_property_change("task_result")
def update_task_state_callback(self, unique_id: str, status: str):
"""Update the executing task state.
:param unique_id: The task unique ID
:type unique_id: str
:param status: The state of the task
:type status: str
"""
if unique_id in self._tasks_in_queue:
with self._property_update_lock:
del self._tasks_in_queue[unique_id]
self._on_property_change("task_ids_in_queue")
self._on_property_change("tasks_in_queue")
with self._property_update_lock:
self._task_status[unique_id] = status
self._on_property_change("task_status")
def _on_property_change(self, property_name: str):
"""Trigger when a property changes value.
:param property_name: The property name
:type property_name: str
"""
if self._on_property_update_callback:
self._on_property_update_callback(
property_name, getattr(self, property_name)
)
def abort_tasks(self):
"""Start aborting tasks."""
for worker in self._threads:
worker.is_aborting.set()
def resume_tasks(self):
"""Unsets aborting so tasks can be picked up again."""
for worker in self._threads:
worker.is_aborting.clear()
def stop_tasks(self):
"""Set is_stopping on each thread so it exists out. Killing the thread."""
self.is_stopping.set()
@property
def is_aborting(self) -> bool:
"""Return False if any of the threads are aborting."""
return all([worker.is_aborting.is_set() for worker in self._threads])
@classmethod
def get_unique_id(cls, task_name) -> str:
"""Generate a unique ID for the task.
:param task_name: The name of the task
:type task_name: string
:return: The unique ID of the task
:rtype: string
"""
return f"{time.time()}_{task_name}"
def get_task_state(self, unique_id: str) -> TaskState:
"""Attempt to get state of QueueTask.
:param unique_id: Unique ID of the QueueTask
:type unique_id: str
:return: State of the QueueTask
:rtype: TaskState
"""
if self._task_result:
_task_result = TaskResult.from_task_result(self._task_result)
if unique_id == _task_result.unique_id:
return TaskState.COMPLETED
if unique_id in self.task_ids_in_queue:
return TaskState.QUEUED
if unique_id in self.task_status.keys():
return TaskState.IN_PROGRESS
return TaskState.NOT_FOUND
def __del__(self) -> None:
"""Release resources prior to instance deletion.
- Set the workers to aborting, this will empty out the queue and set the result code
for each task to `Aborted`.
- Wait for the queues to empty out.
- Set the workers to stopping, this will exit out the running thread.
"""
if not self._threads:
return
self.abort_tasks()
self._work_queue.join()
for worker in self._threads:
worker.is_stopping.set()
def __len__(self) -> int:
"""Approximate length of the queue.
:return: The approximate length of the queue
:rtype: int
"""
return self._work_queue.qsize()
class TaskQueueComponentManager(BaseComponentManager):
"""A component manager that provides message queue functionality."""
def __init__(
self: TaskQueueComponentManager,
message_queue: QueueManager,
op_state_model: Any,
*args: Any,
**kwargs: Any,
) -> None:
"""Create a new component manager that puts tasks on the queue.
:param message_queue: The queue manager instance
:type message_queue: QueueManager
:param op_state_model: The ops state model
:type op_state_model: Any
"""
self.message_queue = message_queue
super().__init__(op_state_model, *args, **kwargs)
def enqueue(
self,
task: QueueTask,
) -> str:
"""Put `task` on the queue. The unique ID for it is returned.
:param task: The task to execute in the thread
:type task: QueueTask
:return: The unique ID of the queued command
:rtype: str
"""
return self.message_queue.enqueue_task(task)
...@@ -96,6 +96,21 @@ class ResultCode(enum.IntEnum): ...@@ -96,6 +96,21 @@ class ResultCode(enum.IntEnum):
The status of the command is not known. The status of the command is not known.
""" """
REJECTED = 5
"""
The command execution has been rejected.
"""
NOT_ALLOWED = 6
"""
The command is not allowed to be executed
"""
ABORTED = 7
"""
The command in progress has been aborted
"""
class BaseCommand: class BaseCommand:
""" """
......
"""Tests for QueueManager and its component manager."""
import logging
import time
import pytest
from unittest.mock import MagicMock, patch
from ska_tango_base.commands import ResultCode
from ska_tango_base.base.task_queue_component_manager import (
QueueManager,
TaskResult,
TaskQueueComponentManager,
QueueTask,
TaskState,
)
logger = logging.getLogger(__name__)
def check_matching_pattern(list_to_check=()):
"""Check that lengths go 1,2,3,2,1 for example."""
list_to_check = list(list_to_check)
if not list_to_check[-1]:
list_to_check.pop()
assert len(list_to_check) > 2
while len(list_to_check) > 2:
last_e = list_to_check.pop()
first_e = list_to_check.pop(0)
assert len(last_e) == len(first_e)
@pytest.fixture
def progress_task():
"""Fixture for a test that throws an exception."""
def get_task():
class ProgressTask(QueueTask):
def do(self):
for i in range(100):
self.update_progress(str(i))
time.sleep(0.5)
return ProgressTask()
return get_task
@pytest.fixture
def exc_task():
"""Fixture for a test that throws an exception."""
def get_task():
class ExcTask(QueueTask):
def do(self):
raise Exception("An error occurred")
return ExcTask()
return get_task
@pytest.fixture
def slow_task():
"""Fixture for a test that takes long."""
def get_task():
class SlowTask(QueueTask):
def do(self):
time.sleep(2)
return SlowTask()
return get_task
@pytest.fixture
def simple_task():
"""Fixture for a very simple task."""
def get_task():
class SimpleTask(QueueTask):
def do(self):
num_one = self.args[0]
num_two = self.kwargs.get("num_two")
return num_one + num_two
return SimpleTask(2, num_two=3)
return get_task
@pytest.fixture
def abort_task():
"""Fixture for a task that aborts."""
def get_task():
class AbortTask(QueueTask):
def do(self):
sleep_time = self.args[0]
while not self.is_aborting_event.is_set():
time.sleep(sleep_time)
return AbortTask(0.2)
return get_task
@pytest.fixture
def stop_task():
"""Fixture for a task that stops."""
def get_task():
class StopTask(QueueTask):
def do(self):
assert not self.is_stopping_event.is_set()
while not self.is_stopping_event.is_set():
pass
return StopTask()
return get_task
class TestQueueTask:
"""Test QueueTask."""
def test_simple(self, simple_task):
"""Test simple task."""
assert simple_task().do() == 5
def test_exception(self, exc_task):
"""Test that exception is thrown."""
with pytest.raises(Exception):
exc_task().do()
class TestQueueManager:
"""General QueueManager checks."""
def test_threads_start(self):
"""Test that threads start up. Set stop and exit."""
qm = QueueManager(logger, max_queue_size=2, num_workers=2)
assert len(qm._threads) == 2
for worker in qm._threads:
assert worker.is_alive()
assert worker.daemon
for worker in qm._threads:
worker.is_stopping.set()
class TestQueueManagerTasks:
"""QueueManager checks for tasks executed."""
@pytest.mark.timeout(5)
def test_task_ids(self, simple_task):
"""Check ids."""
qm = QueueManager(logger, max_queue_size=5, num_workers=2)
unique_id_one = qm.enqueue_task(simple_task())
unique_id_two = qm.enqueue_task(simple_task())
assert unique_id_one.endswith("SimpleTask")
assert unique_id_one != unique_id_two
@pytest.mark.timeout(5)
def test_task_is_executed(self, simple_task):
"""Check that tasks are executed."""
with patch.object(QueueManager, "result_callback") as my_cb:
qm = QueueManager(logger, max_queue_size=5, num_workers=2)
unique_id_one = qm.enqueue_task(simple_task())
unique_id_two = qm.enqueue_task(simple_task())
while my_cb.call_count != 2:
time.sleep(0.5)
result_one = my_cb.call_args_list[0][0][0]
result_two = my_cb.call_args_list[1][0][0]
assert result_one.unique_id.endswith("SimpleTask")
assert result_one.unique_id == unique_id_one
assert result_two.unique_id.endswith("SimpleTask")
assert result_two.unique_id == unique_id_two
assert result_one.result_code == ResultCode.OK
assert result_two.result_code == ResultCode.OK
assert result_one.task_result == "5"
assert result_two.task_result == "5"
@pytest.mark.timeout(5)
def test_task_result(self, simple_task, exc_task):
"""Check task results are what's expected."""
qm = QueueManager(logger, max_queue_size=5, num_workers=2)
add_task_one = simple_task()
exc_task = exc_task()
qm.enqueue_task(add_task_one)
while not qm.task_result:
time.sleep(0.5)
task_result = TaskResult.from_task_result(qm.task_result)
assert task_result.unique_id.endswith("SimpleTask")
assert task_result.result_code == ResultCode.OK
assert task_result.task_result == "5"
qm.enqueue_task(exc_task)
while qm.task_result[0].endswith("SimpleTask"):
time.sleep(0.5)
task_result = TaskResult.from_task_result(qm.task_result)
assert task_result.unique_id.endswith("ExcTask")
assert task_result.result_code == ResultCode.FAILED
assert task_result.task_result.startswith(
"Error: An error occurred Traceback ("
)
@pytest.mark.timeout(10)
def test_full_queue(self, slow_task):
"""Check full queues rejects new commands."""
with patch.object(QueueManager, "result_callback") as my_cb:
qm = QueueManager(logger, max_queue_size=1, num_workers=1)
for i in range(10):
qm.enqueue_task(slow_task())
while len(my_cb.call_args_list) != 10:
time.sleep(0.5)
results = [i[0][0].result_code for i in my_cb.call_args_list]
# 9/10 should be rejected since the first is busy and the queue length is 1
assert results[-1] == ResultCode.OK
for res in results[:-1]:
assert res == ResultCode.REJECTED
with patch.object(QueueManager, "result_callback") as my_cb:
qm = QueueManager(logger, max_queue_size=2, num_workers=2)
for i in range(10):
qm.enqueue_task(slow_task())
while len(my_cb.call_args_list) != 10:
time.sleep(0.5)
results = [i[0][0].result_code for i in my_cb.call_args_list]
# 8/10 should be rejected since two are taken to be processed.
assert results[-1] == ResultCode.OK
assert results[-2] == ResultCode.OK
for res in results[:-2]:
assert res == ResultCode.REJECTED
@pytest.mark.timeout(5)
def test_zero_queue(self, simple_task):
"""Check task_result is the same between queue and non queue."""
expected_name = "SimpleTask"
expected_result_code = ResultCode.OK
expected_result = "5"
# No Queue
qm = QueueManager(logger, max_queue_size=0, num_workers=1)
assert len(qm._threads) == 0
res = qm.enqueue_task(simple_task())
assert res.endswith(expected_name)
assert qm.task_result[0].endswith(expected_name)
assert int(qm.task_result[1]) == expected_result_code
assert qm.task_result[2] == expected_result
# Queue
qm = QueueManager(logger, max_queue_size=2, num_workers=1)
res = qm.enqueue_task(simple_task())
assert res.endswith(expected_name)
# Wait for the task to be picked up
while not qm.task_result:
time.sleep(0.5)
assert qm.task_result[0].endswith(expected_name)
assert int(qm.task_result[1]) == expected_result_code
assert qm.task_result[2] == expected_result
@pytest.mark.timeout(5)
def test_multi_jobs(self, slow_task):
"""Test that multiple threads are working. Test that attribute updates fires."""
num_of_workers = 3
call_back_func = MagicMock()
qm = QueueManager(
logger,
max_queue_size=5,
num_workers=num_of_workers,
on_property_update_callback=call_back_func,
)
unique_ids = []
for _ in range(4):
unique_id = qm.enqueue_task(slow_task())
unique_ids.append(unique_id)
# Wait for a item on the queue
while not qm.task_ids_in_queue:
pass
while not qm.task_result:
pass
# Wait for last task to finish
while unique_ids[-1] != TaskResult.from_task_result(qm.task_result).unique_id:
pass
all_passed_params = [a_call[0] for a_call in call_back_func.call_args_list]
tasks_in_queue = [
a_call[1] for a_call in all_passed_params if a_call[0] == "tasks_in_queue"
]
task_ids_in_queue = [
a_call[1]
for a_call in all_passed_params
if a_call[0] == "task_ids_in_queue"
]
task_status = [
a_call[1] for a_call in all_passed_params if a_call[0] == "task_status"
]
task_result = [
a_call[1] for a_call in all_passed_params if a_call[0] == "task_result"
]
task_result_ids = [res[0] for res in task_result]
check_matching_pattern(tuple(tasks_in_queue))
check_matching_pattern(tuple(task_ids_in_queue))
# Since there's 3 workers there should at least once be 3 in progress
for status in task_status:
if len(status) == num_of_workers:
break
else:
assert 0, f"Length of {num_of_workers} in task_status not found"
assert len(task_result) == 4
for unique_id in unique_ids:
assert unique_id in task_result_ids
def test_task_progress(self, progress_task):
"""Test the progress updates."""
qm = QueueManager(logger, max_queue_size=8, num_workers=2)
unique_id_one = qm.enqueue_task(progress_task())
unique_id_two = qm.enqueue_task(progress_task())
time.sleep(0.5)
assert unique_id_one in qm.task_progress
assert unique_id_two in qm.task_progress
progress_one_before = qm.task_progress[unique_id_one]
progress_two_before = qm.task_progress[unique_id_two]
time.sleep(1.0)
progress_one_after = qm.task_progress[unique_id_one]
progress_two_after = qm.task_progress[unique_id_two]
assert int(progress_one_after) > int(progress_one_before)
assert int(progress_two_after) > int(progress_two_before)
def test_task_get_state_completed(self, simple_task):
"""Test the QueueTask get state is completed."""
qm = QueueManager(logger, max_queue_size=8, num_workers=2)
unique_id_one = qm.enqueue_task(simple_task())
while not qm.task_result:
pass
assert qm.get_task_state(unique_id=unique_id_one) == TaskState.COMPLETED
def test_task_get_state_in_queued(self, slow_task):
"""Test the QueueTask get state is queued."""
qm = QueueManager(logger, max_queue_size=8, num_workers=1)
qm.enqueue_task(slow_task())
qm.enqueue_task(slow_task())
unique_id_last = qm.enqueue_task(slow_task())
assert qm.get_task_state(unique_id=unique_id_last) == TaskState.QUEUED
def test_task_get_state_in_progress(self, progress_task):
"""Test the QueueTask get state is in progress."""
qm = QueueManager(logger, max_queue_size=8, num_workers=2)
unique_id_one = qm.enqueue_task(progress_task())
while not qm.task_progress:
pass
assert qm.get_task_state(unique_id=unique_id_one) == TaskState.IN_PROGRESS
def test_task_get_state_in_not_found(self):
"""Test the QueueTask get state not found."""
qm = QueueManager(logger, max_queue_size=8, num_workers=2)
assert qm.get_task_state(unique_id="non_existing_id") == TaskState.NOT_FOUND
class TestQueueManagerExit:
"""Test the stopping and aborting."""
@pytest.mark.timeout(5)
def test_exit_abort(self, abort_task, slow_task):
"""Test aborting exit."""
call_back_func = MagicMock()
qm = QueueManager(
logger,
max_queue_size=5,
num_workers=2,
on_property_update_callback=call_back_func,
)
cm = TaskQueueComponentManager(
message_queue=qm, op_state_model=None, logger=None
)
cm.enqueue(abort_task())
# Wait for the command to start
while not qm.task_status:
pass
# Start aborting
cm.message_queue.abort_tasks()
# Wait for the exit
while not qm.task_result:
pass
assert qm.is_aborting
# When aborting this should be rejected
unique_id = cm.enqueue(slow_task())
while True:
tr = TaskResult.from_task_result(qm.task_result)
if tr.unique_id == unique_id and tr.result_code == ResultCode.ABORTED:
break
# Resume the commands
qm.resume_tasks()
assert not qm.is_aborting
# Wait for my slow command to finish
unique_id = cm.enqueue(slow_task())
while True:
tr = TaskResult.from_task_result(qm.task_result)
if tr.unique_id == unique_id:
break
@pytest.mark.timeout(10)
def test_exit_stop(self, stop_task):
"""Test stopping exit."""
call_back_func = MagicMock()
qm = QueueManager(
logger,
max_queue_size=5,
num_workers=2,
on_property_update_callback=call_back_func,
)
cm = TaskQueueComponentManager(
message_queue=qm, op_state_model=None, logger=None
)
cm.enqueue(stop_task())
# Wait for the command to start
while not qm.task_status:
pass
# Stop all threads
cm.message_queue.stop_tasks()
# Wait for the exit
while not qm.task_result:
pass
# Wait for all the workers to stop
while not any([worker.is_alive() for worker in qm._threads]):
pass
@pytest.mark.timeout(5)
def test_delete_queue(self, slow_task, stop_task, abort_task):
"""Test deleting the queue."""
call_back_func = MagicMock()
qm = QueueManager(
logger,
max_queue_size=8,
num_workers=2,
on_property_update_callback=call_back_func,
)
cm = TaskQueueComponentManager(
message_queue=qm, op_state_model=None, logger=None
)
cm.enqueue(slow_task())
cm.enqueue(stop_task())
cm.enqueue(abort_task())
cm.enqueue(stop_task())
cm.enqueue(abort_task())
cm.enqueue(stop_task())
cm.enqueue(abort_task())
cm.enqueue(stop_task())
cm.enqueue(abort_task())
del cm.message_queue
del cm
class TestComponentManager:
"""Tests for the component manager."""
def test_init(self):
"""Test that we can init the component manager."""
qm = QueueManager(logger, max_queue_size=0, num_workers=1)
cm = TaskQueueComponentManager(
message_queue=qm, op_state_model=None, logger=logger
)
assert cm.message_queue.task_ids_in_queue == []
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment