diff --git a/src/ska_tango_base/base/base_device.py b/src/ska_tango_base/base/base_device.py index a84e4839d481b06be1870ca642a92fdf45bd07c0..b17afb58b1c9e19bc39eedab631bb20d921c3072 100644 --- a/src/ska_tango_base/base/base_device.py +++ b/src/ska_tango_base/base/base_device.py @@ -1545,14 +1545,14 @@ class SKABaseDevice(Device): @command( dtype_in=str, - dtype_out="DevVarShortArray", + dtype_out="DevVarLongStringArray", ) @DebugIt() def CheckLongRunningCommandStatus(self, argin): """Check the status of a long running command by ID.""" command = self.get_command_object("CheckLongRunningCommandStatus") (return_code, command_state) = command(argin) - return [return_code, command_state] + return [[return_code], [command_state]] class DebugDeviceCommand(BaseCommand): """A class for the SKABaseDevice's DebugDevice() command.""" diff --git a/src/ska_tango_base/base/reference_component_manager.py b/src/ska_tango_base/base/reference_component_manager.py index 0b05e8a5ac37d23cf97fe8daa0d13b5ed58358ea..8a8f94e33f543212c84592e0123a5f4f307dc185 100644 --- a/src/ska_tango_base/base/reference_component_manager.py +++ b/src/ska_tango_base/base/reference_component_manager.py @@ -6,13 +6,12 @@ package. """ import functools import logging -from typing import Optional, Callable, List +from typing import Optional, Callable from ska_tango_base.base import BaseComponentManager, OpStateModel from ska_tango_base.base.task_queue_manager import QueueManager from ska_tango_base.control_model import PowerMode from ska_tango_base.faults import ComponentFault -from ska_tango_base.utils import LongRunningDeviceInterface def check_communicating(func): @@ -405,7 +404,6 @@ class QueueWorkerComponentManager(ReferenceBaseComponentManager): max_queue_size: int, num_workers: int, push_change_event: Optional[Callable], - child_devices: Optional[List[str]], *args, **kwargs ): @@ -421,14 +419,11 @@ class QueueWorkerComponentManager(ReferenceBaseComponentManager): :type num_workers: int :param push_change_event: A method that will be called when attributes are updated :type push_change_event: Callable - :param child_devices: Names of child devices to execute long running commands on - :type child_devices: List of strings """ self.logger = logger self.max_queue_size = max_queue_size self.num_workers = num_workers self.push_change_event = push_change_event - self.lrc_device_interface = LongRunningDeviceInterface(child_devices, logger) super().__init__(op_state_model, *args, logger=logger, **kwargs) def create_queue_manager(self) -> QueueManager: diff --git a/src/ska_tango_base/utils.py b/src/ska_tango_base/utils.py index 3a296c2a478aad7b37dbb139271f3a88200e2642..c754579e8e62ee1769124227d554ae32775f20b1 100644 --- a/src/ska_tango_base/utils.py +++ b/src/ska_tango_base/utils.py @@ -618,7 +618,7 @@ class LongRunningDeviceInterface: For every event that comes in: - Update command state: - - Make sure that it's a longrunningcommandresult + - Make sure that it's a longRunningCommandResult - Check to see if the command ID we get from the event is one we are keeping track of. - If so, set that command to completed @@ -630,7 +630,7 @@ class LongRunningDeviceInterface: - If so, fire the callback - Clean up """ - if ev.attr_value.name == "longrunningcommandresult": + if ev.attr_value and ev.attr_value.name == "longrunningcommandresult": if ev.attr_value.value: # push change event to new attribute for all tango devices # for tango_dev in self._tango_devices: diff --git a/tests/conftest.py b/tests/conftest.py index 4a3c4078a832a0bb67b5280d25c80d389895967d..87da31a373856fb5f6d1386c7c37e005b5af2c31 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,12 +1,10 @@ """This module defines elements of the pytest test harness shared by all tests.""" import logging -import socket from queue import Empty, Queue import pytest -import tango from tango import EventType -from tango.test_context import get_host_ip, DeviceTestContext, MultiDeviceTestContext +from tango.test_context import DeviceTestContext @pytest.fixture(scope="class") @@ -221,42 +219,3 @@ def tango_change_event_helper(device_under_test): def logger(): """Fixture that returns a default logger for tests.""" return logging.Logger("Test logger") - - -@pytest.fixture(scope="module") -def devices_to_test(request): - """Fixture for devices to test.""" - yield getattr(request.module, "devices_to_test") - - -@pytest.fixture(scope="function") -def multi_device_tango_context( - mocker, devices_to_test # pylint: disable=redefined-outer-name -): - """ - Create and return a TANGO MultiDeviceTestContext object. - - tango.DeviceProxy patched to work around a name-resolving issue. - """ - - def _get_open_port(): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.bind(("", 0)) - s.listen(1) - port = s.getsockname()[1] - s.close() - return port - - HOST = get_host_ip() - PORT = _get_open_port() - _DeviceProxy = tango.DeviceProxy - mocker.patch( - "tango.DeviceProxy", - wraps=lambda fqdn, *args, **kwargs: _DeviceProxy( - "tango://{0}:{1}/{2}#dbase=no".format(HOST, PORT, fqdn), *args, **kwargs - ), - ) - with MultiDeviceTestContext( - devices_to_test, host=HOST, port=PORT, process=True - ) as context: - yield context diff --git a/tests/long_running_tasks/conftest.py b/tests/long_running_tasks/conftest.py new file mode 100644 index 0000000000000000000000000000000000000000..1172b35468d1a23025326304dc64a9b7cee9cd2e --- /dev/null +++ b/tests/long_running_tasks/conftest.py @@ -0,0 +1,46 @@ +"""Fixtures for tests.""" +import pytest +import socket + +import tango + +from tango.test_context import get_host_ip, MultiDeviceTestContext + + +@pytest.fixture(scope="module") +def devices_to_test(request): + """Fixture for devices to test.""" + yield getattr(request.module, "devices_to_test") + + +@pytest.fixture(scope="function") +def multi_device_tango_context( + mocker, devices_to_test # pylint: disable=redefined-outer-name +): + """ + Create and return a TANGO MultiDeviceTestContext object. + + tango.DeviceProxy patched to work around a name-resolving issue. + """ + + def _get_open_port(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(("", 0)) + s.listen(1) + port = s.getsockname()[1] + s.close() + return port + + HOST = get_host_ip() + PORT = _get_open_port() + _DeviceProxy = tango.DeviceProxy + mocker.patch( + "tango.DeviceProxy", + wraps=lambda fqdn, *args, **kwargs: _DeviceProxy( + "tango://{0}:{1}/{2}#dbase=no".format(HOST, PORT, fqdn), *args, **kwargs + ), + ) + with MultiDeviceTestContext( + devices_to_test, host=HOST, port=PORT, process=True + ) as context: + yield context diff --git a/tests/long_running_tasks/reference_base_device.py b/tests/long_running_tasks/reference_base_device.py index 4d90ed45b765fd94ea0f93e704a2d3c1df9ab2f5..329d1c54e8652add3617931b6ae347bdbe6ba4bb 100644 --- a/tests/long_running_tasks/reference_base_device.py +++ b/tests/long_running_tasks/reference_base_device.py @@ -10,6 +10,7 @@ There are two versions used for testing long running commands. It is provided to support testing of the BaseDevice. """ import time +import tango from tango.server import command, device_property from tango import DebugIt @@ -55,14 +56,9 @@ class LongRunningCommandBaseTestDevice(SKABaseDevice): ) self.register_command_object( - "TestProgressNoArgs", - self.TestProgressNoArgsCommand(self.component_manager, logger=self.logger), - ) - - self.register_command_object( - "TestProgressWithArgs", - self.TestProgressWithArgsCommand( - self.component_manager, logger=self.logger + "CallChildren", + self.CallChildrenCommand( + self.component_manager, logger=self.logger, devices=self.client_devices ), ) @@ -190,46 +186,40 @@ class LongRunningCommandBaseTestDevice(SKABaseDevice): (return_code, message) = self.component_manager.enqueue(handler, argin) return f"{return_code}", f"{message}" - class TestProgressNoArgsCommand(ResponseCommand): - """The command class for the TestProgressNoArgsCommand command.""" + class CallChildrenCommand(ResponseCommand): + """The command class for the TestProgress command.""" - def do(self): - """Execute something on the long running device.""" - interface = self.target.lrc_device_interface - interface.execute_long_running_command("TestProgress", 0.5, None) - self.logger.info("In TestProgressNoArgsCommand") - return (ResultCode.OK, "Done TestProgressNoArgsCommand") + def __init__(self, target, *args, logger=None, **kwargs): + """Create ResponseCommand, add devices. - @command( - dtype_in=None, - dtype_out="DevVarStringArray", - ) - @DebugIt() - def TestProgressNoArgs(self): - """Command to execute the TestProgress on long running command device.""" - handler = self.get_command_object("TestProgressNoArgs") - (return_code, message) = self.component_manager.enqueue(handler) - return f"{return_code}", f"{message}" - - class TestProgressWithArgsCommand(ResponseCommand): - """The command class for the TestProgressWithArgsCommand command.""" + :param target: component manager + :type target: BaseComponentManager + :param logger: logger, defaults to None + :type logger: logging.Logger, optional + """ + self.devices = kwargs.pop("devices") + super().__init__(target, *args, logger=logger, **kwargs) def do(self, argin): - """Execute something on the long running device.""" - interface = self.target.lrc_device_interface - interface.execute_long_running_command("TestProgress", argin, None) - self.logger.info("In TestProgressWithArgs") - return (ResultCode.OK, "Done TestProgressWithArgsCommand") + """Call `CallChildren` on children, or block if not.""" + if self.devices: + for device in self.devices: + proxy = tango.DeviceProxy(device) + proxy.CallChildren(argin) + return ResultCode.QUEUED, f"Called children: {self.devices}" + else: + time.sleep(argin) + return ResultCode.OK, f"Slept {argin}" @command( dtype_in=float, dtype_out="DevVarStringArray", ) @DebugIt() - def TestProgressWithArgs(self, argin): - """Command to execute the TestProgress on long running command device.""" - handler = self.get_command_object("TestProgressWithArgs") - (return_code, message) = self.component_manager.enqueue(handler, argin) + def CallChildren(self, argin): + """Command to call `CallChildren` on children, or block if not.""" + command = self.get_command_object("CallChildren") + (return_code, message) = self.component_manager.enqueue(command, argin) return f"{return_code}", f"{message}" @@ -250,20 +240,4 @@ class AsyncBaseDevice(LongRunningCommandBaseTestDevice): max_queue_size=20, num_workers=3, push_change_event=self.push_change_event, - child_devices=self.client_devices, - ) - - -class AsyncClientDevice(LongRunningCommandBaseTestDevice): - """Async client device.""" - - def create_component_manager(self: SKABaseDevice): - """Create the component manager with a queue manager that has workers.""" - return QueueWorkerComponentManager( - op_state_model=self.op_state_model, - logger=self.logger, - max_queue_size=20, - num_workers=3, - push_change_event=self.push_change_event, - child_devices=self.client_devices, ) diff --git a/tests/long_running_tasks/test_multi_device.py b/tests/long_running_tasks/test_multi_device.py new file mode 100644 index 0000000000000000000000000000000000000000..01ab65638defc952c87209dd72882ea41894628d --- /dev/null +++ b/tests/long_running_tasks/test_multi_device.py @@ -0,0 +1,216 @@ +"""Test various Tango devices with long running commmands working together.""" +import time +import pytest + +from io import StringIO +from unittest.mock import MagicMock + +from tango.utils import EventCallback +from tango import EventType + +from reference_base_device import AsyncBaseDevice +from ska_tango_base.base.task_queue_manager import TaskResult, TaskState +from ska_tango_base.commands import ResultCode +from ska_tango_base.utils import LongRunningDeviceInterface + +# Testing a chain of calls +# On command `CallChildren` +# If the device has children: +# Call `CallChildren` on each child +# If no children: +# Sleep the time specified, simulating blocking work +# +# test/toplevel/1 +# test/midlevel/1 +# test/lowlevel/1 +# test/lowlevel/2 +# test/midlevel/2 +# test/lowlevel/3 +# test/lowlevel/4 +# test/midlevel/3 +# test/lowlevel/5 +# test/lowlevel/6 + + +devices_to_test = [ + { + "class": AsyncBaseDevice, + "devices": [ + { + "name": "test/toplevel/1", + "properties": { + "client_devices": [ + "test/midlevel/1", + "test/midlevel/2", + "test/midlevel/3", + ], + }, + }, + { + "name": "test/midlevel/1", + "properties": { + "client_devices": [ + "test/lowlevel/1", + "test/lowlevel/2", + ], + }, + }, + { + "name": "test/midlevel/2", + "properties": { + "client_devices": [ + "test/lowlevel/3", + "test/lowlevel/4", + ], + }, + }, + { + "name": "test/midlevel/3", + "properties": { + "client_devices": [ + "test/lowlevel/5", + "test/lowlevel/6", + ], + }, + }, + {"name": "test/lowlevel/1"}, + {"name": "test/lowlevel/2"}, + {"name": "test/lowlevel/3"}, + {"name": "test/lowlevel/4"}, + {"name": "test/lowlevel/5"}, + {"name": "test/lowlevel/6"}, + ], + }, +] + + +class TestMultiDevice: + """Multi-device tests.""" + + @pytest.mark.forked + @pytest.mark.timeout(6) + def test_chain(self, multi_device_tango_context): + """Test that commands flow from top to middle to low level.""" + # Top level + top_device = multi_device_tango_context.get_device("test/toplevel/1") + top_device_result_events = EventCallback(fd=StringIO()) + top_device.subscribe_event( + "longRunningCommandResult", + EventType.CHANGE_EVENT, + top_device_result_events, + wait=True, + ) + top_device_queue_events = EventCallback(fd=StringIO()) + top_device.subscribe_event( + "longRunningCommandsInQueue", + EventType.CHANGE_EVENT, + top_device_queue_events, + wait=True, + ) + + # Mid level + mid_device = multi_device_tango_context.get_device("test/midlevel/3") + mid_device_result_events = EventCallback(fd=StringIO()) + mid_device.subscribe_event( + "longRunningCommandResult", + EventType.CHANGE_EVENT, + mid_device_result_events, + wait=True, + ) + mid_device_queue_events = EventCallback(fd=StringIO()) + mid_device.subscribe_event( + "longRunningCommandsInQueue", + EventType.CHANGE_EVENT, + mid_device_queue_events, + wait=True, + ) + + # Low level + low_device = multi_device_tango_context.get_device("test/lowlevel/6") + low_device_result_events = EventCallback(fd=StringIO()) + low_device.subscribe_event( + "longRunningCommandResult", + EventType.CHANGE_EVENT, + low_device_result_events, + wait=True, + ) + low_device_queue_events = EventCallback(fd=StringIO()) + low_device.subscribe_event( + "longRunningCommandsInQueue", + EventType.CHANGE_EVENT, + low_device_queue_events, + wait=True, + ) + + # Call the toplevel command + # Sleep for 4 so that if a task is not queued the Tango command will time out + tr = TaskResult.from_response_command(top_device.CallChildren(4)) + assert tr.result_code == ResultCode.QUEUED + + # Get all the events + top_result_events = self.get_events(top_device_result_events, 1) + top_queue_events = self.get_events(top_device_queue_events, 1) + mid_result_events = self.get_events(mid_device_result_events, 1) + mid_queue_events = self.get_events(mid_device_queue_events, 1) + low_result_events = self.get_events(low_device_result_events, 1) + low_queue_events = self.get_events(low_device_queue_events, 1) + + # Make sure every level device command gets queued + top_queue_events[0] == ("CallChildrenCommand",) + mid_queue_events[0] == ("CallChildrenCommand",) + low_queue_events[0] == ("CallChildrenCommand",) + + top_level_taskresult = TaskResult.from_task_result(top_result_events[0]) + mid_level_taskresult = TaskResult.from_task_result(mid_result_events[0]) + low_level_taskresult = TaskResult.from_task_result(low_result_events[0]) + + # Make sure the command moved from top level to lowest level + assert ( + top_level_taskresult.task_result + == "Called children: ['test/midlevel/1', 'test/midlevel/2', 'test/midlevel/3']" + ) + assert ( + mid_level_taskresult.task_result + == "Called children: ['test/lowlevel/5', 'test/lowlevel/6']" + ) + assert low_level_taskresult.task_result == "Slept 4.0" + + @pytest.mark.forked + @pytest.mark.timeout(8) + def test_util_interface(self, multi_device_tango_context): + """Test LongRunningDeviceInterface.""" + devices = [] + for i in range(1, 5): + devices.append(f"test/lowlevel/{i}") + + mock_dunc = MagicMock() + + dev_interface = LongRunningDeviceInterface(devices, logger=None) + dev_interface.execute_long_running_command( + "CallChildren", 1.0, on_completion_callback=mock_dunc + ) + time.sleep(2) + assert mock_dunc.called + assert mock_dunc.call_args[0][0] == "CallChildren" + + task_ids = mock_dunc.call_args[0][1] + + low_device = multi_device_tango_context.get_device("test/lowlevel/1") + for id in task_ids: + res = low_device.CheckLongRunningCommandStatus(id) + if int(res[1][0]) == TaskState.COMPLETED: + break + else: + assert 0, "At least one task should be completed on device test/lowlevel/1" + + def get_events(self, event_callback: EventCallback, min_required: int): + """Keep reading events until the required count is found.""" + events = [] + while len(events) < min_required: + events = [ + i.attr_value.value + for i in event_callback.get_events() + if i.attr_value and i.attr_value.value + ] + time.sleep(0.2) + return events diff --git a/tests/long_running_tasks/test_reference_base_device.py b/tests/long_running_tasks/test_reference_base_device.py index 74099b2d5399a36db4e69e270a5cba0e2557a476..470c5e52ffb3e141009903367dbf266a18d5fa9b 100644 --- a/tests/long_running_tasks/test_reference_base_device.py +++ b/tests/long_running_tasks/test_reference_base_device.py @@ -12,7 +12,6 @@ from tango.utils import EventCallback from reference_base_device import ( BlockingBaseDevice, AsyncBaseDevice, - AsyncClientDevice, ) from ska_tango_base.base.task_queue_manager import TaskResult from ska_tango_base.commands import ResultCode @@ -200,182 +199,3 @@ def test_events(): ] for index, progress in enumerate(["1", "25", "50", "74", "100"]): assert progress_event_values[index][1] == progress - - -devices_to_test = [ - { - "class": AsyncBaseDevice, - "devices": [ - {"name": "test/asyncdevice/1"}, - {"name": "test/asyncdevice/2"}, - ], - }, - { - "class": AsyncClientDevice, - "devices": [ - { - "name": "test/asyncclientdevice/1", - "properties": { - "client_devices": [ - "test/asyncdevice/1", - ], - }, - }, - { - "name": "test/asyncclientdevice/2", - "properties": { - "client_devices": [ - "test/asyncdevice/1", - "test/asyncdevice/2", - ], - }, - }, - ], - }, -] - - -@pytest.mark.skip( - "These tests should be made more robust. Getting inconsistent results" -) -class TestMultiDevice: - """Multi-device tests.""" - - # TODO track events from underlying device(s) instead - - @pytest.mark.forked - def test_two_async_devices_communicating(self, multi_device_tango_context): - """Test only two devices communication.""" - client = multi_device_tango_context.get_device("test/asyncclientdevice/1") - - commands_in_queue_events = EventCallback(fd=StringIO()) - client.subscribe_event( - "longRunningCommandsInQueue", - EventType.CHANGE_EVENT, - commands_in_queue_events, - wait=True, - ) - - command_ids_in_queue_events = EventCallback(fd=StringIO()) - client.subscribe_event( - "longRunningCommandIDsInQueue", - EventType.CHANGE_EVENT, - command_ids_in_queue_events, - wait=True, - ) - - client.TestProgressNoArgs() - client.ping() - time.sleep(2) - - commands_in_queue = [ - i.attr_value.value[0] - for i in commands_in_queue_events.get_events() - if i.attr_value.value - ] - # though two commands were triggered, only one is registered in the - # event callback since ping is not registered as a long running command - assert commands_in_queue == ["TestProgressNoArgsCommand"] - - command_ids_in_queue = [ - i.attr_value.value - for i in command_ids_in_queue_events.get_events() - if i.attr_value.value - ] - # there should be 1 ID since 1 command was triggered - assert len(command_ids_in_queue) == 1 - - @pytest.mark.forked - def test_multiple_async_devices_communicating(self, multi_device_tango_context): - """Test multiple devices.""" - client = multi_device_tango_context.get_device("test/asyncclientdevice/2") - - commands_in_queue_events = EventCallback(fd=StringIO()) - client.subscribe_event( - "longRunningCommandsInQueue", - EventType.CHANGE_EVENT, - commands_in_queue_events, - wait=True, - ) - - command_ids_in_queue_events = EventCallback(fd=StringIO()) - client.subscribe_event( - "longRunningCommandIDsInQueue", - EventType.CHANGE_EVENT, - command_ids_in_queue_events, - wait=True, - ) - - client.TestProgressNoArgs() - client.ping() - client.TestProgressWithArgs(0.5) - time.sleep(3) - - commands_in_queue = [ - i.attr_value.value[0] - for i in commands_in_queue_events.get_events() - if i.attr_value.value - ] - assert commands_in_queue == [ - "TestProgressNoArgsCommand", - "TestProgressWithArgsCommand", - ] - - commands_ids_in_queue = [ - i.attr_value.value - for i in command_ids_in_queue_events.get_events() - if i.attr_value.value - ] - # should be checking for 4 since each comand triggers - # two devices under the hood, see TODO in class - assert len(commands_ids_in_queue) == 2 - - @pytest.mark.forked - def test_multiple_async_devices_communicating_with_duplicate_commands( - self, multi_device_tango_context - ): - """Test multiple devices with duplicate commands.""" - client = multi_device_tango_context.get_device("test/asyncclientdevice/2") - - commands_in_queue_events = EventCallback(fd=StringIO()) - client.subscribe_event( - "longRunningCommandsInQueue", - EventType.CHANGE_EVENT, - commands_in_queue_events, - wait=True, - ) - - command_ids_in_queue_events = EventCallback(fd=StringIO()) - client.subscribe_event( - "longRunningCommandIDsInQueue", - EventType.CHANGE_EVENT, - command_ids_in_queue_events, - wait=True, - ) - - client.TestProgressNoArgs() - client.TestProgressNoArgs() - client.TestProgressNoArgs() - client.TestProgressWithArgs(0.5) - time.sleep(3) - - commands_in_queue = [ - i.attr_value.value - for i in commands_in_queue_events.get_events() - if i.attr_value.value - ] - assert commands_in_queue == [ - ("TestProgressNoArgsCommand",), - ("TestProgressNoArgsCommand",), - ("TestProgressNoArgsCommand",), - ("TestProgressWithArgsCommand",), - ] - - commands_ids_in_queue = [ - i.attr_value.value - for i in command_ids_in_queue_events.get_events() - if i.attr_value.value - ] - # should be checking for 8 since each comand triggers - # two devices under the hood, see TODO in class - assert len(commands_ids_in_queue) == 4