Skip to content
Snippets Groups Projects
Unverified Commit cdedb974 authored by samueltwum1's avatar samueltwum1
Browse files

SAR-276 Add intial test for multi device

parent 60e61660
No related branches found
No related tags found
No related merge requests found
......@@ -6,12 +6,13 @@ package.
"""
import functools
import logging
from typing import Optional, Callable
from typing import Optional, Callable, List
from ska_tango_base.base import BaseComponentManager, OpStateModel
from ska_tango_base.base.task_queue_manager import QueueManager
from ska_tango_base.control_model import PowerMode
from ska_tango_base.faults import ComponentFault
from ska_tango_base.utils import LongRunningDeviceInterface
def check_communicating(func):
......@@ -394,7 +395,7 @@ class ReferenceBaseComponentManager(BaseComponentManager):
self.op_state_model.perform_action("component_fault")
class QueueWorkerComponentManager(BaseComponentManager):
class QueueWorkerComponentManager(ReferenceBaseComponentManager):
"""A component manager that configures the queue manager."""
def __init__(
......@@ -404,6 +405,7 @@ class QueueWorkerComponentManager(BaseComponentManager):
max_queue_size: int,
num_workers: int,
push_change_event: Optional[Callable],
child_devices: Optional[List[str]],
*args,
**kwargs
):
......@@ -419,12 +421,15 @@ class QueueWorkerComponentManager(BaseComponentManager):
:type num_workers: int
:param push_change_event: A method that will be called when attributes are updated
:type push_change_event: Callable
:param child_devices: Names of child devices to execute long running commands on
:type child_devices: List of strings
"""
self.logger = logger
self.max_queue_size = max_queue_size
self.num_workers = num_workers
self.push_change_event = push_change_event
super().__init__(op_state_model, *args, **kwargs)
self.lrc_device_interface = LongRunningDeviceInterface(child_devices, logger)
super().__init__(op_state_model, *args, logger=logger, **kwargs)
def create_queue_manager(self) -> QueueManager:
"""Create a QueueManager.
......
......@@ -4,12 +4,16 @@ import ast
import functools
import inspect
import json
import logging
import pydoc
import traceback
import sys
import uuid
import warnings
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable, Dict, List
import tango
from tango import (
......@@ -20,10 +24,13 @@ from tango import (
AttrWriteType,
Except,
ErrSeverity,
EventData,
EventType,
)
from tango import DevState
from contextlib import contextmanager
from ska_tango_base.faults import GroupDefinitionsError, SKABaseError
from ska_tango_base.commands import ResultCode
int_types = {
tango._tango.CmdArgType.DevUShort,
......@@ -539,3 +546,192 @@ def for_testing_only(func, _testing_check=lambda: "pytest" in sys.modules):
return func(*args, **kwargs)
return _wrapper
@dataclass
class LongRunningRequestResponse:
"""Convenience class to parse the long running command response."""
response_code: ResultCode
command_id: str
command_name: str
def __init__(self, request_response):
"""Create the LongRunningRequestResponse dataclass.
:param request_response: The response from a Long Running
Request Command
:type request_response: list
"""
self.response_code = request_response[0][0]
self.command_id = request_response[1][0]
self.command_name = self.command_id.split("_")[1]
@dataclass
class StoredCommand:
"""Used to keep track of commands scheduled across devices.
command_name: The Tango command to execute across devices.
command_id: Every Tango device will return the command ID for the
long running command submitted to it.
is_completed: Whether the command is done or not
"""
command_name: str
command_id: str
is_completed: bool
class LongRunningDeviceInterface:
"""This class is a convenience class to be used by clients of devices
that implement long running commands.
The intent of this class is that clients should not have to keep
track of command IDs or the various attributes
to determine long running command progress/results.
This class is also useful when you want to run a long running
command across various devices. Once they all complete a callback
supplied by the user is fired.
Using this class, a client would need to:
- Supply the Tango devices to connect to that implements long
running commands
- The Long running commands to run (including parameter)
- Optional callback that should be executed when the command
completes
The callback will be executed once the command completes across all
devices. Thus there's no need to watch attribute changes or keep
track of commands IDs. They are handled here.
"""
def __init__(
self, tango_devices: List[str], logger: logging.Logger
) -> None:
"""Init LRC device interface."""
self._logger = logger
self._tango_devices = tango_devices
self._long_running_device_proxies = []
self._result_subscriptions = []
self._stored_commands: Dict[str, List[StoredCommand]] = {}
self._stored_callbacks: Dict[str, Callable] = {}
def setup(self):
"""Only create the device proxy and subscribe when a command is invoked."""
if not self._long_running_device_proxies:
for device in self._tango_devices:
self._long_running_device_proxies.append(
tango.DeviceProxy(device)
)
if not self._result_subscriptions:
for device_proxy in self._long_running_device_proxies:
self._result_subscriptions.append(
device_proxy.subscribe_event(
"longRunningCommandResult",
EventType.CHANGE_EVENT,
self,
wait=True,
)
)
def push_event(self, ev: EventData):
"""Handles the attribute change events.
For every event that comes in:
- Update command state:
- Make sure that it's a longrunningcommandresult
- Check to see if the command ID we get from the event
is one we are keeping track of.
- If so, set that command to completed
- Check if we should fire the callback:
Once the command across all devices have completed
(for that command)
- Check whether all have completed
- If so, fire the callback
- Clean up
"""
if ev.attr_value.name == "longrunningcommandresult":
if ev.attr_value.value:
# push change event to new attribute for all tango devices
# for tango_dev in self._tango_devices:
# tango_dev.push_change_event("lastResultCommandIDs", ev.attr_value.value[0])
# tango_dev.push_change_event("lastResultCommandName", ev.attr_value.value[1])
event_command_id = ev.attr_value.value[0]
for stored_commands in self._stored_commands.values():
for stored_command in stored_commands:
if stored_command.command_id == event_command_id:
stored_command.is_completed = True
completed_group_keys = []
for key, stored_command_group in self._stored_commands.items():
if stored_command_group:
# Determine if all the commands in this group have completed
commands_are_completed = [
stored_command.is_completed
for stored_command in stored_command_group
]
if all(commands_are_completed):
completed_group_keys.append(key)
# Get the command IDs
command_ids = [
stored_command.command_id
for stored_command in stored_command_group
]
command_name = stored_command_group[0].command_name
# Trigger the callback, send command_name and command_ids
# as paramater
self._stored_callbacks[key](command_name, command_ids)
# Remove callback as the group completed
# Clean up
# Remove callback and commands no longer needed
for key in completed_group_keys:
del self._stored_callbacks[key]
del self._stored_commands[key]
def execute_long_running_command(
self,
command_name: str,
command_arg: Any = None,
on_completion_callback: Callable = None,
):
"""Execute the long running command with an argument if any.
Once the commmand completes, then the `on_completion_callback`
will be executed with the EventData as parameter.
This class keeps track of the command ID and events
used to determine when this commmand has completed.
:param command_name: A long running command that exists on the
target Tango device.
:type command_name: str
:param command_arg: The argument to be used in the long running
command method.
:type command_arg: Any, optional
:param on_completion_callback: The method to execute when the
long running command has completed.
:type on_completion_callback: callable, optional
"""
self.setup()
unique_id = uuid.uuid4()
self._stored_callbacks[unique_id] = on_completion_callback
self._stored_commands[unique_id] = []
for device_proxy in self._long_running_device_proxies:
response = LongRunningRequestResponse(
device_proxy.command_inout(command_name, command_arg)
)
self._stored_commands[unique_id].append(
StoredCommand(
command_name,
response.command_id,
False,
)
)
"""This module defines elements of the pytest test harness shared by all tests."""
import logging
import socket
from queue import Empty, Queue
import pytest
import tango
from tango import EventType
from tango.test_context import DeviceTestContext
from tango.test_context import get_host_ip, DeviceTestContext, MultiDeviceTestContext
@pytest.fixture(scope="class")
......@@ -219,3 +221,42 @@ def tango_change_event_helper(device_under_test):
def logger():
"""Fixture that returns a default logger for tests."""
return logging.Logger("Test logger")
@pytest.fixture(scope="module")
def devices_to_test(request):
yield getattr(request.module, "devices_to_test")
@pytest.fixture(scope="function")
def multi_device_tango_context(
mocker, devices_to_test # pylint: disable=redefined-outer-name
):
"""
Creates and returns a TANGO MultiDeviceTestContext object, with
tango.DeviceProxy patched to work around a name-resolving issue.
"""
def _get_open_port():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0))
s.listen(1)
port = s.getsockname()[1]
s.close()
return port
HOST = get_host_ip()
PORT = _get_open_port()
_DeviceProxy = tango.DeviceProxy
mocker.patch(
"tango.DeviceProxy",
wraps=lambda fqdn, *args, **kwargs: _DeviceProxy(
"tango://{0}:{1}/{2}#dbase=no".format(HOST, PORT, fqdn),
*args,
**kwargs
),
)
with MultiDeviceTestContext(
devices_to_test, host=HOST, port=PORT, process=True
) as context:
yield context
......@@ -10,7 +10,7 @@ There are two versions used for testing long running commands.
It is provided to support testing of the BaseDevice.
"""
import time
from tango.server import command
from tango.server import command, device_property
from tango import DebugIt
from ska_tango_base.base.reference_component_manager import QueueWorkerComponentManager
......@@ -22,6 +22,8 @@ from ska_tango_base.commands import ResponseCommand
class LongRunningCommandBaseTestDevice(SKABaseDevice):
"""Implement commands to test queued work."""
client_devices = device_property(dtype="DevVarStringArray")
def init_command_objects(self):
"""Initialise the command handlers."""
super().init_command_objects()
......@@ -52,6 +54,16 @@ class LongRunningCommandBaseTestDevice(SKABaseDevice):
self.TestProgressCommand(self.component_manager, logger=self.logger),
)
self.register_command_object(
"TestProgressNoArgs",
self.TestProgressNoArgsCommand(self.component_manager, logger=self.logger),
)
self.register_command_object(
"TestProgressWithArgs",
self.TestProgressWithArgsCommand(self.component_manager, logger=self.logger),
)
class ShortCommand(ResponseCommand):
"""The command class for the Short command."""
......@@ -176,6 +188,50 @@ class LongRunningCommandBaseTestDevice(SKABaseDevice):
(return_code, message) = self.component_manager.enqueue(handler, argin)
return f"{return_code}", f"{message}"
class TestProgressNoArgsCommand(ResponseCommand):
"""The command class for the TestProgressNoArgsCommand command."""
def do(self):
"""Execute something on the long running device."""
interface = self.target.lrc_device_interface
interface.execute_long_running_command(
"TestProgress", 0.5, None)
self.logger.info("In TestProgressNoArgsCommand")
return (ResultCode.OK, "Done TestProgressNoArgsCommand")
@command(
dtype_in=None,
dtype_out="DevVarStringArray",
)
@DebugIt()
def TestProgressNoArgs(self):
"""Command to execute the TestProgress on long running command device."""
handler = self.get_command_object("TestProgressNoArgs")
(return_code, message) = self.component_manager.enqueue(handler)
return f"{return_code}", f"{message}"
class TestProgressWithArgsCommand(ResponseCommand):
"""The command class for the TestProgressWithArgsCommand command."""
def do(self, argin):
"""Execute something on the long running device."""
interface = self.target.lrc_device_interface
interface.execute_long_running_command(
"TestProgress", argin, None)
self.logger.info("In TestProgressWithArgs")
return (ResultCode.OK, "Done TestProgressWithArgsCommand")
@command(
dtype_in=float,
dtype_out="DevVarStringArray",
)
@DebugIt()
def TestProgressWithArgs(self, argin):
"""Command to execute the TestProgress on long running command device."""
handler = self.get_command_object("TestProgressWithArgs")
(return_code, message) = self.component_manager.enqueue(handler, argin)
return f"{return_code}", f"{message}"
class BlockingBaseDevice(LongRunningCommandBaseTestDevice):
"""Test device that has a component manager with the default queue manager that has no workers."""
......@@ -194,4 +250,20 @@ class AsyncBaseDevice(LongRunningCommandBaseTestDevice):
max_queue_size=20,
num_workers=3,
push_change_event=self.push_change_event,
child_devices=self.client_devices,
)
class AsyncClientDevice(LongRunningCommandBaseTestDevice):
"""Async client device."""
def create_component_manager(self: SKABaseDevice):
"""Create the component manager with a queue manager that has workers."""
return QueueWorkerComponentManager(
op_state_model=self.op_state_model,
logger=self.logger,
max_queue_size=20,
num_workers=3,
push_change_event=self.push_change_event,
child_devices=self.client_devices,
)
......@@ -9,9 +9,10 @@ from unittest import mock
from tango import EventType
from tango.test_context import DeviceTestContext
from tango.utils import EventCallback
from reference_base_device import (
from ska_tango_base.base.reference_base_device import (
BlockingBaseDevice,
AsyncBaseDevice,
AsyncClientDevice,
)
from ska_tango_base.base.task_queue_manager import TaskResult
from ska_tango_base.commands import ResultCode
......@@ -199,3 +200,175 @@ def test_events():
]
for index, progress in enumerate(["1", "25", "50", "74", "100"]):
assert progress_event_values[index][1] == progress
devices_to_test = [
{
"class": AsyncBaseDevice,
"devices": [
{"name": "test/asyncdevice/1"},
{"name": "test/asyncdevice/2"},
],
},
{
"class": AsyncClientDevice,
"devices": [
{
"name": "test/asyncclientdevice/1",
"properties": {
"client_devices": [
"test/asyncdevice/1",
],
},
},
{
"name": "test/asyncclientdevice/2",
"properties": {
"client_devices": [
"test/asyncdevice/1",
"test/asyncdevice/2",
],
},
},
],
},
]
class TestMultiDevice:
"""Multi-device tests."""
# TODO track events from underlying device(s) instead
def test_two_async_devices_communicating(self, multi_device_tango_context):
"""Test only two devices communication."""
client = multi_device_tango_context.get_device(
"test/asyncclientdevice/1"
)
commands_in_queue_events = EventCallback(fd=StringIO())
client.subscribe_event(
"longRunningCommandsInQueue",
EventType.CHANGE_EVENT,
commands_in_queue_events,
wait=True,
)
command_ids_in_queue_events = EventCallback(fd=StringIO())
client.subscribe_event(
"longRunningCommandIDsInQueue",
EventType.CHANGE_EVENT,
command_ids_in_queue_events,
wait=True,
)
client.TestProgressNoArgs()
client.ping()
time.sleep(2)
commands_in_queue = [
i.attr_value.value[0]
for i in commands_in_queue_events.get_events()
if i.attr_value.value
]
# though two commands were triggered, only one is registered in the
# event callback since ping is not registered as a long running command
assert commands_in_queue == ['TestProgressNoArgsCommand']
command_ids_in_queue = [
i.attr_value.value
for i in command_ids_in_queue_events.get_events()
if i.attr_value.value
]
# there should be 1 ID since 1 command was triggered
assert len(command_ids_in_queue) == 1
def test_multiple_async_devices_communicating(self, multi_device_tango_context):
"""Test multiple devices."""
client = multi_device_tango_context.get_device(
"test/asyncclientdevice/2"
)
commands_in_queue_events = EventCallback(fd=StringIO())
client.subscribe_event(
"longRunningCommandsInQueue",
EventType.CHANGE_EVENT,
commands_in_queue_events,
wait=True,
)
command_ids_in_queue_events = EventCallback(fd=StringIO())
client.subscribe_event(
"longRunningCommandIDsInQueue",
EventType.CHANGE_EVENT,
command_ids_in_queue_events,
wait=True,
)
client.TestProgressNoArgs()
client.ping()
client.TestProgressWithArgs(0.5)
time.sleep(3)
commands_in_queue = [
i.attr_value.value[0]
for i in commands_in_queue_events.get_events()
if i.attr_value.value
]
assert commands_in_queue == ['TestProgressNoArgsCommand', 'TestProgressWithArgsCommand']
commands_ids_in_queue = [
i.attr_value.value
for i in command_ids_in_queue_events.get_events()
if i.attr_value.value
]
# should be checking for 4 since each comand triggers
# two devices under the hood, see TODO in class
assert len(commands_ids_in_queue) == 2
def test_multiple_async_devices_communicating_with_duplicate_commands(self, multi_device_tango_context):
"""Test multiple devices with duplicate commands."""
client = multi_device_tango_context.get_device(
"test/asyncclientdevice/2"
)
commands_in_queue_events = EventCallback(fd=StringIO())
client.subscribe_event(
"longRunningCommandsInQueue",
EventType.CHANGE_EVENT,
commands_in_queue_events,
wait=True,
)
command_ids_in_queue_events = EventCallback(fd=StringIO())
client.subscribe_event(
"longRunningCommandIDsInQueue",
EventType.CHANGE_EVENT,
command_ids_in_queue_events,
wait=True,
)
client.TestProgressNoArgs()
client.TestProgressNoArgs()
client.TestProgressNoArgs()
client.ping()
client.TestProgressWithArgs(0.5)
time.sleep(3)
commands_in_queue = [
i.attr_value.value
for i in commands_in_queue_events.get_events()
if i.attr_value.value
]
assert commands_in_queue == [
'TestProgressNoArgsCommand', 'TestProgressNoArgsCommand',
'TestProgressNoArgsCommand', 'TestProgressWithArgsCommand']
commands_ids_in_queue = [
i.attr_value.value
for i in command_ids_in_queue_events.get_events()
if i.attr_value.value
]
# should be checking for 8 since each comand triggers
# two devices under the hood, see TODO in class
assert len(commands_ids_in_queue) == 4
......@@ -402,6 +402,7 @@ class TestQueueManagerExit:
max_queue_size=10,
num_workers=2,
push_change_event=catch_updates,
child_devices=[],
)
cm.enqueue(abort_task(), 0.1)
......@@ -463,6 +464,7 @@ class TestQueueManagerExit:
max_queue_size=5,
num_workers=2,
push_change_event=None,
child_devices=[],
)
cm.enqueue(stop_task())
......@@ -490,6 +492,7 @@ class TestQueueManagerExit:
max_queue_size=8,
num_workers=2,
push_change_event=None,
child_devices=[],
)
cm.enqueue(slow_task())
cm.enqueue(stop_task())
......@@ -517,6 +520,7 @@ class TestComponentManager:
max_queue_size=0,
num_workers=1,
push_change_event=None,
child_devices=[],
)
assert cm.task_ids_in_queue == ()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment