Skip to content
Snippets Groups Projects
Unverified Commit 3e1a5da9 authored by SKAJohanVenter's avatar SKAJohanVenter
Browse files

SAR-302 Log from the main thread

parent 833c8b26
No related branches found
No related tags found
No related merge requests found
...@@ -292,7 +292,7 @@ class QueueManager: ...@@ -292,7 +292,7 @@ class QueueManager:
def __init__( def __init__(
self: QueueManager.Worker, self: QueueManager.Worker,
queue: Queue, queue: Queue,
logger: logging.Logger, log_message: Callable,
stopping_event: Event, stopping_event: Event,
aborting_event: Event, aborting_event: Event,
result_callback: Callable, result_callback: Callable,
...@@ -317,7 +317,7 @@ class QueueManager: ...@@ -317,7 +317,7 @@ class QueueManager:
""" """
super().__init__() super().__init__()
self._work_queue = queue self._work_queue = queue
self._logger = logger self._log_message = log_message
self.stopping_event = stopping_event self.stopping_event = stopping_event
self.aborting_event = aborting_event self.aborting_event = aborting_event
self._result_callback = result_callback self._result_callback = result_callback
...@@ -346,7 +346,9 @@ class QueueManager: ...@@ -346,7 +346,9 @@ class QueueManager:
while not self._work_queue.empty(): while not self._work_queue.empty():
unique_id, _, _ = self._work_queue.get() unique_id, _, _ = self._work_queue.get()
self.current_task_id = unique_id self.current_task_id = unique_id
self._logger.warning("Aborting task ID [%s]", unique_id) self._log_message(
f"Aborting task ID [{unique_id}]", level="WARNING"
)
result = TaskResult( result = TaskResult(
ResultCode.ABORTED, f"{unique_id} Aborted", unique_id ResultCode.ABORTED, f"{unique_id} Aborted", unique_id
) )
...@@ -474,7 +476,7 @@ class QueueManager: ...@@ -474,7 +476,7 @@ class QueueManager:
self._threads = [ self._threads = [
self.Worker( self.Worker(
self._work_queue, self._work_queue,
self._logger, self._log_message,
self.stopping_event, self.stopping_event,
self.aborting_event, self.aborting_event,
self.result_callback, self.result_callback,
...@@ -701,6 +703,18 @@ class QueueManager: ...@@ -701,6 +703,18 @@ class QueueManager:
return TaskState.NOT_FOUND return TaskState.NOT_FOUND
def _log_message(self, message: str, level: str = "INFO"):
"""Log a message.
Called from worker threads as well.
:param message: Message to log
:type message: str
:param level: A valid logging level, defaults to "INFO"
:type level: str, optional
"""
self._logger.log(getattr(logging, level), message)
def __len__(self) -> int: def __len__(self) -> int:
"""Approximate length of the queue. """Approximate length of the queue.
......
...@@ -12,6 +12,7 @@ from ska_tango_base.base.task_queue_manager import ( ...@@ -12,6 +12,7 @@ from ska_tango_base.base.task_queue_manager import (
) )
from ska_tango_base.base.reference_component_manager import QueueWorkerComponentManager from ska_tango_base.base.reference_component_manager import QueueWorkerComponentManager
from ska_tango_base.commands import BaseCommand from ska_tango_base.commands import BaseCommand
from tests.test_utils import LRCAttributesStore
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -382,40 +383,29 @@ class TestQueueManagerExit: ...@@ -382,40 +383,29 @@ class TestQueueManagerExit:
@pytest.mark.forked @pytest.mark.forked
@pytest.mark.timeout(5) @pytest.mark.timeout(5)
def test_exit_abort(self, abort_task, slow_task): def test_exit_abort(self, abort_task, slow_task, caplog):
"""Test aborting exit.""" """Test aborting exit."""
results = [] attribute_store = LRCAttributesStore()
caplog.set_level(logging.INFO)
def catch_updates(name, result):
if name == "longRunningCommandResult":
tr = TaskResult.from_task_result(result)
results.append(
(
tr.unique_id,
tr.result_code,
)
)
cm = QueueWorkerComponentManager( cm = QueueWorkerComponentManager(
op_state_model=None, op_state_model=None,
logger=logger, logger=logger,
max_queue_size=10, max_queue_size=10,
num_workers=2, num_workers=2,
push_change_event=catch_updates, push_change_event=attribute_store.store_push_event,
child_devices=[], child_devices=[],
) )
cm.enqueue(abort_task(), 0.1) cm.enqueue(abort_task(), 0.1)
# Wait for the command to start # Wait for the command to start
while not cm.task_status: attribute_store.get_attribute_value("longRunningCommandStatus")
time.sleep(0.1)
# Start aborting # Start aborting
cm._queue_manager.abort_tasks() cm._queue_manager.abort_tasks()
# Wait for the exit # Wait for the exit
while not cm.task_result: attribute_store.get_attribute_value("longRunningCommandResult")
time.sleep(0.1)
# aborting state should be cleaned up since the queue is empty and # aborting state should be cleaned up since the queue is empty and
# nothing is in progress # nothing is in progress
while cm._queue_manager.is_aborting: while cm._queue_manager.is_aborting:
...@@ -433,14 +423,16 @@ class TestQueueManagerExit: ...@@ -433,14 +423,16 @@ class TestQueueManagerExit:
assert cm._queue_manager.is_aborting assert cm._queue_manager.is_aborting
# Load up some tasks that should be aborted # Load up some tasks that should be aborted
cm.enqueue(slow_task()) aborted_task_id, _ = cm.enqueue(slow_task())
cm.enqueue(slow_task()) cm.enqueue(slow_task())
unique_id, _ = cm.enqueue(slow_task()) unique_id, _ = cm.enqueue(slow_task())
while True: while True:
if (unique_id, ResultCode.ABORTED) in results: result_id, result_code, _ = attribute_store.get_attribute_value(
"longRunningCommandResult"
)
if (unique_id, ResultCode.ABORTED) == (result_id, int(result_code)):
break break
time.sleep(0.1)
# Resume the commands # Resume the commands
cm._queue_manager.resume_tasks() cm._queue_manager.resume_tasks()
...@@ -450,9 +442,14 @@ class TestQueueManagerExit: ...@@ -450,9 +442,14 @@ class TestQueueManagerExit:
unique_id, _ = cm.enqueue(slow_task()) unique_id, _ = cm.enqueue(slow_task())
while True: while True:
if (unique_id, ResultCode.OK) in results: result_id, result_code, _ = attribute_store.get_attribute_value(
"longRunningCommandResult"
)
if (unique_id, ResultCode.OK) == (result_id, int(result_code)):
break break
time.sleep(0.1)
log_messages = [rec.msg for rec in caplog.records]
assert f"Aborting task ID [{aborted_task_id}]" in log_messages
@pytest.mark.forked @pytest.mark.forked
@pytest.mark.timeout(5) @pytest.mark.timeout(5)
......
"""Tests for skabase.utils.""" """Tests for skabase.utils."""
from contextlib import nullcontext from contextlib import nullcontext
from queue import Queue
import json import json
from typing import Any
import pytest import pytest
from ska_tango_base.utils import ( from ska_tango_base.utils import (
...@@ -262,3 +264,42 @@ def test_for_testing_only_decorator(): ...@@ -262,3 +264,42 @@ def test_for_testing_only_decorator():
with pytest.warns(None) as warning_record: with pytest.warns(None) as warning_record:
assert bah() == "bah" assert bah() == "bah"
assert len(warning_record) == 0 # no warning was raised because we are testing assert len(warning_record) == 0 # no warning was raised because we are testing
class LRCAttributesStore:
"""Utility class to keep track of long running command attribute changes."""
def __init__(self) -> None:
"""Create the queues."""
self.queues = {}
for attribute in [
"longRunningCommandsInQueue",
"longRunningCommandStatus",
"longRunningCommandProgress",
"longRunningCommandIDsInQueue",
"longRunningCommandResult",
]:
self.queues[attribute] = Queue()
def store_push_event(self, attribute_name: str, value: Any):
"""Store attribute changes as they change.
:param attribute_name: a valid LCR attribute
:type attribute_name: str
:param value: The value of the attribute
:type value: Any
"""
assert attribute_name in self.queues
self.queues[attribute_name].put_nowait(value)
def get_attribute_value(self, attribute_name: str, fetch_timeout: float = 2.0):
"""Read a value from the queue.
:param attribute_name: a valid LCR attribute
:type attribute_name: str
:param fetch_timeout: How long to wait for a event, defaults to 2.0
:type fetch_timeout: float, optional
:return: An attribute value fromthe queue
:rtype: Any
"""
return self.queues[attribute_name].get(timeout=fetch_timeout)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment