diff --git a/docs/source/api/index.rst b/docs/source/api/index.rst index fc226495739e260ae76a8428f0a6ba17c88b9fdb..a7a88ba3279ccdce92a7cf2c35d58171cb6ceaf4 100644 --- a/docs/source/api/index.rst +++ b/docs/source/api/index.rst @@ -32,3 +32,4 @@ API Faults<faults> Release<release> Utils<utils> + QueueManager<queue_manager> diff --git a/docs/source/api/queue_manager.rst b/docs/source/api/queue_manager.rst new file mode 100644 index 0000000000000000000000000000000000000000..71dfdcd85a76d4a95a2363ab14a552a44ed069db --- /dev/null +++ b/docs/source/api/queue_manager.rst @@ -0,0 +1,6 @@ +============= +Queue Manager +============= + +.. automodule:: ska_tango_base.base.task_queue_component_manager + :members: diff --git a/setup.py b/setup.py index d544c2ea40a484885142781d5ea275aa8301a55f..ef6e738468a6ef19a38c37ea3599a221ce4c7ffc 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ setuptools.setup( "ska_ser_logging", "transitions", ], - tests_require=["pytest", "coverage", "pytest-json-report", "pytest-forked"], + tests_require=["pytest", "coverage", "pytest-json-report", "pytest-forked", "pytest-timeout"], entry_points={ "console_scripts": [ "SKAAlarmHandler=ska_tango_base.alarm_handler_device:main", diff --git a/src/ska_tango_base/base/task_queue_component_manager.py b/src/ska_tango_base/base/task_queue_component_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..d729bf074b18f6c8200a16dc89a91adedea93278 --- /dev/null +++ b/src/ska_tango_base/base/task_queue_component_manager.py @@ -0,0 +1,709 @@ +""" +This module provides a QueueManager, TaskResult and QueueTask classes. + +* **TaskResult**: is a convenience `dataclass` for parsing and formatting the + results of a task. + +* **QueueTask**: is a class that instances of which can be added to the queue for execution + by background threads. + +* **QueueManager**: that implements the queue and thread worker functionality. + +********** +TaskResult +********** + +This is a simple convenience class to parse or format the task result. The result of a task will +be made available as a Tango device attribute named `command_result`. It will be a tuple of 3 strings. + +1. The unique ID + Every command/task put on the queue for execution by background threads will have a unique ID. +2. The Result Code + This is the result of the task executed by a worker from the queue. +3. The task result + The string representation of the returned result for the task on the queue. + +.. code-block:: py + + from ska_tango_base.base.task_queue_component_manager import TaskResult + tr = TaskResult.from_task_result(["UniqueID", "0", "The task result"]) + tr + TaskResult(result_code=<ResultCode.OK: 0>, task_result='The task result', unique_id='UniqueID') + tr.to_task_result() + ('UniqueID', '0', 'The task result') + +********* +QueueTask +********* + +This class should be subclassed and the `do` method implemented with the required functionality. +The `do` method will be executed by the background worker in a thread. + +`get_task_name` can be overridden if you want to change the name of the task as it would appear in +the `tasks_in_queue` property. + +Simple example: + +.. code-block:: py + + class SimpleTask(QueueTask): + def do(self): + num_one = self.args[0] + num_two = self.kwargs.get("num_two") + return num_one + num_two + + return SimpleTask(2, num_two=3) + +3 items are added dynamically by the worker thread and is available for use in the class instance. + +* **is_aborting_event**: can be check periodically to determine whether + the queue tasks have been aborted to gracefully complete the task in progress. + The thread will stay active and once `is_aborting_event` has been unset, + new tasks will be fetched from the queue for execution. + +.. code-block:: py + + class AbortTask(QueueTask): + def do(self): + sleep_time = self.args[0] + while not self.is_aborting_event.is_set(): + time.sleep(sleep_time) + + return AbortTask(0.2) + +* **is_stopping_event**: can be check periodically to determine whether + the queue tasks have been stopped. In this case the thread will complete. + +.. code-block:: py + + class StopTask(QueueTask): + def do(self): + assert not self.is_stopping_event.is_set() + while not self.is_stopping_event.is_set(): + pass + + return StopTask() + +* **update_progress**: a callback that can be called wth the current progress + of the task in progress + +.. code-block:: py + + class ProgressTask(QueueTask): + def do(self): + for i in range(100): + self.update_progress(str(i)) + time.sleep(0.5) + + return ProgressTask() + +************ +QueueManager +************ + +The queue manager class manages the queue, workers and the update of properties. +The number of worker threads can be specified. + +When `num_workers` is 0, tasks that are enqueued will *not* be put on the queue, +but will simply be executed and thus block until done. No worker threads are started in this case. + +As tasks are taken off the queue and completes, the properties below will be updated. An optional callback +`on_property_update_callback` can be specified that will be executed for every property change. This callback +will be called with the name of the property and its current value. + +* **tasks_in_queue**: A list of names for the tasks in the queue. + Changes when a queue item is added or removed. + +* **task_ids_in_queue**: A list of unique IDs for the tasks in the queue. + Changes when a queue item is added or removed. + +* **task_result**: The result of the latest completed task. Changes when task completes. + +* **task_status**: A list of unique IDs and the their status. + Currently indicates when a task is in progress and changes when a task is started or completed. + +Other properties of note: + +* **queue_full**: Indicates whether the queue is full or not. + If the queue is full, the result of any enqueued task will immediately be set to REJECTED. + +* **task_progress**: When reading this property the progress of the tasks are fetched from the worker threads. + +Aborting tasks +-------------- + +When `abort_tasks` is called on the queue manager the following will happen. + +* Any tasks in progress will complete. Tasks that check `is_aborting_event` will know to complete otherwise + it will complete as per normal. + +* Any tasks on the queue will be removed and their result set to ABORTED. They will not be executed. + +* Any tasks enqueued while in aborted state will immediately be removed from the queue and marked as ABORTED. + +* The thread stays alive. + +When `resume_tasks` is then called, tasks are again put on the queue, retrieved and executed as per normal. + +Stopping tasks +-------------- + +Once `stop_tasks` is called the worker threads completes as soon as possible. + +* Any tasks in progress will complete. Tasks that check `is_stopping_event` will know to exit gracefully. + +* The thread will cease. + +Getting the state of a task +--------------------------- + +Calling `get_task_state` with the task ID will check the state of the task. A history of completed tasks +are not kept, so it may not be found. + +""" +from __future__ import annotations +import enum +import logging +import threading +import time +import traceback +from queue import Empty, Queue +from threading import Event +from typing import Any, Callable, Dict, Optional, Tuple +from attr import dataclass + +import tango + +from ska_tango_base.base.component_manager import BaseComponentManager +from ska_tango_base.commands import ResultCode + + +class TaskState(enum.IntEnum): + """The state of the QueueTask in the QueueManager.""" + + QUEUED = 0 + """ + The task has been accepted and will be executed at a future time + """ + + IN_PROGRESS = 1 + """ + The task in progress + """ + + ABORTED = 2 + """ + The task in progress has been aborted + """ + + NOT_FOUND = 3 + """ + The task is not found + """ + + COMPLETED = 4 + """ + The task was completed. + """ + + NOT_ALLOWED = 5 + """ + The task is not allowed to be executed + """ + + +@dataclass +class TaskResult: + """Convenience class for results.""" + + result_code: ResultCode + task_result: str + unique_id: str + + def to_task_result(self) -> Tuple[str]: + """Convert TaskResult to task_result. + + :return: The task result + :rtype: list[str] + """ + return (f"{self.unique_id}", f"{int(self.result_code)}", f"{self.task_result}") + + @classmethod + def from_task_result(cls, task_result: list) -> TaskResult: + """Convert task_result list to TaskResult. + + :param task_result: The task_result [unique_id, result_code, task_result] + :type task_result: list + :return: The task result + :rtype: TaskResult + :raises: ValueError + """ + if len(task_result) != 3: + raise ValueError(f"Cannot parse task_result {task_result}") + + return TaskResult( + result_code=ResultCode(int(task_result[1])), + task_result=task_result[2], + unique_id=task_result[0], + ) + + +class QueueTask: + """A task that can be put on the queue.""" + + def __init__(self: QueueTask, *args, **kwargs) -> None: + """Create the task. args and kwargs are stored and should be referenced in the `do` method.""" + self.args = args + self.kwargs = kwargs + self._update_progress_callback = None + + @property + def is_aborting_event(self) -> threading.Event: + """Worker adds is_aborting_event threading event. + + Indicates whether task execution have been aborted. + + :return: The is_aborted event. + :rtype: threading.Event + """ + return self.kwargs.get("is_aborting_event") + + @property + def is_stopping_event(self) -> threading.Event: + """Worker adds is_stopping_event threading event. + + Indicates whether task execution have been stopped. + + :return: The is_stopping event. + :rtype: threading.Event + """ + return self.kwargs.get("is_stopping_event") + + def update_progress(self, progress: str): + """Call the callback to update the progress. + + :param progress: String that to indicate progress of task + :type progress: str + """ + self._update_progress_callback = self.kwargs.get("update_progress_callback") + if self._update_progress_callback: + self._update_progress_callback(progress) + + def get_task_name(self) -> str: + """Return a custom task name. + + :return: The name of the task + :rtype: str + """ + return self.__class__.__name__ + + def do(self: QueueTask) -> Any: + """Implement this method with your functionality.""" + raise NotImplementedError + + +class QueueManager: + """Manages the worker threads. Updates the properties as the tasks are completed.""" + + class Worker(threading.Thread): + """A worker thread that takes tasks from the queue and performs them.""" + + def __init__( + self: QueueManager.Worker, + queue: Queue, + logger: logging.Logger, + stopping_event: Event, + result_callback: Callable, + update_command_state_callback: Callable, + queue_fetch_timeout: int = 0.1, + ) -> None: + """Initiate a worker. + + :param self: Worker class + :type self: QueueManager.Worker + :param queue: The queue from which tasks are pulled + :type queue: Queue + :param logger: Logger to log to + :type logger: logging.Logger + :param stopping_event: Indicates whether to get more tasks off the queue + :type stopping_event: Event + :param update_command_state_callback: Callback to update command state + :type update_command_state_callback: Callable + """ + super().__init__() + self._work_queue = queue + self._logger = logger + self.is_stopping = stopping_event + self.is_aborting = threading.Event() + self._result_callback = result_callback + self._update_command_state_callback = update_command_state_callback + self._queue_fetch_timeout = queue_fetch_timeout + self.current_task_progress: Optional[str] = None + self.current_task_id: Optional[str] = None + self.setDaemon(True) + + def run(self) -> None: + """Run in the thread. + + Tasks are fetched off the queue and executed. + if _is_stopping is set the thread wil exit. + If _is_aborting is set the queue will be emptied. All new commands will be aborted until + is_aborting cleared. + """ + with tango.EnsureOmniThread(): + while not self.is_stopping.is_set(): + self.current_task_id = None + self.current_task_progress = "" + + if self.is_aborting.is_set(): + # Drain the Queue since self.is_aborting is set + while not self._work_queue.empty(): + unique_id, _ = self._work_queue.get() + self.current_task_id = unique_id + self._logger.warning("Aborting task ID [%s]", unique_id) + result = TaskResult( + ResultCode.ABORTED, f"{unique_id} Aborted", unique_id + ) + self._result_callback(result) + self._work_queue.task_done() + time.sleep(self._queue_fetch_timeout) + continue # Don't try and get work off the queue below, continue next loop + try: + (unique_id, task) = self._work_queue.get( + block=True, timeout=self._queue_fetch_timeout + ) + + self._update_command_state_callback(unique_id, "IN_PROGRESS") + self.current_task_id = unique_id + # Inject is_aborting, is_stopping, progress_update into task + task.kwargs["is_aborting_event"] = self.is_aborting + task.kwargs["is_stopping_event"] = self.is_stopping + task.kwargs[ + "update_progress_callback" + ] = self._update_progress_callback + result = self.execute_task(task, unique_id) + self._result_callback(result) + self._work_queue.task_done() + except Empty: + continue + return + + def _update_progress_callback(self, progress: str) -> None: + """Update the current task progress. + + :param progress: An indication of progress + :type progress: str + """ + self.current_task_progress = progress + + @classmethod + def execute_task(cls, task: QueueTask, unique_id: str) -> TaskResult: + """Execute a task, return results in a standardised format. + + :param task: Task to execute + :type task: QueueTask + :param unique_id: The task unique ID + :type unique_id: str + :return: The result of the task + :rtype: TaskResult + """ + try: + result = TaskResult(ResultCode.OK, f"{task.do()}", unique_id) + except Exception as err: + result = TaskResult( + ResultCode.FAILED, + f"Error: {err} {traceback.format_exc()}", + unique_id, + ) + return result + + def __init__( + self: QueueManager, + logger: logging.Logger, + max_queue_size: int = 0, + queue_fetch_timeout: float = 0.1, + num_workers: int = 0, + on_property_update_callback: Optional[Callable] = None, + ): + """Init QueryManager. + + Creates the queue and starts the thread that will execute tasks + from it. + + :param logger: Python logger + :type logger: logging.Logger + :param max_queue_size: The maximum size of the queue + :type max_queue_size: int + :param max_queue_size: The time to wait for items in the queue + :type max_queue_size: float + :param num_workers: The number of worker threads to start + :type num_workers: float + """ + self._logger = logger + self._max_queue_size = max_queue_size + self._work_queue = Queue(self._max_queue_size) + self._queue_fetch_timeout = queue_fetch_timeout + self._on_property_update_callback = on_property_update_callback + self.is_stopping = threading.Event() + self._property_update_lock = threading.Lock() + + self._task_result = () + self._tasks_in_queue: Dict[str, str] = {} # unique_id, task_name + self._task_status: Dict[str, str] = {} # unique_id, status + self._threads = [] + + # If there's no queue, don't start threads + if not self._max_queue_size: + return + + self._threads = [ + self.Worker( + self._work_queue, + self._logger, + self.is_stopping, + self.result_callback, + self.update_task_state_callback, + ) + for _ in range(num_workers) + ] + for thread in self._threads: + thread.start() + + @property + def queue_full(self) -> bool: + """Check if the queue is full. + + :return: Whether or not the queue is full. + :rtype: bool + """ + return self._work_queue.full() + + @property + def task_result(self) -> Tuple[str]: + """Return the last task result. + + :return: Last task result + :rtype: Tuple + """ + return self._task_result + + @property + def task_ids_in_queue(self) -> list: + """Task IDs in the queue. + + :return: The task IDs in the queue + :rtype: list + """ + return list(self._tasks_in_queue.keys()) + + @property + def tasks_in_queue(self) -> list: + """Task names in the queue. + + :return: The list of task names in the queue + :rtype: list + """ + return list(self._tasks_in_queue.values()) + + @property + def task_status(self) -> Dict[str, str]: + """Return task status. + + :return: The task status + :rtype: Dict[str, str] + """ + return self._task_status.copy() + + @property + def task_progress(self) -> Dict[str, str]: + """Return the task progress. + + :return: The task progress + :rtype: Dict[str, str] + """ + progress = {} + for worker in self._threads: + if worker.current_task_id: + progress[worker.current_task_id] = worker.current_task_progress + return progress + + def enqueue_task(self, task: QueueTask) -> str: + """Add the task to be done onto the queue. + + :param task: The task to execute in a thread + :type task: QueueTask + :return: The unique ID of the command + :rtype: string + """ + unique_id = self.get_unique_id(task.get_task_name()) + + # If there is no queue, just execute the command and return + if self._max_queue_size == 0: + self.update_task_state_callback(unique_id, "IN_PROGRESS") + result = self.Worker.execute_task(task, unique_id) + self.result_callback(result) + return unique_id + + if self.queue_full: + self.result_callback( + TaskResult(ResultCode.REJECTED, "Queue is full", unique_id) + ) + return unique_id + + self._work_queue.put([unique_id, task]) + with self._property_update_lock: + self._tasks_in_queue[unique_id] = task.get_task_name() + self._on_property_change("tasks_in_queue") + self._on_property_change("task_ids_in_queue") + return unique_id + + def result_callback(self, task_result: TaskResult): + """Run when the task, taken from the queue, have completed to update the appropriate attributes. + + :param task_result: The result of the task + :type task_result: TaskResult + """ + with self._property_update_lock: + if task_result.unique_id in self._task_status: + del self._task_status[task_result.unique_id] + self._task_result = task_result.to_task_result() + self._on_property_change("task_result") + + def update_task_state_callback(self, unique_id: str, status: str): + """Update the executing task state. + + :param unique_id: The task unique ID + :type unique_id: str + :param status: The state of the task + :type status: str + """ + if unique_id in self._tasks_in_queue: + with self._property_update_lock: + del self._tasks_in_queue[unique_id] + self._on_property_change("task_ids_in_queue") + self._on_property_change("tasks_in_queue") + + with self._property_update_lock: + self._task_status[unique_id] = status + self._on_property_change("task_status") + + def _on_property_change(self, property_name: str): + """Trigger when a property changes value. + + :param property_name: The property name + :type property_name: str + """ + if self._on_property_update_callback: + self._on_property_update_callback( + property_name, getattr(self, property_name) + ) + + def abort_tasks(self): + """Start aborting tasks.""" + for worker in self._threads: + worker.is_aborting.set() + + def resume_tasks(self): + """Unsets aborting so tasks can be picked up again.""" + for worker in self._threads: + worker.is_aborting.clear() + + def stop_tasks(self): + """Set is_stopping on each thread so it exists out. Killing the thread.""" + self.is_stopping.set() + + @property + def is_aborting(self) -> bool: + """Return False if any of the threads are aborting.""" + return all([worker.is_aborting.is_set() for worker in self._threads]) + + @classmethod + def get_unique_id(cls, task_name) -> str: + """Generate a unique ID for the task. + + :param task_name: The name of the task + :type task_name: string + :return: The unique ID of the task + :rtype: string + """ + return f"{time.time()}_{task_name}" + + def get_task_state(self, unique_id: str) -> TaskState: + """Attempt to get state of QueueTask. + + :param unique_id: Unique ID of the QueueTask + :type unique_id: str + :return: State of the QueueTask + :rtype: TaskState + """ + if self._task_result: + _task_result = TaskResult.from_task_result(self._task_result) + if unique_id == _task_result.unique_id: + return TaskState.COMPLETED + + if unique_id in self.task_ids_in_queue: + return TaskState.QUEUED + + if unique_id in self.task_status.keys(): + return TaskState.IN_PROGRESS + + return TaskState.NOT_FOUND + + def __del__(self) -> None: + """Release resources prior to instance deletion. + + - Set the workers to aborting, this will empty out the queue and set the result code + for each task to `Aborted`. + - Wait for the queues to empty out. + - Set the workers to stopping, this will exit out the running thread. + """ + if not self._threads: + return + + self.abort_tasks() + self._work_queue.join() + for worker in self._threads: + worker.is_stopping.set() + + def __len__(self) -> int: + """Approximate length of the queue. + + :return: The approximate length of the queue + :rtype: int + """ + return self._work_queue.qsize() + + +class TaskQueueComponentManager(BaseComponentManager): + """A component manager that provides message queue functionality.""" + + def __init__( + self: TaskQueueComponentManager, + message_queue: QueueManager, + op_state_model: Any, + *args: Any, + **kwargs: Any, + ) -> None: + """Create a new component manager that puts tasks on the queue. + + :param message_queue: The queue manager instance + :type message_queue: QueueManager + :param op_state_model: The ops state model + :type op_state_model: Any + """ + self.message_queue = message_queue + + super().__init__(op_state_model, *args, **kwargs) + + def enqueue( + self, + task: QueueTask, + ) -> str: + """Put `task` on the queue. The unique ID for it is returned. + + :param task: The task to execute in the thread + :type task: QueueTask + :return: The unique ID of the queued command + :rtype: str + """ + return self.message_queue.enqueue_task(task) diff --git a/src/ska_tango_base/commands.py b/src/ska_tango_base/commands.py index c14b4f1878682df12308a8d9ee25942e50445a3a..7ebf3f582072ff211eafd94514c64cdf13ca515a 100644 --- a/src/ska_tango_base/commands.py +++ b/src/ska_tango_base/commands.py @@ -96,6 +96,21 @@ class ResultCode(enum.IntEnum): The status of the command is not known. """ + REJECTED = 5 + """ + The command execution has been rejected. + """ + + NOT_ALLOWED = 6 + """ + The command is not allowed to be executed + """ + + ABORTED = 7 + """ + The command in progress has been aborted + """ + class BaseCommand: """ diff --git a/tests/test_task_queue_component_manager.py b/tests/test_task_queue_component_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..4b343afc565c0f46010469133f27e890afec597c --- /dev/null +++ b/tests/test_task_queue_component_manager.py @@ -0,0 +1,487 @@ +"""Tests for QueueManager and its component manager.""" +import logging +import time +import pytest +from unittest.mock import MagicMock, patch + +from ska_tango_base.commands import ResultCode +from ska_tango_base.base.task_queue_component_manager import ( + QueueManager, + TaskResult, + TaskQueueComponentManager, + QueueTask, + TaskState, +) + +logger = logging.getLogger(__name__) + + +def check_matching_pattern(list_to_check=()): + """Check that lengths go 1,2,3,2,1 for example.""" + list_to_check = list(list_to_check) + if not list_to_check[-1]: + list_to_check.pop() + assert len(list_to_check) > 2 + while len(list_to_check) > 2: + last_e = list_to_check.pop() + first_e = list_to_check.pop(0) + assert len(last_e) == len(first_e) + + +@pytest.fixture +def progress_task(): + """Fixture for a test that throws an exception.""" + + def get_task(): + class ProgressTask(QueueTask): + def do(self): + for i in range(100): + self.update_progress(str(i)) + time.sleep(0.5) + + return ProgressTask() + + return get_task + + +@pytest.fixture +def exc_task(): + """Fixture for a test that throws an exception.""" + + def get_task(): + class ExcTask(QueueTask): + def do(self): + raise Exception("An error occurred") + + return ExcTask() + + return get_task + + +@pytest.fixture +def slow_task(): + """Fixture for a test that takes long.""" + + def get_task(): + class SlowTask(QueueTask): + def do(self): + time.sleep(2) + + return SlowTask() + + return get_task + + +@pytest.fixture +def simple_task(): + """Fixture for a very simple task.""" + + def get_task(): + class SimpleTask(QueueTask): + def do(self): + num_one = self.args[0] + num_two = self.kwargs.get("num_two") + return num_one + num_two + + return SimpleTask(2, num_two=3) + + return get_task + + +@pytest.fixture +def abort_task(): + """Fixture for a task that aborts.""" + + def get_task(): + class AbortTask(QueueTask): + def do(self): + sleep_time = self.args[0] + while not self.is_aborting_event.is_set(): + time.sleep(sleep_time) + + return AbortTask(0.2) + + return get_task + + +@pytest.fixture +def stop_task(): + """Fixture for a task that stops.""" + + def get_task(): + class StopTask(QueueTask): + def do(self): + assert not self.is_stopping_event.is_set() + while not self.is_stopping_event.is_set(): + pass + + return StopTask() + + return get_task + + +class TestQueueTask: + """Test QueueTask.""" + + def test_simple(self, simple_task): + """Test simple task.""" + assert simple_task().do() == 5 + + def test_exception(self, exc_task): + """Test that exception is thrown.""" + with pytest.raises(Exception): + exc_task().do() + + +class TestQueueManager: + """General QueueManager checks.""" + + def test_threads_start(self): + """Test that threads start up. Set stop and exit.""" + qm = QueueManager(logger, max_queue_size=2, num_workers=2) + assert len(qm._threads) == 2 + for worker in qm._threads: + assert worker.is_alive() + assert worker.daemon + + for worker in qm._threads: + worker.is_stopping.set() + + +class TestQueueManagerTasks: + """QueueManager checks for tasks executed.""" + + @pytest.mark.timeout(5) + def test_task_ids(self, simple_task): + """Check ids.""" + qm = QueueManager(logger, max_queue_size=5, num_workers=2) + unique_id_one = qm.enqueue_task(simple_task()) + unique_id_two = qm.enqueue_task(simple_task()) + assert unique_id_one.endswith("SimpleTask") + assert unique_id_one != unique_id_two + + @pytest.mark.timeout(5) + def test_task_is_executed(self, simple_task): + """Check that tasks are executed.""" + with patch.object(QueueManager, "result_callback") as my_cb: + qm = QueueManager(logger, max_queue_size=5, num_workers=2) + unique_id_one = qm.enqueue_task(simple_task()) + unique_id_two = qm.enqueue_task(simple_task()) + + while my_cb.call_count != 2: + time.sleep(0.5) + result_one = my_cb.call_args_list[0][0][0] + result_two = my_cb.call_args_list[1][0][0] + + assert result_one.unique_id.endswith("SimpleTask") + assert result_one.unique_id == unique_id_one + assert result_two.unique_id.endswith("SimpleTask") + assert result_two.unique_id == unique_id_two + + assert result_one.result_code == ResultCode.OK + assert result_two.result_code == ResultCode.OK + + assert result_one.task_result == "5" + assert result_two.task_result == "5" + + @pytest.mark.timeout(5) + def test_task_result(self, simple_task, exc_task): + """Check task results are what's expected.""" + qm = QueueManager(logger, max_queue_size=5, num_workers=2) + add_task_one = simple_task() + exc_task = exc_task() + + qm.enqueue_task(add_task_one) + while not qm.task_result: + time.sleep(0.5) + task_result = TaskResult.from_task_result(qm.task_result) + assert task_result.unique_id.endswith("SimpleTask") + assert task_result.result_code == ResultCode.OK + assert task_result.task_result == "5" + + qm.enqueue_task(exc_task) + while qm.task_result[0].endswith("SimpleTask"): + time.sleep(0.5) + task_result = TaskResult.from_task_result(qm.task_result) + assert task_result.unique_id.endswith("ExcTask") + assert task_result.result_code == ResultCode.FAILED + assert task_result.task_result.startswith( + "Error: An error occurred Traceback (" + ) + + @pytest.mark.timeout(10) + def test_full_queue(self, slow_task): + """Check full queues rejects new commands.""" + with patch.object(QueueManager, "result_callback") as my_cb: + qm = QueueManager(logger, max_queue_size=1, num_workers=1) + for i in range(10): + qm.enqueue_task(slow_task()) + + while len(my_cb.call_args_list) != 10: + time.sleep(0.5) + + results = [i[0][0].result_code for i in my_cb.call_args_list] + # 9/10 should be rejected since the first is busy and the queue length is 1 + assert results[-1] == ResultCode.OK + for res in results[:-1]: + assert res == ResultCode.REJECTED + + with patch.object(QueueManager, "result_callback") as my_cb: + qm = QueueManager(logger, max_queue_size=2, num_workers=2) + for i in range(10): + qm.enqueue_task(slow_task()) + + while len(my_cb.call_args_list) != 10: + time.sleep(0.5) + results = [i[0][0].result_code for i in my_cb.call_args_list] + # 8/10 should be rejected since two are taken to be processed. + assert results[-1] == ResultCode.OK + assert results[-2] == ResultCode.OK + for res in results[:-2]: + assert res == ResultCode.REJECTED + + @pytest.mark.timeout(5) + def test_zero_queue(self, simple_task): + """Check task_result is the same between queue and non queue.""" + expected_name = "SimpleTask" + expected_result_code = ResultCode.OK + expected_result = "5" + + # No Queue + qm = QueueManager(logger, max_queue_size=0, num_workers=1) + assert len(qm._threads) == 0 + res = qm.enqueue_task(simple_task()) + assert res.endswith(expected_name) + assert qm.task_result[0].endswith(expected_name) + assert int(qm.task_result[1]) == expected_result_code + assert qm.task_result[2] == expected_result + + # Queue + qm = QueueManager(logger, max_queue_size=2, num_workers=1) + res = qm.enqueue_task(simple_task()) + assert res.endswith(expected_name) + + # Wait for the task to be picked up + while not qm.task_result: + time.sleep(0.5) + assert qm.task_result[0].endswith(expected_name) + assert int(qm.task_result[1]) == expected_result_code + assert qm.task_result[2] == expected_result + + @pytest.mark.timeout(5) + def test_multi_jobs(self, slow_task): + """Test that multiple threads are working. Test that attribute updates fires.""" + num_of_workers = 3 + + call_back_func = MagicMock() + qm = QueueManager( + logger, + max_queue_size=5, + num_workers=num_of_workers, + on_property_update_callback=call_back_func, + ) + unique_ids = [] + for _ in range(4): + unique_id = qm.enqueue_task(slow_task()) + unique_ids.append(unique_id) + + # Wait for a item on the queue + while not qm.task_ids_in_queue: + pass + + while not qm.task_result: + pass + + # Wait for last task to finish + while unique_ids[-1] != TaskResult.from_task_result(qm.task_result).unique_id: + pass + + all_passed_params = [a_call[0] for a_call in call_back_func.call_args_list] + tasks_in_queue = [ + a_call[1] for a_call in all_passed_params if a_call[0] == "tasks_in_queue" + ] + task_ids_in_queue = [ + a_call[1] + for a_call in all_passed_params + if a_call[0] == "task_ids_in_queue" + ] + task_status = [ + a_call[1] for a_call in all_passed_params if a_call[0] == "task_status" + ] + task_result = [ + a_call[1] for a_call in all_passed_params if a_call[0] == "task_result" + ] + task_result_ids = [res[0] for res in task_result] + + check_matching_pattern(tuple(tasks_in_queue)) + check_matching_pattern(tuple(task_ids_in_queue)) + + # Since there's 3 workers there should at least once be 3 in progress + for status in task_status: + if len(status) == num_of_workers: + break + else: + assert 0, f"Length of {num_of_workers} in task_status not found" + assert len(task_result) == 4 + for unique_id in unique_ids: + assert unique_id in task_result_ids + + def test_task_progress(self, progress_task): + """Test the progress updates.""" + qm = QueueManager(logger, max_queue_size=8, num_workers=2) + unique_id_one = qm.enqueue_task(progress_task()) + unique_id_two = qm.enqueue_task(progress_task()) + + time.sleep(0.5) + assert unique_id_one in qm.task_progress + assert unique_id_two in qm.task_progress + progress_one_before = qm.task_progress[unique_id_one] + progress_two_before = qm.task_progress[unique_id_two] + time.sleep(1.0) + progress_one_after = qm.task_progress[unique_id_one] + progress_two_after = qm.task_progress[unique_id_two] + + assert int(progress_one_after) > int(progress_one_before) + assert int(progress_two_after) > int(progress_two_before) + + def test_task_get_state_completed(self, simple_task): + """Test the QueueTask get state is completed.""" + qm = QueueManager(logger, max_queue_size=8, num_workers=2) + unique_id_one = qm.enqueue_task(simple_task()) + while not qm.task_result: + pass + assert qm.get_task_state(unique_id=unique_id_one) == TaskState.COMPLETED + + def test_task_get_state_in_queued(self, slow_task): + """Test the QueueTask get state is queued.""" + qm = QueueManager(logger, max_queue_size=8, num_workers=1) + qm.enqueue_task(slow_task()) + qm.enqueue_task(slow_task()) + unique_id_last = qm.enqueue_task(slow_task()) + + assert qm.get_task_state(unique_id=unique_id_last) == TaskState.QUEUED + + def test_task_get_state_in_progress(self, progress_task): + """Test the QueueTask get state is in progress.""" + qm = QueueManager(logger, max_queue_size=8, num_workers=2) + unique_id_one = qm.enqueue_task(progress_task()) + while not qm.task_progress: + pass + + assert qm.get_task_state(unique_id=unique_id_one) == TaskState.IN_PROGRESS + + def test_task_get_state_in_not_found(self): + """Test the QueueTask get state not found.""" + qm = QueueManager(logger, max_queue_size=8, num_workers=2) + assert qm.get_task_state(unique_id="non_existing_id") == TaskState.NOT_FOUND + + +class TestQueueManagerExit: + """Test the stopping and aborting.""" + + @pytest.mark.timeout(5) + def test_exit_abort(self, abort_task, slow_task): + """Test aborting exit.""" + call_back_func = MagicMock() + qm = QueueManager( + logger, + max_queue_size=5, + num_workers=2, + on_property_update_callback=call_back_func, + ) + cm = TaskQueueComponentManager( + message_queue=qm, op_state_model=None, logger=None + ) + cm.enqueue(abort_task()) + + # Wait for the command to start + while not qm.task_status: + pass + # Start aborting + cm.message_queue.abort_tasks() + # Wait for the exit + while not qm.task_result: + pass + assert qm.is_aborting + # When aborting this should be rejected + unique_id = cm.enqueue(slow_task()) + while True: + tr = TaskResult.from_task_result(qm.task_result) + if tr.unique_id == unique_id and tr.result_code == ResultCode.ABORTED: + break + + # Resume the commands + qm.resume_tasks() + assert not qm.is_aborting + + # Wait for my slow command to finish + unique_id = cm.enqueue(slow_task()) + while True: + tr = TaskResult.from_task_result(qm.task_result) + if tr.unique_id == unique_id: + break + + @pytest.mark.timeout(10) + def test_exit_stop(self, stop_task): + """Test stopping exit.""" + call_back_func = MagicMock() + qm = QueueManager( + logger, + max_queue_size=5, + num_workers=2, + on_property_update_callback=call_back_func, + ) + cm = TaskQueueComponentManager( + message_queue=qm, op_state_model=None, logger=None + ) + cm.enqueue(stop_task()) + + # Wait for the command to start + while not qm.task_status: + pass + # Stop all threads + cm.message_queue.stop_tasks() + # Wait for the exit + while not qm.task_result: + pass + # Wait for all the workers to stop + while not any([worker.is_alive() for worker in qm._threads]): + pass + + @pytest.mark.timeout(5) + def test_delete_queue(self, slow_task, stop_task, abort_task): + """Test deleting the queue.""" + call_back_func = MagicMock() + qm = QueueManager( + logger, + max_queue_size=8, + num_workers=2, + on_property_update_callback=call_back_func, + ) + cm = TaskQueueComponentManager( + message_queue=qm, op_state_model=None, logger=None + ) + cm.enqueue(slow_task()) + cm.enqueue(stop_task()) + cm.enqueue(abort_task()) + cm.enqueue(stop_task()) + cm.enqueue(abort_task()) + cm.enqueue(stop_task()) + cm.enqueue(abort_task()) + cm.enqueue(stop_task()) + cm.enqueue(abort_task()) + + del cm.message_queue + del cm + + +class TestComponentManager: + """Tests for the component manager.""" + + def test_init(self): + """Test that we can init the component manager.""" + qm = QueueManager(logger, max_queue_size=0, num_workers=1) + cm = TaskQueueComponentManager( + message_queue=qm, op_state_model=None, logger=logger + ) + assert cm.message_queue.task_ids_in_queue == []