Skip to content
Snippets Groups Projects
Unverified Commit fece502b authored by SKAJohanVenter's avatar SKAJohanVenter
Browse files

SAR-276 Added multidevice tests

parent c01d6811
No related branches found
No related tags found
No related merge requests found
......@@ -1545,14 +1545,14 @@ class SKABaseDevice(Device):
@command(
dtype_in=str,
dtype_out="DevVarShortArray",
dtype_out="DevVarLongStringArray",
)
@DebugIt()
def CheckLongRunningCommandStatus(self, argin):
"""Check the status of a long running command by ID."""
command = self.get_command_object("CheckLongRunningCommandStatus")
(return_code, command_state) = command(argin)
return [return_code, command_state]
return [[return_code], [command_state]]
class DebugDeviceCommand(BaseCommand):
"""A class for the SKABaseDevice's DebugDevice() command."""
......
......@@ -6,13 +6,12 @@ package.
"""
import functools
import logging
from typing import Optional, Callable, List
from typing import Optional, Callable
from ska_tango_base.base import BaseComponentManager, OpStateModel
from ska_tango_base.base.task_queue_manager import QueueManager
from ska_tango_base.control_model import PowerMode
from ska_tango_base.faults import ComponentFault
from ska_tango_base.utils import LongRunningDeviceInterface
def check_communicating(func):
......@@ -405,7 +404,6 @@ class QueueWorkerComponentManager(ReferenceBaseComponentManager):
max_queue_size: int,
num_workers: int,
push_change_event: Optional[Callable],
child_devices: Optional[List[str]],
*args,
**kwargs
):
......@@ -421,14 +419,11 @@ class QueueWorkerComponentManager(ReferenceBaseComponentManager):
:type num_workers: int
:param push_change_event: A method that will be called when attributes are updated
:type push_change_event: Callable
:param child_devices: Names of child devices to execute long running commands on
:type child_devices: List of strings
"""
self.logger = logger
self.max_queue_size = max_queue_size
self.num_workers = num_workers
self.push_change_event = push_change_event
self.lrc_device_interface = LongRunningDeviceInterface(child_devices, logger)
super().__init__(op_state_model, *args, logger=logger, **kwargs)
def create_queue_manager(self) -> QueueManager:
......
......@@ -618,7 +618,7 @@ class LongRunningDeviceInterface:
For every event that comes in:
- Update command state:
- Make sure that it's a longrunningcommandresult
- Make sure that it's a longRunningCommandResult
- Check to see if the command ID we get from the event
is one we are keeping track of.
- If so, set that command to completed
......@@ -630,7 +630,7 @@ class LongRunningDeviceInterface:
- If so, fire the callback
- Clean up
"""
if ev.attr_value.name == "longrunningcommandresult":
if ev.attr_value and ev.attr_value.name == "longrunningcommandresult":
if ev.attr_value.value:
# push change event to new attribute for all tango devices
# for tango_dev in self._tango_devices:
......
"""This module defines elements of the pytest test harness shared by all tests."""
import logging
import socket
from queue import Empty, Queue
import pytest
import tango
from tango import EventType
from tango.test_context import get_host_ip, DeviceTestContext, MultiDeviceTestContext
from tango.test_context import DeviceTestContext
@pytest.fixture(scope="class")
......@@ -221,42 +219,3 @@ def tango_change_event_helper(device_under_test):
def logger():
"""Fixture that returns a default logger for tests."""
return logging.Logger("Test logger")
@pytest.fixture(scope="module")
def devices_to_test(request):
"""Fixture for devices to test."""
yield getattr(request.module, "devices_to_test")
@pytest.fixture(scope="function")
def multi_device_tango_context(
mocker, devices_to_test # pylint: disable=redefined-outer-name
):
"""
Create and return a TANGO MultiDeviceTestContext object.
tango.DeviceProxy patched to work around a name-resolving issue.
"""
def _get_open_port():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0))
s.listen(1)
port = s.getsockname()[1]
s.close()
return port
HOST = get_host_ip()
PORT = _get_open_port()
_DeviceProxy = tango.DeviceProxy
mocker.patch(
"tango.DeviceProxy",
wraps=lambda fqdn, *args, **kwargs: _DeviceProxy(
"tango://{0}:{1}/{2}#dbase=no".format(HOST, PORT, fqdn), *args, **kwargs
),
)
with MultiDeviceTestContext(
devices_to_test, host=HOST, port=PORT, process=True
) as context:
yield context
"""Fixtures for tests."""
import pytest
import socket
import tango
from tango.test_context import get_host_ip, MultiDeviceTestContext
@pytest.fixture(scope="module")
def devices_to_test(request):
"""Fixture for devices to test."""
yield getattr(request.module, "devices_to_test")
@pytest.fixture(scope="function")
def multi_device_tango_context(
mocker, devices_to_test # pylint: disable=redefined-outer-name
):
"""
Create and return a TANGO MultiDeviceTestContext object.
tango.DeviceProxy patched to work around a name-resolving issue.
"""
def _get_open_port():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0))
s.listen(1)
port = s.getsockname()[1]
s.close()
return port
HOST = get_host_ip()
PORT = _get_open_port()
_DeviceProxy = tango.DeviceProxy
mocker.patch(
"tango.DeviceProxy",
wraps=lambda fqdn, *args, **kwargs: _DeviceProxy(
"tango://{0}:{1}/{2}#dbase=no".format(HOST, PORT, fqdn), *args, **kwargs
),
)
with MultiDeviceTestContext(
devices_to_test, host=HOST, port=PORT, process=True
) as context:
yield context
......@@ -10,6 +10,7 @@ There are two versions used for testing long running commands.
It is provided to support testing of the BaseDevice.
"""
import time
import tango
from tango.server import command, device_property
from tango import DebugIt
......@@ -55,14 +56,9 @@ class LongRunningCommandBaseTestDevice(SKABaseDevice):
)
self.register_command_object(
"TestProgressNoArgs",
self.TestProgressNoArgsCommand(self.component_manager, logger=self.logger),
)
self.register_command_object(
"TestProgressWithArgs",
self.TestProgressWithArgsCommand(
self.component_manager, logger=self.logger
"CallChildren",
self.CallChildrenCommand(
self.component_manager, logger=self.logger, devices=self.client_devices
),
)
......@@ -190,46 +186,40 @@ class LongRunningCommandBaseTestDevice(SKABaseDevice):
(return_code, message) = self.component_manager.enqueue(handler, argin)
return f"{return_code}", f"{message}"
class TestProgressNoArgsCommand(ResponseCommand):
"""The command class for the TestProgressNoArgsCommand command."""
class CallChildrenCommand(ResponseCommand):
"""The command class for the TestProgress command."""
def do(self):
"""Execute something on the long running device."""
interface = self.target.lrc_device_interface
interface.execute_long_running_command("TestProgress", 0.5, None)
self.logger.info("In TestProgressNoArgsCommand")
return (ResultCode.OK, "Done TestProgressNoArgsCommand")
def __init__(self, target, *args, logger=None, **kwargs):
"""Create ResponseCommand, add devices.
@command(
dtype_in=None,
dtype_out="DevVarStringArray",
)
@DebugIt()
def TestProgressNoArgs(self):
"""Command to execute the TestProgress on long running command device."""
handler = self.get_command_object("TestProgressNoArgs")
(return_code, message) = self.component_manager.enqueue(handler)
return f"{return_code}", f"{message}"
class TestProgressWithArgsCommand(ResponseCommand):
"""The command class for the TestProgressWithArgsCommand command."""
:param target: component manager
:type target: BaseComponentManager
:param logger: logger, defaults to None
:type logger: logging.Logger, optional
"""
self.devices = kwargs.pop("devices")
super().__init__(target, *args, logger=logger, **kwargs)
def do(self, argin):
"""Execute something on the long running device."""
interface = self.target.lrc_device_interface
interface.execute_long_running_command("TestProgress", argin, None)
self.logger.info("In TestProgressWithArgs")
return (ResultCode.OK, "Done TestProgressWithArgsCommand")
"""Call `CallChildren` on children, or block if not."""
if self.devices:
for device in self.devices:
proxy = tango.DeviceProxy(device)
proxy.CallChildren(argin)
return ResultCode.QUEUED, f"Called children: {self.devices}"
else:
time.sleep(argin)
return ResultCode.OK, f"Slept {argin}"
@command(
dtype_in=float,
dtype_out="DevVarStringArray",
)
@DebugIt()
def TestProgressWithArgs(self, argin):
"""Command to execute the TestProgress on long running command device."""
handler = self.get_command_object("TestProgressWithArgs")
(return_code, message) = self.component_manager.enqueue(handler, argin)
def CallChildren(self, argin):
"""Command to call `CallChildren` on children, or block if not."""
command = self.get_command_object("CallChildren")
(return_code, message) = self.component_manager.enqueue(command, argin)
return f"{return_code}", f"{message}"
......@@ -250,20 +240,4 @@ class AsyncBaseDevice(LongRunningCommandBaseTestDevice):
max_queue_size=20,
num_workers=3,
push_change_event=self.push_change_event,
child_devices=self.client_devices,
)
class AsyncClientDevice(LongRunningCommandBaseTestDevice):
"""Async client device."""
def create_component_manager(self: SKABaseDevice):
"""Create the component manager with a queue manager that has workers."""
return QueueWorkerComponentManager(
op_state_model=self.op_state_model,
logger=self.logger,
max_queue_size=20,
num_workers=3,
push_change_event=self.push_change_event,
child_devices=self.client_devices,
)
"""Test various Tango devices with long running commmands working together."""
import time
import pytest
from io import StringIO
from unittest.mock import MagicMock
from tango.utils import EventCallback
from tango import EventType
from reference_base_device import AsyncBaseDevice
from ska_tango_base.base.task_queue_manager import TaskResult, TaskState
from ska_tango_base.commands import ResultCode
from ska_tango_base.utils import LongRunningDeviceInterface
# Testing a chain of calls
# On command `CallChildren`
# If the device has children:
# Call `CallChildren` on each child
# If no children:
# Sleep the time specified, simulating blocking work
#
# test/toplevel/1
# test/midlevel/1
# test/lowlevel/1
# test/lowlevel/2
# test/midlevel/2
# test/lowlevel/3
# test/lowlevel/4
# test/midlevel/3
# test/lowlevel/5
# test/lowlevel/6
devices_to_test = [
{
"class": AsyncBaseDevice,
"devices": [
{
"name": "test/toplevel/1",
"properties": {
"client_devices": [
"test/midlevel/1",
"test/midlevel/2",
"test/midlevel/3",
],
},
},
{
"name": "test/midlevel/1",
"properties": {
"client_devices": [
"test/lowlevel/1",
"test/lowlevel/2",
],
},
},
{
"name": "test/midlevel/2",
"properties": {
"client_devices": [
"test/lowlevel/3",
"test/lowlevel/4",
],
},
},
{
"name": "test/midlevel/3",
"properties": {
"client_devices": [
"test/lowlevel/5",
"test/lowlevel/6",
],
},
},
{"name": "test/lowlevel/1"},
{"name": "test/lowlevel/2"},
{"name": "test/lowlevel/3"},
{"name": "test/lowlevel/4"},
{"name": "test/lowlevel/5"},
{"name": "test/lowlevel/6"},
],
},
]
class TestMultiDevice:
"""Multi-device tests."""
@pytest.mark.forked
@pytest.mark.timeout(6)
def test_chain(self, multi_device_tango_context):
"""Test that commands flow from top to middle to low level."""
# Top level
top_device = multi_device_tango_context.get_device("test/toplevel/1")
top_device_result_events = EventCallback(fd=StringIO())
top_device.subscribe_event(
"longRunningCommandResult",
EventType.CHANGE_EVENT,
top_device_result_events,
wait=True,
)
top_device_queue_events = EventCallback(fd=StringIO())
top_device.subscribe_event(
"longRunningCommandsInQueue",
EventType.CHANGE_EVENT,
top_device_queue_events,
wait=True,
)
# Mid level
mid_device = multi_device_tango_context.get_device("test/midlevel/3")
mid_device_result_events = EventCallback(fd=StringIO())
mid_device.subscribe_event(
"longRunningCommandResult",
EventType.CHANGE_EVENT,
mid_device_result_events,
wait=True,
)
mid_device_queue_events = EventCallback(fd=StringIO())
mid_device.subscribe_event(
"longRunningCommandsInQueue",
EventType.CHANGE_EVENT,
mid_device_queue_events,
wait=True,
)
# Low level
low_device = multi_device_tango_context.get_device("test/lowlevel/6")
low_device_result_events = EventCallback(fd=StringIO())
low_device.subscribe_event(
"longRunningCommandResult",
EventType.CHANGE_EVENT,
low_device_result_events,
wait=True,
)
low_device_queue_events = EventCallback(fd=StringIO())
low_device.subscribe_event(
"longRunningCommandsInQueue",
EventType.CHANGE_EVENT,
low_device_queue_events,
wait=True,
)
# Call the toplevel command
# Sleep for 4 so that if a task is not queued the Tango command will time out
tr = TaskResult.from_response_command(top_device.CallChildren(4))
assert tr.result_code == ResultCode.QUEUED
# Get all the events
top_result_events = self.get_events(top_device_result_events, 1)
top_queue_events = self.get_events(top_device_queue_events, 1)
mid_result_events = self.get_events(mid_device_result_events, 1)
mid_queue_events = self.get_events(mid_device_queue_events, 1)
low_result_events = self.get_events(low_device_result_events, 1)
low_queue_events = self.get_events(low_device_queue_events, 1)
# Make sure every level device command gets queued
top_queue_events[0] == ("CallChildrenCommand",)
mid_queue_events[0] == ("CallChildrenCommand",)
low_queue_events[0] == ("CallChildrenCommand",)
top_level_taskresult = TaskResult.from_task_result(top_result_events[0])
mid_level_taskresult = TaskResult.from_task_result(mid_result_events[0])
low_level_taskresult = TaskResult.from_task_result(low_result_events[0])
# Make sure the command moved from top level to lowest level
assert (
top_level_taskresult.task_result
== "Called children: ['test/midlevel/1', 'test/midlevel/2', 'test/midlevel/3']"
)
assert (
mid_level_taskresult.task_result
== "Called children: ['test/lowlevel/5', 'test/lowlevel/6']"
)
assert low_level_taskresult.task_result == "Slept 4.0"
@pytest.mark.forked
@pytest.mark.timeout(8)
def test_util_interface(self, multi_device_tango_context):
"""Test LongRunningDeviceInterface."""
devices = []
for i in range(1, 5):
devices.append(f"test/lowlevel/{i}")
mock_dunc = MagicMock()
dev_interface = LongRunningDeviceInterface(devices, logger=None)
dev_interface.execute_long_running_command(
"CallChildren", 1.0, on_completion_callback=mock_dunc
)
time.sleep(2)
assert mock_dunc.called
assert mock_dunc.call_args[0][0] == "CallChildren"
task_ids = mock_dunc.call_args[0][1]
low_device = multi_device_tango_context.get_device("test/lowlevel/1")
for id in task_ids:
res = low_device.CheckLongRunningCommandStatus(id)
if int(res[1][0]) == TaskState.COMPLETED:
break
else:
assert 0, "At least one task should be completed on device test/lowlevel/1"
def get_events(self, event_callback: EventCallback, min_required: int):
"""Keep reading events until the required count is found."""
events = []
while len(events) < min_required:
events = [
i.attr_value.value
for i in event_callback.get_events()
if i.attr_value and i.attr_value.value
]
time.sleep(0.2)
return events
......@@ -12,7 +12,6 @@ from tango.utils import EventCallback
from reference_base_device import (
BlockingBaseDevice,
AsyncBaseDevice,
AsyncClientDevice,
)
from ska_tango_base.base.task_queue_manager import TaskResult
from ska_tango_base.commands import ResultCode
......@@ -200,182 +199,3 @@ def test_events():
]
for index, progress in enumerate(["1", "25", "50", "74", "100"]):
assert progress_event_values[index][1] == progress
devices_to_test = [
{
"class": AsyncBaseDevice,
"devices": [
{"name": "test/asyncdevice/1"},
{"name": "test/asyncdevice/2"},
],
},
{
"class": AsyncClientDevice,
"devices": [
{
"name": "test/asyncclientdevice/1",
"properties": {
"client_devices": [
"test/asyncdevice/1",
],
},
},
{
"name": "test/asyncclientdevice/2",
"properties": {
"client_devices": [
"test/asyncdevice/1",
"test/asyncdevice/2",
],
},
},
],
},
]
@pytest.mark.skip(
"These tests should be made more robust. Getting inconsistent results"
)
class TestMultiDevice:
"""Multi-device tests."""
# TODO track events from underlying device(s) instead
@pytest.mark.forked
def test_two_async_devices_communicating(self, multi_device_tango_context):
"""Test only two devices communication."""
client = multi_device_tango_context.get_device("test/asyncclientdevice/1")
commands_in_queue_events = EventCallback(fd=StringIO())
client.subscribe_event(
"longRunningCommandsInQueue",
EventType.CHANGE_EVENT,
commands_in_queue_events,
wait=True,
)
command_ids_in_queue_events = EventCallback(fd=StringIO())
client.subscribe_event(
"longRunningCommandIDsInQueue",
EventType.CHANGE_EVENT,
command_ids_in_queue_events,
wait=True,
)
client.TestProgressNoArgs()
client.ping()
time.sleep(2)
commands_in_queue = [
i.attr_value.value[0]
for i in commands_in_queue_events.get_events()
if i.attr_value.value
]
# though two commands were triggered, only one is registered in the
# event callback since ping is not registered as a long running command
assert commands_in_queue == ["TestProgressNoArgsCommand"]
command_ids_in_queue = [
i.attr_value.value
for i in command_ids_in_queue_events.get_events()
if i.attr_value.value
]
# there should be 1 ID since 1 command was triggered
assert len(command_ids_in_queue) == 1
@pytest.mark.forked
def test_multiple_async_devices_communicating(self, multi_device_tango_context):
"""Test multiple devices."""
client = multi_device_tango_context.get_device("test/asyncclientdevice/2")
commands_in_queue_events = EventCallback(fd=StringIO())
client.subscribe_event(
"longRunningCommandsInQueue",
EventType.CHANGE_EVENT,
commands_in_queue_events,
wait=True,
)
command_ids_in_queue_events = EventCallback(fd=StringIO())
client.subscribe_event(
"longRunningCommandIDsInQueue",
EventType.CHANGE_EVENT,
command_ids_in_queue_events,
wait=True,
)
client.TestProgressNoArgs()
client.ping()
client.TestProgressWithArgs(0.5)
time.sleep(3)
commands_in_queue = [
i.attr_value.value[0]
for i in commands_in_queue_events.get_events()
if i.attr_value.value
]
assert commands_in_queue == [
"TestProgressNoArgsCommand",
"TestProgressWithArgsCommand",
]
commands_ids_in_queue = [
i.attr_value.value
for i in command_ids_in_queue_events.get_events()
if i.attr_value.value
]
# should be checking for 4 since each comand triggers
# two devices under the hood, see TODO in class
assert len(commands_ids_in_queue) == 2
@pytest.mark.forked
def test_multiple_async_devices_communicating_with_duplicate_commands(
self, multi_device_tango_context
):
"""Test multiple devices with duplicate commands."""
client = multi_device_tango_context.get_device("test/asyncclientdevice/2")
commands_in_queue_events = EventCallback(fd=StringIO())
client.subscribe_event(
"longRunningCommandsInQueue",
EventType.CHANGE_EVENT,
commands_in_queue_events,
wait=True,
)
command_ids_in_queue_events = EventCallback(fd=StringIO())
client.subscribe_event(
"longRunningCommandIDsInQueue",
EventType.CHANGE_EVENT,
command_ids_in_queue_events,
wait=True,
)
client.TestProgressNoArgs()
client.TestProgressNoArgs()
client.TestProgressNoArgs()
client.TestProgressWithArgs(0.5)
time.sleep(3)
commands_in_queue = [
i.attr_value.value
for i in commands_in_queue_events.get_events()
if i.attr_value.value
]
assert commands_in_queue == [
("TestProgressNoArgsCommand",),
("TestProgressNoArgsCommand",),
("TestProgressNoArgsCommand",),
("TestProgressWithArgsCommand",),
]
commands_ids_in_queue = [
i.attr_value.value
for i in command_ids_in_queue_events.get_events()
if i.attr_value.value
]
# should be checking for 8 since each comand triggers
# two devices under the hood, see TODO in class
assert len(commands_ids_in_queue) == 4
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment