diff --git a/src/ska_tango_base/base/reference_component_manager.py b/src/ska_tango_base/base/reference_component_manager.py index d69afb14f8abb0d2fa0b00d6bb6373ccd26a8dd0..0b05e8a5ac37d23cf97fe8daa0d13b5ed58358ea 100644 --- a/src/ska_tango_base/base/reference_component_manager.py +++ b/src/ska_tango_base/base/reference_component_manager.py @@ -6,12 +6,13 @@ package. """ import functools import logging -from typing import Optional, Callable +from typing import Optional, Callable, List from ska_tango_base.base import BaseComponentManager, OpStateModel from ska_tango_base.base.task_queue_manager import QueueManager from ska_tango_base.control_model import PowerMode from ska_tango_base.faults import ComponentFault +from ska_tango_base.utils import LongRunningDeviceInterface def check_communicating(func): @@ -394,7 +395,7 @@ class ReferenceBaseComponentManager(BaseComponentManager): self.op_state_model.perform_action("component_fault") -class QueueWorkerComponentManager(BaseComponentManager): +class QueueWorkerComponentManager(ReferenceBaseComponentManager): """A component manager that configures the queue manager.""" def __init__( @@ -404,6 +405,7 @@ class QueueWorkerComponentManager(BaseComponentManager): max_queue_size: int, num_workers: int, push_change_event: Optional[Callable], + child_devices: Optional[List[str]], *args, **kwargs ): @@ -419,12 +421,15 @@ class QueueWorkerComponentManager(BaseComponentManager): :type num_workers: int :param push_change_event: A method that will be called when attributes are updated :type push_change_event: Callable + :param child_devices: Names of child devices to execute long running commands on + :type child_devices: List of strings """ self.logger = logger self.max_queue_size = max_queue_size self.num_workers = num_workers self.push_change_event = push_change_event - super().__init__(op_state_model, *args, **kwargs) + self.lrc_device_interface = LongRunningDeviceInterface(child_devices, logger) + super().__init__(op_state_model, *args, logger=logger, **kwargs) def create_queue_manager(self) -> QueueManager: """Create a QueueManager. diff --git a/src/ska_tango_base/utils.py b/src/ska_tango_base/utils.py index 1cdfe12da26026d7f6ccd42dba7e0709402439e6..f0824e975f22d5c6f66c5f73fe9daa32a4ed8c70 100644 --- a/src/ska_tango_base/utils.py +++ b/src/ska_tango_base/utils.py @@ -4,12 +4,16 @@ import ast import functools import inspect import json +import logging import pydoc import traceback import sys +import uuid import warnings +from dataclasses import dataclass from datetime import datetime +from typing import Any, Callable, Dict, List import tango from tango import ( @@ -20,10 +24,13 @@ from tango import ( AttrWriteType, Except, ErrSeverity, + EventData, + EventType, ) from tango import DevState from contextlib import contextmanager from ska_tango_base.faults import GroupDefinitionsError, SKABaseError +from ska_tango_base.commands import ResultCode int_types = { tango._tango.CmdArgType.DevUShort, @@ -539,3 +546,192 @@ def for_testing_only(func, _testing_check=lambda: "pytest" in sys.modules): return func(*args, **kwargs) return _wrapper + + +@dataclass +class LongRunningRequestResponse: + """Convenience class to parse the long running command response.""" + + response_code: ResultCode + command_id: str + command_name: str + + def __init__(self, request_response): + """Create the LongRunningRequestResponse dataclass. + + :param request_response: The response from a Long Running + Request Command + :type request_response: list + """ + self.response_code = request_response[0][0] + self.command_id = request_response[1][0] + self.command_name = self.command_id.split("_")[1] + + +@dataclass +class StoredCommand: + """Used to keep track of commands scheduled across devices. + + command_name: The Tango command to execute across devices. + command_id: Every Tango device will return the command ID for the + long running command submitted to it. + is_completed: Whether the command is done or not + """ + + command_name: str + command_id: str + is_completed: bool + + +class LongRunningDeviceInterface: + """This class is a convenience class to be used by clients of devices + that implement long running commands. + + The intent of this class is that clients should not have to keep + track of command IDs or the various attributes + to determine long running command progress/results. + + This class is also useful when you want to run a long running + command across various devices. Once they all complete a callback + supplied by the user is fired. + + Using this class, a client would need to: + - Supply the Tango devices to connect to that implements long + running commands + - The Long running commands to run (including parameter) + - Optional callback that should be executed when the command + completes + + The callback will be executed once the command completes across all + devices. Thus there's no need to watch attribute changes or keep + track of commands IDs. They are handled here. + """ + + def __init__( + self, tango_devices: List[str], logger: logging.Logger + ) -> None: + """Init LRC device interface.""" + self._logger = logger + self._tango_devices = tango_devices + self._long_running_device_proxies = [] + self._result_subscriptions = [] + self._stored_commands: Dict[str, List[StoredCommand]] = {} + self._stored_callbacks: Dict[str, Callable] = {} + + def setup(self): + """Only create the device proxy and subscribe when a command is invoked.""" + if not self._long_running_device_proxies: + for device in self._tango_devices: + self._long_running_device_proxies.append( + tango.DeviceProxy(device) + ) + + if not self._result_subscriptions: + for device_proxy in self._long_running_device_proxies: + self._result_subscriptions.append( + device_proxy.subscribe_event( + "longRunningCommandResult", + EventType.CHANGE_EVENT, + self, + wait=True, + ) + ) + + def push_event(self, ev: EventData): + """Handles the attribute change events. + + For every event that comes in: + + - Update command state: + - Make sure that it's a longrunningcommandresult + - Check to see if the command ID we get from the event + is one we are keeping track of. + - If so, set that command to completed + + - Check if we should fire the callback: + Once the command across all devices have completed + (for that command) + - Check whether all have completed + - If so, fire the callback + - Clean up + """ + if ev.attr_value.name == "longrunningcommandresult": + if ev.attr_value.value: + # push change event to new attribute for all tango devices + # for tango_dev in self._tango_devices: + # tango_dev.push_change_event("lastResultCommandIDs", ev.attr_value.value[0]) + # tango_dev.push_change_event("lastResultCommandName", ev.attr_value.value[1]) + + event_command_id = ev.attr_value.value[0] + for stored_commands in self._stored_commands.values(): + for stored_command in stored_commands: + if stored_command.command_id == event_command_id: + stored_command.is_completed = True + + completed_group_keys = [] + for key, stored_command_group in self._stored_commands.items(): + if stored_command_group: + # Determine if all the commands in this group have completed + commands_are_completed = [ + stored_command.is_completed + for stored_command in stored_command_group + ] + if all(commands_are_completed): + completed_group_keys.append(key) + + # Get the command IDs + command_ids = [ + stored_command.command_id + for stored_command in stored_command_group + ] + command_name = stored_command_group[0].command_name + + # Trigger the callback, send command_name and command_ids + # as paramater + self._stored_callbacks[key](command_name, command_ids) + # Remove callback as the group completed + + # Clean up + # Remove callback and commands no longer needed + for key in completed_group_keys: + del self._stored_callbacks[key] + del self._stored_commands[key] + + def execute_long_running_command( + self, + command_name: str, + command_arg: Any = None, + on_completion_callback: Callable = None, + ): + """Execute the long running command with an argument if any. + + Once the commmand completes, then the `on_completion_callback` + will be executed with the EventData as parameter. + This class keeps track of the command ID and events + used to determine when this commmand has completed. + + :param command_name: A long running command that exists on the + target Tango device. + :type command_name: str + :param command_arg: The argument to be used in the long running + command method. + :type command_arg: Any, optional + :param on_completion_callback: The method to execute when the + long running command has completed. + :type on_completion_callback: callable, optional + """ + self.setup() + unique_id = uuid.uuid4() + self._stored_callbacks[unique_id] = on_completion_callback + self._stored_commands[unique_id] = [] + for device_proxy in self._long_running_device_proxies: + response = LongRunningRequestResponse( + device_proxy.command_inout(command_name, command_arg) + ) + self._stored_commands[unique_id].append( + StoredCommand( + command_name, + response.command_id, + False, + ) + ) diff --git a/tests/conftest.py b/tests/conftest.py index 87da31a373856fb5f6d1386c7c37e005b5af2c31..7652044902f3500724e02991a535361979aca9f5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,10 +1,12 @@ """This module defines elements of the pytest test harness shared by all tests.""" import logging +import socket from queue import Empty, Queue import pytest +import tango from tango import EventType -from tango.test_context import DeviceTestContext +from tango.test_context import get_host_ip, DeviceTestContext, MultiDeviceTestContext @pytest.fixture(scope="class") @@ -219,3 +221,42 @@ def tango_change_event_helper(device_under_test): def logger(): """Fixture that returns a default logger for tests.""" return logging.Logger("Test logger") + + +@pytest.fixture(scope="module") +def devices_to_test(request): + yield getattr(request.module, "devices_to_test") + + +@pytest.fixture(scope="function") +def multi_device_tango_context( + mocker, devices_to_test # pylint: disable=redefined-outer-name +): + """ + Creates and returns a TANGO MultiDeviceTestContext object, with + tango.DeviceProxy patched to work around a name-resolving issue. + """ + + def _get_open_port(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(("", 0)) + s.listen(1) + port = s.getsockname()[1] + s.close() + return port + + HOST = get_host_ip() + PORT = _get_open_port() + _DeviceProxy = tango.DeviceProxy + mocker.patch( + "tango.DeviceProxy", + wraps=lambda fqdn, *args, **kwargs: _DeviceProxy( + "tango://{0}:{1}/{2}#dbase=no".format(HOST, PORT, fqdn), + *args, + **kwargs + ), + ) + with MultiDeviceTestContext( + devices_to_test, host=HOST, port=PORT, process=True + ) as context: + yield context diff --git a/tests/long_running_tasks/reference_base_device.py b/tests/long_running_tasks/reference_base_device.py index 547c5956bbf60fedde22a00e870696aa25544912..a9dde348c1ab5e144022c7545c467ae437b1e3f2 100644 --- a/tests/long_running_tasks/reference_base_device.py +++ b/tests/long_running_tasks/reference_base_device.py @@ -10,7 +10,7 @@ There are two versions used for testing long running commands. It is provided to support testing of the BaseDevice. """ import time -from tango.server import command +from tango.server import command, device_property from tango import DebugIt from ska_tango_base.base.reference_component_manager import QueueWorkerComponentManager @@ -22,6 +22,8 @@ from ska_tango_base.commands import ResponseCommand class LongRunningCommandBaseTestDevice(SKABaseDevice): """Implement commands to test queued work.""" + client_devices = device_property(dtype="DevVarStringArray") + def init_command_objects(self): """Initialise the command handlers.""" super().init_command_objects() @@ -52,6 +54,16 @@ class LongRunningCommandBaseTestDevice(SKABaseDevice): self.TestProgressCommand(self.component_manager, logger=self.logger), ) + self.register_command_object( + "TestProgressNoArgs", + self.TestProgressNoArgsCommand(self.component_manager, logger=self.logger), + ) + + self.register_command_object( + "TestProgressWithArgs", + self.TestProgressWithArgsCommand(self.component_manager, logger=self.logger), + ) + class ShortCommand(ResponseCommand): """The command class for the Short command.""" @@ -176,6 +188,50 @@ class LongRunningCommandBaseTestDevice(SKABaseDevice): (return_code, message) = self.component_manager.enqueue(handler, argin) return f"{return_code}", f"{message}" + class TestProgressNoArgsCommand(ResponseCommand): + """The command class for the TestProgressNoArgsCommand command.""" + + def do(self): + """Execute something on the long running device.""" + interface = self.target.lrc_device_interface + interface.execute_long_running_command( + "TestProgress", 0.5, None) + self.logger.info("In TestProgressNoArgsCommand") + return (ResultCode.OK, "Done TestProgressNoArgsCommand") + + @command( + dtype_in=None, + dtype_out="DevVarStringArray", + ) + @DebugIt() + def TestProgressNoArgs(self): + """Command to execute the TestProgress on long running command device.""" + handler = self.get_command_object("TestProgressNoArgs") + (return_code, message) = self.component_manager.enqueue(handler) + return f"{return_code}", f"{message}" + + class TestProgressWithArgsCommand(ResponseCommand): + """The command class for the TestProgressWithArgsCommand command.""" + + def do(self, argin): + """Execute something on the long running device.""" + interface = self.target.lrc_device_interface + interface.execute_long_running_command( + "TestProgress", argin, None) + self.logger.info("In TestProgressWithArgs") + return (ResultCode.OK, "Done TestProgressWithArgsCommand") + + @command( + dtype_in=float, + dtype_out="DevVarStringArray", + ) + @DebugIt() + def TestProgressWithArgs(self, argin): + """Command to execute the TestProgress on long running command device.""" + handler = self.get_command_object("TestProgressWithArgs") + (return_code, message) = self.component_manager.enqueue(handler, argin) + return f"{return_code}", f"{message}" + class BlockingBaseDevice(LongRunningCommandBaseTestDevice): """Test device that has a component manager with the default queue manager that has no workers.""" @@ -194,4 +250,20 @@ class AsyncBaseDevice(LongRunningCommandBaseTestDevice): max_queue_size=20, num_workers=3, push_change_event=self.push_change_event, + child_devices=self.client_devices, + ) + + +class AsyncClientDevice(LongRunningCommandBaseTestDevice): + """Async client device.""" + + def create_component_manager(self: SKABaseDevice): + """Create the component manager with a queue manager that has workers.""" + return QueueWorkerComponentManager( + op_state_model=self.op_state_model, + logger=self.logger, + max_queue_size=20, + num_workers=3, + push_change_event=self.push_change_event, + child_devices=self.client_devices, ) diff --git a/tests/long_running_tasks/test_reference_base_device.py b/tests/long_running_tasks/test_reference_base_device.py index 470c5e52ffb3e141009903367dbf266a18d5fa9b..b29e1260a419c07007dd0c8d6976c777716362e4 100644 --- a/tests/long_running_tasks/test_reference_base_device.py +++ b/tests/long_running_tasks/test_reference_base_device.py @@ -9,9 +9,10 @@ from unittest import mock from tango import EventType from tango.test_context import DeviceTestContext from tango.utils import EventCallback -from reference_base_device import ( +from ska_tango_base.base.reference_base_device import ( BlockingBaseDevice, AsyncBaseDevice, + AsyncClientDevice, ) from ska_tango_base.base.task_queue_manager import TaskResult from ska_tango_base.commands import ResultCode @@ -199,3 +200,175 @@ def test_events(): ] for index, progress in enumerate(["1", "25", "50", "74", "100"]): assert progress_event_values[index][1] == progress + + +devices_to_test = [ + { + "class": AsyncBaseDevice, + "devices": [ + {"name": "test/asyncdevice/1"}, + {"name": "test/asyncdevice/2"}, + ], + }, + { + "class": AsyncClientDevice, + "devices": [ + { + "name": "test/asyncclientdevice/1", + "properties": { + "client_devices": [ + "test/asyncdevice/1", + ], + }, + }, + { + "name": "test/asyncclientdevice/2", + "properties": { + "client_devices": [ + "test/asyncdevice/1", + "test/asyncdevice/2", + ], + }, + }, + ], + }, +] + + +class TestMultiDevice: + """Multi-device tests.""" + + # TODO track events from underlying device(s) instead + + def test_two_async_devices_communicating(self, multi_device_tango_context): + """Test only two devices communication.""" + client = multi_device_tango_context.get_device( + "test/asyncclientdevice/1" + ) + + commands_in_queue_events = EventCallback(fd=StringIO()) + client.subscribe_event( + "longRunningCommandsInQueue", + EventType.CHANGE_EVENT, + commands_in_queue_events, + wait=True, + ) + + command_ids_in_queue_events = EventCallback(fd=StringIO()) + client.subscribe_event( + "longRunningCommandIDsInQueue", + EventType.CHANGE_EVENT, + command_ids_in_queue_events, + wait=True, + ) + + client.TestProgressNoArgs() + client.ping() + time.sleep(2) + + commands_in_queue = [ + i.attr_value.value[0] + for i in commands_in_queue_events.get_events() + if i.attr_value.value + ] + # though two commands were triggered, only one is registered in the + # event callback since ping is not registered as a long running command + assert commands_in_queue == ['TestProgressNoArgsCommand'] + + command_ids_in_queue = [ + i.attr_value.value + for i in command_ids_in_queue_events.get_events() + if i.attr_value.value + ] + # there should be 1 ID since 1 command was triggered + assert len(command_ids_in_queue) == 1 + + def test_multiple_async_devices_communicating(self, multi_device_tango_context): + """Test multiple devices.""" + client = multi_device_tango_context.get_device( + "test/asyncclientdevice/2" + ) + + commands_in_queue_events = EventCallback(fd=StringIO()) + client.subscribe_event( + "longRunningCommandsInQueue", + EventType.CHANGE_EVENT, + commands_in_queue_events, + wait=True, + ) + + command_ids_in_queue_events = EventCallback(fd=StringIO()) + client.subscribe_event( + "longRunningCommandIDsInQueue", + EventType.CHANGE_EVENT, + command_ids_in_queue_events, + wait=True, + ) + + client.TestProgressNoArgs() + client.ping() + client.TestProgressWithArgs(0.5) + time.sleep(3) + + commands_in_queue = [ + i.attr_value.value[0] + for i in commands_in_queue_events.get_events() + if i.attr_value.value + ] + assert commands_in_queue == ['TestProgressNoArgsCommand', 'TestProgressWithArgsCommand'] + + commands_ids_in_queue = [ + i.attr_value.value + for i in command_ids_in_queue_events.get_events() + if i.attr_value.value + ] + # should be checking for 4 since each comand triggers + # two devices under the hood, see TODO in class + assert len(commands_ids_in_queue) == 2 + + def test_multiple_async_devices_communicating_with_duplicate_commands(self, multi_device_tango_context): + """Test multiple devices with duplicate commands.""" + client = multi_device_tango_context.get_device( + "test/asyncclientdevice/2" + ) + + commands_in_queue_events = EventCallback(fd=StringIO()) + client.subscribe_event( + "longRunningCommandsInQueue", + EventType.CHANGE_EVENT, + commands_in_queue_events, + wait=True, + ) + + command_ids_in_queue_events = EventCallback(fd=StringIO()) + client.subscribe_event( + "longRunningCommandIDsInQueue", + EventType.CHANGE_EVENT, + command_ids_in_queue_events, + wait=True, + ) + + client.TestProgressNoArgs() + client.TestProgressNoArgs() + client.TestProgressNoArgs() + client.ping() + client.TestProgressWithArgs(0.5) + time.sleep(3) + + commands_in_queue = [ + i.attr_value.value + for i in commands_in_queue_events.get_events() + if i.attr_value.value + ] + assert commands_in_queue == [ + 'TestProgressNoArgsCommand', 'TestProgressNoArgsCommand', + 'TestProgressNoArgsCommand', 'TestProgressWithArgsCommand'] + + commands_ids_in_queue = [ + i.attr_value.value + for i in command_ids_in_queue_events.get_events() + if i.attr_value.value + ] + # should be checking for 8 since each comand triggers + # two devices under the hood, see TODO in class + assert len(commands_ids_in_queue) == 4 diff --git a/tests/long_running_tasks/test_task_queue_manager.py b/tests/long_running_tasks/test_task_queue_manager.py index 1907102350167e0425272fac10460b5cc972ca25..ab32b961b5383901355a08d153a8889d2182a6cc 100644 --- a/tests/long_running_tasks/test_task_queue_manager.py +++ b/tests/long_running_tasks/test_task_queue_manager.py @@ -402,6 +402,7 @@ class TestQueueManagerExit: max_queue_size=10, num_workers=2, push_change_event=catch_updates, + child_devices=[], ) cm.enqueue(abort_task(), 0.1) @@ -463,6 +464,7 @@ class TestQueueManagerExit: max_queue_size=5, num_workers=2, push_change_event=None, + child_devices=[], ) cm.enqueue(stop_task()) @@ -490,6 +492,7 @@ class TestQueueManagerExit: max_queue_size=8, num_workers=2, push_change_event=None, + child_devices=[], ) cm.enqueue(slow_task()) cm.enqueue(stop_task()) @@ -517,6 +520,7 @@ class TestComponentManager: max_queue_size=0, num_workers=1, push_change_event=None, + child_devices=[], ) assert cm.task_ids_in_queue == ()