From 3574d2af6702eaff794b1e1a79c190d70aaccfa4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Corn=C3=A9=20Lukken?= <lukken@astron.nl> Date: Mon, 31 Jul 2023 11:27:52 +0000 Subject: [PATCH] L2SS-1436: Revert support for multiple parents --- README.md | 1 + tangostationcontrol/VERSION | 2 +- .../test_power_hierarchy.py | 4 +- .../beam/managers/_digitalbeam.py | 10 +-- .../beam/managers/_tilebeam.py | 12 +-- .../devices/base_device_classes/hierarchy.py | 77 +++++-------------- .../base_device_classes/hierarchy_device.py | 27 +++---- .../tangostationcontrol/devices/sdp/bst.py | 1 - .../devices/sdp/digitalbeam.py | 10 +-- .../tangostationcontrol/devices/sdp/sdp.py | 6 +- .../tangostationcontrol/devices/sdp/sst.py | 1 - .../tangostationcontrol/devices/sdp/xst.py | 1 - .../tangostationcontrol/devices/tilebeam.py | 8 +- .../beam/managers/test_digitalbeam_manager.py | 2 +- .../beam/managers/test_tilebeam_manager.py | 4 +- .../base_device_classes/test_hierarchy.py | 70 ++++------------- 16 files changed, 69 insertions(+), 167 deletions(-) diff --git a/README.md b/README.md index eab36569f..33801c770 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,7 @@ Next change the version in the following places: # Release Notes +* 0.20.2 Support only one parent in hierarchies * 0.20.1 Create an abstract AntennaMapper class which implements behavior of both AntennaToSdpMapper and AntennaToRecvMapper * 0.20.0 Complete implementation of station-state transitions in StationManager device. diff --git a/tangostationcontrol/VERSION b/tangostationcontrol/VERSION index 847e9aef6..727d97b9b 100644 --- a/tangostationcontrol/VERSION +++ b/tangostationcontrol/VERSION @@ -1 +1 @@ -0.20.1 +0.20.2 diff --git a/tangostationcontrol/integration_test/default/devices/base_device_classes/test_power_hierarchy.py b/tangostationcontrol/integration_test/default/devices/base_device_classes/test_power_hierarchy.py index 20716f4be..531458412 100644 --- a/tangostationcontrol/integration_test/default/devices/base_device_classes/test_power_hierarchy.py +++ b/tangostationcontrol/integration_test/default/devices/base_device_classes/test_power_hierarchy.py @@ -124,10 +124,10 @@ class TestPowerHierarchy(base.IntegrationTestCase): # Check if PSOC retrieves correctly its parent state (StationManager -> ON) psoc_ph = PowerHierarchy() psoc_ph.init(self.psoc_name) - self.assertEqual(psoc_ph.parent_state(self.stationmanager_name), DevState.ON) + self.assertEqual(psoc_ph.parent_state(), DevState.ON) # Check if child reads correctly a parent attribute self.assertEqual( - psoc_ph.read_parent_attribute(self.stationmanager_name, "Station_Name_R"), + psoc_ph.read_parent_attribute("Station_Name_R"), "DevStation", ) diff --git a/tangostationcontrol/tangostationcontrol/beam/managers/_digitalbeam.py b/tangostationcontrol/tangostationcontrol/beam/managers/_digitalbeam.py index 94406e155..1bca73797 100644 --- a/tangostationcontrol/tangostationcontrol/beam/managers/_digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/beam/managers/_digitalbeam.py @@ -65,9 +65,7 @@ class DigitalBeamManager(AbstractBeamManager): # delay can accomplish that. We zero out weights later. beam_delays = numpy.zeros((N_pn, A_pn, N_beamlets_ctrl), dtype=numpy.float32) for antenna_nr, (fpga_nr, input_nr) in enumerate( - self._device.control.read_parent_attribute( - self._device.control.parent(), "Antenna_to_SDP_Mapping_R" - ) + self._device.control.read_parent_attribute("Antenna_to_SDP_Mapping_R") ): if input_nr >= 0: beam_delays[fpga_nr, input_nr, :] = antenna_delays[antenna_nr, :] @@ -81,15 +79,13 @@ class DigitalBeamManager(AbstractBeamManager): # - if the antenna is bad (False in Antenna_Usage_Mask_R) # - if the antenna is not in the antenna set (False in Antenna_Mask_RW) antenna_usage_mask = self._device.control.read_parent_attribute( - self._device.control.parent(), "Antenna_Usage_Mask_R" + "Antenna_Usage_Mask_R" ) antenna_mask = self._device.read_Antenna_Mask_R() zeroes_for_all_beamlets = numpy.array([0] * N_beamlets_ctrl, dtype=numpy.uint32) for antenna_nr, (fpga_nr, input_nr) in enumerate( - self._device.control.read_parent_attribute( - self._device.control.parent(), "Antenna_to_SDP_Mapping_R" - ) + self._device.control.read_parent_attribute("Antenna_to_SDP_Mapping_R") ): if ( input_nr < 0 diff --git a/tangostationcontrol/tangostationcontrol/beam/managers/_tilebeam.py b/tangostationcontrol/tangostationcontrol/beam/managers/_tilebeam.py index 79b08e515..a18614eb7 100644 --- a/tangostationcontrol/tangostationcontrol/beam/managers/_tilebeam.py +++ b/tangostationcontrol/tangostationcontrol/beam/managers/_tilebeam.py @@ -67,9 +67,7 @@ class TileBeamManager(AbstractBeamManager): (self.nr_tiles, N_elements * N_pol), dtype=numpy.int64 ) control_mapping = numpy.reshape( - self.device.control.read_parent_attribute( - self.device.control.parent(), "Control_to_RECV_mapping_R" - ), + self.device.control.read_parent_attribute("Control_to_RECV_mapping_R"), (-1, N_pol), ) @@ -107,12 +105,8 @@ class TileBeamManager(AbstractBeamManager): timestamp: datetime.datetime, bf_delay_steps: numpy.array, ): - parent = self.device.control.parent() - control_mapping = numpy.reshape( - self.device.control.read_parent_attribute( - parent, "Control_to_RECV_mapping_R" - ), + self.device.control.read_parent_attribute("Control_to_RECV_mapping_R"), (-1, N_pol), ) @@ -149,7 +143,7 @@ class TileBeamManager(AbstractBeamManager): # Record where we now point to, now that we've updated the weights. # Only the entries within the mask have been updated - mask = self.device.control.read_parent_attribute(parent, "ANT_mask_RW") + mask = self.device.control.read_parent_attribute("ANT_mask_RW") for rcu in range(self.nr_tiles): if mask[rcu]: self.current_pointing_direction[rcu] = pointing_direction[rcu] diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy.py index c233be8c0..7b11cf11e 100644 --- a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy.py +++ b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy.py @@ -47,7 +47,7 @@ class AbstractHierarchy(ABC): self, child_property_name: str, children: List[str] = None, - parents: List[str] = None, + parent: str = None, proxies: Optional[Dict[str, DeviceProxy]] = None, ): """Construct the hierarchy and provide protected access @@ -55,7 +55,7 @@ class AbstractHierarchy(ABC): :param child_property_name: The name of the PyTango property to identify children from your own direct (grand)children :param children: Your direct children, if any, can be empty list - :param parents: Your direct parents, if any, can be None + :param parent: Your direct parent, if any, can be None :param proxies: Pass reference to DeviceProxy cache dictionary, if None it will be created. This can be used to ensure single DeviceProxy instances when devices implement _multiple_ @@ -64,17 +64,15 @@ class AbstractHierarchy(ABC): self._child_property_name = child_property_name self._children = {} - self._parents = [] + self._parent = None # Store proxies internally upon creation and only pass references to # them. Ensures only single instance of DeviceProxy is created per # device. self._proxies = proxies or {} - if not parents: - parents = [] - for parent in parents: - self._parents.append(create_device_proxy(parent)) + if parent: + self._parent = create_device_proxy(parent) if not children: children = [] @@ -84,31 +82,16 @@ class AbstractHierarchy(ABC): def __str__(self) -> str: return f"Hierarchy of {self._child_property_name}" - def parents(self) -> List[str]: - """Return the parents device names if there are parents - - :return: The device names of each parent or None - """ - - if not self._parents: - return [] - - return [parent.dev_name().casefold() for parent in self._parents] - - def parent(self) -> List[str]: + def parent(self) -> str: """Return the parent device name. Requires the parent to be unique. - :return: The device name of the parent + :return: The device name of the parent if any """ - _parents = self.parents() - - if len(_parents) != 1: - raise NotFoundException( - f"Could not find unique parent in {self}, found {_parents}" - ) + if self._parent: + return self._parent.dev_name().casefold() - return _parents[0] + return None @staticmethod def _get_filter(filter_type: HierarchyMatchingFilter) -> child_filter_func_type: @@ -350,11 +333,11 @@ class AbstractHierarchy(ABC): pass db = tango.Database() - children = db.get_device_property(self.parents()[0], self._child_property_name)[ + children = db.get_device_property(self.parent(), self._child_property_name)[ self._child_property_name ] return AbstractHierarchy( - self._child_property_name, children, self.parents()[1:], self._proxies + self._child_property_name, children, self.parent(), self._proxies ).branch_child(child_filter, filter_type) def branch_children_names( @@ -379,57 +362,37 @@ class AbstractHierarchy(ABC): return children db = tango.Database() - children = db.get_device_property(self.parents()[0], self._child_property_name)[ + children = db.get_device_property(self.parent(), self._child_property_name)[ self._child_property_name ] return AbstractHierarchy( - self._child_property_name, children, self.parents()[1:], self._proxies + self._child_property_name, children, self.parent(), self._proxies ).branch_children_names(child_filter, filter_type) - def _match_parent(self, parent_name: str): - """Try to find an exact match for one of our parents - - :param parent_name: The full name of the parent to read the attribute from - :return: Return DeviceProxy of parent if it exists - """ - - _parent = None - for parent in self._parents: - if parent.dev_name().casefold() == parent_name.casefold(): - _parent = parent - break - return _parent - - def read_attribute(self, parent_name: str, attribute: str) -> any: + def read_attribute(self, attribute: str) -> any: """Allow to read attribute from parent without direct access - :param parent_name: The full name of the parent to read the attribute from :param attribute: The attribute to read from the parent, can be RW :return: The data from the attribute :raises tango.DevFailed: The exception from the DeviceProxy if raised """ - _parent = self._match_parent(parent_name) - - if not _parent: + if not self._parent: return None - return getattr(_parent, attribute) + return getattr(self._parent, attribute) - def state(self, parent_name: str) -> Optional[DevState]: + def state(self) -> Optional[DevState]: """Return the state of the parent without direct access - :param parent_name: The full name of the parent to read the attribute from :return: The state of the parent if there is one :raises tango.DevFailed: The exception from the DeviceProxy if raised """ - _parent = self._match_parent(parent_name) - - if not _parent: + if not self._parent: return None - return _parent.state() + return self._parent.state() def walk_down(self, func, depth: int = -1): """Execute the given function on every node in the tree downwards from the root, depth first.""" diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy_device.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy_device.py index 30f485b90..3c2e996a4 100644 --- a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/hierarchy_device.py @@ -34,14 +34,14 @@ class AbstractHierarchyDevice: self._hierarchy: Optional[AbstractHierarchy] = None @staticmethod - def _find_parents(device_name: str, child_property: str): - """Find all parents for the given device_name and child property""" + def _find_parent(device_name: str, child_property: str): + """Find parent for the given device_name and child property""" + db = Database() # Get servers: ['antennafield/STAT', 'digitalbeam/STAT', ...] servers = db.get_server_list() - parents = [] devices = [] # Find each device through devices per server: ['STAT/antennafield/HBA', ... ] @@ -54,9 +54,9 @@ class AbstractHierarchyDevice: children = db.get_device_property(device, child_property)[child_property] for child in children: if child.casefold() == device_name.casefold(): - parents.append(device) + return device - return parents + return None def init( self, @@ -80,9 +80,9 @@ class AbstractHierarchyDevice: db = Database() children = db.get_device_property(device_name, child_property)[child_property] - parents = self._find_parents(device_name, child_property) + parent = self._find_parent(device_name, child_property) - if not children and not parents: + if not children and not parent: logger.warning( "Device: %s has empty hierarchy, %s property is empty and there are no " "parents", @@ -91,7 +91,7 @@ class AbstractHierarchyDevice: exc_info=True, ) - self._hierarchy = AbstractHierarchy(child_property, children, parents, proxies) + self._hierarchy = AbstractHierarchy(child_property, children, parent, proxies) def children(self, depth: int = 1) -> AbstractHierarchy.children_type: return self._hierarchy.children(depth) @@ -127,11 +127,8 @@ class AbstractHierarchyDevice: def parent(self): return self._hierarchy.parent() - def parents(self) -> List[str]: - return self._hierarchy.parents() - - def read_parent_attribute(self, parent_name: str, attribute: str) -> any: - return self._hierarchy.read_attribute(parent_name, attribute) + def read_parent_attribute(self, attribute: str) -> any: + return self._hierarchy.read_attribute(attribute) - def parent_state(self, parent_name: str) -> DevState: - return self._hierarchy.state(parent_name) + def parent_state(self) -> DevState: + return self._hierarchy.state() diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py index fba9cb892..48f0c19e5 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py @@ -167,7 +167,6 @@ class BST(Statistics): def read_FPGA_processing_error_R(self): return self.control.read_parent_attribute( - self.control.parent(), "TR_fpga_mask_RW", ) & (~self.read_attribute("FPGA_bst_offload_enable_R")) diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py index a0882c92c..622630998 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py @@ -120,7 +120,7 @@ class DigitalBeam(BeamDevice): def nr_antennas(self): """Return the number of configured antennas.""" - return self.control.read_parent_attribute(self.parent, "nr_antennas_R") + return self.control.read_parent_attribute("nr_antennas_R") def nr_beamlets(self): """Return the number of controlled beamlets.""" @@ -130,12 +130,12 @@ class DigitalBeam(BeamDevice): def antenna_setlist(self): """Return the string representation of officially offered set of antennas""" - return self.control.read_parent_attribute(self.parent, "Antenna_Sets_R") + return self.control.read_parent_attribute("Antenna_Sets_R") def antenna_set_masks(self): """Return string encoding of the corresponding antenna masks for the antenna field""" - return self.control.read_parent_attribute(self.parent, "Antenna_Set_Masks_R") + return self.control.read_parent_attribute("Antenna_Set_Masks_R") def read_Antenna_Set_RW(self): return self._antenna_set @@ -188,10 +188,10 @@ class DigitalBeam(BeamDevice): # Retrieve positions from RECV device reference_itrf = self.control.read_parent_attribute( - self.parent, "Antenna_Field_Reference_ITRF_R" + "Antenna_Field_Reference_ITRF_R" ) antenna_itrf = self.control.read_parent_attribute( - self.parent, "Antenna_Reference_ITRF_R" + "Antenna_Reference_ITRF_R" ).reshape(-1, N_xyz) # a delay calculator diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py b/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py index 6aacc0efa..c18dfc89d 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py @@ -541,7 +541,6 @@ class SDP(OPCUADevice): # We can only return a single value, so we assume the FPGA is configured coherently. # Which is something that is to be checked by an independent monitoring system anyway. mask = self.control.read_parent_attribute( - self.control.parent(), "TR_fpga_mask_RW", ) clocks = self.read_attribute("FPGA_pps_expected_cnt_RW") @@ -603,18 +602,15 @@ class SDP(OPCUADevice): ) def read_FPGA_processing_error_R(self): - parent_dev = self.control.parent() return self.control.read_parent_attribute( - parent_dev, "TR_fpga_mask_R", ) & ( ~self.read_attribute("FPGA_processing_enable_R") - | (self.control.read_parent_attribute(parent_dev, "FPGA_boot_image_R") <= 0) + | (self.control.read_parent_attribute("FPGA_boot_image_R") <= 0) ) def read_FPGA_input_error_R(self): return self.control.read_parent_attribute( - self.control.parent(), "TR_fpga_mask_R", ) & ( self.read_attribute("FPGA_wg_enable_R").any(axis=1) diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py index 15b4f9d6e..90500b9cf 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py @@ -201,7 +201,6 @@ class SST(Statistics): def read_FPGA_processing_error_R(self): return self.control.read_parent_attribute( - self.control.parent(), "TR_fpga_mask_RW", ) & (~self.read_attribute("FPGA_sst_offload_enable_R")) diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py index cfae51f00..731408378 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py @@ -711,7 +711,6 @@ class XST(Statistics): def read_FPGA_processing_error_R(self): return self.control.read_parent_attribute( - self.control.parent(), "TR_fpga_mask_RW", ) & ( ~self.read_attribute("FPGA_xst_offload_enable_R") diff --git a/tangostationcontrol/tangostationcontrol/devices/tilebeam.py b/tangostationcontrol/tangostationcontrol/devices/tilebeam.py index 3870aadbb..80dad2763 100644 --- a/tangostationcontrol/tangostationcontrol/devices/tilebeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/tilebeam.py @@ -43,20 +43,18 @@ class TileBeam(BeamDevice): @log_exceptions() def configure_for_initialise(self): - parent = self.control.parent() - # We maintain the same number of tiles as the AntennaField self._beam_manager.nr_tiles = self.control.read_parent_attribute( - parent, "nr_antennas_R" + "nr_antennas_R" ) super().configure_for_initialise(self._beam_manager.nr_tiles) # Retrieve positions from AntennaField device antenna_reference_itrf = self.control.read_parent_attribute( - parent, "Antenna_Reference_itrf_R" + "Antenna_Reference_itrf_R" ) hbat_antenna_itrf_offsets = self.control.read_parent_attribute( - parent, "HBAT_antenna_itrf_offsets_R" + "HBAT_antenna_itrf_offsets_R" ).reshape(self._beam_manager.nr_tiles, N_elements, N_xyz) # a delay calculator for each tile diff --git a/tangostationcontrol/test/beam/managers/test_digitalbeam_manager.py b/tangostationcontrol/test/beam/managers/test_digitalbeam_manager.py index e5b186ab6..bbb360bf4 100644 --- a/tangostationcontrol/test/beam/managers/test_digitalbeam_manager.py +++ b/tangostationcontrol/test/beam/managers/test_digitalbeam_manager.py @@ -44,7 +44,7 @@ class TestDigitalBeamManager(base.TestCase): dt = datetime.datetime.now() pointing_direction = numpy.full((N_beamlets_ctrl, N_point_prop), 1) - def read_parent_attribute(parent, attr): + def read_parent_attribute(attr): match attr: case "Antenna_to_SDP_Mapping_R": return [[0, 0], [0, 1], [0, 2]] diff --git a/tangostationcontrol/test/beam/managers/test_tilebeam_manager.py b/tangostationcontrol/test/beam/managers/test_tilebeam_manager.py index 37b0a3450..faffaae60 100644 --- a/tangostationcontrol/test/beam/managers/test_tilebeam_manager.py +++ b/tangostationcontrol/test/beam/managers/test_tilebeam_manager.py @@ -53,7 +53,7 @@ class TestTileBeamManager(base.TestCase): dt = datetime.datetime.now() pointing_direction = numpy.full(1, 1) - def read_parent_attribute(parent, attr): + def read_parent_attribute(attr): match attr: case "Control_to_RECV_mapping_R": return numpy.array([[1, x] for x in range(0, DEFAULT_N_HBA_TILES)]) @@ -96,7 +96,7 @@ class TestTileBeamManager(base.TestCase): [numpy.full(N_elements, i) for i in range(DEFAULT_N_HBA_TILES)] * N_pol ) - def read_parent_attribute(parent, attr): + def read_parent_attribute(attr): match attr: case "Control_to_RECV_mapping_R": return numpy.array([[1, x] for x in range(0, DEFAULT_N_HBA_TILES)]) diff --git a/tangostationcontrol/test/devices/base_device_classes/test_hierarchy.py b/tangostationcontrol/test/devices/base_device_classes/test_hierarchy.py index ce120eec6..fa1bda622 100644 --- a/tangostationcontrol/test/devices/base_device_classes/test_hierarchy.py +++ b/tangostationcontrol/test/devices/base_device_classes/test_hierarchy.py @@ -31,51 +31,35 @@ class TestAbstractHierarchy(device_base.DeviceTestCase): ) self.assertEqual({}, test_hierarchy._children) - self.assertEqual([], test_hierarchy._parents) + self.assertEqual(None, test_hierarchy._parent) self.assertEqual({}, test_hierarchy.children()) - self.assertEqual([], test_hierarchy.parents()) + self.assertEqual(None, test_hierarchy.parent()) def test_get_or_create_proxy_cache(self): """Test if get_or_create_proxy caches without duplicates""" test = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, children=None, parents=None + self.TEST_PROPERTY_NAME, children=None, parent=None ) - def test_parents_get_name(self): + def test_parent_get_name(self): """Read the name of the parent through mocking DeviceProxy.dev_name""" name_station = "stat/stationmanager/1" - name_antennafield = "stat/antennafield/1" mock_dev_station_name = Mock() mock_dev_station_name.dev_name.return_value = name_station - mock_dev_field_name = Mock() - mock_dev_field_name.dev_name.return_value = name_antennafield - self.device_proxy_mock["object"].side_effect = [ - mock_dev_station_name, - mock_dev_field_name, - ] + self.device_proxy_mock["object"].side_effect = [mock_dev_station_name] test = TestAbstractHierarchy.ConcreteHierarchy( self.TEST_PROPERTY_NAME, children=None, - parents=[name_station, name_antennafield], - ) - - self.assertEqual(name_station, test.parents()[0]) - self.assertEqual(name_antennafield, test.parents()[1]) - - def test_parent_get_name_no_parents(self): - """Read the name of the parent when there is no parent""" - - test = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, children=None + parent=name_station, ) - self.assertEqual([], test.parents()) + self.assertEqual(name_station, test.parent()) def test_parent_read_attribute(self): """Read an attribute from the parent and get the mocked data""" @@ -93,22 +77,22 @@ class TestAbstractHierarchy(device_base.DeviceTestCase): ].return_value.dev_name.return_value = name_station test = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, children=None, parents=[name_station] + self.TEST_PROPERTY_NAME, children=None, parent=name_station ) self.assertEqual( attribute_value, - test.read_attribute(name_station, "FPGA_firmware_version_R"), + test.read_attribute("FPGA_firmware_version_R"), ) def test_parent_read_attribute_no_parent(self): """Ensure that read_attribute returns None if there is no parent""" test = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, children=None, parents=None + self.TEST_PROPERTY_NAME, children=None, parent=None ) - self.assertIsNone(test.read_attribute("parent", "FPGA_firmware_version_R")) + self.assertIsNone(test.read_attribute("FPGA_firmware_version_R")) def test_parent_get_state(self): """Ensure that we can get parent state""" @@ -123,19 +107,19 @@ class TestAbstractHierarchy(device_base.DeviceTestCase): ].return_value.dev_name.return_value = name_station test = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, children=None, parents=[name_station] + self.TEST_PROPERTY_NAME, children=None, parent=name_station ) - self.assertEqual(DevState.FAULT, test.state(name_station)) + self.assertEqual(DevState.FAULT, test.state()) def test_parent_get_state_no_parent(self): """Ensure that state returns None if no parent""" test = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, children=None, parents=None + self.TEST_PROPERTY_NAME, children=None, parent=None ) - self.assertIsNone(test.state("parent")) + self.assertIsNone(test.state()) def children_test_base( self, @@ -262,30 +246,6 @@ class TestAbstractHierarchy(device_base.DeviceTestCase): _ = test_hierarchy.parent() - def test_parent_does_not_exist(self): - """Test finding a non-existing parent to raise an exception""" - - test_hierarchy = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, ["stat/child/1", "stat/child/2"], [] - ) - - # try to find a non-existing parent - with self.assertRaises(hierarchy.NotFoundException): - _ = test_hierarchy.parent() - - def test_parent_not_unique(self): - """Test finding a unique parent when there are multiple to raise an exception""" - - test_hierarchy = TestAbstractHierarchy.ConcreteHierarchy( - self.TEST_PROPERTY_NAME, - ["stat/child/1", "stat/child/2"], - ["stat/parent/1", "stat/parent/2"], - ) - - # try to find a unique parent - with self.assertRaises(hierarchy.NotFoundException): - _ = test_hierarchy.parent() - TEST_CHILDREN_ROOT = ["stat/ccd/1", "stat/psoc/1", "stat/antennafield/1"] TEST_CHILDREN_ANTENNAFIELD = [ -- GitLab