diff --git a/tangostationcontrol/integration_test/default/devices/base_device_classes/test_power_hierarchy.py b/tangostationcontrol/integration_test/default/devices/base_device_classes/test_power_hierarchy.py index c2b8e7cf12a6f0cdd9541fb1e5abafe0cf3cfba5..d91c0a2235763337a30e6c9f2ec01e02834fc069 100644 --- a/tangostationcontrol/integration_test/default/devices/base_device_classes/test_power_hierarchy.py +++ b/tangostationcontrol/integration_test/default/devices/base_device_classes/test_power_hierarchy.py @@ -5,10 +5,15 @@ Power Hierarchy module integration test """ import logging -from tango import DevState, DeviceProxy from integration_test import base from integration_test.device_proxy import TestDeviceProxy +from tango import DevState, DeviceProxy +from tangostationcontrol.common.case_insensitive_string import CaseInsensitiveString +from tangostationcontrol.devices.base_device_classes.hierarchy_device import ( + NotFoundException, + HierarchyMatchingFilter, +) from tangostationcontrol.devices.base_device_classes.power_hierarchy import ( PowerHierarchyDevice, ) @@ -26,9 +31,12 @@ class TestPowerHierarchyDevice(base.IntegrationTestCase): ec_name = "STAT/EC/1" antennafield_name = "STAT/AntennaField/HBA0" aps_l0_name = "STAT/APS/L0" + aps_l1_name = "STAT/APS/L1" + aps_h0_name = "STAT/APS/H0" apsct_name = "STAT/APSCT/H0" apspu_h0_name = "STAT/APSPU/H0" apspu_l0_name = "STAT/APSPU/L0" + apspu_l1_name = "STAT/APSPU/L1" ccd_name = "STAT/CCD/1" pcon_name = "STAT/PCON/1" psoc_name = "STAT/PSOC/1" @@ -277,3 +285,64 @@ class TestPowerHierarchyDevice(base.IntegrationTestCase): ) self.assertListEqual([], self._unacceptable_exceptions()) + + def test_branch_child_deep_tree(self): + """ + Test whether Tango devices are correctly retrieved with branch_child function + """ + self.setup_all_devices() + # Create a Hierarchy Device from Device STAT/RECVH/H0 + recvh_ph = PowerHierarchyDevice() + recvh_ph.init(self.recvh_name) + self.assertEqual(recvh_ph.parent(), "stat/apspu/h0") + # Branch child method must not return its direct parent + self.assertRaises( + NotFoundException, + recvh_ph.branch_child, + "*/apspu/h*", + HierarchyMatchingFilter.REGEX, + ) + # Test if returns another device with the right query + self.assertEqual( + recvh_ph.branch_child("*/apspu/*", HierarchyMatchingFilter.REGEX).name(), + self.apspu_l0_name, + ) + # Test if returns a device in the tree with depth > 1 + self.assertEqual( + recvh_ph.branch_child("*/aps/l*", HierarchyMatchingFilter.REGEX).name(), + self.aps_l0_name, + ) + + def test_branch_children_names_deep_tree(self): + """ + Test whether Tango devices are correctly retrieved with + branch_children_names function + """ + self.setup_all_devices() + # Create a Hierarchy Device from Device STAT/RECVH/H0 + recvh_ph = PowerHierarchyDevice() + recvh_ph.init(self.recvh_name) + self.assertEqual(recvh_ph.parent(), "stat/apspu/h0") + self.assertEqual(recvh_ph.children(), {}) + # Branch_children_names method must not return its direct parent + self.assertListEqual( + recvh_ph.branch_children_names("*/apspu/h*", HierarchyMatchingFilter.REGEX), + [], + ) + # Test if returns another device with the right query + self.assertListEqual( + recvh_ph.branch_children_names("*/apspu/*", HierarchyMatchingFilter.REGEX), + [ + CaseInsensitiveString(self.apspu_l0_name), + CaseInsensitiveString(self.apspu_l1_name), + ], + ) + # Test if returns a device in the tree with depth > 1 + self.assertListEqual( + recvh_ph.branch_children_names("*/aps/*", HierarchyMatchingFilter.REGEX), + [ + CaseInsensitiveString(self.aps_l0_name), + CaseInsensitiveString(self.aps_l1_name), + CaseInsensitiveString(self.aps_h0_name), + ], + ) diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy_device.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy_device.py index 084eb2a86b4d243750b24e81cddaa0bc947a82b9..ece897ae66d229abd47be6a4ea962ff492496361 100644 --- a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy_device.py @@ -365,15 +365,44 @@ class AbstractHierarchyDevice: :param matching_filter: Type of filter such as exact, substring or regex :return A device proxy whose name matches the child_filter substring """ - try: - return self.child(child_filter, matching_filter) - except NotFoundException: - pass - _hierarchy = AbstractHierarchyDevice() - _hierarchy.init(self.parent(), self._child_property_name) + def _branch_child(hierarchy_dev=self, depth=0, direct_parent=""): + """Recursive helper function of branch_child method + :param hierarchy_dev: The current tree element of the recursion + :param depth: Depth of the actual recursion + :param direct_parent: Direct parent of the first level node, which must be excluded + from the results + """ + if depth == 0: + direct_parent = hierarchy_dev.parent() + + print(depth) + print(direct_parent) + print(hierarchy_dev._child_property_name) + print(hierarchy_dev._parent) + print(hierarchy_dev is None) + print(hierarchy_dev.parent()) + print() - return _hierarchy.branch_child(child_filter, matching_filter) + try: + child = hierarchy_dev.child(child_filter, matching_filter) + # Prevent access to direct parent + if CaseInsensitiveString(child.dev_name()) == CaseInsensitiveString( + direct_parent + ): + pass + else: + return child + except NotFoundException: + pass + + _hierarchy = AbstractHierarchyDevice() + _hierarchy.init(hierarchy_dev.parent(), self._child_property_name) + + # Recursive call with updated depth and direct_parent + return _branch_child(_hierarchy, depth + 1, direct_parent) + + return _branch_child() def branch_children_names( self, @@ -385,21 +414,45 @@ class AbstractHierarchyDevice: :param child_filter: Substring of the device to retrieve device names list for - :param filter_type: Type of filter such as exact, substring or regex + :param matching_filter: Type of filter such as exact, substring or regex :return A list of device names matching the child filter substring """ - if isinstance(child_filter, Enum): - child_filter = child_filter.value - children = self.children_names(child_filter, matching_filter) + def _branch_children_names(hierarchy_dev=self, depth=0, direct_parent=""): + """Recursive helper function of branch_children method + :param hierarchy_dev: The current tree element of the recursion + :param depth: Depth of the actual recursion + :param direct_parent: Direct parent of the first level node, which must be excluded + from the results + """ + if depth == 0: + try: + direct_parent = hierarchy_dev.parent() + except NotFoundException: + return [] + + children = hierarchy_dev.children_names(child_filter, matching_filter) + # Prevent access to direct parent + try: + children.remove(CaseInsensitiveString(direct_parent)) + except ValueError: + pass + + if len(children) > 0: + return children + if len(children) == 0 and not hierarchy_dev._parent: + return [] - if len(children) > 0: - return children + _hierarchy = AbstractHierarchyDevice() + _hierarchy.init(hierarchy_dev.parent(), self._child_property_name) - _hierarchy = AbstractHierarchyDevice() - _hierarchy.init(self.parent(), self._child_property_name) + # Recursive call with updated depth and direct_parent + return _branch_children_names(_hierarchy, depth + 1, direct_parent) + + if isinstance(child_filter, Enum): + child_filter = child_filter.value - return _hierarchy.branch_children_names(child_filter, matching_filter) + return _branch_children_names() def parent(self) -> str: """Return the parent device name. Requires the parent to be unique. diff --git a/tangostationcontrol/test/devices/base_device_classes/test_hierarchy_device.py b/tangostationcontrol/test/devices/base_device_classes/test_hierarchy_device.py index 713315ec380f453848876cc84d0c686641fe16a0..748996c57953d5d4a038ea43737fa5ea1e540c7f 100644 --- a/tangostationcontrol/test/devices/base_device_classes/test_hierarchy_device.py +++ b/tangostationcontrol/test/devices/base_device_classes/test_hierarchy_device.py @@ -288,24 +288,6 @@ class TestHierarchyDevice(device_base.DeviceTestCase): "stat/notexist/*", hierarchy_device.HierarchyMatchingFilter.REGEX ) - def test_unique_parent_exists(self): - """Test finding the uniquely defined parent""" - - test_hierarchy = TestHierarchyDevice.ConcreteHierarchy() - test_hierarchy.init("TestDevice", self.TEST_PROPERTY_NAME) - mock_children = {} - for child in ["stat/child/1", "stat/child/2"]: - mock_children[child] = None - test_hierarchy._children = mock_children - mock_parent = Mock() - mock_parent.dev_name.return_value = "stat/parent/1" - test_hierarchy._parents = [mock_parent] - - try: - _ = test_hierarchy.parent() - except hierarchy_device.NotFoundException: - pass - TEST_CHILDREN_ROOT = ["stat/ccd/1", "stat/psoc/1", "stat/antennafield/1"] TEST_CHILDREN_ANTENNAFIELD = [ @@ -524,3 +506,91 @@ class TestHierarchyDevice(device_base.DeviceTestCase): ["3", "2.2", "2.1.1", "2.1", "2", "1.2", "1.1", "1"], walk_order, ) + + @patch.object(hierarchy_device, "create_device_proxy", wraps=FakeDeviceProxy) + def test_branch_children_names(self, m_create_device_proxy): + """Test if branch children names are correctly retrieved and direct + parent is not included in them""" + test_hierarchy = TestHierarchyDevice.ConcreteHierarchy() + test_hierarchy.init("stat/testdevice/1", self.TEST_PROPERTY_NAME) + mock_children = {} + for child in ["stat/childdevice/1", "stat/childdevice/2", "stat/childdevice/3"]: + mock_children[child] = None + test_hierarchy._children = mock_children + mock_parent = Mock() + mock_parent.dev_name.return_value = "stat/parent/1" + test_hierarchy._parent = mock_parent + + children_names = test_hierarchy.branch_children_names( + "*stat*", hierarchy_device.HierarchyMatchingFilter.REGEX + ) + self.assertEqual(3, len(children_names)) + self.assertNotIn("stat/parent/1", children_names) + + children_names = test_hierarchy.branch_children_names( + "stat/parent/*", + hierarchy_device.HierarchyMatchingFilter.REGEX, + ) + self.assertNotIn("stat/parent/1", children_names) + + @patch.object(hierarchy_device, "create_device_proxy", wraps=FakeDeviceProxy) + def test_branch_children_names_no_parent(self, m_create_device_proxy): + """Test branch children names with no-parent node""" + test_hierarchy = TestHierarchyDevice.ConcreteHierarchy() + test_hierarchy.init("stat/testdevice/1", self.TEST_PROPERTY_NAME) + mock_children = {} + for child in ["stat/childdevice/1", "stat/childdevice/2", "stat/childdevice/3"]: + mock_children[child] = None + test_hierarchy._children = mock_children + + children_names = test_hierarchy.branch_children_names( + "*stat*", hierarchy_device.HierarchyMatchingFilter.REGEX + ) + self.assertEqual(0, len(children_names)) + + @patch.object(hierarchy_device, "create_device_proxy", wraps=FakeDeviceProxy) + def test_branch_child(self, m_create_device_proxy): + """Test if branch child is correctly retrieved and direct + parent is not included""" + test_hierarchy = TestHierarchyDevice.ConcreteHierarchy() + test_hierarchy.init("stat/testdevice/1", self.TEST_PROPERTY_NAME) + mock_children = {} + for child in ["stat/child/1", "stat/child/2", "stat/child/3"]: + mock_children[child] = Mock() + mock_children[child].dev_name.return_value = child + mock_children[child].name.return_value = child + test_hierarchy._children = mock_children + mock_parent = Mock() + mock_parent.dev_name.return_value = "stat/parent/1" + test_hierarchy._parent = mock_parent + + child = test_hierarchy.branch_child( + "stat/child/*", hierarchy_device.HierarchyMatchingFilter.REGEX + ) + self.assertEqual("stat/child/1", child.dev_name()) + + self.assertRaises( + hierarchy_device.NotFoundException, + test_hierarchy.branch_child, + "stat/parent/*", + hierarchy_device.HierarchyMatchingFilter.REGEX, + ) + + @patch.object(hierarchy_device, "create_device_proxy", wraps=FakeDeviceProxy) + def test_branch_child_no_parent(self, m_create_device_proxy): + """Test branch child with no-parent node""" + test_hierarchy = TestHierarchyDevice.ConcreteHierarchy() + test_hierarchy.init("stat/testdevice/1", self.TEST_PROPERTY_NAME) + mock_children = {} + for child in ["stat/child/1", "stat/child/2", "stat/child/3"]: + mock_children[child] = Mock() + mock_children[child].dev_name.return_value = child + mock_children[child].name.return_value = child + test_hierarchy._children = mock_children + + self.assertRaises( + hierarchy_device.NotFoundException, + test_hierarchy.branch_child, + "stat/child/*", + hierarchy_device.HierarchyMatchingFilter.REGEX, + )