Skip to content
Snippets Groups Projects
Commit 2af49286 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'L2SS-1496-mapper-for-lba' into 'master'

L2SS-1496: refactor mapper for lba

Closes L2SS-1496

See merge request !728
parents ee829eb4 71e2d319
Branches
No related tags found
1 merge request!728L2SS-1496: refactor mapper for lba
......@@ -157,7 +157,7 @@ class TestCalibrationDevice(AbstractTestBases.TestDeviceBase):
def test_calibrate_recv(self):
calibration_properties = {
"Antenna_Type": ["LBA"],
"Antenna_Type": ["HBA"],
"Antenna_Cables": ["50m", "80m"] * (DEFAULT_N_HBA_TILES // 2),
"Control_to_RECV_mapping":
# [1, 0, 1, 1, 1, 2, 1, x ... 1, 47]
......
......@@ -1004,6 +1004,7 @@ class AntennaField(LOFARDevice):
recv_mapping,
self.get_mapped_dimensions("RECV"),
number_of_receivers,
antenna_type=self.read_attribute("Antenna_Type_R"),
)
def __setup_sdp_mapper(self):
......
......@@ -55,7 +55,7 @@ class MappedAttribute(attribute):
cast_type = dtype
while not type_not_sequence(cast_type):
cast_type = cast_type[0]
write_func = device.set_mapped_attribute(
_write_func = device.set_mapped_attribute(
mapping_attribute, value, cast_type, mapping_device
)
......@@ -84,6 +84,7 @@ class AntennaMapper:
self,
mapping_dict: Dict[str, numpy.ndarray],
num_devices: Optional[int] = 1,
antenna_type: str = "HBA",
) -> None:
self._mapping_dict = mapping_dict
self._num_devices = num_devices
......@@ -94,6 +95,8 @@ class AntennaMapper:
self._reshape_attributes_out = {}
self._fill_attributes_in = {}
self._empty_attributes_out = {}
self._antenna_type = antenna_type
self._pwr_mapping = []
def map_read(self, mapped_attribute: str, device_values: List[any]) -> List[any]:
"""Perform a mapped read for the attribute using the device_results
......@@ -119,7 +122,11 @@ class AntennaMapper:
)
return self._mapped_r_values(
device_values, default_values, self._value_mapper[mapped_attribute]
device_values,
default_values,
self._value_mapper[mapped_attribute],
self._antenna_type,
mapped_attribute in self._pwr_mapping,
)
def map_write(self, mapped_attribute: str, set_values: List[any]) -> List[any]:
......@@ -133,7 +140,11 @@ class AntennaMapper:
default_values = self._masked_value_mapping_write[mapped_attribute]
mapped_values = self._mapped_rw_values(
set_values, default_values, self._value_mapper[mapped_attribute]
set_values,
default_values,
self._value_mapper[mapped_attribute],
self._antenna_type,
mapped_attribute in self._pwr_mapping,
)
# Attributes which need to be reshaped with removing a part of their duplicated
......@@ -160,6 +171,8 @@ class AntennaMapper:
device_values: List[any],
default_values: List[any],
value_mapping: List[any],
antenna_type: str,
pwr_attribute: bool,
):
"""Mapping for read operation"""
mapped_values = numpy.array(default_values)
......@@ -189,6 +202,15 @@ class AntennaMapper:
(dev_power, dev_input_power),
(dev_control, dev_input_control),
) in enumerate(zip(power_mapping, control_mapping)):
if antenna_type == "LBA" and pwr_attribute:
# We should report the value of both mappings for each antenna (2 values),
# but for now we pick only one to keep the resulting array of similar
# dimension compared to HBA
if dev_power > 0 and dev_control > 0:
mapped_values[idx] = device_values[dev_power - 1][
dev_input_power
]
else:
# Insert the two values in the mapped array
if dev_power > 0:
mapped_values[idx][0] = device_values[dev_power - 1][
......@@ -206,6 +228,8 @@ class AntennaMapper:
set_values: List[any],
default_values: List[any],
value_mapping: List[any],
antenna_type: str,
pwr_attribute: bool,
):
if MappingKeys.CONTROL in list(self._mapping_dict.keys()):
mapped_values = []
......@@ -234,13 +258,18 @@ class AntennaMapper:
(dev_power, dev_input_power),
(dev_control, dev_input_control),
) in enumerate(zip(power_mapping, control_mapping)):
if antenna_type == "LBA" and pwr_attribute:
if dev_power > 0 and dev_control > 0:
mapped_values[dev_power] = set_values[idx]
else:
if dev_power > 0:
mapped_values[dev_power - 1, dev_input_power] = set_values[idx][0]
if dev_control > 0:
mapped_values[dev_control - 1, dev_input_control] = set_values[idx][
1
mapped_values[dev_power - 1, dev_input_power] = set_values[idx][
0
]
if dev_control > 0:
mapped_values[dev_control - 1, dev_input_control] = set_values[
idx
][1]
return mapped_values
def _init_reshape_out(self, mapped_attributes: List[str], device_type: str):
......@@ -296,8 +325,10 @@ class AntennaToRecvMapper(AntennaMapper):
_VALUE_MAP_NONE_96_32 = numpy.full((MAX_ANTENNA, N_elements * N_pol), None)
_VALUE_MAP_NONE_96_2 = numpy.full((MAX_ANTENNA, 2), None)
def __init__(self, mapping_dict, mapped_dimensions, number_of_receivers):
super().__init__(mapping_dict, number_of_receivers)
def __init__(
self, mapping_dict, mapped_dimensions, number_of_receivers, antenna_type
):
super().__init__(mapping_dict, number_of_receivers, antenna_type)
self._power_mapping = self._mapping_dict[MappingKeys.POWER]
self._control_mapping = self._mapping_dict[MappingKeys.CONTROL]
......@@ -315,7 +346,7 @@ class AntennaToRecvMapper(AntennaMapper):
self._mapped_attributes = list(mapped_dimensions)
self._value_mapper = self._init_value_mapper()
self._value_mapper = self._init_value_mapper(antenna_type)
self._default_value_mapping_read = {
"ANT_mask_RW": value_map_ant_bool,
......@@ -402,10 +433,10 @@ class AntennaToRecvMapper(AntennaMapper):
masked_write[attr] = AntennaToRecvMapper._VALUE_MAP_NONE_96_32
return masked_write
def _init_value_mapper(self):
def _init_value_mapper(self, antenna_type: str):
"""Create the value mapping dictionary"""
value_mapper = {}
double_mapping = [
self._double_mapping = [
"RCU_attenuator_dB_R",
"RCU_attenuator_dB_RW",
"RCU_band_select_R",
......@@ -413,11 +444,14 @@ class AntennaToRecvMapper(AntennaMapper):
"RCU_PCB_ID_R",
"RCU_PCB_version_R",
]
pwr_mapping = ["RCU_PWR_ANT_on_R", "RCU_PWR_ANT_on_RW"]
self._pwr_mapping = ["RCU_PWR_ANT_on_R", "RCU_PWR_ANT_on_RW"]
for attr in self._mapped_attributes:
if attr in double_mapping:
if attr in self._double_mapping:
value_mapper[attr] = [self._power_mapping, self._control_mapping]
elif attr in self._pwr_mapping:
if antenna_type == "LBA":
value_mapper[attr] = [self._power_mapping, self._control_mapping]
elif attr in pwr_mapping:
elif antenna_type == "HBA":
value_mapper[attr] = [self._power_mapping]
else:
value_mapper[attr] = [self._control_mapping]
......@@ -435,7 +469,8 @@ class RecvDeviceWalker(object):
self.antenna_usage_mask = antenna_usage_mask
def recv_ant_masks(self) -> numpy.ndarray:
"""Return the antenna mask for the control inputs of the antennas enabled in Antenna_Usage_Mask_R."""
"""Return the antenna mask for the control inputs of the antennas enabled
in Antenna_Usage_Mask_R."""
nr_recv_devices = max(self.control_to_recv_mapping[:, 0])
......
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from test import base
import numpy
from tangostationcontrol.common.constants import N_rcu
from tangostationcontrol.devices.base_device_classes.mapper import RecvDeviceWalker
class TestRecvDeviceWalker(base.TestCase):
"""Test class for RecvDeviceWalker"""
class MockDeviceProxy:
"""Mock of DeviceProxy that simulates recv.ANT_mask_RW."""
@property
def ANT_mask_RW(self):
return self.ant_mask
@ANT_mask_RW.setter
def ANT_mask_RW(self, value):
self.ant_mask = value
def __init__(self, index):
self.visited = False
self.index = index
# fill with a value we don't use, to make sure
# there will be a difference when set
self.ant_mask = numpy.array([[False, True, True]] * N_rcu)
# save the value we originally use
self.original_ant_mask = self.ant_mask
def test_recv_masks_identity_mapping(self):
"""Test whether a straight setup works."""
control_to_recv_mapping = numpy.array([[1, 0], [1, 1], [1, 2]])
antenna_usage_mask = numpy.array([True, True, True])
sut = RecvDeviceWalker(control_to_recv_mapping, antenna_usage_mask)
expected = numpy.array(
[[[True, True, True]] + [[False, False, False]] * (N_rcu - 1)]
)
numpy.testing.assert_equal(expected, sut.recv_ant_masks())
def test_recv_masks_antenna_usage_mask(self):
"""Test whether the antenna_usage_mask is respected."""
control_to_recv_mapping = numpy.array([[1, 0], [1, 1], [1, 2]])
antenna_usage_mask = numpy.array([False, True, False])
sut = RecvDeviceWalker(control_to_recv_mapping, antenna_usage_mask)
expected = numpy.array(
[[[False, True, False]] + [[False, False, False]] * (N_rcu - 1)]
)
numpy.testing.assert_equal(expected, sut.recv_ant_masks())
def test_recv_masks_control_to_recv_mapping(self):
"""Test whether control_to_recv_mapping is respected."""
control_to_recv_mapping = numpy.array([[1, 0], [2, 1], [1, 2]])
antenna_usage_mask = numpy.array([True, True, True])
sut = RecvDeviceWalker(control_to_recv_mapping, antenna_usage_mask)
expected = numpy.array(
[
[[True, False, True]] + [[False, False, False]] * (N_rcu - 1),
[[False, True, False]] + [[False, False, False]] * (N_rcu - 1),
]
)
numpy.testing.assert_equal(expected, sut.recv_ant_masks())
def test_walk_receivers(self):
"""Test walk_receivers on multiple recv_proxies."""
control_to_recv_mapping = numpy.array([[1, 0], [2, 1], [1, 2]])
antenna_usage_mask = numpy.array([True, True, True])
sut = RecvDeviceWalker(control_to_recv_mapping, antenna_usage_mask)
recv_proxies = [TestRecvDeviceWalker.MockDeviceProxy(n + 1) for n in range(2)]
def visitor(recv_proxy):
recv_proxy.visited = True
# is our mask set correctly?
if recv_proxy.index == 1:
expected = numpy.array(
[[True, False, True]] + [[False, False, False]] * (N_rcu - 1)
)
elif recv_proxy.index == 2:
expected = numpy.array(
[[False, True, False]] + [[False, False, False]] * (N_rcu - 1)
)
numpy.testing.assert_equal(expected, recv_proxy.ANT_mask_RW)
sut.walk_receivers(recv_proxies, visitor)
# make sure both recv_proxies were visited
self.assertTrue(recv_proxies[0].visited)
self.assertTrue(recv_proxies[1].visited)
# make sure both masks were restored
numpy.testing.assert_equal(
recv_proxies[0].original_ant_mask, recv_proxies[0].ant_mask
)
numpy.testing.assert_equal(
recv_proxies[1].original_ant_mask, recv_proxies[1].ant_mask
)
def test_walk_receivers_restores_mask_on_exception(self):
"""Test whether walk_receivers() also restores the recv_proxy.ANT_mask_RW
if the visitor function throws."""
control_to_recv_mapping = numpy.array([[1, 0], [2, 1], [1, 2]])
antenna_usage_mask = numpy.array([True, True, True])
sut = RecvDeviceWalker(control_to_recv_mapping, antenna_usage_mask)
recv_proxies = [TestRecvDeviceWalker.MockDeviceProxy(n + 1) for n in range(2)]
class MyException(Exception):
"""A exception noone can raise but us."""
def visitor(recv_proxy):
raise MyException("foo")
with self.assertRaises(MyException):
sut.walk_receivers(recv_proxies, visitor)
# make sure no mask was disturbed
numpy.testing.assert_equal(
recv_proxies[0].original_ant_mask, recv_proxies[0].ant_mask
)
numpy.testing.assert_equal(
recv_proxies[1].original_ant_mask, recv_proxies[1].ant_mask
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment