diff --git a/README.md b/README.md index 05a8cbe2aab8cf789777d141e87fab07dd774113..5cfefc5cac2da203dc1e754f28f005b72bbcf3f2 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,7 @@ Next change the version in the following places: # Release Notes +* 0.20.4 Collapse AbstractHierarchyDevice and AbstractHierarchy into one class * 0.20.3 Fix application of Field_Attenuation_R * 0.20.2 Support only one parent in hierarchies * 0.20.1 Create an abstract AntennaMapper class which implements behavior of both AntennaToSdpMapper diff --git a/tangostationcontrol/VERSION b/tangostationcontrol/VERSION index 144996ed2ce21fec05796dcb821c56088f0ae67a..6dd46024a4c072db92c9d8eb54c221f0d3996a2b 100644 --- a/tangostationcontrol/VERSION +++ b/tangostationcontrol/VERSION @@ -1 +1 @@ -0.20.3 +0.20.4 diff --git a/tangostationcontrol/integration_test/default/devices/base_device_classes/test_power_hierarchy.py b/tangostationcontrol/integration_test/default/devices/base_device_classes/test_power_hierarchy.py index 531458412ecb156cb8650b8215d9c6e1dd9d765d..816ab446670af44bda8cbe73d5a73da257aaabbb 100644 --- a/tangostationcontrol/integration_test/default/devices/base_device_classes/test_power_hierarchy.py +++ b/tangostationcontrol/integration_test/default/devices/base_device_classes/test_power_hierarchy.py @@ -4,22 +4,21 @@ """ Power Hierarchy module integration test """ +import logging from tango import DevState, DeviceProxy +from integration_test import base +from integration_test.device_proxy import TestDeviceProxy from tangostationcontrol.devices.base_device_classes.power_hierarchy import ( - PowerHierarchy, + PowerHierarchyDevice, ) -from integration_test import base -from integration_test.device_proxy import TestDeviceProxy - -import logging logger = logging.getLogger() -class TestPowerHierarchy(base.IntegrationTestCase): - """Integration Test class for PowerHierarchy methods""" +class TestPowerHierarchyDevice(base.IntegrationTestCase): + """Integration Test class for PowerHierarchyDevice methods""" pwr_attr_name = "hardware_powered_R" @@ -106,7 +105,7 @@ class TestPowerHierarchy(base.IntegrationTestCase): self.setup_stationmanager_proxy() self.setup_all_devices() - stationmanager_ph = PowerHierarchy() + stationmanager_ph = PowerHierarchyDevice() stationmanager_ph.init(self.stationmanager_name) children_hierarchy = stationmanager_ph.children(depth=2) @@ -122,7 +121,7 @@ class TestPowerHierarchy(base.IntegrationTestCase): ) # Check if PSOC retrieves correctly its parent state (StationManager -> ON) - psoc_ph = PowerHierarchy() + psoc_ph = PowerHierarchyDevice() psoc_ph.init(self.psoc_name) self.assertEqual(psoc_ph.parent_state(), DevState.ON) # Check if child reads correctly a parent attribute diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy.py deleted file mode 100644 index 7b11cf11e38b78a786161c130ddf8ff0500d3615..0000000000000000000000000000000000000000 --- a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy.py +++ /dev/null @@ -1,415 +0,0 @@ -# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) -# SPDX-License-Identifier: Apache-2.0 - -"""Abstract Hierarchy for PyTango devices""" - -import fnmatch -import logging -from abc import ABC -from enum import Enum -from typing import Callable -from typing import Dict -from typing import List -from typing import Optional -from typing import Union - -import tango -from tango import DevState -from tango import DeviceProxy - -from tangostationcontrol.common.proxy import create_device_proxy - -logger = logging.getLogger() - - -class HierarchyMatchingFilter(Enum): - """Select to filter by exact match, substring or regex""" - - Exact = 0 - Find = 1 - Regex = 2 - - -class NotFoundException(Exception): - """The search for a node in the hierarchy turned up empty""" - - pass - - -class AbstractHierarchy(ABC): - """AbstractHierarchy""" - - children_type = Dict[str, Dict[str, Union[DeviceProxy, "children_type"]]] - child_filter_func_type = Callable[[str, str], bool] - child_filter_input_type = Union[str, Enum] - - def __init__( - self, - child_property_name: str, - children: List[str] = None, - parent: str = None, - proxies: Optional[Dict[str, DeviceProxy]] = None, - ): - """Construct the hierarchy and provide protected access - - :param child_property_name: The name of the PyTango property to identify - children from your own direct (grand)children - :param children: Your direct children, if any, can be empty list - :param parent: Your direct parent, if any, can be None - :param proxies: Pass reference to DeviceProxy cache dictionary, if None - it will be created. This can be used to ensure single - DeviceProxy instances when devices implement _multiple_ - hierarchies. - """ - - self._child_property_name = child_property_name - self._children = {} - self._parent = None - - # Store proxies internally upon creation and only pass references to - # them. Ensures only single instance of DeviceProxy is created per - # device. - self._proxies = proxies or {} - - if parent: - self._parent = create_device_proxy(parent) - - if not children: - children = [] - for child in children: - self._children[child] = None - - def __str__(self) -> str: - return f"Hierarchy of {self._child_property_name}" - - def parent(self) -> str: - """Return the parent device name. Requires the parent to be unique. - - :return: The device name of the parent if any - """ - - if self._parent: - return self._parent.dev_name().casefold() - - return None - - @staticmethod - def _get_filter(filter_type: HierarchyMatchingFilter) -> child_filter_func_type: - """Get specific filter function for exact or contains matching - - These functions ensure the matching is case-insensitive - - :param exact: Bool to determine type of filter function - :return: Function to match exactly or by contains - """ - - def f_regex(name: str, filter_str: str) -> bool: - return fnmatch.fnmatch(name.casefold(), filter_str.casefold()) - - def f_exact(name: str, filter_str: str) -> bool: - return name.casefold() == filter_str.casefold() - - def f_find(name: str, filter_str: str) -> bool: - return name.casefold().find(filter_str.casefold()) >= 0 - - match = f_find - if filter_type == HierarchyMatchingFilter.Exact: - match = f_exact - elif filter_type == HierarchyMatchingFilter.Regex: - match = f_regex - - return match - - def _get_or_create_proxy(self, device: str) -> DeviceProxy: - """Create a proxy if it does not yet exist otherwise pass reference - - :param device: full device name - :return: Reference to DeviceProxy from internal cache - """ - - device = device.casefold() - - if not self._proxies.get(device): - self._proxies[device] = create_device_proxy(device, 30000) - - return self._proxies[device] - - def _get_children(self, child: str, depth: int) -> children_type: - """Recursively create dict structure of DeviceProxy and children - - Built a depth-first recursive dict structure terminating at - :attr:`~depth` by reading the :attr:`~self._child_property` property - of each proxy. depth may be set to -1 for indefinite recursion - - Resulting datastructure of format - _children_ = { - device_string: { - 'proxy': DeviceProxy(device_string), - 'children': _children_ - }, - ... - } - - :warning: Makes no attempt to detect cycles in the tree and if they - exist will never terminate and consume infinite memory! - :param child: full device name string from the current child - :param depth: Maximum depth to recurse to, -1 for indefinite - :return: recursive datastructure of proxies and children as described - """ - - proxy = self._get_or_create_proxy(child) - - # TODO(Corne): Assert if this value changes even if the property - # has become persistent / immutable with the original value - # for the given device. If so resolve potential issues - children = proxy.get_property(self._child_property_name)[ - self._child_property_name - ] - - if len(children) == 0 or depth == 0: - return {"proxy": proxy, "children": {}} - - # Perform depth-first recursion to build tree of children and their - # children - proxies = {} - for child in children: - child = child.casefold() - - try: - proxies[child] = self._get_children(child, depth - 1) - except Exception as e: - raise NotFoundException( - f"Could not obtain proxy to child {child} of parent {proxy.dev_name()}" - ) from e - - return {"proxy": proxy, "children": proxies} - - def children(self, depth: int = 1) -> children_type: - """Retrieve DeviceProxies of children up to depth - - :param depth: Maximum steps of traversing children, -1 for unlimited - :raises tango.NonDbDevice: Raised if the child device does not exist in - the tango database - :raises tango.ConnectionFailed: Raised if connecting to the tango - database failed - :raises tango.CommunicationFailed: Raised if communication with the - tango database failed - :raises tango.DevFailed: Raised if the tango database encounters an - error - :return: Dict of DeviceProxies, children and grandchildren up to - :attr:`~depth` of recursive structure _children_ = { - device_string: { - 'proxy': DeviceProxy(device_string), - 'children': _children_ - }, - ... - } - """ - - children = {} - for child in self._children.keys(): - child = child.casefold() - children[child] = self._get_children(child, depth - 1) - - return children - - def _children_names( - self, - child_filter_str: str, - child_filter_func: child_filter_func_type, - children: children_type, - ) -> List[str]: - """Return children names matching child_filter substring - - :param child_filter_str: Substring of the device to retrieve device names list - for - :param children: Recursive dictionary of children of the caller hierarchy - device - """ - - device_names = [] - child_filter_str = child_filter_str.casefold() - - for name, data in children.items(): - name = name.casefold() - if child_filter_func(name, child_filter_str): - device_names.append(name) - - if not data["children"]: - continue - - device_names.extend( - self._children_names( - child_filter_str, child_filter_func, data["children"] - ) - ) - - return device_names - - def children_names( - self, - child_filter: child_filter_input_type, - filter_type: HierarchyMatchingFilter, - ) -> List[str]: - """Retrieve Device children names matching child_filter substring - - :param child_filter: Substring of the device to retrieve device names list - for - :param filter_type: Type of filter such as exact, substring or regex - :return A list of device names matching the child filter substring - """ - if isinstance(child_filter, Enum): - child_filter = child_filter.value - - return self._children_names( - child_filter, self._get_filter(filter_type), self.children(depth=-1) - ) - - def _child( - self, - child_filter_str: str, - child_filter_func: child_filter_func_type, - children: children_type, - ) -> Optional[DeviceProxy]: - """Recurse :attr:`~children` to find device and return it - - Only returns single device or None - """ - - for name, data in children.items(): - if child_filter_func(name, child_filter_str): - return data["proxy"] - - if not data["children"]: - continue - - result = self._child(child_filter_str, child_filter_func, data["children"]) - if result: - return result - - def child( - self, - child_filter: child_filter_input_type, - filter_type: HierarchyMatchingFilter, - ) -> DeviceProxy: - """Retrieve DeviceProxy of child matching full name :attr:`~filter` - - :param child_filter: Full name of the device to retrieve DeviceProxy for - :param filter_type: Type of filter such as exact, substring or regex - :raises ValueError: Raised if the child could not be found in the - hierarchy - :raises tango.NonDbDevice: Raised if the child device does not exist in - the tango database - :raises tango.ConnectionFailed: Raised if connecting to the tango - database failed - :raises tango.CommunicationFailed: Raised if communication with the - tango database failed - :raises tango.DevFailed: Raised if the tango database encounters an - error - :return: Return DeviceProxy of child device - """ - if isinstance(child_filter, Enum): - child_filter = child_filter.value - - child = self._child( - child_filter, self._get_filter(filter_type), self.children(depth=-1) - ) - - if not child: - raise NotFoundException( - f"Could not find child in {self} matching {child_filter} using filter {filter_type.name}" - ) - - return child - - def branch_child( - self, - child_filter: child_filter_input_type, - filter_type: HierarchyMatchingFilter, - ) -> DeviceProxy: - try: - return self.child(child_filter, filter_type) - except NotFoundException: - pass - - db = tango.Database() - children = db.get_device_property(self.parent(), self._child_property_name)[ - self._child_property_name - ] - return AbstractHierarchy( - self._child_property_name, children, self.parent(), self._proxies - ).branch_child(child_filter, filter_type) - - def branch_children_names( - self, - child_filter: child_filter_input_type, - filter_type: HierarchyMatchingFilter, - ) -> List[str]: - """Retrieve Device children names matching child_filter substring located in - the same branch of the hierarchy - - :param child_filter: Substring of the device to retrieve device names list - for - :param filter_type: Type of filter such as exact, substring or regex - :return A list of device names matching the child filter substring - """ - if isinstance(child_filter, Enum): - child_filter = child_filter.value - - children = self.children_names(child_filter, filter_type) - - if len(children) > 0: - return children - - db = tango.Database() - children = db.get_device_property(self.parent(), self._child_property_name)[ - self._child_property_name - ] - return AbstractHierarchy( - self._child_property_name, children, self.parent(), self._proxies - ).branch_children_names(child_filter, filter_type) - - def read_attribute(self, attribute: str) -> any: - """Allow to read attribute from parent without direct access - - :param attribute: The attribute to read from the parent, can be RW - :return: The data from the attribute - :raises tango.DevFailed: The exception from the DeviceProxy if raised - """ - - if not self._parent: - return None - - return getattr(self._parent, attribute) - - def state(self) -> Optional[DevState]: - """Return the state of the parent without direct access - - :return: The state of the parent if there is one - :raises tango.DevFailed: The exception from the DeviceProxy if raised - """ - - if not self._parent: - return None - - return self._parent.state() - - def walk_down(self, func, depth: int = -1): - """Execute the given function on every node in the tree downwards from the root, depth first.""" - - def _walk(children, func): - for child in children.values(): - func(child["proxy"]) - _walk(child["children"], func) - - _walk(self.children(depth), func) - - def walk_up(self, func, depth: int = -1): - """Execute the given function on every node in the tree upwards from the leaves, depth first.""" - - def _walk(children, func): - for child in reversed(children.values()): - _walk(child["children"], func) - func(child["proxy"]) - - _walk(self.children(depth), func) diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy_device.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy_device.py index 3c2e996a41d966e8b8e7631c4afaa80823139a19..a06d46fec514217845a8d17bdbb2e00d7586c885 100644 --- a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy_device.py @@ -4,34 +4,47 @@ """Abstract Hierarchy Device for PyTango devices""" import logging -from typing import Dict -from typing import List -from typing import Optional +import fnmatch +from enum import Enum +from typing import Dict, List, Optional, Callable, Union -from tango import Database -from tango import DevState -from tango import DeviceProxy +from tango import Database, DevState, DeviceProxy -from tangostationcontrol.devices.base_device_classes.hierarchy import AbstractHierarchy -from tangostationcontrol.devices.base_device_classes.hierarchy import ( - HierarchyMatchingFilter, -) +from tangostationcontrol.common.proxy import create_device_proxy logger = logging.getLogger() +class HierarchyMatchingFilter(Enum): + """Select to filter by exact match, substring or regex""" + + EXACT = 0 + FIND = 1 + REGEX = 2 + + +class NotFoundException(Exception): + """The search for a node in the hierarchy turned up empty""" + + class AbstractHierarchyDevice: - """AbstractHierarchyDevice wraps AbstractHierarchy + """AbstractHierarchyDevice :warning: Do not actually use ABC to make this an abstract class as it will cause conflicting metaclasses with PyTango Device servers - - See :py:class:`AbstractHierarchy` for implementation details, wrapping done - to prevent pollution of PyTango device namespace """ + children_type = Dict[str, Dict[str, Union[DeviceProxy, "children_type"]]] + child_filter_func_type = Callable[[str, str], bool] + child_filter_input_type = Union[str, Enum] + def __init__(self): - self._hierarchy: Optional[AbstractHierarchy] = None + self._children = {} + self._parent = None + self._proxies = {} + + def __str__(self) -> str: + return f"Hierarchy of {self._child_property_name}" @staticmethod def _find_parent(device_name: str, child_property: str): @@ -91,44 +104,355 @@ class AbstractHierarchyDevice: exc_info=True, ) - self._hierarchy = AbstractHierarchy(child_property, children, parent, proxies) + self._child_property_name = child_property + self._children = {} + self._parent = None - def children(self, depth: int = 1) -> AbstractHierarchy.children_type: - return self._hierarchy.children(depth) + # Store proxies internally upon creation and only pass references to + # them. Ensures only single instance of DeviceProxy is created per + # device. + self._proxies = proxies or {} - def children_names( + if parent: + self._parent = create_device_proxy(parent) + + if not children: + children = [] + for child in children: + self._children[child] = None + + @staticmethod + def _get_filter(filter_type: HierarchyMatchingFilter) -> child_filter_func_type: + """Get specific filter function for exact or contains matching + + These functions ensure the matching is case-insensitive + + :param exact: Bool to determine type of filter function + :return: Function to match exactly or by contains + """ + + def f_regex(name: str, filter_str: str) -> bool: + return fnmatch.fnmatch(name.casefold(), filter_str.casefold()) + + def f_exact(name: str, filter_str: str) -> bool: + return name.casefold() == filter_str.casefold() + + def f_find(name: str, filter_str: str) -> bool: + return name.casefold().find(filter_str.casefold()) >= 0 + + match = f_find + if filter_type == HierarchyMatchingFilter.EXACT: + match = f_exact + elif filter_type == HierarchyMatchingFilter.REGEX: + match = f_regex + + return match + + def _get_or_create_proxy(self, device: str) -> DeviceProxy: + """Create a proxy if it does not yet exist otherwise pass reference + + :param device: full device name + :return: Reference to DeviceProxy from internal cache + """ + + device = device.casefold() + + if not self._proxies.get(device): + self._proxies[device] = create_device_proxy(device, 30000) + + return self._proxies[device] + + def _get_children(self, child: str, depth: int) -> children_type: + """Recursively create dict structure of DeviceProxy and children + + Built a depth-first recursive dict structure terminating at + :attr:`~depth` by reading the :attr:`~self._child_property` property + of each proxy. depth may be set to -1 for indefinite recursion + + Resulting datastructure of format + _children_ = { + device_string: { + 'proxy': DeviceProxy(device_string), + 'children': _children_ + }, + ... + } + + :warning: Makes no attempt to detect cycles in the tree and if they + exist will never terminate and consume infinite memory! + :param child: full device name string from the current child + :param depth: Maximum depth to recurse to, -1 for indefinite + :return: recursive datastructure of proxies and children as described + """ + + proxy = self._get_or_create_proxy(child) + + # TODO(Corne): Assert if this value changes even if the property + # has become persistent / immutable with the original value + # for the given device. If so resolve potential issues + children = proxy.get_property(self._child_property_name)[ + self._child_property_name + ] + + if len(children) == 0 or depth == 0: + return {"proxy": proxy, "children": {}} + + # Perform depth-first recursion to build tree of children and their + # children + proxies = {} + for child in children: + child = child.casefold() + + try: + proxies[child] = self._get_children(child, depth - 1) + except Exception as exc: + raise NotFoundException( + f"Could not obtain proxy to child {child} of parent {proxy.dev_name()}" + ) from exc + + return {"proxy": proxy, "children": proxies} + + def children(self, depth: int = 1) -> children_type: + """Retrieve DeviceProxies of children up to depth + + :param depth: Maximum steps of traversing children, -1 for unlimited + :raises tango.NonDbDevice: Raised if the child device does not exist in + the tango database + :raises tango.ConnectionFailed: Raised if connecting to the tango + database failed + :raises tango.CommunicationFailed: Raised if communication with the + tango database failed + :raises tango.DevFailed: Raised if the tango database encounters an + error + :return: Dict of DeviceProxies, children and grandchildren up to + :attr:`~depth` of recursive structure _children_ = { + device_string: { + 'proxy': DeviceProxy(device_string), + 'children': _children_ + }, + ... + } + """ + + children = {} + for child in self._children: + child = child.casefold() + children[child] = self._get_children(child, depth - 1) + + return children + + def _children_names( self, child_filter_str: str, - matching_filter: HierarchyMatchingFilter = HierarchyMatchingFilter.Find, + child_filter_func: child_filter_func_type, + children: children_type, ) -> List[str]: - return self._hierarchy.children_names(child_filter_str, matching_filter) + """Return children names matching child_filter substring - def child( + :param child_filter_str: Substring of the device to retrieve device names list + for + :param children: Recursive dictionary of children of the caller hierarchy + device + """ + + device_names = [] + child_filter_str = child_filter_str.casefold() + + for name, data in children.items(): + name = name.casefold() + if child_filter_func(name, child_filter_str): + device_names.append(name) + + if not data["children"]: + continue + + device_names.extend( + self._children_names( + child_filter_str, child_filter_func, data["children"] + ) + ) + + return device_names + + def children_names( + self, + child_filter: child_filter_input_type, + matching_filter: HierarchyMatchingFilter = HierarchyMatchingFilter.FIND, + ) -> List[str]: + """Retrieve Device children names matching child_filter substring + + :param child_filter: Substring of the device to retrieve device names list + for + :param filter_type: Type of filter such as exact, substring or regex + :return A list of device names matching the child filter substring + """ + if isinstance(child_filter, Enum): + child_filter = child_filter.value + + return self._children_names( + child_filter, self._get_filter(matching_filter), self.children(depth=-1) + ) + + def _child( self, child_filter_str: str, - matching_filter: HierarchyMatchingFilter = HierarchyMatchingFilter.Find, + child_filter_func: child_filter_func_type, + children: children_type, + ) -> Optional[DeviceProxy]: + """Recurse :attr:`~children` to find device and return it + + Only returns single device or None + """ + + for name, data in children.items(): + if child_filter_func(name, child_filter_str): + return data["proxy"] + + if not data["children"]: + continue + + result = self._child(child_filter_str, child_filter_func, data["children"]) + if result: + return result + + def child( + self, + child_filter: child_filter_input_type, + matching_filter: HierarchyMatchingFilter = HierarchyMatchingFilter.FIND, ) -> Optional[DeviceProxy]: - return self._hierarchy.child(child_filter_str, matching_filter) + """Retrieve DeviceProxy of child matching full name :attr:`~filter` + + :param child_filter: Full name of the device to retrieve DeviceProxy for + :param filter_type: Type of filter such as exact, substring or regex + :raises ValueError: Raised if the child could not be found in the + hierarchy + :raises tango.NonDbDevice: Raised if the child device does not exist in + the tango database + :raises tango.ConnectionFailed: Raised if connecting to the tango + database failed + :raises tango.CommunicationFailed: Raised if communication with the + tango database failed + :raises tango.DevFailed: Raised if the tango database encounters an + error + :return: Return DeviceProxy of child device + """ + if isinstance(child_filter, Enum): + child_filter = child_filter.value + + child = self._child( + child_filter, self._get_filter(matching_filter), self.children(depth=-1) + ) + + if not child: + raise NotFoundException( + ( + f"Could not find child in {self} matching {child_filter} " + f"using filter {matching_filter.name}" + ) + ) + return child def branch_child( self, - child_filter_str: str, - matching_filter: HierarchyMatchingFilter = HierarchyMatchingFilter.Find, + child_filter: child_filter_input_type, + matching_filter: HierarchyMatchingFilter = HierarchyMatchingFilter.FIND, ) -> Optional[DeviceProxy]: - return self._hierarchy.branch_child(child_filter_str, matching_filter) + """Retrieve Device child matching child_filter substring located in + the same branch of the hierarchy + + :param child_filter: Substring of the device to retrieve device names list + for + :param matching_filter: Type of filter such as exact, substring or regex + :return A device proxy whose name matches the child_filter substring + """ + try: + return self.child(child_filter, matching_filter) + except NotFoundException: + pass + + _hierarchy = AbstractHierarchyDevice() + _hierarchy.init(self.parent(), self._child_property_name) + + return _hierarchy.branch_child(child_filter, matching_filter) def branch_children_names( self, - child_filter_str: str, - matching_filter: HierarchyMatchingFilter = HierarchyMatchingFilter.Find, + child_filter: child_filter_input_type, + matching_filter: HierarchyMatchingFilter = HierarchyMatchingFilter.FIND, ) -> List[str]: - return self._hierarchy.branch_children_names(child_filter_str, matching_filter) + """Retrieve Device children names matching child_filter substring located in + the same branch of the hierarchy + + :param child_filter: Substring of the device to retrieve device names list + for + :param filter_type: Type of filter such as exact, substring or regex + :return A list of device names matching the child filter substring + """ + if isinstance(child_filter, Enum): + child_filter = child_filter.value + + children = self.children_names(child_filter, matching_filter) - def parent(self): - return self._hierarchy.parent() + if len(children) > 0: + return children + + _hierarchy = AbstractHierarchyDevice() + _hierarchy.init(self.parent(), self._child_property_name) + + return _hierarchy.branch_children_names(child_filter, matching_filter) + + def parent(self) -> str: + """Return the parent device name. Requires the parent to be unique. + + :return: The device name of the parent if any + """ + + if self._parent: + return self._parent.dev_name().casefold() + + return None def read_parent_attribute(self, attribute: str) -> any: - return self._hierarchy.read_attribute(attribute) + """Allow to read attribute from parent without direct access + + :param attribute: The attribute to read from the parent, can be RW + :return: The data from the attribute + :raises tango.DevFailed: The exception from the DeviceProxy if raised + """ + if not self._parent: + return None + + return getattr(self._parent, attribute) + + def parent_state(self) -> Optional[DevState]: + """Return the state of the parent without direct access + + :return: The state of the parent if there is one + :raises tango.DevFailed: The exception from the DeviceProxy if raised + """ + if not self._parent: + return None + + return self._parent.state() + + def walk_down(self, func, depth: int = -1): + """Execute the given function on every node in the tree downwards from the root, + depth first.""" + + def _walk(children, func): + for child in children.values(): + func(child["proxy"]) + _walk(child["children"], func) + + _walk(self.children(depth), func) + + def walk_up(self, func, depth: int = -1): + """Execute the given function on every node in the tree upwards from the leaves, + depth first.""" + + def _walk(children, func): + for child in reversed(children.values()): + _walk(child["children"], func) + func(child["proxy"]) - def parent_state(self) -> DevState: - return self._hierarchy.state() + _walk(self.children(depth), func) diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py index d2d6c76386ca30762849ca6463e02745b717a206..e8a27d8af843259380c8f8c7973c8acb5fda0c50 100644 --- a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/lofar_device.py @@ -251,8 +251,8 @@ class LOFARDevice(Device): return pprint.pformat(self.control.children(-1)) @command(dtype_out=str) - def get_parents(self): - return pprint.pformat(self.control.parents(), depth=1, indent=4, width=60) + def get_parent(self): + return pprint.pformat(self.control.parent(), depth=1, indent=4, width=60) @log_exceptions() def init_device(self): diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/power_hierarchy.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/power_hierarchy.py index dba7167b76f78c99944a96b2525865137cd69e40..4874b48a5fe60342e85db624b6bf6330749b5b28 100644 --- a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/power_hierarchy.py +++ b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/power_hierarchy.py @@ -21,10 +21,10 @@ from tangostationcontrol.devices.device_decorators import suppress_exceptions logger = logging.getLogger() -__all__ = ["PowerHierarchy"] +__all__ = ["PowerHierarchyDevice"] -class PowerHierarchy(AbstractHierarchyDevice): +class PowerHierarchyDevice(AbstractHierarchyDevice): """Power Hierarchy""" POWER_CHILD_PROPERTY = "Power_Children" @@ -89,7 +89,7 @@ class PowerHierarchy(AbstractHierarchyDevice): device.power_hardware_on() logger.info(f"Powering on {device}: Succesful: Clock") - self._hierarchy.walk_down(boot_to_hibernate, -1) + self.walk_down(boot_to_hibernate, -1) # Return the suppressed exceptions return boot_to_hibernate.exceptions @@ -110,7 +110,7 @@ class PowerHierarchy(AbstractHierarchyDevice): device.power_hardware_on() logger.info(f"Powering on {device}: Succesful: Uniboards") - self._hierarchy.walk_down(boot_to_standby, -1) + self.walk_down(boot_to_standby, -1) # Return the suppressed exceptions return boot_to_standby.exceptions @@ -129,7 +129,7 @@ class PowerHierarchy(AbstractHierarchyDevice): device.power_hardware_off() logger.info(f"Powering off {device}: Succesful: Uniboards") - self._hierarchy.walk_up(power_off_from_standby, -1) + self.walk_up(power_off_from_standby, -1) # Return the suppressed exceptions return power_off_from_standby.exceptions @@ -160,7 +160,7 @@ class PowerHierarchy(AbstractHierarchyDevice): device.power_hardware_on() logger.info(f"Powering on {device}: Succesful: User image") - self._hierarchy.walk_down(power_on, -1) + self.walk_down(power_on, -1) # now transition to on @suppress_exceptions(self.continue_on_failure) @@ -168,7 +168,7 @@ class PowerHierarchy(AbstractHierarchyDevice): def boot_to_on(device: DeviceProxy): self._boot_device(device) - self._hierarchy.walk_down(boot_to_on, -1) + self.walk_down(boot_to_on, -1) # power on antennas (now that AntennaField is booted) @suppress_exceptions(self.continue_on_failure) @@ -180,7 +180,7 @@ class PowerHierarchy(AbstractHierarchyDevice): # TODO(JDM): Report which antennas logger.info(f"Powering on {device}: Succesful: Antennas") - self._hierarchy.walk_down(power_antennas_on, -1) + self.walk_down(power_antennas_on, -1) # Return the suppressed exceptions return ( @@ -203,7 +203,7 @@ class PowerHierarchy(AbstractHierarchyDevice): self._shutdown_device(device) - self._hierarchy.walk_up(power_off_from_on, -1) + self.walk_up(power_off_from_on, -1) # now turn off power to power-hungry hardware @suppress_exceptions(self.continue_on_failure) @@ -226,7 +226,7 @@ class PowerHierarchy(AbstractHierarchyDevice): device.power_hardware_off() logger.info(f"Powering off {device}: Succesful: Factory image") - self._hierarchy.walk_up(power_off, -1) + self.walk_up(power_off, -1) # Return the suppressed exceptions return power_off_from_on.exceptions + power_off.exceptions diff --git a/tangostationcontrol/tangostationcontrol/devices/station_manager.py b/tangostationcontrol/tangostationcontrol/devices/station_manager.py index a0845793b7f7cebd3e340ff0cd87042a37fb2f46..2d1878913478c8d637f7f4ea0d55aa2dc5264330 100644 --- a/tangostationcontrol/tangostationcontrol/devices/station_manager.py +++ b/tangostationcontrol/tangostationcontrol/devices/station_manager.py @@ -18,7 +18,7 @@ from tangostationcontrol.common.lofar_logging import exception_to_str from tangostationcontrol.common.lofar_logging import log_exceptions from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice from tangostationcontrol.devices.base_device_classes.power_hierarchy import ( - PowerHierarchy, + PowerHierarchyDevice, ) from tangostationcontrol.common.states import ( @@ -119,7 +119,7 @@ class StationManager(LOFARDevice): def _initialise_power_hierarchy(self): """Create and initialise the PowerHierarchy to manage the power sequence""" # create a power hierarchy device instance - self.stationmanager_ph = PowerHierarchy() + self.stationmanager_ph = PowerHierarchyDevice() self.stationmanager_ph.init( self.get_name(), continue_on_failure=self.Suppress_State_Transition_Failures ) diff --git a/tangostationcontrol/test/devices/base_device_classes/test_hierarchy.py b/tangostationcontrol/test/devices/base_device_classes/test_hierarchy.py deleted file mode 100644 index fa1bda6224918df998b50c0d1d5e07fac630d529..0000000000000000000000000000000000000000 --- a/tangostationcontrol/test/devices/base_device_classes/test_hierarchy.py +++ /dev/null @@ -1,453 +0,0 @@ -# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy) -# SPDX-License-Identifier: Apache-2.0 - -import copy -import logging -from typing import Callable -from typing import Dict -from typing import List -from unittest.mock import Mock, patch - -from tango import DevState, DeviceProxy - -from tangostationcontrol.devices.base_device_classes import hierarchy - -from test.devices import device_base - -logger = logging.getLogger() - - -class TestAbstractHierarchy(device_base.DeviceTestCase): - class ConcreteHierarchy(hierarchy.AbstractHierarchy): - pass - - TEST_PROPERTY_NAME = "Control_Children" - - def test_create_instance(self): - """Test default values and if we can create an instance""" - - test_hierarchy = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME - ) - - self.assertEqual({}, test_hierarchy._children) - self.assertEqual(None, test_hierarchy._parent) - - self.assertEqual({}, test_hierarchy.children()) - self.assertEqual(None, test_hierarchy.parent()) - - def test_get_or_create_proxy_cache(self): - """Test if get_or_create_proxy caches without duplicates""" - - test = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, children=None, parent=None - ) - - def test_parent_get_name(self): - """Read the name of the parent through mocking DeviceProxy.dev_name""" - - name_station = "stat/stationmanager/1" - - mock_dev_station_name = Mock() - mock_dev_station_name.dev_name.return_value = name_station - - self.device_proxy_mock["object"].side_effect = [mock_dev_station_name] - - test = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, - children=None, - parent=name_station, - ) - - self.assertEqual(name_station, test.parent()) - - def test_parent_read_attribute(self): - """Read an attribute from the parent and get the mocked data""" - - attribute_value = "0.0.1" - - name_station = "stat/stationmanager/1" - # Mock DeviceProxy attribute - self.device_proxy_mock[ - "object" - ].return_value.FPGA_firmware_version_R = attribute_value - # Mock DeviceProxy dev_name - self.device_proxy_mock[ - "object" - ].return_value.dev_name.return_value = name_station - - test = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, children=None, parent=name_station - ) - - self.assertEqual( - attribute_value, - test.read_attribute("FPGA_firmware_version_R"), - ) - - def test_parent_read_attribute_no_parent(self): - """Ensure that read_attribute returns None if there is no parent""" - - test = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, children=None, parent=None - ) - - self.assertIsNone(test.read_attribute("FPGA_firmware_version_R")) - - def test_parent_get_state(self): - """Ensure that we can get parent state""" - - name_station = "stat/stationmanager/1" - # Mock the state - self.device_proxy_mock[ - "object" - ].return_value.state.return_value = DevState.FAULT - self.device_proxy_mock[ - "object" - ].return_value.dev_name.return_value = name_station - - test = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, children=None, parent=name_station - ) - - self.assertEqual(DevState.FAULT, test.state()) - - def test_parent_get_state_no_parent(self): - """Ensure that state returns None if no parent""" - - test = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, children=None, parent=None - ) - - self.assertIsNone(test.state()) - - def children_test_base( - self, - property_name: str, - direct_children: List[str], - children_properties: List[Dict[str, List[str]]], - fn: Callable[[ConcreteHierarchy], None], - ): - """Base function for testing children() method - - :param fn: Callable to perform actual tests in - """ - - test_hierarchy = TestAbstractHierarchy.ConcreteHierarchy( - property_name, direct_children, None - ) - - self.device_proxy_mock[ - "object" - ].return_value.get_property.side_effect = children_properties - - fn(test_hierarchy) - - def test_get_children_depth_1_no_filter(self): - """Test finding all children at max depth of 1""" - - depth = 1 - - test_children = ["stat/sdp/1", "stat/sdp/2"] - test_property_calls = [ - {self.TEST_PROPERTY_NAME: ["stat/xst/1"]}, # sdp/1 - {self.TEST_PROPERTY_NAME: ["stat/xstbeam/1"]}, # xst/1 - {self.TEST_PROPERTY_NAME: []}, # xstbeam,/1 - {self.TEST_PROPERTY_NAME: []}, # sdp/2 - ] - - def test_fn(test_hierarchy: TestAbstractHierarchy.ConcreteHierarchy): - test = test_hierarchy.children(depth=depth) - - # Test if all keys from test_children are in the children() dict - self.assertSetEqual(set(test_children), set(test.keys())) - - # test if for each child there are no further children - for child in test_children: - self.assertDictEqual({}, test[child]["children"]) - - self.children_test_base( - self.TEST_PROPERTY_NAME, test_children, test_property_calls, test_fn - ) - - def test_get_children_depth_2_no_filter(self): - """Test recursively finding children limited to depth 2""" - - depth = 2 - - test_children = ["stat/sdp/1", "stat/sdp/2"] - # Calls to get_property follow depth-first order - test_property_calls = [ - {self.TEST_PROPERTY_NAME: ["stat/antennafield/1"]}, # sdp/1 - {self.TEST_PROPERTY_NAME: ["stat/tilebeam/1"]}, # antennafield/1 - {self.TEST_PROPERTY_NAME: []}, # tilebeam/1 - {self.TEST_PROPERTY_NAME: []}, # sdp/2 - ] - - def test_fn(test_hierarchy: TestAbstractHierarchy.ConcreteHierarchy): - test = test_hierarchy.children(depth=depth) - - # Test that antennafield is included - self.assertIsNotNone( - test["stat/sdp/1"]["children"].get("stat/antennafield/1") - ) - - # Test that antennafield child is removed due to depth limit - self.assertDictEqual( - {}, test["stat/sdp/1"]["children"]["stat/antennafield/1"]["children"] - ) - - self.children_test_base( - self.TEST_PROPERTY_NAME, test_children, test_property_calls, test_fn - ) - - def test_child_exists(self): - """Test finding existing child""" - - test_hierarchy = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, ["stat/child/1", "stat/child/2"], ["stat/parent/1"] - ) - - # try to find a child using an exact filter match - _ = test_hierarchy.child( - "stat/child/1", hierarchy.HierarchyMatchingFilter.Exact - ) - - # try to find a child using a regex - _ = test_hierarchy.child( - "stat/child/*", hierarchy.HierarchyMatchingFilter.Regex - ) - - def test_child_does_not_exist(self): - """Test finding a non-existing child to raise an exception""" - - test_hierarchy = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, ["stat/child/1", "stat/child/2"], ["stat/parent/1"] - ) - - # try to find a non-existing child using an exact filter match - with self.assertRaises(hierarchy.NotFoundException): - _ = test_hierarchy.child( - "stat/notexist/1", hierarchy.HierarchyMatchingFilter.Exact - ) - - # try to find a non-existing child using a regex - with self.assertRaises(hierarchy.NotFoundException): - _ = test_hierarchy.child( - "stat/notexist/*", hierarchy.HierarchyMatchingFilter.Regex - ) - - def test_unique_parent_exists(self): - """Test finding the uniquely defined parent""" - - test_hierarchy = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, ["stat/child/1", "stat/child/2"], ["stat/parent/1"] - ) - - _ = test_hierarchy.parent() - - TEST_CHILDREN_ROOT = ["stat/ccd/1", "stat/psoc/1", "stat/antennafield/1"] - - TEST_CHILDREN_ANTENNAFIELD = [ - "stat/sdp/1", - "stat/tilebeam/1", - "stat/digitalbeam/1", - "stat/aps/1", - ] - TEST_CHILDREN_SDP = ["stat/xst/1", "stat/sst/1", "stat/bst/1"] - TEST_CHILDREN_APS = [ - "stat/apsct/1", - "stat/apspu/1", - "stat/unb2/1", - "stat/recvh/1", - "stat/recvl/1", - ] - - # Calls to get_property follow depth-first order - TEST_GET_PROPERTY_CALLS = [ - {TEST_PROPERTY_NAME: []}, # cdd/1 - {TEST_PROPERTY_NAME: []}, # psoc/1 - {TEST_PROPERTY_NAME: TEST_CHILDREN_ANTENNAFIELD}, # antennafield/1 - {TEST_PROPERTY_NAME: TEST_CHILDREN_SDP}, # sdp/1 - {TEST_PROPERTY_NAME: []}, # xst/1 - {TEST_PROPERTY_NAME: []}, # sst/1 - {TEST_PROPERTY_NAME: []}, # bst/1 - {TEST_PROPERTY_NAME: []}, # tilebeam/1 - {TEST_PROPERTY_NAME: ["stat/beamlet/1"]}, # digitalbeam/1 - {TEST_PROPERTY_NAME: []}, # beamlet/1 - {TEST_PROPERTY_NAME: TEST_CHILDREN_APS}, # aps/1 - {TEST_PROPERTY_NAME: []}, # apsct/1 - {TEST_PROPERTY_NAME: []}, # apspu/1 - {TEST_PROPERTY_NAME: []}, # unb2/1 - {TEST_PROPERTY_NAME: []}, # recvh/1 - {TEST_PROPERTY_NAME: []}, # recvl/1 - ] - - def test_get_children_depth_elaborate_no_filter(self): - """Create a 3 levels deep hierarchy with ~15 devices""" - - def test_fn(test_hierarchy: TestAbstractHierarchy.ConcreteHierarchy): - test = test_hierarchy.children(depth=-1) - - # Test ccd and psoc have no children - self.assertDictEqual({}, test["stat/ccd/1"]["children"]) - self.assertDictEqual({}, test["stat/psoc/1"]["children"]) - - # Test antennafield has 4 children - self.assertEqual(4, len(test["stat/antennafield/1"]["children"])) - - # Test sdp has 3 children - self.assertEqual( - 3, - len(test["stat/antennafield/1"]["children"]["stat/sdp/1"]["children"]), - ) - - # Test all childs of sdp have no children - for sdp_child in self.TEST_CHILDREN_SDP: - self.assertDictEqual( - {}, - test["stat/antennafield/1"]["children"]["stat/sdp/1"]["children"][ - sdp_child - ]["children"], - ) - - # Test tilebeam has no children - self.assertDictEqual( - {}, - test["stat/antennafield/1"]["children"]["stat/tilebeam/1"]["children"], - ) - - # Test digitalbeam has 1 child - self.assertEqual( - 1, - len( - test["stat/antennafield/1"]["children"]["stat/digitalbeam/1"][ - "children" - ] - ), - ) - - # Test beamlet has no children - self.assertDictEqual( - {}, - test["stat/antennafield/1"]["children"]["stat/digitalbeam/1"][ - "children" - ]["stat/beamlet/1"]["children"], - ) - - # Test aps has 5 children - self.assertEqual( - 5, - len(test["stat/antennafield/1"]["children"]["stat/aps/1"]["children"]), - ) - - # Test all childs of aps have no children - for aps_child in self.TEST_CHILDREN_APS: - self.assertDictEqual( - {}, - test["stat/antennafield/1"]["children"]["stat/aps/1"]["children"][ - aps_child - ]["children"], - ) - - self.children_test_base( - self.TEST_PROPERTY_NAME, - self.TEST_CHILDREN_ROOT, - self.TEST_GET_PROPERTY_CALLS, - test_fn, - ) - - def test_get_child_filter(self): - """Test we can find every device""" - - test_hierarchy = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, self.TEST_CHILDREN_ROOT, None - ) - - self.device_proxy_mock[ - "object" - ].return_value.get_property.side_effect = self.TEST_GET_PROPERTY_CALLS - - all_children = copy.copy(self.TEST_CHILDREN_ROOT) - all_children.extend(self.TEST_CHILDREN_ANTENNAFIELD) - all_children.extend(self.TEST_CHILDREN_SDP) - all_children.extend(self.TEST_CHILDREN_APS) - - # Find all proxies for each child and match that it is the same - # object as cached in `_proxies` - for child in all_children: - result = test_hierarchy.child( - child, filter_type=hierarchy.HierarchyMatchingFilter.Exact - ) - self.assertEqual(test_hierarchy._proxies[child], result) - - self.device_proxy_mock[ - "object" - ].return_value.get_property.side_effect = self.TEST_GET_PROPERTY_CALLS - - class FakeDeviceProxy: - """A stateful fake to return the right values for the right device regardless of calling order.""" - - def __init__(self, name, timeout): - self.name = name - - def dev_name(self): - return self.name - - def get_property(self, prop_name): - children = { - "1": ["1.1", "1.2"], - "2": ["2.1", "2.2"], - "2.1": ["2.1.1"], - } - return { - TestAbstractHierarchy.TEST_PROPERTY_NAME: children.get(self.name, []) - } - - @patch.object(hierarchy, "create_device_proxy", wraps=FakeDeviceProxy) - def test_walk_down(self, m_create_device_proxy): - """Test whether walking down the hierarchy tree (root -> leaves) works.""" - - test_hierarchy = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, ["1", "2", "3"], None - ) - - def walker(device: DeviceProxy): - walk_order.append(device.dev_name()) - - # walk one level - walk_order = [] - test_hierarchy.walk_down(walker, depth=1) - self.assertListEqual(["1", "2", "3"], walk_order) - - # walk whole tree - walk_order = [] - test_hierarchy.walk_down(walker, depth=-1) - self.assertListEqual( - ["1", "1.1", "1.2", "2", "2.1", "2.1.1", "2.2", "3"], - walk_order, - ) - - @patch.object(hierarchy, "create_device_proxy", wraps=FakeDeviceProxy) - def test_walk_up(self, m_create_device_proxy): - """Test whether walking up the hierarchy (leaves -> root) tree works.""" - - test_hierarchy = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, ["1", "2", "3"], None - ) - - def walker(device: DeviceProxy): - walk_order.append(device.dev_name()) - - # walk one level - walk_order = [] - test_hierarchy.walk_up(walker, depth=1) - self.assertListEqual(["3", "2", "1"], walk_order) - - # walk whole tree - walk_order = [] - test_hierarchy.walk_up(walker, depth=-1) - self.assertListEqual( - ["3", "2.2", "2.1.1", "2.1", "2", "1.2", "1.1", "1"], - walk_order, - ) diff --git a/tangostationcontrol/test/devices/base_device_classes/test_hierarchy_device.py b/tangostationcontrol/test/devices/base_device_classes/test_hierarchy_device.py index f32de04f7a931177f63401ec37199c57db13fe85..ce6fd9fdcbcbeab308fd9c9d706e34e35f3256f9 100644 --- a/tangostationcontrol/test/devices/base_device_classes/test_hierarchy_device.py +++ b/tangostationcontrol/test/devices/base_device_classes/test_hierarchy_device.py @@ -1,25 +1,27 @@ # Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy) # SPDX-License-Identifier: Apache-2.0 +import copy import unittest +from unittest.mock import Mock, patch +from typing import List, Dict, Callable -from tango import DeviceProxy +from test.devices import device_base + +from tango import DeviceProxy, DevState from tango.server import command from tango.server import device_property from tango.test_context import DeviceTestContext -from tangostationcontrol.devices.base_device_classes import hierarchy from tangostationcontrol.devices.base_device_classes import hierarchy_device from tangostationcontrol.devices.base_device_classes import lofar_device -from test.devices import device_base - class TestHierarchyDevice(device_base.DeviceTestCase): def setUp(self): # DeviceTestCase setUp patches lofar_device DeviceProxy super(TestHierarchyDevice, self).setUp() - self.hierarchy_mock = self.device_proxy_patch(hierarchy) + self.hierarchy_mock = self.device_proxy_patch(hierarchy_device) @unittest.mock.patch.object(hierarchy_device, "Database") def test_get_direct_child_device(self, m_database): @@ -55,3 +57,457 @@ class TestHierarchyDevice(device_base.DeviceTestCase): {TEST_PROPERTY_CHILDREN: []}, ] proxy.has_child_proxy() + + class ConcreteHierarchy(hierarchy_device.AbstractHierarchyDevice): + pass + + TEST_PROPERTY_NAME = "Control_Children" + + def test_create_instance(self): + """Test default values and if we can create an instance""" + + test_hierarchy = TestHierarchyDevice.ConcreteHierarchy() + test_hierarchy.init("MockDevice", self.TEST_PROPERTY_NAME) + + self.assertEqual({}, test_hierarchy._children) + self.assertEqual(None, test_hierarchy._parent) + + self.assertEqual({}, test_hierarchy.children()) + self.assertEqual(None, test_hierarchy.parent()) + + def test_get_or_create_proxy_cache(self): + """Test if get_or_create_proxy caches without duplicates""" + + test = TestHierarchyDevice.ConcreteHierarchy() + test.init("TestDevice", self.TEST_PROPERTY_NAME) + + def test_parent_get_name(self): + """Read the name of the parent through mocking DeviceProxy.dev_name""" + + name_station = "stat/stationmanager/1" + + mock_dev_station_name = Mock() + mock_dev_station_name.dev_name.return_value = name_station + + self.device_proxy_mock["object"].side_effect = [mock_dev_station_name] + + test = TestHierarchyDevice.ConcreteHierarchy() + test.init("TestDevice", self.TEST_PROPERTY_NAME) + test._children = None + test._parent = mock_dev_station_name + + self.assertEqual(name_station, test.parent()) + + def test_parent_read_attribute(self): + """Read an attribute from the parent and get the mocked data""" + + attribute_value = "0.0.1" + + name_station = "stat/stationmanager/1" + + mock_dev_station_name = Mock() + mock_dev_station_name.dev_name.return_value = name_station + mock_dev_station_name.FPGA_firmware_version_R = attribute_value + + test = TestHierarchyDevice.ConcreteHierarchy() + test.init("TestDevice", self.TEST_PROPERTY_NAME) + test._parent = mock_dev_station_name + + self.assertEqual( + attribute_value, + test.read_parent_attribute("FPGA_firmware_version_R"), + ) + + def test_parent_read_attribute_no_parent(self): + """Ensure that read_attribute returns None if there is no parent""" + + test = TestHierarchyDevice.ConcreteHierarchy() + test.init("TestDevice", self.TEST_PROPERTY_NAME) + + self.assertIsNone(test.read_parent_attribute("FPGA_firmware_version_R")) + + def test_parent_get_state(self): + """Ensure that we can get parent state""" + + name_station = "stat/stationmanager/1" + # Mock the state + mock_dev_station = Mock() + mock_dev_station.dev_name.return_value = name_station + mock_dev_station.state.return_value = DevState.FAULT + + test = TestHierarchyDevice.ConcreteHierarchy() + test.init("TestDevice", self.TEST_PROPERTY_NAME) + test._parent = mock_dev_station + + self.assertEqual(DevState.FAULT, test.parent_state()) + + def test_parent_get_state_no_parent(self): + """Ensure that state returns None if no parent""" + + test = TestHierarchyDevice.ConcreteHierarchy() + test.init("TestDevice", self.TEST_PROPERTY_NAME) + + self.assertIsNone(test.parent_state()) + + def children_test_base( + self, + property_name: str, + direct_children: List[str], + children_properties: List[Dict[str, List[str]]], + fn: Callable[[ConcreteHierarchy], None], + ): + """Base function for testing children() method + + :param fn: Callable to perform actual tests in + """ + + test_hierarchy = TestHierarchyDevice.ConcreteHierarchy() + test_hierarchy.init("TestDevice", property_name) + mock_children = {} + for child in direct_children: + mock_children[child] = None + test_hierarchy._children = mock_children + + self.device_proxy_mock[ + "object" + ].return_value.get_property.side_effect = children_properties + + fn(test_hierarchy) + + def test_get_children_depth_1_no_filter(self): + """Test finding all children at max depth of 1""" + + depth = 1 + + test_children = ["stat/sdp/1", "stat/sdp/2"] + test_property_calls = [ + {self.TEST_PROPERTY_NAME: ["stat/xst/1"]}, # sdp/1 + {self.TEST_PROPERTY_NAME: ["stat/xstbeam/1"]}, # xst/1 + {self.TEST_PROPERTY_NAME: []}, # xstbeam,/1 + {self.TEST_PROPERTY_NAME: []}, # sdp/2 + ] + + def test_fn(test_hierarchy: TestHierarchyDevice.ConcreteHierarchy): + test = test_hierarchy.children(depth=depth) + + # Test if all keys from test_children are in the children() dict + self.assertSetEqual(set(test_children), set(test.keys())) + + # test if for each child there are no further children + for child in test_children: + self.assertDictEqual({}, test[child]["children"]) + + self.children_test_base( + self.TEST_PROPERTY_NAME, test_children, test_property_calls, test_fn + ) + + def test_get_children_depth_2_no_filter(self): + """Test recursively finding children limited to depth 2""" + + depth = 2 + + test_children = ["stat/sdp/1", "stat/sdp/2"] + # Calls to get_property follow depth-first order + test_property_calls = [ + {self.TEST_PROPERTY_NAME: ["stat/antennafield/1"]}, # sdp/1 + {self.TEST_PROPERTY_NAME: ["stat/tilebeam/1"]}, # antennafield/1 + {self.TEST_PROPERTY_NAME: []}, # tilebeam/1 + {self.TEST_PROPERTY_NAME: []}, # sdp/2 + ] + + def test_fn(test_hierarchy: TestHierarchyDevice.ConcreteHierarchy): + test = test_hierarchy.children(depth=depth) + + # Test that antennafield is included + self.assertIsNotNone( + test["stat/sdp/1"]["children"].get("stat/antennafield/1") + ) + + # Test that antennafield child is removed due to depth limit + self.assertDictEqual( + {}, test["stat/sdp/1"]["children"]["stat/antennafield/1"]["children"] + ) + + self.children_test_base( + self.TEST_PROPERTY_NAME, test_children, test_property_calls, test_fn + ) + + def test_child_exists(self): + """Test finding existing child""" + + test_hierarchy = TestHierarchyDevice.ConcreteHierarchy() + test_hierarchy.init("TestDevice", self.TEST_PROPERTY_NAME) + mock_children = {} + for child in ["stat/child/1", "stat/child/2"]: + mock_children[child] = None + test_hierarchy._children = mock_children + mock_parent = Mock() + mock_parent.dev_name.return_value = "stat/parent/1" + test_hierarchy._parents = [mock_parent] + + # try to find a child using an exact filter match + _ = test_hierarchy.child( + "stat/child/1", hierarchy_device.HierarchyMatchingFilter.EXACT + ) + + # try to find a child using a regex + _ = test_hierarchy.child( + "stat/child/*", hierarchy_device.HierarchyMatchingFilter.REGEX + ) + + def test_child_does_not_exist(self): + """Test finding a non-existing child to raise an exception""" + + test_hierarchy = TestHierarchyDevice.ConcreteHierarchy() + test_hierarchy.init("TestDevice", self.TEST_PROPERTY_NAME) + mock_children = {} + for child in ["stat/child/1", "stat/child/2"]: + mock_children[child] = None + test_hierarchy._children = mock_children + mock_parent = Mock() + mock_parent.dev_name.return_value = "stat/parent/1" + test_hierarchy._parents = [mock_parent] + + # try to find a non-existing child using an exact filter match + with self.assertRaises(hierarchy_device.NotFoundException): + _ = test_hierarchy.child( + "stat/notexist/1", hierarchy_device.HierarchyMatchingFilter.EXACT + ) + + # try to find a non-existing child using a regex + with self.assertRaises(hierarchy_device.NotFoundException): + _ = test_hierarchy.child( + "stat/notexist/*", hierarchy_device.HierarchyMatchingFilter.REGEX + ) + + def test_unique_parent_exists(self): + """Test finding the uniquely defined parent""" + + test_hierarchy = TestHierarchyDevice.ConcreteHierarchy() + test_hierarchy.init("TestDevice", self.TEST_PROPERTY_NAME) + mock_children = {} + for child in ["stat/child/1", "stat/child/2"]: + mock_children[child] = None + test_hierarchy._children = mock_children + mock_parent = Mock() + mock_parent.dev_name.return_value = "stat/parent/1" + test_hierarchy._parents = [mock_parent] + + _ = test_hierarchy.parent() + + TEST_CHILDREN_ROOT = ["stat/ccd/1", "stat/psoc/1", "stat/antennafield/1"] + + TEST_CHILDREN_ANTENNAFIELD = [ + "stat/sdp/1", + "stat/tilebeam/1", + "stat/digitalbeam/1", + "stat/aps/1", + ] + TEST_CHILDREN_SDP = ["stat/xst/1", "stat/sst/1", "stat/bst/1"] + TEST_CHILDREN_APS = [ + "stat/apsct/1", + "stat/apspu/1", + "stat/unb2/1", + "stat/recvh/1", + "stat/recvl/1", + ] + + # Calls to get_property follow depth-first order + TEST_GET_PROPERTY_CALLS = [ + {TEST_PROPERTY_NAME: []}, # cdd/1 + {TEST_PROPERTY_NAME: []}, # psoc/1 + {TEST_PROPERTY_NAME: TEST_CHILDREN_ANTENNAFIELD}, # antennafield/1 + {TEST_PROPERTY_NAME: TEST_CHILDREN_SDP}, # sdp/1 + {TEST_PROPERTY_NAME: []}, # xst/1 + {TEST_PROPERTY_NAME: []}, # sst/1 + {TEST_PROPERTY_NAME: []}, # bst/1 + {TEST_PROPERTY_NAME: []}, # tilebeam/1 + {TEST_PROPERTY_NAME: ["stat/beamlet/1"]}, # digitalbeam/1 + {TEST_PROPERTY_NAME: []}, # beamlet/1 + {TEST_PROPERTY_NAME: TEST_CHILDREN_APS}, # aps/1 + {TEST_PROPERTY_NAME: []}, # apsct/1 + {TEST_PROPERTY_NAME: []}, # apspu/1 + {TEST_PROPERTY_NAME: []}, # unb2/1 + {TEST_PROPERTY_NAME: []}, # recvh/1 + {TEST_PROPERTY_NAME: []}, # recvl/1 + ] + + def test_get_children_depth_elaborate_no_filter(self): + """Create a 3 levels deep hierarchy with ~15 devices""" + + def test_fn(test_hierarchy: TestHierarchyDevice.ConcreteHierarchy): + test = test_hierarchy.children(depth=-1) + + # Test ccd and psoc have no children + self.assertDictEqual({}, test["stat/ccd/1"]["children"]) + self.assertDictEqual({}, test["stat/psoc/1"]["children"]) + + # Test antennafield has 4 children + self.assertEqual(4, len(test["stat/antennafield/1"]["children"])) + + # Test sdp has 3 children + self.assertEqual( + 3, + len(test["stat/antennafield/1"]["children"]["stat/sdp/1"]["children"]), + ) + + # Test all childs of sdp have no children + for sdp_child in self.TEST_CHILDREN_SDP: + self.assertDictEqual( + {}, + test["stat/antennafield/1"]["children"]["stat/sdp/1"]["children"][ + sdp_child + ]["children"], + ) + + # Test tilebeam has no children + self.assertDictEqual( + {}, + test["stat/antennafield/1"]["children"]["stat/tilebeam/1"]["children"], + ) + + # Test digitalbeam has 1 child + self.assertEqual( + 1, + len( + test["stat/antennafield/1"]["children"]["stat/digitalbeam/1"][ + "children" + ] + ), + ) + + # Test beamlet has no children + self.assertDictEqual( + {}, + test["stat/antennafield/1"]["children"]["stat/digitalbeam/1"][ + "children" + ]["stat/beamlet/1"]["children"], + ) + + # Test aps has 5 children + self.assertEqual( + 5, + len(test["stat/antennafield/1"]["children"]["stat/aps/1"]["children"]), + ) + + # Test all childs of aps have no children + for aps_child in self.TEST_CHILDREN_APS: + self.assertDictEqual( + {}, + test["stat/antennafield/1"]["children"]["stat/aps/1"]["children"][ + aps_child + ]["children"], + ) + + self.children_test_base( + self.TEST_PROPERTY_NAME, + self.TEST_CHILDREN_ROOT, + self.TEST_GET_PROPERTY_CALLS, + test_fn, + ) + + def test_get_child_filter(self): + """Test we can find every device""" + + test_hierarchy = TestHierarchyDevice.ConcreteHierarchy() + test_hierarchy.init("TestDevice", self.TEST_PROPERTY_NAME) + mock_children = {} + for child in self.TEST_CHILDREN_ROOT: + mock_children[child] = None + test_hierarchy._children = mock_children + test_hierarchy._parents = None + + self.device_proxy_mock[ + "object" + ].return_value.get_property.side_effect = self.TEST_GET_PROPERTY_CALLS + + all_children = copy.copy(self.TEST_CHILDREN_ROOT) + all_children.extend(self.TEST_CHILDREN_ANTENNAFIELD) + all_children.extend(self.TEST_CHILDREN_SDP) + all_children.extend(self.TEST_CHILDREN_APS) + + # Find all proxies for each child and match that it is the same + # object as cached in `_proxies` + for child in all_children: + result = test_hierarchy.child( + child, matching_filter=hierarchy_device.HierarchyMatchingFilter.EXACT + ) + self.assertEqual(test_hierarchy._proxies[child], result) + + self.device_proxy_mock[ + "object" + ].return_value.get_property.side_effect = self.TEST_GET_PROPERTY_CALLS + + class FakeDeviceProxy: + """A stateful fake to return the right values + for the right device regardless of calling order.""" + + def __init__(self, name, timeout): + self.name = name + + def dev_name(self): + return self.name + + def get_property(self, prop_name): + children = { + "1": ["1.1", "1.2"], + "2": ["2.1", "2.2"], + "2.1": ["2.1.1"], + } + return {TestHierarchyDevice.TEST_PROPERTY_NAME: children.get(self.name, [])} + + @patch.object(hierarchy_device, "create_device_proxy", wraps=FakeDeviceProxy) + def test_walk_down(self, m_create_device_proxy): + """Test whether walking down the hierarchy tree (root -> leaves) works.""" + + test_hierarchy = TestHierarchyDevice.ConcreteHierarchy() + test_hierarchy.init("TestDevice", self.TEST_PROPERTY_NAME) + mock_children = {} + for child in ["1", "2", "3"]: + mock_children[child] = None + test_hierarchy._children = mock_children + test_hierarchy._parents = None + + def walker(device: DeviceProxy): + walk_order.append(device.dev_name()) + + # walk one level + walk_order = [] + test_hierarchy.walk_down(walker, depth=1) + self.assertListEqual(["1", "2", "3"], walk_order) + + # walk whole tree + walk_order = [] + test_hierarchy.walk_down(walker, depth=-1) + self.assertListEqual( + ["1", "1.1", "1.2", "2", "2.1", "2.1.1", "2.2", "3"], + walk_order, + ) + + @patch.object(hierarchy_device, "create_device_proxy", wraps=FakeDeviceProxy) + def test_walk_up(self, m_create_device_proxy): + """Test whether walking up the hierarchy (leaves -> root) tree works.""" + + test_hierarchy = TestHierarchyDevice.ConcreteHierarchy() + test_hierarchy.init("TestDevice", self.TEST_PROPERTY_NAME) + mock_children = {} + for child in ["1", "2", "3"]: + mock_children[child] = None + test_hierarchy._children = mock_children + test_hierarchy._parents = None + + def walker(device: DeviceProxy): + walk_order.append(device.dev_name()) + + # walk one level + walk_order = [] + test_hierarchy.walk_up(walker, depth=1) + self.assertListEqual(["3", "2", "1"], walk_order) + + # walk whole tree + walk_order = [] + test_hierarchy.walk_up(walker, depth=-1) + self.assertListEqual( + ["3", "2.2", "2.1.1", "2.1", "2", "1.2", "1.1", "1"], + walk_order, + )