diff --git a/tangostationcontrol/integration_test/default/devices/test_device_calibration.py b/tangostationcontrol/integration_test/default/devices/test_device_calibration.py index 12fd195830600f310d03562cf0420831cd8acc84..a724432974572deee74a613b1ec20df5bd51af9c 100644 --- a/tangostationcontrol/integration_test/default/devices/test_device_calibration.py +++ b/tangostationcontrol/integration_test/default/devices/test_device_calibration.py @@ -157,7 +157,7 @@ class TestCalibrationDevice(AbstractTestBases.TestDeviceBase): def test_calibrate_recv(self): calibration_properties = { - "Antenna_Type": ["LBA"], + "Antenna_Type": ["HBA"], "Antenna_Cables": ["50m", "80m"] * (DEFAULT_N_HBA_TILES // 2), "Control_to_RECV_mapping": # [1, 0, 1, 1, 1, 2, 1, x ... 1, 47] diff --git a/tangostationcontrol/tangostationcontrol/devices/antennafield.py b/tangostationcontrol/tangostationcontrol/devices/antennafield.py index 4c1cb078051576c7de02b1ea0b8e32e37ae17d51..1ccc569736748090541d6ca3c6eeb3de5d24ce2b 100644 --- a/tangostationcontrol/tangostationcontrol/devices/antennafield.py +++ b/tangostationcontrol/tangostationcontrol/devices/antennafield.py @@ -1004,6 +1004,7 @@ class AntennaField(LOFARDevice): recv_mapping, self.get_mapped_dimensions("RECV"), number_of_receivers, + antenna_type=self.read_attribute("Antenna_Type_R"), ) def __setup_sdp_mapper(self): diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/mapper.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/mapper.py index 1af7d34c06f5a59725d1a04a483a0e542902e04d..9e3bd89d0431a118416866d2ca014c4284008469 100644 --- a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/mapper.py +++ b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/mapper.py @@ -55,7 +55,7 @@ class MappedAttribute(attribute): cast_type = dtype while not type_not_sequence(cast_type): cast_type = cast_type[0] - write_func = device.set_mapped_attribute( + _write_func = device.set_mapped_attribute( mapping_attribute, value, cast_type, mapping_device ) @@ -84,6 +84,7 @@ class AntennaMapper: self, mapping_dict: Dict[str, numpy.ndarray], num_devices: Optional[int] = 1, + antenna_type: str = "HBA", ) -> None: self._mapping_dict = mapping_dict self._num_devices = num_devices @@ -94,6 +95,8 @@ class AntennaMapper: self._reshape_attributes_out = {} self._fill_attributes_in = {} self._empty_attributes_out = {} + self._antenna_type = antenna_type + self._pwr_mapping = [] def map_read(self, mapped_attribute: str, device_values: List[any]) -> List[any]: """Perform a mapped read for the attribute using the device_results @@ -119,7 +122,11 @@ class AntennaMapper: ) return self._mapped_r_values( - device_values, default_values, self._value_mapper[mapped_attribute] + device_values, + default_values, + self._value_mapper[mapped_attribute], + self._antenna_type, + mapped_attribute in self._pwr_mapping, ) def map_write(self, mapped_attribute: str, set_values: List[any]) -> List[any]: @@ -133,7 +140,11 @@ class AntennaMapper: default_values = self._masked_value_mapping_write[mapped_attribute] mapped_values = self._mapped_rw_values( - set_values, default_values, self._value_mapper[mapped_attribute] + set_values, + default_values, + self._value_mapper[mapped_attribute], + self._antenna_type, + mapped_attribute in self._pwr_mapping, ) # Attributes which need to be reshaped with removing a part of their duplicated @@ -160,6 +171,8 @@ class AntennaMapper: device_values: List[any], default_values: List[any], value_mapping: List[any], + antenna_type: str, + pwr_attribute: bool, ): """Mapping for read operation""" mapped_values = numpy.array(default_values) @@ -189,15 +202,24 @@ class AntennaMapper: (dev_power, dev_input_power), (dev_control, dev_input_control), ) in enumerate(zip(power_mapping, control_mapping)): - # Insert the two values in the mapped array - if dev_power > 0: - mapped_values[idx][0] = device_values[dev_power - 1][ - dev_input_power - ] - if dev_control > 0: - mapped_values[idx][1] = device_values[dev_control - 1][ - dev_input_control - ] + if antenna_type == "LBA" and pwr_attribute: + # We should report the value of both mappings for each antenna (2 values), + # but for now we pick only one to keep the resulting array of similar + # dimension compared to HBA + if dev_power > 0 and dev_control > 0: + mapped_values[idx] = device_values[dev_power - 1][ + dev_input_power + ] + else: + # Insert the two values in the mapped array + if dev_power > 0: + mapped_values[idx][0] = device_values[dev_power - 1][ + dev_input_power + ] + if dev_control > 0: + mapped_values[idx][1] = device_values[dev_control - 1][ + dev_input_control + ] return mapped_values @@ -206,6 +228,8 @@ class AntennaMapper: set_values: List[any], default_values: List[any], value_mapping: List[any], + antenna_type: str, + pwr_attribute: bool, ): if MappingKeys.CONTROL in list(self._mapping_dict.keys()): mapped_values = [] @@ -234,13 +258,18 @@ class AntennaMapper: (dev_power, dev_input_power), (dev_control, dev_input_control), ) in enumerate(zip(power_mapping, control_mapping)): - if dev_power > 0: - mapped_values[dev_power - 1, dev_input_power] = set_values[idx][0] - if dev_control > 0: - mapped_values[dev_control - 1, dev_input_control] = set_values[idx][ - 1 - ] - + if antenna_type == "LBA" and pwr_attribute: + if dev_power > 0 and dev_control > 0: + mapped_values[dev_power] = set_values[idx] + else: + if dev_power > 0: + mapped_values[dev_power - 1, dev_input_power] = set_values[idx][ + 0 + ] + if dev_control > 0: + mapped_values[dev_control - 1, dev_input_control] = set_values[ + idx + ][1] return mapped_values def _init_reshape_out(self, mapped_attributes: List[str], device_type: str): @@ -296,8 +325,10 @@ class AntennaToRecvMapper(AntennaMapper): _VALUE_MAP_NONE_96_32 = numpy.full((MAX_ANTENNA, N_elements * N_pol), None) _VALUE_MAP_NONE_96_2 = numpy.full((MAX_ANTENNA, 2), None) - def __init__(self, mapping_dict, mapped_dimensions, number_of_receivers): - super().__init__(mapping_dict, number_of_receivers) + def __init__( + self, mapping_dict, mapped_dimensions, number_of_receivers, antenna_type + ): + super().__init__(mapping_dict, number_of_receivers, antenna_type) self._power_mapping = self._mapping_dict[MappingKeys.POWER] self._control_mapping = self._mapping_dict[MappingKeys.CONTROL] @@ -315,7 +346,7 @@ class AntennaToRecvMapper(AntennaMapper): self._mapped_attributes = list(mapped_dimensions) - self._value_mapper = self._init_value_mapper() + self._value_mapper = self._init_value_mapper(antenna_type) self._default_value_mapping_read = { "ANT_mask_RW": value_map_ant_bool, @@ -402,10 +433,10 @@ class AntennaToRecvMapper(AntennaMapper): masked_write[attr] = AntennaToRecvMapper._VALUE_MAP_NONE_96_32 return masked_write - def _init_value_mapper(self): + def _init_value_mapper(self, antenna_type: str): """Create the value mapping dictionary""" value_mapper = {} - double_mapping = [ + self._double_mapping = [ "RCU_attenuator_dB_R", "RCU_attenuator_dB_RW", "RCU_band_select_R", @@ -413,12 +444,15 @@ class AntennaToRecvMapper(AntennaMapper): "RCU_PCB_ID_R", "RCU_PCB_version_R", ] - pwr_mapping = ["RCU_PWR_ANT_on_R", "RCU_PWR_ANT_on_RW"] + self._pwr_mapping = ["RCU_PWR_ANT_on_R", "RCU_PWR_ANT_on_RW"] for attr in self._mapped_attributes: - if attr in double_mapping: + if attr in self._double_mapping: value_mapper[attr] = [self._power_mapping, self._control_mapping] - elif attr in pwr_mapping: - value_mapper[attr] = [self._power_mapping] + elif attr in self._pwr_mapping: + if antenna_type == "LBA": + value_mapper[attr] = [self._power_mapping, self._control_mapping] + elif antenna_type == "HBA": + value_mapper[attr] = [self._power_mapping] else: value_mapper[attr] = [self._control_mapping] return value_mapper @@ -435,7 +469,8 @@ class RecvDeviceWalker(object): self.antenna_usage_mask = antenna_usage_mask def recv_ant_masks(self) -> numpy.ndarray: - """Return the antenna mask for the control inputs of the antennas enabled in Antenna_Usage_Mask_R.""" + """Return the antenna mask for the control inputs of the antennas enabled + in Antenna_Usage_Mask_R.""" nr_recv_devices = max(self.control_to_recv_mapping[:, 0]) diff --git a/tangostationcontrol/test/devices/base_device_classes/test_mapper.py b/tangostationcontrol/test/devices/base_device_classes/test_mapper.py new file mode 100644 index 0000000000000000000000000000000000000000..9dc78b7f2c4a5263a18548e74646d97b115ad1cc --- /dev/null +++ b/tangostationcontrol/test/devices/base_device_classes/test_mapper.py @@ -0,0 +1,142 @@ +# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) +# SPDX-License-Identifier: Apache-2.0 + +from test import base +import numpy + +from tangostationcontrol.common.constants import N_rcu +from tangostationcontrol.devices.base_device_classes.mapper import RecvDeviceWalker + + +class TestRecvDeviceWalker(base.TestCase): + """Test class for RecvDeviceWalker""" + + class MockDeviceProxy: + """Mock of DeviceProxy that simulates recv.ANT_mask_RW.""" + + @property + def ANT_mask_RW(self): + return self.ant_mask + + @ANT_mask_RW.setter + def ANT_mask_RW(self, value): + self.ant_mask = value + + def __init__(self, index): + self.visited = False + self.index = index + + # fill with a value we don't use, to make sure + # there will be a difference when set + self.ant_mask = numpy.array([[False, True, True]] * N_rcu) + + # save the value we originally use + self.original_ant_mask = self.ant_mask + + def test_recv_masks_identity_mapping(self): + """Test whether a straight setup works.""" + + control_to_recv_mapping = numpy.array([[1, 0], [1, 1], [1, 2]]) + antenna_usage_mask = numpy.array([True, True, True]) + + sut = RecvDeviceWalker(control_to_recv_mapping, antenna_usage_mask) + + expected = numpy.array( + [[[True, True, True]] + [[False, False, False]] * (N_rcu - 1)] + ) + numpy.testing.assert_equal(expected, sut.recv_ant_masks()) + + def test_recv_masks_antenna_usage_mask(self): + """Test whether the antenna_usage_mask is respected.""" + + control_to_recv_mapping = numpy.array([[1, 0], [1, 1], [1, 2]]) + antenna_usage_mask = numpy.array([False, True, False]) + + sut = RecvDeviceWalker(control_to_recv_mapping, antenna_usage_mask) + + expected = numpy.array( + [[[False, True, False]] + [[False, False, False]] * (N_rcu - 1)] + ) + numpy.testing.assert_equal(expected, sut.recv_ant_masks()) + + def test_recv_masks_control_to_recv_mapping(self): + """Test whether control_to_recv_mapping is respected.""" + + control_to_recv_mapping = numpy.array([[1, 0], [2, 1], [1, 2]]) + antenna_usage_mask = numpy.array([True, True, True]) + + sut = RecvDeviceWalker(control_to_recv_mapping, antenna_usage_mask) + + expected = numpy.array( + [ + [[True, False, True]] + [[False, False, False]] * (N_rcu - 1), + [[False, True, False]] + [[False, False, False]] * (N_rcu - 1), + ] + ) + numpy.testing.assert_equal(expected, sut.recv_ant_masks()) + + def test_walk_receivers(self): + """Test walk_receivers on multiple recv_proxies.""" + + control_to_recv_mapping = numpy.array([[1, 0], [2, 1], [1, 2]]) + antenna_usage_mask = numpy.array([True, True, True]) + + sut = RecvDeviceWalker(control_to_recv_mapping, antenna_usage_mask) + + recv_proxies = [TestRecvDeviceWalker.MockDeviceProxy(n + 1) for n in range(2)] + + def visitor(recv_proxy): + recv_proxy.visited = True + + # is our mask set correctly? + if recv_proxy.index == 1: + expected = numpy.array( + [[True, False, True]] + [[False, False, False]] * (N_rcu - 1) + ) + elif recv_proxy.index == 2: + expected = numpy.array( + [[False, True, False]] + [[False, False, False]] * (N_rcu - 1) + ) + + numpy.testing.assert_equal(expected, recv_proxy.ANT_mask_RW) + + sut.walk_receivers(recv_proxies, visitor) + + # make sure both recv_proxies were visited + self.assertTrue(recv_proxies[0].visited) + self.assertTrue(recv_proxies[1].visited) + + # make sure both masks were restored + numpy.testing.assert_equal( + recv_proxies[0].original_ant_mask, recv_proxies[0].ant_mask + ) + numpy.testing.assert_equal( + recv_proxies[1].original_ant_mask, recv_proxies[1].ant_mask + ) + + def test_walk_receivers_restores_mask_on_exception(self): + """Test whether walk_receivers() also restores the recv_proxy.ANT_mask_RW + if the visitor function throws.""" + control_to_recv_mapping = numpy.array([[1, 0], [2, 1], [1, 2]]) + antenna_usage_mask = numpy.array([True, True, True]) + + sut = RecvDeviceWalker(control_to_recv_mapping, antenna_usage_mask) + + recv_proxies = [TestRecvDeviceWalker.MockDeviceProxy(n + 1) for n in range(2)] + + class MyException(Exception): + """A exception noone can raise but us.""" + + def visitor(recv_proxy): + raise MyException("foo") + + with self.assertRaises(MyException): + sut.walk_receivers(recv_proxies, visitor) + + # make sure no mask was disturbed + numpy.testing.assert_equal( + recv_proxies[0].original_ant_mask, recv_proxies[0].ant_mask + ) + numpy.testing.assert_equal( + recv_proxies[1].original_ant_mask, recv_proxies[1].ant_mask + ) diff --git a/tangostationcontrol/test/devices/test_antennafield_device.py b/tangostationcontrol/test/devices/test_antennafield_device.py index 0ce70d37a2b9af165372f63fc2bed2ef9e370ae7..685e3fceaa17a95bd06160ec468f6e57489c64e0 100644 --- a/tangostationcontrol/test/devices/test_antennafield_device.py +++ b/tangostationcontrol/test/devices/test_antennafield_device.py @@ -30,7 +30,6 @@ from tangostationcontrol.devices.base_device_classes.mapper import ( MappingKeys, AntennaToRecvMapper, AntennaToSdpMapper, - RecvDeviceWalker, ) logger = logging.getLogger() @@ -38,6 +37,8 @@ logger = logging.getLogger() SDP_MAPPED_ATTRS = AntennaField.get_mapped_dimensions("SDP") RECV_MAPPED_ATTRS = AntennaField.get_mapped_dimensions("RECV") +DEFAULT_N_LBA = DEFAULT_N_HBA_TILES # 48 antennas + class TestAntennaToSdpMapper(base.TestCase): """Test class for AntennaToSDPMapper""" @@ -151,7 +152,106 @@ class TestAntennaToSdpMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) -class TestAntennaToRecvMapper(base.TestCase): +class TestLBAToRecvMapper(base.TestCase): + """Test class for AntennaToRecvMapper with LBA AntennaType""" + + # A mapping where Antennas are all not mapped to power RCUs + POWER_NOT_CONNECTED = [[-1, -1]] * 2 * DEFAULT_N_LBA + # A mapping where Antennas are all not mapped to control RCUs + CONTROL_NOT_CONNECTED = [[-1, -1]] * 2 * DEFAULT_N_LBA + # A mapping where first two Antennas are mapped on the first Receiver. + # The first Antenna control line on RCU 1 and the second Antenna control line + # on RCU 0. + POWER_LBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = ( + [[1, 1], [1, 0]] + + [[-1, -1]] * (DEFAULT_N_LBA - 2) + + [[2, 1], [2, 0]] + + [[-1, -1]] * (DEFAULT_N_LBA - 2) + ) + CONTROL_LBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = ( + POWER_LBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 + ) + ANTENNA_TYPE = "LBA" + + def test_map_write_rcu_pwr_ant_on_no_mapping_and_one_receiver(self): + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper( + recv_mapping, + RECV_MAPPED_ATTRS, + 1, + self.ANTENNA_TYPE, + ) + + set_values = [None] * DEFAULT_N_LBA + expected = [[[None, None, None]] * N_rcu] + actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values) + numpy.testing.assert_equal(expected, actual) + + def test_map_write_rcu_pwr_ant_on_no_mapping_and_two_receivers(self): + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 2, self.ANTENNA_TYPE + ) + + set_values = [None] * DEFAULT_N_LBA + expected = [[[None, None, None]] * N_rcu] * 2 + actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values) + numpy.testing.assert_equal(expected, actual) + + def test_map_write_rcu_pwr_ant_on_hba_0_and_1_on_rcu_1_and_0_of_recv_1_and_no_ctrl( + self, + ): + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_LBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + } + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) + + set_values = [1, 0] + [None] * (DEFAULT_N_LBA - 2) + expected = [[[None, None, None]] + [[None, None, None]] * (N_rcu - 1)] + actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values) + numpy.testing.assert_equal(expected, actual) + + def test_map_read_rcu_pwr_ant_on_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_LBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_LBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + } + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 2, self.ANTENNA_TYPE + ) + + receiver_values = [[False, True] + [False] * (MAX_ANTENNA - 2)] * 2 + expected = ([True, False] + [False] * (int(MAX_ANTENNA / 2) - 2)) * 2 + actual = mapper.map_read("RCU_PWR_ANT_on_R", receiver_values) + numpy.testing.assert_equal(expected, actual) + + def test_map_write_rcu_pwr_ant_on_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_LBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + } + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) + + set_values = [1, 0] + [None] * (DEFAULT_N_LBA - 2) + expected = [[[None, None, None]] * N_rcu] + actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values) + numpy.testing.assert_equal(expected, actual) + + +class TestHBAToRecvMapper(base.TestCase): + """Test class for AntennaToRecvMapper with HBA AntennaType""" + # A mapping where Antennas are all not mapped to power RCUs POWER_NOT_CONNECTED = [[-1, -1]] * DEFAULT_N_HBA_TILES # A mapping where Antennas are all not mapped to control RCUs @@ -168,13 +268,16 @@ class TestAntennaToRecvMapper(base.TestCase): CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1 = [[1, 3], [1, 2]] + [[-1, -1]] * ( DEFAULT_N_HBA_TILES - 2 ) + ANTENNA_TYPE = "HBA" def test_ant_read_mask_r_no_mapping(self): recv_mapping = { MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [False] * MAX_ANTENNA, @@ -190,7 +293,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [False, True, False] + [False, False, False] * (N_rcu - 1), @@ -207,7 +312,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [[0] * MAX_ANTENNA, [0] * MAX_ANTENNA, [0] * MAX_ANTENNA] expected = [[0, 0]] * DEFAULT_N_HBA_TILES actual = mapper.map_read("RCU_band_select_RW", receiver_values) @@ -218,7 +325,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[0] * N_rcu] * MAX_ANTENNA, @@ -234,7 +343,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[2] * N_rcu, [1] * N_rcu] + [[0] * N_rcu] * (MAX_ANTENNA - 2), @@ -253,7 +364,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[0] * N_rcu] * MAX_ANTENNA, @@ -269,7 +382,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[2] * N_rcu, [1] * N_rcu] + [[0] * N_rcu] * (MAX_ANTENNA - 2), @@ -288,7 +403,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[False] * N_rcu] * MAX_ANTENNA, @@ -304,7 +421,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[False, True] * 16, [True, False] * 16] @@ -324,7 +443,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[False] * N_rcu] * MAX_ANTENNA, @@ -340,7 +461,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[False, True] * 16, [True, False] * 16] @@ -360,7 +483,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[False] * N_rcu] * MAX_ANTENNA, @@ -376,7 +501,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[False, True] * 16, [True, False] * 16] @@ -396,7 +523,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[False] * N_rcu] * MAX_ANTENNA, @@ -412,7 +541,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[False, True] * 16, [True, False] * 16] @@ -432,7 +563,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[False] * N_rcu] * MAX_ANTENNA, @@ -448,7 +581,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[False, True] * 16, [True, False] * 16] @@ -468,7 +603,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[False] * N_rcu] * MAX_ANTENNA, @@ -484,7 +621,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ [[False, True] * 16, [True, False] * 16] @@ -505,7 +644,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [list(range(32)), [0] * 32, [0] * 32] expected = [[0] * 2] * DEFAULT_N_HBA_TILES actual = mapper.map_read("RCU_PCB_ID_R", receiver_values) @@ -518,7 +659,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [list(range(32)), [0] * 32, [0] * 32] expected = [[0, 3], [0, 2]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) actual = mapper.map_read("RCU_PCB_ID_R", receiver_values) @@ -531,7 +674,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [list(range(32)), [0] * 32, [0] * 32] expected = [[1, 0], [0, 0]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) actual = mapper.map_read("RCU_PCB_ID_R", receiver_values) @@ -544,7 +689,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1, MappingKeys.POWER: self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [list(range(32)), [0] * 32, [0] * 32] expected = [[1, 3], [0, 2]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) actual = mapper.map_read("RCU_PCB_ID_R", receiver_values) @@ -556,7 +703,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 3, self.ANTENNA_TYPE + ) receiver_values = [ list(range(MAX_ANTENNA)), [0] * MAX_ANTENNA, @@ -573,7 +722,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) receiver_values = [list(range(MAX_ANTENNA))] expected = [[0, 1], [0, 0]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) actual = mapper.map_read("RCU_attenuator_dB_R", receiver_values) @@ -586,7 +737,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) receiver_values = [list(range(MAX_ANTENNA))] expected = [[1, 0], [0, 0]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) actual = mapper.map_read("RCU_attenuator_dB_R", receiver_values) @@ -599,7 +752,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1, MappingKeys.POWER: self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) receiver_values = [list(range(MAX_ANTENNA))] expected = [[1, 3], [0, 2]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) actual = mapper.map_read("RCU_attenuator_dB_R", receiver_values) @@ -613,7 +768,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [None] * DEFAULT_N_HBA_TILES expected = [[[None, None, None]] * N_rcu] @@ -626,7 +783,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 2) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 2, self.ANTENNA_TYPE + ) set_values = [None] * DEFAULT_N_HBA_TILES expected = [[[None, None, None]] * N_rcu] * 2 @@ -638,7 +797,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [True, False] + [None] * (DEFAULT_N_HBA_TILES - 2) expected = [[[False, True, None]] + [[None, None, None]] * (N_rcu - 1)] @@ -650,7 +811,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [None] * DEFAULT_N_HBA_TILES expected = [[[None, None, None]] * N_rcu] @@ -662,7 +825,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 2) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 2, self.ANTENNA_TYPE + ) set_values = [None] * DEFAULT_N_HBA_TILES expected = [[[None, None, None]] * N_rcu] * 2 @@ -674,7 +839,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [1, 0] + [None] * (DEFAULT_N_HBA_TILES - 2) expected = [[[0, 1, None]] + [[None, None, None]] * (N_rcu - 1)] @@ -686,7 +853,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [None] * DEFAULT_N_HBA_TILES expected = [[[None, None, None]] * N_rcu] @@ -698,7 +867,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 2) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 2, self.ANTENNA_TYPE + ) set_values = [None] * DEFAULT_N_HBA_TILES expected = [[[None, None, None]] * N_rcu] * 2 @@ -710,7 +881,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [[1, 1], [0, 0]] + [[None, None]] * (DEFAULT_N_HBA_TILES - 2) expected = [[[0, 1, None]] + [[None, None, None]] * (N_rcu - 1)] @@ -722,7 +895,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [[1] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA] @@ -734,7 +909,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 2) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 2, self.ANTENNA_TYPE + ) set_values = [[1] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] @@ -746,7 +923,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [[1] * N_rcu, [2] * N_rcu] + [[None] * N_rcu] * ( DEFAULT_N_HBA_TILES - 2 @@ -760,7 +939,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA] @@ -772,7 +953,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 2) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 2, self.ANTENNA_TYPE + ) set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] @@ -784,7 +967,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * ( DEFAULT_N_HBA_TILES - 2 @@ -801,7 +986,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA] @@ -813,7 +1000,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 2) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 2, self.ANTENNA_TYPE + ) set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] @@ -825,7 +1014,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * ( DEFAULT_N_HBA_TILES - 2 @@ -842,7 +1033,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_PWR_on_RW", set_values) @@ -853,7 +1046,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 2) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 2, self.ANTENNA_TYPE + ) set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] @@ -865,7 +1060,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * ( DEFAULT_N_HBA_TILES - 2 @@ -883,7 +1080,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [[1, 1]] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu_inp] * N_rcu] @@ -898,7 +1097,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, MappingKeys.POWER: self.POWER_NOT_CONNECTED, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = [[1, 2], [3, 4]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) expected = [[[4, 2, None]] + [[None] * N_rcu_inp] * (N_rcu - 1)] @@ -913,7 +1114,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, MappingKeys.POWER: self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = set_values = [[1, 2], [3, 4]] + [[0, 0]] * ( DEFAULT_N_HBA_TILES - 2 @@ -930,7 +1133,9 @@ class TestAntennaToRecvMapper(base.TestCase): MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1, MappingKeys.POWER: self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, } - mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) + mapper = AntennaToRecvMapper( + recv_mapping, RECV_MAPPED_ATTRS, 1, self.ANTENNA_TYPE + ) set_values = set_values = [[1, 2], [3, 4]] + [[0, 0]] * ( DEFAULT_N_HBA_TILES - 2 @@ -942,6 +1147,8 @@ class TestAntennaToRecvMapper(base.TestCase): class TestAntennafieldDevice(device_base.DeviceTestCase): + """Test class for AntennaField device""" + # some dummy values for mandatory properties AT_PROPERTIES = { "OPC_Server_Name": "example.com", @@ -954,11 +1161,16 @@ class TestAntennafieldDevice(device_base.DeviceTestCase): ANTENNA_PROPERTIES = { "Antenna_Sets": ["INNER", "OUTER", "SPARSE_EVEN", "SPARSE_ODD", "ALL"], "Antenna_Set_Masks": [ - "111111111111111111111111111111111111111111111111000000000000000000000000000000000000000000000000", - "000000000000000000000000000000000000000000000000111111111111111111111111111111111111111111111111", - "101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010", - "010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101", - "111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111", + "111111111111111111111111111111111111111111111111" + + "000000000000000000000000000000000000000000000000", + "000000000000000000000000000000000000000000000000" + + "111111111111111111111111111111111111111111111111", + "101010101010101010101010101010101010101010101010" + + "101010101010101010101010101010101010101010101010", + "010101010101010101010101010101010101010101010101" + + "010101010101010101010101010101010101010101010101", + "111111111111111111111111111111111111111111111111" + + "111111111111111111111111111111111111111111111111", ], } @@ -1107,137 +1319,3 @@ class TestAntennafieldDevice(device_base.DeviceTestCase): ) as proxy: expected = [True] * int(MAX_ANTENNA / 2) + [False] * int(MAX_ANTENNA / 2) numpy.testing.assert_equal(expected, proxy.antenna_set_to_mask("INNER")) - - -class TestRecvDeviceWalker(base.TestCase): - class MockDeviceProxy: - """Mock of DeviceProxy that simulates recv.ANT_mask_RW.""" - - @property - def ANT_mask_RW(self): - return self.ant_mask - - @ANT_mask_RW.setter - def ANT_mask_RW(self, value): - self.ant_mask = value - - def __init__(self, index): - self.visited = False - self.index = index - - # fill with a value we don't use, to make sure - # there will be a difference when set - self.ant_mask = numpy.array([[False, True, True]] * N_rcu) - - # save the value we originally use - self.original_ant_mask = self.ant_mask - - def test_recv_masks_identity_mapping(self): - """Test whether a straight setup works.""" - - control_to_recv_mapping = numpy.array([[1, 0], [1, 1], [1, 2]]) - antenna_usage_mask = numpy.array([True, True, True]) - - sut = RecvDeviceWalker(control_to_recv_mapping, antenna_usage_mask) - - expected = numpy.array( - [[[True, True, True]] + [[False, False, False]] * (N_rcu - 1)] - ) - numpy.testing.assert_equal(expected, sut.recv_ant_masks()) - - def test_recv_masks_antenna_usage_mask(self): - """Test whether the antenna_usage_mask is respected.""" - - control_to_recv_mapping = numpy.array([[1, 0], [1, 1], [1, 2]]) - antenna_usage_mask = numpy.array([False, True, False]) - - sut = RecvDeviceWalker(control_to_recv_mapping, antenna_usage_mask) - - expected = numpy.array( - [[[False, True, False]] + [[False, False, False]] * (N_rcu - 1)] - ) - numpy.testing.assert_equal(expected, sut.recv_ant_masks()) - - def test_recv_masks_control_to_recv_mapping(self): - """Test whether control_to_recv_mapping is respected.""" - - control_to_recv_mapping = numpy.array([[1, 0], [2, 1], [1, 2]]) - antenna_usage_mask = numpy.array([True, True, True]) - - sut = RecvDeviceWalker(control_to_recv_mapping, antenna_usage_mask) - - expected = numpy.array( - [ - [[True, False, True]] + [[False, False, False]] * (N_rcu - 1), - [[False, True, False]] + [[False, False, False]] * (N_rcu - 1), - ] - ) - numpy.testing.assert_equal(expected, sut.recv_ant_masks()) - - def test_walk_receivers(self): - """Test walk_receivers on multiple recv_proxies.""" - - control_to_recv_mapping = numpy.array([[1, 0], [2, 1], [1, 2]]) - antenna_usage_mask = numpy.array([True, True, True]) - - sut = RecvDeviceWalker(control_to_recv_mapping, antenna_usage_mask) - - recv_proxies = [TestRecvDeviceWalker.MockDeviceProxy(n + 1) for n in range(2)] - - def visitor(recv_proxy): - recv_proxy.visited = True - - # is our mask set correctly? - if recv_proxy.index == 1: - expected = numpy.array( - [[True, False, True]] + [[False, False, False]] * (N_rcu - 1) - ) - elif recv_proxy.index == 2: - expected = numpy.array( - [[False, True, False]] + [[False, False, False]] * (N_rcu - 1) - ) - - numpy.testing.assert_equal(expected, recv_proxy.ANT_mask_RW) - - sut.walk_receivers(recv_proxies, visitor) - - # make sure both recv_proxies were visited - self.assertTrue(recv_proxies[0].visited) - self.assertTrue(recv_proxies[1].visited) - - # make sure both masks were restored - numpy.testing.assert_equal( - recv_proxies[0].original_ant_mask, recv_proxies[0].ant_mask - ) - numpy.testing.assert_equal( - recv_proxies[1].original_ant_mask, recv_proxies[1].ant_mask - ) - - def test_walk_receivers_restores_mask_on_exception(self): - """Test whether walk_receivers() also restores the recv_proxy.ANT_mask_RW - if the visitor function throws.""" - control_to_recv_mapping = numpy.array([[1, 0], [2, 1], [1, 2]]) - antenna_usage_mask = numpy.array([True, True, True]) - - sut = RecvDeviceWalker(control_to_recv_mapping, antenna_usage_mask) - - recv_proxies = [TestRecvDeviceWalker.MockDeviceProxy(n + 1) for n in range(2)] - - class MyException(Exception): - """A exception noone can raise but us.""" - - pass - - def visitor(recv_proxy): - raise MyException("foo") - - with self.assertRaises(MyException): - sut.walk_receivers(recv_proxies, visitor) - - # make sure no mask was disturbed - numpy.testing.assert_equal( - recv_proxies[0].original_ant_mask, recv_proxies[0].ant_mask - ) - numpy.testing.assert_equal( - recv_proxies[1].original_ant_mask, recv_proxies[1].ant_mask - )