Skip to content
Snippets Groups Projects
Commit c02366d5 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-1357-expose-used-antennas' into 'master'

Resolve L2SS-1537 "Expose used antennas"

Closes L2SS-1537

See merge request !744
parents c44e135a 1b311e85
No related branches found
No related tags found
1 merge request!744Resolve L2SS-1537 "Expose used antennas"
......@@ -115,6 +115,7 @@ Next change the version in the following places:
# Release Notes
* 0.21.3 Added DigitalBeam.Antenna_Usage_Mask_R to expose antennas used in beamforming
* 0.21.2 Removed deprecated "Boot" device (use StationManager now)
* 0.21.1 Implement multi project integration downstream pipeline
* 0.21.0 Use radians instead of degrees when interpreting pointings
......
0.21.2
0.21.3
......@@ -26,6 +26,35 @@ class DigitalBeamManager(AbstractBeamManager):
self.relative_antenna_positions = None
super().__init__()
def antennas_used(self) -> list[bool]:
"""
Return the set of antennas actually used in the beamforming, after
all masks and selectionis have been applied.
"""
# Use zero weights for antennas that we should not use:
# - if the antenna has no input mapped on the SDP
# - if the antenna is bad (False in Antenna_Usage_Mask_R)
# - if the antenna is not in the antenna set (False in Antenna_Mask_RW)
antenna_usage_mask = self._device.control.read_parent_attribute(
"Antenna_Usage_Mask_R"
)
antenna_mask = self._device.read_Antenna_Mask_R()
antenna_to_sdp_mapping = self._device.control.read_parent_attribute(
"Antenna_to_SDP_Mapping_R"
)
antennas_used = [False] * len(antenna_mask)
for antenna_nr, (_fpga_nr, input_nr) in enumerate(antenna_to_sdp_mapping):
antennas_used[antenna_nr] = (
input_nr >= 0
and antenna_usage_mask[antenna_nr]
and antenna_mask[antenna_nr]
)
return antennas_used
@TimeIt()
def delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime):
"""
......@@ -74,24 +103,14 @@ class DigitalBeamManager(AbstractBeamManager):
beam_weights = self._device.beamlet.calculate_bf_weights(beam_delays.flatten())
beam_weights = beam_weights.reshape(N_pn, A_pn, N_beamlets_ctrl)
# Use zero weights for antennas that we should not use:
# - if the antenna has no input mapped on the SDP
# - if the antenna is bad (False in Antenna_Usage_Mask_R)
# - if the antenna is not in the antenna set (False in Antenna_Mask_RW)
antenna_usage_mask = self._device.control.read_parent_attribute(
"Antenna_Usage_Mask_R"
)
antenna_mask = self._device.read_Antenna_Mask_R()
# Use zero weights for antennas that we should not use
antennas_used = self.antennas_used()
zeroes_for_all_beamlets = numpy.array([0] * N_beamlets_ctrl, dtype=numpy.uint32)
for antenna_nr, (fpga_nr, input_nr) in enumerate(
self._device.control.read_parent_attribute("Antenna_to_SDP_Mapping_R")
):
if (
input_nr < 0
or not antenna_usage_mask[antenna_nr]
or not antenna_mask[antenna_nr]
):
if not antennas_used[antenna_nr]:
beam_weights[fpga_nr, input_nr, :] = zeroes_for_all_beamlets
return beam_weights
......
......@@ -72,9 +72,19 @@ class DigitalBeam(BeamDevice):
max_dim_x=MAX_ANTENNA,
)
Antenna_Usage_Mask_R = attribute(
doc="Antennas used for beam forming. Excludes antennas outside "
"the antenna set, broken antennas, unconnected antennas, etc.",
dtype=(bool,),
max_dim_x=MAX_ANTENNA,
fisallowed="is_attribute_access_allowed",
fget=lambda self: self._beam_manager.antennas_used(),
)
Duration_delays_R = attribute(
access=AttrWriteType.READ,
dtype=numpy.float64,
fisallowed="is_attribute_access_allowed",
fget=lambda self: self._beam_manager.delays.get_statistic(self)["last"] or 0,
)
......
......@@ -116,3 +116,76 @@ class TestDigitalBeamManager(base.TestCase):
numpy.testing.assert_array_equal(
sut.current_pointing_error, numpy.full(1, 1.05)
)
class TestDigitalBeamManagerAntennasUsed(base.TestCase):
def _test_antennas_used(
self,
antenna_to_sdp_mapping: list[list[int]],
antenna_usage_mask: list[bool],
antenna_mask: list[bool],
expected_antennas_used: list[bool],
):
"""Generic test function."""
def read_parent_attribute(attr):
match attr:
case "Antenna_to_SDP_Mapping_R":
return antenna_to_sdp_mapping
case "Antenna_Usage_Mask_R":
return antenna_usage_mask
device_mock = MagicMock()
device_mock.read_Antenna_Mask_R = MagicMock(return_value=antenna_mask)
device_mock.control = MagicMock()
device_mock.control.read_parent_attribute = MagicMock()
device_mock.control.read_parent_attribute.side_effect = read_parent_attribute
sut = DigitalBeamManager(device_mock)
self.assertListEqual(sut.antennas_used(), expected_antennas_used)
def test_antennas_used_single_antenna(self):
# Test simple case: single antenna
self._test_antennas_used(
antenna_to_sdp_mapping=[[0, 0]],
antenna_usage_mask=[True],
antenna_mask=[True],
expected_antennas_used=[True],
)
def test_antennas_used_unconnected_antenna(self):
# Exclude antenna because it is not connected
self._test_antennas_used(
antenna_to_sdp_mapping=[[0, -1]],
antenna_usage_mask=[True],
antenna_mask=[True],
expected_antennas_used=[False],
)
def test_antennas_used_unusable_antenna(self):
# Exclude antenna because it is not to be used
self._test_antennas_used(
antenna_to_sdp_mapping=[[0, 0]],
antenna_usage_mask=[False],
antenna_mask=[True],
expected_antennas_used=[False],
)
def test_antennas_used_unselected_antenna(self):
# Exclude antenna because it is not in the selected antenna set
self._test_antennas_used(
antenna_to_sdp_mapping=[[0, 0]],
antenna_usage_mask=[True],
antenna_mask=[False],
expected_antennas_used=[False],
)
def test_antennas_used_multiple_antennas(self):
# All combinations in one go to test multiple antennas
self._test_antennas_used(
antenna_to_sdp_mapping=[[0, 0], [0, 1], [0, 2], [0, -1]],
antenna_usage_mask=[True, False, True, True],
antenna_mask=[True, True, False, True],
expected_antennas_used=[True, False, False, False],
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment