Skip to content
Snippets Groups Projects
Select Git revision
  • 4286656bea7ab9f2a6e17458bf2dd6093c18b60a
  • main
  • sar-277-update-docs-with-examples-for-lrc
  • st-946-automate
  • sar_302-log-fix
  • sar-287_subarray_commands_to_lrc
  • sar_302-POC_await_sub_device_state
  • sat_302_fix_pipelines
  • sar-286_lrc_one_subarry_command
  • sar-286_lrc_improvements
  • sar-288-async-controller
  • sar-276-combine-tango-queue
  • sar-255_remove_nexus_reference
  • sar-275-add-LRC
  • sar-273-add-lrc-attributes
  • sar-272
  • sp-1106-marvin-1230525148-ska-tango-base
  • sp-1106-marvin-813091765-ska-tango-base
  • sar-255/Publish-package-to-CAR
  • mccs-661-device-under-test-fixture
  • mccs-659-pep257-docstring-linting
  • 0.11.3
  • 0.11.2
  • 0.11.1
  • 0.11.0
  • 0.10.1
  • 0.10.0
  • 0.9.1
  • 0.9.0
  • 0.8.1
  • 0.8.0
  • 0.7.2
  • 0.7.1
  • 0.7.0
  • 0.6.6
  • 0.6.5
  • 0.6.4
  • 0.6.3
  • 0.6.2
  • 0.6.1
  • 0.6.0
41 results

task_queue_component_manager.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    task_queue_component_manager.py 23.07 KiB
    """
    This module provides a QueueManager, TaskResult and QueueTask classes.
    
    * **TaskResult**: is a convenience `dataclass` for parsing and formatting the
      results of a task.
    
    * **QueueTask**: is a class that instances of which can be added to the queue for execution
      by background threads.
    
    * **QueueManager**: that implements the queue and thread worker functionality.
    
    **********
    TaskResult
    **********
    
    This is a simple convenience class to parse or format the task result. The result of a task will
    be made available as a Tango device attribute named `command_result`. It will be a tuple of 3 strings.
    
    1. The unique ID
        Every command/task put on the queue for execution by background threads will have a unique ID.
    2. The Result Code
        This is the result of the task executed by a worker from the queue.
    3. The task result
        The string representation of the returned result for the task on the queue.
    
    .. code-block:: py
    
        from ska_tango_base.base.task_queue_component_manager import TaskResult
        tr = TaskResult.from_task_result(["UniqueID", "0", "The task result"])
        tr
        TaskResult(result_code=<ResultCode.OK: 0>, task_result='The task result', unique_id='UniqueID')
        tr.to_task_result()
        ('UniqueID', '0', 'The task result')
    
    *********
    QueueTask
    *********
    
    This class should be subclassed and the `do` method implemented with the required functionality.
    The `do` method will be executed by the background worker in a thread.
    
    `get_task_name` can be overridden if you want to change the name of the task as it would appear in
    the `tasks_in_queue` property.
    
    Simple example:
    
    .. code-block:: py
    
        class SimpleTask(QueueTask):
            def do(self):
                num_one = self.args[0]
                num_two = self.kwargs.get("num_two")
                return num_one + num_two
    
        return SimpleTask(2, num_two=3)
    
    3 items are added dynamically by the worker thread and is available for use in the class instance.
    
    * **is_aborting_event**: can be check periodically to determine whether
      the queue tasks have been aborted to gracefully complete the task in progress.
      The thread will stay active and once `is_aborting_event` has been unset,
      new tasks will be fetched from the queue for execution.
    
    .. code-block:: py
    
        class AbortTask(QueueTask):
            def do(self):
                sleep_time = self.args[0]
                while not self.is_aborting_event.is_set():
                    time.sleep(sleep_time)
    
        return AbortTask(0.2)
    
    * **is_stopping_event**: can be check periodically to determine whether
      the queue tasks have been stopped. In this case the thread will complete.
    
    .. code-block:: py
    
        class StopTask(QueueTask):
            def do(self):
                assert not self.is_stopping_event.is_set()
                while not self.is_stopping_event.is_set():
                    pass
    
        return StopTask()
    
    * **update_progress**: a callback that can be called wth the current progress
      of the task in progress
    
    .. code-block:: py
    
        class ProgressTask(QueueTask):
            def do(self):
                for i in range(100):
                    self.update_progress(str(i))
                    time.sleep(0.5)
    
        return ProgressTask()
    
    ************
    QueueManager
    ************
    
    The queue manager class manages the queue, workers and the update of properties.
    The number of worker threads can be specified.
    
    When `num_workers` is 0, tasks that are enqueued will *not* be put on the queue,
    but will simply be executed and thus block until done. No worker threads are started in this case.
    
    As tasks are taken off the queue and completes, the properties below will be updated. An optional callback
    `on_property_update_callback` can be specified that will be executed for every property change. This callback
    will be called with the name of the property and its current value.
    
    * **tasks_in_queue**: A list of names for the tasks in the queue.
      Changes when a queue item is added or removed.
    
    * **task_ids_in_queue**: A list of unique IDs for the tasks in the queue.
      Changes when a queue item is added or removed.
    
    * **task_result**: The result of the latest completed task. Changes when task completes.
    
    * **task_status**: A list of unique IDs and the their status.
      Currently indicates when a task is in progress and changes when a task is started or completed.
    
    Other properties of note:
    
    * **queue_full**: Indicates whether the queue is full or not.
      If the queue is full, the result of any enqueued task will immediately be set to REJECTED.
    
    * **task_progress**: When reading this property the progress of the tasks are fetched from the worker threads.
    
    Aborting tasks
    --------------
    
    When `abort_tasks` is called on the queue manager the following will happen.
    
    * Any tasks in progress will complete. Tasks that check `is_aborting_event` will know to complete otherwise
      it will complete as per normal.
    
    * Any tasks on the queue will be removed and their result set to ABORTED. They will not be executed.
    
    * Any tasks enqueued while in aborted state will immediately be removed from the queue and marked as ABORTED.
    
    * The thread stays alive.
    
    When `resume_tasks` is then called, tasks are again put on the queue, retrieved and executed as per normal.
    
    Stopping tasks
    --------------
    
    Once `stop_tasks` is called the worker threads completes as soon as possible.
    
    * Any tasks in progress will complete. Tasks that check `is_stopping_event` will know to exit gracefully.
    
    * The thread will cease.
    
    Getting the state of a task
    ---------------------------
    
    Calling `get_task_state` with the task ID will check the state of the task. A history of completed tasks
    are not kept, so it may not be found.
    
    """
    from __future__ import annotations
    import enum
    import logging
    import threading
    import time
    import traceback
    from queue import Empty, Queue
    from threading import Event
    from typing import Any, Callable, Dict, Optional, Tuple
    from attr import dataclass
    
    import tango
    
    from ska_tango_base.base.component_manager import BaseComponentManager
    from ska_tango_base.commands import ResultCode
    
    
    class TaskState(enum.IntEnum):
        """The state of the QueueTask in the QueueManager."""
    
        QUEUED = 0
        """
        The task has been accepted and will be executed at a future time
        """
    
        IN_PROGRESS = 1
        """
        The task in progress
        """
    
        ABORTED = 2
        """
        The task in progress has been aborted
        """
    
        NOT_FOUND = 3
        """
        The task is not found
        """
    
        COMPLETED = 4
        """
        The task was completed.
        """
    
        NOT_ALLOWED = 5
        """
        The task is not allowed to be executed
        """
    
    
    @dataclass
    class TaskResult:
        """Convenience class for results."""
    
        result_code: ResultCode
        task_result: str
        unique_id: str
    
        def to_task_result(self) -> Tuple[str]:
            """Convert TaskResult to task_result.
    
            :return: The task result
            :rtype: list[str]
            """
            return (f"{self.unique_id}", f"{int(self.result_code)}", f"{self.task_result}")
    
        @classmethod
        def from_task_result(cls, task_result: list) -> TaskResult:
            """Convert task_result list to TaskResult.
    
            :param task_result: The task_result [unique_id, result_code, task_result]
            :type task_result: list
            :return: The task result
            :rtype: TaskResult
            :raises: ValueError
            """
            if len(task_result) != 3:
                raise ValueError(f"Cannot parse task_result {task_result}")
    
            return TaskResult(
                result_code=ResultCode(int(task_result[1])),
                task_result=task_result[2],
                unique_id=task_result[0],
            )
    
    
    class QueueTask:
        """A task that can be put on the queue."""
    
        def __init__(self: QueueTask, *args, **kwargs) -> None:
            """Create the task. args and kwargs are stored and should be referenced in the `do` method."""
            self.args = args
            self.kwargs = kwargs
            self._update_progress_callback = None
    
        @property
        def is_aborting_event(self) -> threading.Event:
            """Worker adds is_aborting_event threading event.
    
            Indicates whether task execution have been aborted.
    
            :return: The is_aborted event.
            :rtype: threading.Event
            """
            return self.kwargs.get("is_aborting_event")
    
        @property
        def is_stopping_event(self) -> threading.Event:
            """Worker adds is_stopping_event threading event.
    
            Indicates whether task execution have been stopped.
    
            :return: The is_stopping event.
            :rtype: threading.Event
            """
            return self.kwargs.get("is_stopping_event")
    
        def update_progress(self, progress: str):
            """Call the callback to update the progress.
    
            :param progress: String that to indicate progress of task
            :type progress: str
            """
            self._update_progress_callback = self.kwargs.get("update_progress_callback")
            if self._update_progress_callback:
                self._update_progress_callback(progress)
    
        def get_task_name(self) -> str:
            """Return a custom task name.
    
            :return: The name of the task
            :rtype: str
            """
            return self.__class__.__name__
    
        def do(self: QueueTask) -> Any:
            """Implement this method with your functionality."""
            raise NotImplementedError
    
    
    class QueueManager:
        """Manages the worker threads. Updates the properties as the tasks are completed."""
    
        class Worker(threading.Thread):
            """A worker thread that takes tasks from the queue and performs them."""
    
            def __init__(
                self: QueueManager.Worker,
                queue: Queue,
                logger: logging.Logger,
                stopping_event: Event,
                result_callback: Callable,
                update_command_state_callback: Callable,
                queue_fetch_timeout: int = 0.1,
            ) -> None:
                """Initiate a worker.
    
                :param self: Worker class
                :type self: QueueManager.Worker
                :param queue: The queue from which tasks are pulled
                :type queue: Queue
                :param logger: Logger to log to
                :type logger: logging.Logger
                :param stopping_event: Indicates whether to get more tasks off the queue
                :type stopping_event: Event
                :param update_command_state_callback: Callback to update command state
                :type update_command_state_callback: Callable
                """
                super().__init__()
                self._work_queue = queue
                self._logger = logger
                self.is_stopping = stopping_event
                self.is_aborting = threading.Event()
                self._result_callback = result_callback
                self._update_command_state_callback = update_command_state_callback
                self._queue_fetch_timeout = queue_fetch_timeout
                self.current_task_progress: Optional[str] = None
                self.current_task_id: Optional[str] = None
                self.setDaemon(True)
    
            def run(self) -> None:
                """Run in the thread.
    
                Tasks are fetched off the queue and executed.
                if _is_stopping is set the thread wil exit.
                If _is_aborting is set the queue will be emptied. All new commands will be aborted until
                is_aborting cleared.
                """
                with tango.EnsureOmniThread():
                    while not self.is_stopping.is_set():
                        self.current_task_id = None
                        self.current_task_progress = ""
    
                        if self.is_aborting.is_set():
                            # Drain the Queue since self.is_aborting is set
                            while not self._work_queue.empty():
                                unique_id, _ = self._work_queue.get()
                                self.current_task_id = unique_id
                                self._logger.warning("Aborting task ID [%s]", unique_id)
                                result = TaskResult(
                                    ResultCode.ABORTED, f"{unique_id} Aborted", unique_id
                                )
                                self._result_callback(result)
                                self._work_queue.task_done()
                            time.sleep(self._queue_fetch_timeout)
                            continue  # Don't try and get work off the queue below, continue next loop
                        try:
                            (unique_id, task) = self._work_queue.get(
                                block=True, timeout=self._queue_fetch_timeout
                            )
    
                            self._update_command_state_callback(unique_id, "IN_PROGRESS")
                            self.current_task_id = unique_id
                            # Inject is_aborting, is_stopping, progress_update into task
                            task.kwargs["is_aborting_event"] = self.is_aborting
                            task.kwargs["is_stopping_event"] = self.is_stopping
                            task.kwargs[
                                "update_progress_callback"
                            ] = self._update_progress_callback
                            result = self.execute_task(task, unique_id)
                            self._result_callback(result)
                            self._work_queue.task_done()
                        except Empty:
                            continue
                    return
    
            def _update_progress_callback(self, progress: str) -> None:
                """Update the current task progress.
    
                :param progress: An indication of progress
                :type progress: str
                """
                self.current_task_progress = progress
    
            @classmethod
            def execute_task(cls, task: QueueTask, unique_id: str) -> TaskResult:
                """Execute a task, return results in a standardised format.
    
                :param task: Task to execute
                :type task: QueueTask
                :param unique_id: The task unique ID
                :type unique_id: str
                :return: The result of the task
                :rtype: TaskResult
                """
                try:
                    result = TaskResult(ResultCode.OK, f"{task.do()}", unique_id)
                except Exception as err:
                    result = TaskResult(
                        ResultCode.FAILED,
                        f"Error: {err} {traceback.format_exc()}",
                        unique_id,
                    )
                return result
    
        def __init__(
            self: QueueManager,
            logger: logging.Logger,
            max_queue_size: int = 0,
            queue_fetch_timeout: float = 0.1,
            num_workers: int = 0,
            on_property_update_callback: Optional[Callable] = None,
        ):
            """Init QueryManager.
    
            Creates the queue and starts the thread that will execute tasks
            from it.
    
            :param logger: Python logger
            :type logger: logging.Logger
            :param max_queue_size: The maximum size of the queue
            :type max_queue_size: int
            :param max_queue_size: The time to wait for items in the queue
            :type max_queue_size: float
            :param num_workers: The number of worker threads to start
            :type num_workers: float
            """
            self._logger = logger
            self._max_queue_size = max_queue_size
            self._work_queue = Queue(self._max_queue_size)
            self._queue_fetch_timeout = queue_fetch_timeout
            self._on_property_update_callback = on_property_update_callback
            self.is_stopping = threading.Event()
            self._property_update_lock = threading.Lock()
    
            self._task_result = ()
            self._tasks_in_queue: Dict[str, str] = {}  # unique_id, task_name
            self._task_status: Dict[str, str] = {}  # unique_id, status
            self._threads = []
    
            # If there's no queue, don't start threads
            if not self._max_queue_size:
                return
    
            self._threads = [
                self.Worker(
                    self._work_queue,
                    self._logger,
                    self.is_stopping,
                    self.result_callback,
                    self.update_task_state_callback,
                )
                for _ in range(num_workers)
            ]
            for thread in self._threads:
                thread.start()
    
        @property
        def queue_full(self) -> bool:
            """Check if the queue is full.
    
            :return: Whether or not the queue is full.
            :rtype: bool
            """
            return self._work_queue.full()
    
        @property
        def task_result(self) -> Tuple[str]:
            """Return the last task result.
    
            :return: Last task result
            :rtype: Tuple
            """
            return self._task_result
    
        @property
        def task_ids_in_queue(self) -> list:
            """Task IDs in the queue.
    
            :return: The task IDs in the queue
            :rtype: list
            """
            return list(self._tasks_in_queue.keys())
    
        @property
        def tasks_in_queue(self) -> list:
            """Task names in the queue.
    
            :return: The list of task names in the queue
            :rtype: list
            """
            return list(self._tasks_in_queue.values())
    
        @property
        def task_status(self) -> Dict[str, str]:
            """Return task status.
    
            :return: The task status
            :rtype: Dict[str, str]
            """
            return self._task_status.copy()
    
        @property
        def task_progress(self) -> Dict[str, str]:
            """Return the task progress.
    
            :return: The task progress
            :rtype: Dict[str, str]
            """
            progress = {}
            for worker in self._threads:
                if worker.current_task_id:
                    progress[worker.current_task_id] = worker.current_task_progress
            return progress
    
        def enqueue_task(self, task: QueueTask) -> str:
            """Add the task to be done onto the queue.
    
            :param task: The task to execute in a thread
            :type task: QueueTask
            :return: The unique ID of the command
            :rtype: string
            """
            unique_id = self.get_unique_id(task.get_task_name())
    
            # If there is no queue, just execute the command and return
            if self._max_queue_size == 0:
                self.update_task_state_callback(unique_id, "IN_PROGRESS")
                result = self.Worker.execute_task(task, unique_id)
                self.result_callback(result)
                return unique_id
    
            if self.queue_full:
                self.result_callback(
                    TaskResult(ResultCode.REJECTED, "Queue is full", unique_id)
                )
                return unique_id
    
            self._work_queue.put([unique_id, task])
            with self._property_update_lock:
                self._tasks_in_queue[unique_id] = task.get_task_name()
            self._on_property_change("tasks_in_queue")
            self._on_property_change("task_ids_in_queue")
            return unique_id
    
        def result_callback(self, task_result: TaskResult):
            """Run when the task, taken from the queue, have completed to update the appropriate attributes.
    
            :param task_result: The result of the task
            :type task_result: TaskResult
            """
            with self._property_update_lock:
                if task_result.unique_id in self._task_status:
                    del self._task_status[task_result.unique_id]
                self._task_result = task_result.to_task_result()
            self._on_property_change("task_result")
    
        def update_task_state_callback(self, unique_id: str, status: str):
            """Update the executing task state.
    
            :param unique_id: The task unique ID
            :type unique_id: str
            :param status: The state of the task
            :type status: str
            """
            if unique_id in self._tasks_in_queue:
                with self._property_update_lock:
                    del self._tasks_in_queue[unique_id]
                self._on_property_change("task_ids_in_queue")
                self._on_property_change("tasks_in_queue")
    
            with self._property_update_lock:
                self._task_status[unique_id] = status
            self._on_property_change("task_status")
    
        def _on_property_change(self, property_name: str):
            """Trigger when a property changes value.
    
            :param property_name: The property name
            :type property_name: str
            """
            if self._on_property_update_callback:
                self._on_property_update_callback(
                    property_name, getattr(self, property_name)
                )
    
        def abort_tasks(self):
            """Start aborting tasks."""
            for worker in self._threads:
                worker.is_aborting.set()
    
        def resume_tasks(self):
            """Unsets aborting so tasks can be picked up again."""
            for worker in self._threads:
                worker.is_aborting.clear()
    
        def stop_tasks(self):
            """Set is_stopping on each thread so it exists out. Killing the thread."""
            self.is_stopping.set()
    
        @property
        def is_aborting(self) -> bool:
            """Return False if any of the threads are aborting."""
            return all([worker.is_aborting.is_set() for worker in self._threads])
    
        @classmethod
        def get_unique_id(cls, task_name) -> str:
            """Generate a unique ID for the task.
    
            :param task_name: The name of the task
            :type task_name: string
            :return: The unique ID of the task
            :rtype: string
            """
            return f"{time.time()}_{task_name}"
    
        def get_task_state(self, unique_id: str) -> TaskState:
            """Attempt to get state of QueueTask.
    
            :param unique_id: Unique ID of the QueueTask
            :type unique_id: str
            :return: State of the QueueTask
            :rtype: TaskState
            """
            if self._task_result:
                _task_result = TaskResult.from_task_result(self._task_result)
                if unique_id == _task_result.unique_id:
                    return TaskState.COMPLETED
    
            if unique_id in self.task_ids_in_queue:
                return TaskState.QUEUED
    
            if unique_id in self.task_status.keys():
                return TaskState.IN_PROGRESS
    
            return TaskState.NOT_FOUND
    
        def __del__(self) -> None:
            """Release resources prior to instance deletion.
    
            - Set the workers to aborting, this will empty out the queue and set the result code
              for each task to `Aborted`.
            - Wait for the queues to empty out.
            - Set the workers to stopping, this will exit out the running thread.
            """
            if not self._threads:
                return
    
            self.abort_tasks()
            self._work_queue.join()
            for worker in self._threads:
                worker.is_stopping.set()
    
        def __len__(self) -> int:
            """Approximate length of the queue.
    
            :return: The approximate length of the queue
            :rtype: int
            """
            return self._work_queue.qsize()
    
    
    class TaskQueueComponentManager(BaseComponentManager):
        """A component manager that provides message queue functionality."""
    
        def __init__(
            self: TaskQueueComponentManager,
            message_queue: QueueManager,
            op_state_model: Any,
            *args: Any,
            **kwargs: Any,
        ) -> None:
            """Create a new component manager that puts tasks on the queue.
    
            :param message_queue: The queue manager instance
            :type message_queue: QueueManager
            :param op_state_model: The ops state model
            :type op_state_model: Any
            """
            self.message_queue = message_queue
    
            super().__init__(op_state_model, *args, **kwargs)
    
        def enqueue(
            self,
            task: QueueTask,
        ) -> str:
            """Put `task` on the queue. The unique ID for it is returned.
    
            :param task: The task to execute in the thread
            :type task: QueueTask
            :return: The unique ID of the queued command
            :rtype: str
            """
            return self.message_queue.enqueue_task(task)