Skip to content
Snippets Groups Projects
Commit 1db6aa56 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'L2SS-1435-prevent-direct-parent-access' into 'master'

Resolve L2SS-1435 "Prevent direct parent access"

Closes L2SS-1435

See merge request !718
parents 3f600e1e 96b4be14
No related branches found
No related tags found
1 merge request!718Resolve L2SS-1435 "Prevent direct parent access"
......@@ -5,10 +5,15 @@
Power Hierarchy module integration test
"""
import logging
from tango import DevState, DeviceProxy
from integration_test import base
from integration_test.device_proxy import TestDeviceProxy
from tango import DevState, DeviceProxy
from tangostationcontrol.common.case_insensitive_string import CaseInsensitiveString
from tangostationcontrol.devices.base_device_classes.hierarchy_device import (
NotFoundException,
HierarchyMatchingFilter,
)
from tangostationcontrol.devices.base_device_classes.power_hierarchy import (
PowerHierarchyDevice,
)
......@@ -26,9 +31,12 @@ class TestPowerHierarchyDevice(base.IntegrationTestCase):
ec_name = "STAT/EC/1"
antennafield_name = "STAT/AntennaField/HBA0"
aps_l0_name = "STAT/APS/L0"
aps_l1_name = "STAT/APS/L1"
aps_h0_name = "STAT/APS/H0"
apsct_name = "STAT/APSCT/H0"
apspu_h0_name = "STAT/APSPU/H0"
apspu_l0_name = "STAT/APSPU/L0"
apspu_l1_name = "STAT/APSPU/L1"
ccd_name = "STAT/CCD/1"
pcon_name = "STAT/PCON/1"
psoc_name = "STAT/PSOC/1"
......@@ -277,3 +285,64 @@ class TestPowerHierarchyDevice(base.IntegrationTestCase):
)
self.assertListEqual([], self._unacceptable_exceptions())
def test_branch_child_deep_tree(self):
"""
Test whether Tango devices are correctly retrieved with branch_child function
"""
self.setup_all_devices()
# Create a Hierarchy Device from Device STAT/RECVH/H0
recvh_ph = PowerHierarchyDevice()
recvh_ph.init(self.recvh_name)
self.assertEqual(recvh_ph.parent(), "stat/apspu/h0")
# Branch child method must not return its direct parent
self.assertRaises(
NotFoundException,
recvh_ph.branch_child,
"*/apspu/h*",
HierarchyMatchingFilter.REGEX,
)
# Test if returns another device with the right query
self.assertEqual(
recvh_ph.branch_child("*/apspu/*", HierarchyMatchingFilter.REGEX).name(),
self.apspu_l0_name,
)
# Test if returns a device in the tree with depth > 1
self.assertEqual(
recvh_ph.branch_child("*/aps/l*", HierarchyMatchingFilter.REGEX).name(),
self.aps_l0_name,
)
def test_branch_children_names_deep_tree(self):
"""
Test whether Tango devices are correctly retrieved with
branch_children_names function
"""
self.setup_all_devices()
# Create a Hierarchy Device from Device STAT/RECVH/H0
recvh_ph = PowerHierarchyDevice()
recvh_ph.init(self.recvh_name)
self.assertEqual(recvh_ph.parent(), "stat/apspu/h0")
self.assertEqual(recvh_ph.children(), {})
# Branch_children_names method must not return its direct parent
self.assertListEqual(
recvh_ph.branch_children_names("*/apspu/h*", HierarchyMatchingFilter.REGEX),
[],
)
# Test if returns another device with the right query
self.assertListEqual(
recvh_ph.branch_children_names("*/apspu/*", HierarchyMatchingFilter.REGEX),
[
CaseInsensitiveString(self.apspu_l0_name),
CaseInsensitiveString(self.apspu_l1_name),
],
)
# Test if returns a device in the tree with depth > 1
self.assertListEqual(
recvh_ph.branch_children_names("*/aps/*", HierarchyMatchingFilter.REGEX),
[
CaseInsensitiveString(self.aps_l0_name),
CaseInsensitiveString(self.aps_l1_name),
CaseInsensitiveString(self.aps_h0_name),
],
)
......@@ -365,15 +365,44 @@ class AbstractHierarchyDevice:
:param matching_filter: Type of filter such as exact, substring or regex
:return A device proxy whose name matches the child_filter substring
"""
def _branch_child(hierarchy_dev=self, depth=0, direct_parent=""):
"""Recursive helper function of branch_child method
:param hierarchy_dev: The current tree element of the recursion
:param depth: Depth of the actual recursion
:param direct_parent: Direct parent of the first level node, which must be excluded
from the results
"""
if depth == 0:
direct_parent = hierarchy_dev.parent()
print(depth)
print(direct_parent)
print(hierarchy_dev._child_property_name)
print(hierarchy_dev._parent)
print(hierarchy_dev is None)
print(hierarchy_dev.parent())
print()
try:
return self.child(child_filter, matching_filter)
child = hierarchy_dev.child(child_filter, matching_filter)
# Prevent access to direct parent
if CaseInsensitiveString(child.dev_name()) == CaseInsensitiveString(
direct_parent
):
pass
else:
return child
except NotFoundException:
pass
_hierarchy = AbstractHierarchyDevice()
_hierarchy.init(self.parent(), self._child_property_name)
_hierarchy.init(hierarchy_dev.parent(), self._child_property_name)
# Recursive call with updated depth and direct_parent
return _branch_child(_hierarchy, depth + 1, direct_parent)
return _hierarchy.branch_child(child_filter, matching_filter)
return _branch_child()
def branch_children_names(
self,
......@@ -385,21 +414,45 @@ class AbstractHierarchyDevice:
:param child_filter: Substring of the device to retrieve device names list
for
:param filter_type: Type of filter such as exact, substring or regex
:param matching_filter: Type of filter such as exact, substring or regex
:return A list of device names matching the child filter substring
"""
if isinstance(child_filter, Enum):
child_filter = child_filter.value
children = self.children_names(child_filter, matching_filter)
def _branch_children_names(hierarchy_dev=self, depth=0, direct_parent=""):
"""Recursive helper function of branch_children method
:param hierarchy_dev: The current tree element of the recursion
:param depth: Depth of the actual recursion
:param direct_parent: Direct parent of the first level node, which must be excluded
from the results
"""
if depth == 0:
try:
direct_parent = hierarchy_dev.parent()
except NotFoundException:
return []
children = hierarchy_dev.children_names(child_filter, matching_filter)
# Prevent access to direct parent
try:
children.remove(CaseInsensitiveString(direct_parent))
except ValueError:
pass
if len(children) > 0:
return children
if len(children) == 0 and not hierarchy_dev._parent:
return []
_hierarchy = AbstractHierarchyDevice()
_hierarchy.init(self.parent(), self._child_property_name)
_hierarchy.init(hierarchy_dev.parent(), self._child_property_name)
# Recursive call with updated depth and direct_parent
return _branch_children_names(_hierarchy, depth + 1, direct_parent)
if isinstance(child_filter, Enum):
child_filter = child_filter.value
return _hierarchy.branch_children_names(child_filter, matching_filter)
return _branch_children_names()
def parent(self) -> str:
"""Return the parent device name. Requires the parent to be unique.
......
......@@ -288,24 +288,6 @@ class TestHierarchyDevice(device_base.DeviceTestCase):
"stat/notexist/*", hierarchy_device.HierarchyMatchingFilter.REGEX
)
def test_unique_parent_exists(self):
"""Test finding the uniquely defined parent"""
test_hierarchy = TestHierarchyDevice.ConcreteHierarchy()
test_hierarchy.init("TestDevice", self.TEST_PROPERTY_NAME)
mock_children = {}
for child in ["stat/child/1", "stat/child/2"]:
mock_children[child] = None
test_hierarchy._children = mock_children
mock_parent = Mock()
mock_parent.dev_name.return_value = "stat/parent/1"
test_hierarchy._parents = [mock_parent]
try:
_ = test_hierarchy.parent()
except hierarchy_device.NotFoundException:
pass
TEST_CHILDREN_ROOT = ["stat/ccd/1", "stat/psoc/1", "stat/antennafield/1"]
TEST_CHILDREN_ANTENNAFIELD = [
......@@ -524,3 +506,91 @@ class TestHierarchyDevice(device_base.DeviceTestCase):
["3", "2.2", "2.1.1", "2.1", "2", "1.2", "1.1", "1"],
walk_order,
)
@patch.object(hierarchy_device, "create_device_proxy", wraps=FakeDeviceProxy)
def test_branch_children_names(self, m_create_device_proxy):
"""Test if branch children names are correctly retrieved and direct
parent is not included in them"""
test_hierarchy = TestHierarchyDevice.ConcreteHierarchy()
test_hierarchy.init("stat/testdevice/1", self.TEST_PROPERTY_NAME)
mock_children = {}
for child in ["stat/childdevice/1", "stat/childdevice/2", "stat/childdevice/3"]:
mock_children[child] = None
test_hierarchy._children = mock_children
mock_parent = Mock()
mock_parent.dev_name.return_value = "stat/parent/1"
test_hierarchy._parent = mock_parent
children_names = test_hierarchy.branch_children_names(
"*stat*", hierarchy_device.HierarchyMatchingFilter.REGEX
)
self.assertEqual(3, len(children_names))
self.assertNotIn("stat/parent/1", children_names)
children_names = test_hierarchy.branch_children_names(
"stat/parent/*",
hierarchy_device.HierarchyMatchingFilter.REGEX,
)
self.assertNotIn("stat/parent/1", children_names)
@patch.object(hierarchy_device, "create_device_proxy", wraps=FakeDeviceProxy)
def test_branch_children_names_no_parent(self, m_create_device_proxy):
"""Test branch children names with no-parent node"""
test_hierarchy = TestHierarchyDevice.ConcreteHierarchy()
test_hierarchy.init("stat/testdevice/1", self.TEST_PROPERTY_NAME)
mock_children = {}
for child in ["stat/childdevice/1", "stat/childdevice/2", "stat/childdevice/3"]:
mock_children[child] = None
test_hierarchy._children = mock_children
children_names = test_hierarchy.branch_children_names(
"*stat*", hierarchy_device.HierarchyMatchingFilter.REGEX
)
self.assertEqual(0, len(children_names))
@patch.object(hierarchy_device, "create_device_proxy", wraps=FakeDeviceProxy)
def test_branch_child(self, m_create_device_proxy):
"""Test if branch child is correctly retrieved and direct
parent is not included"""
test_hierarchy = TestHierarchyDevice.ConcreteHierarchy()
test_hierarchy.init("stat/testdevice/1", self.TEST_PROPERTY_NAME)
mock_children = {}
for child in ["stat/child/1", "stat/child/2", "stat/child/3"]:
mock_children[child] = Mock()
mock_children[child].dev_name.return_value = child
mock_children[child].name.return_value = child
test_hierarchy._children = mock_children
mock_parent = Mock()
mock_parent.dev_name.return_value = "stat/parent/1"
test_hierarchy._parent = mock_parent
child = test_hierarchy.branch_child(
"stat/child/*", hierarchy_device.HierarchyMatchingFilter.REGEX
)
self.assertEqual("stat/child/1", child.dev_name())
self.assertRaises(
hierarchy_device.NotFoundException,
test_hierarchy.branch_child,
"stat/parent/*",
hierarchy_device.HierarchyMatchingFilter.REGEX,
)
@patch.object(hierarchy_device, "create_device_proxy", wraps=FakeDeviceProxy)
def test_branch_child_no_parent(self, m_create_device_proxy):
"""Test branch child with no-parent node"""
test_hierarchy = TestHierarchyDevice.ConcreteHierarchy()
test_hierarchy.init("stat/testdevice/1", self.TEST_PROPERTY_NAME)
mock_children = {}
for child in ["stat/child/1", "stat/child/2", "stat/child/3"]:
mock_children[child] = Mock()
mock_children[child].dev_name.return_value = child
mock_children[child].name.return_value = child
test_hierarchy._children = mock_children
self.assertRaises(
hierarchy_device.NotFoundException,
test_hierarchy.branch_child,
"stat/child/*",
hierarchy_device.HierarchyMatchingFilter.REGEX,
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment