Skip to content
Snippets Groups Projects
Unverified Commit 6dc85804 authored by SKAJohanVenter's avatar SKAJohanVenter
Browse files

SAR-276 Passing puch_change_event dwon to queue_manager

parent 132cea3e
No related branches found
No related tags found
No related merge requests found
......@@ -233,9 +233,12 @@ class BlockingBaseDevice(BaseTestDevice):
class AsyncBaseDevice(BaseTestDevice):
"""Test device that has a component manager with workers."""
def create_component_manager(self):
def create_component_manager(self: SKABaseDevice):
"""Create the component manager with a queue manager that has workers."""
queue_manager = QueueManager(
max_queue_size=10, num_workers=3, logger=self.logger
max_queue_size=10,
num_workers=3,
logger=self.logger,
push_change_event=self.push_change_event,
)
return BaseComponentManager(op_state_model=None, queue_manager=queue_manager)
......@@ -456,6 +456,7 @@ class QueueManager:
queue_fetch_timeout: float = 0.1,
num_workers: int = 0,
logger: Optional[logging.Logger] = None,
push_change_event: Optional[Callable] = None,
):
"""Init QueryManager.
......@@ -474,6 +475,7 @@ class QueueManager:
self._max_queue_size = max_queue_size
self._work_queue = Queue(self._max_queue_size)
self._queue_fetch_timeout = queue_fetch_timeout
self._push_change_event = push_change_event
self.stopping_event = threading.Event()
self.aborting_event = threading.Event()
self._property_update_lock = threading.Lock()
......@@ -542,7 +544,7 @@ class QueueManager:
@property
def task_status(
self,
) -> tuple(str,):
) -> Tuple[str,]: # noqa: E231
"""Return task status.
:return: The task status pairs (id, status)
......@@ -557,7 +559,7 @@ class QueueManager:
@property
def task_progress(
self,
) -> tuple(str,):
) -> Tuple[str,]: # noqa: E231
"""Return the task progress.
:return: The task progress pairs (id, progress)
......@@ -654,8 +656,8 @@ class QueueManager:
:param property_name: The property value
:type property_name: Any
"""
# with self._property_update_lock:
pass
if self._push_change_event:
self._push_change_event(property_name, property_value)
def abort_tasks(self):
"""Start aborting tasks."""
......
"""Tests for the reference base device that uses queue manager."""
from io import StringIO
import pytest
import time
from unittest import mock
from tango import EventType
from tango.test_context import DeviceTestContext
from tango.utils import EventCallback
from ska_tango_base.base.reference_base_device import (
BlockingBaseDevice,
AsyncBaseDevice,
......@@ -11,6 +17,7 @@ from ska_tango_base.base.task_queue_manager import TaskResult
from ska_tango_base.commands import ResultCode
@pytest.mark.skip("Works as expected when run alone. Segfaults in suite.")
class TestCommands:
"""Check that blocking and async commands behave the same way.
......@@ -18,21 +25,30 @@ class TestCommands:
AsyncBaseDevice - QueueManager has multiple threads, tasks run from queue
"""
@pytest.mark.timeout(5)
def test_short_command(self):
"""Test a simple command."""
for class_name in [BlockingBaseDevice, AsyncBaseDevice]:
with DeviceTestContext(class_name, process=True) as proxy:
result = TaskResult.from_response_command(proxy.Short(1))
proxy.Short(1)
# Wait for a result, if the task does not abort, we'll time out here
while not proxy.longRunningCommandResult:
pass
result = TaskResult.from_task_result(proxy.longRunningCommandResult)
assert result.result_code == ResultCode.OK
assert result.unique_id.endswith("SimpleTask")
@pytest.mark.timeout(5)
def test_non_aborting_command(self):
"""Test tasks that does not abort."""
for class_name in [BlockingBaseDevice, AsyncBaseDevice]:
with DeviceTestContext(class_name, process=True) as proxy:
result = TaskResult.from_response_command(
proxy.NonAbortingLongRunning(0.01)
)
# Wait for a result, if the task does not abort, we'll time out here
while not proxy.longRunningCommandResult:
pass
result = TaskResult.from_task_result(proxy.longRunningCommandResult)
assert result.result_code == ResultCode.OK
assert result.unique_id.endswith("NonAbortingTask")
......@@ -73,3 +89,119 @@ class TestCommands:
"An error occurred Traceback (most recent call last)"
in result.task_result
)
@pytest.mark.skip("Works as expected when run alone. Segfaults in suite.")
def test_callbacks():
"""Check that the callback is firing that sends the push change event."""
with mock.patch.object(AsyncBaseDevice, "push_change_event") as my_cb:
with DeviceTestContext(AsyncBaseDevice, process=False) as proxy:
# Execute some commands
proxy.TestProgress(0.5)
while not proxy.longRunningCommandResult:
time.sleep(0.1)
assert my_cb.called
called_args = [
(_call[0][0], _call[0][1]) for _call in my_cb.call_args_list[5:]
]
attribute_names = [arg[0] for arg in called_args]
assert attribute_names == [
"longRunningCommandsInQueue",
"longRunningCommandIDsInQueue",
"longRunningCommandsInQueue",
"longRunningCommandIDsInQueue",
"longRunningCommandStatus",
"longRunningCommandProgress",
"longRunningCommandProgress",
"longRunningCommandProgress",
"longRunningCommandProgress",
"longRunningCommandProgress",
"longRunningCommandResult",
]
# longRunningCommandsInQueue
attribute_values = [arg[1] for arg in called_args]
assert len(attribute_values[0]) == 1
assert attribute_values[0] == ["ProgressTask"]
# longRunningCommandIDsInQueue
assert len(attribute_values[1]) == 1
assert attribute_values[1][0].endswith("ProgressTask")
# longRunningCommandsInQueue
assert not attribute_values[2]
# longRunningCommandIDsInQueue
assert not attribute_values[3]
# longRunningCommandStatus
assert len(attribute_values[4]) == 2
assert attribute_values[4][0].endswith("ProgressTask")
assert attribute_values[4][1] == "IN_PROGRESS"
# longRunningCommandProgress
for (index, progress) in zip(range(5, 9), ["1", "25", "50", "74", "100"]):
assert len(attribute_values[index]) == 2
assert attribute_values[index][0].endswith("ProgressTask")
assert attribute_values[index][1] == progress
# longRunningCommandResult
assert len(attribute_values[10]) == 3
tr = TaskResult.from_task_result(attribute_values[10])
tr.unique_id.endswith("ProgressTask")
tr.result_code == ResultCode.OK
tr.task_result == "None"
@pytest.mark.skip("Works as expected when run alone. Segfaults in suite.")
def test_events():
"""Testing the events.
NOTE: Adding more than 2 event subscriptions leads to inconsistent results.
Sometimes misses events.
Full callback tests (where the push events are triggered) are covered
in `test_callbacks`
"""
with DeviceTestContext(AsyncBaseDevice, process=True) as proxy:
progress_events = EventCallback(fd=StringIO())
ids_in_queue_events = EventCallback(fd=StringIO())
progress_id = proxy.subscribe_event(
"longRunningCommandProgress",
EventType.CHANGE_EVENT,
progress_events,
wait=True,
)
ids_id = proxy.subscribe_event(
"longRunningCommandIDsInQueue",
EventType.CHANGE_EVENT,
ids_in_queue_events,
wait=True,
)
proxy.TestProgress(0.5)
# Wait for task to finish
while not proxy.longRunningCommandResult:
time.sleep(0.1)
progress_event_values = [
event.attr_value.value
for event in progress_events.get_events()
if event.attr_value and event.attr_value.value
]
for index, progress in enumerate(["1", "25", "50", "74", "100"]):
assert progress_event_values[index][1] == progress
ids_in_queue_events_values = [
event.attr_value.value
for event in ids_in_queue_events.get_events()
if event.attr_value and event.attr_value.value
]
assert len(ids_in_queue_events_values) == 1
assert ids_in_queue_events_values[0][0].endswith("ProgressTask")
proxy.unsubscribe_event(progress_id)
proxy.unsubscribe_event(ids_id)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment