diff --git a/README.md b/README.md index e42628c4dc01fd993ee3fcf65f7f22be56cb2515..e454040c5e624063132c88639a6470f38c355c7a 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,7 @@ Next change the version in the following places: # Release Notes +* 0.21.3 Added DigitalBeam.Antenna_Usage_Mask_R to expose antennas used in beamforming * 0.21.2 Removed deprecated "Boot" device (use StationManager now) * 0.21.1 Implement multi project integration downstream pipeline * 0.21.0 Use radians instead of degrees when interpreting pointings diff --git a/tangostationcontrol/VERSION b/tangostationcontrol/VERSION index 59dad104b0bad39d16c6edb862296e363e72ea8d..16eb94e711f823f8085064e79f74d9ce2382c240 100644 --- a/tangostationcontrol/VERSION +++ b/tangostationcontrol/VERSION @@ -1 +1 @@ -0.21.2 +0.21.3 diff --git a/tangostationcontrol/tangostationcontrol/beam/managers/_digitalbeam.py b/tangostationcontrol/tangostationcontrol/beam/managers/_digitalbeam.py index 1bca73797bcd39ee55065f1fe3353987a26a9007..16e38a6f2804065ecce05b01350cc7b2fe45005f 100644 --- a/tangostationcontrol/tangostationcontrol/beam/managers/_digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/beam/managers/_digitalbeam.py @@ -26,6 +26,35 @@ class DigitalBeamManager(AbstractBeamManager): self.relative_antenna_positions = None super().__init__() + def antennas_used(self) -> list[bool]: + """ + Return the set of antennas actually used in the beamforming, after + all masks and selectionis have been applied. + """ + + # Use zero weights for antennas that we should not use: + # - if the antenna has no input mapped on the SDP + # - if the antenna is bad (False in Antenna_Usage_Mask_R) + # - if the antenna is not in the antenna set (False in Antenna_Mask_RW) + antenna_usage_mask = self._device.control.read_parent_attribute( + "Antenna_Usage_Mask_R" + ) + antenna_mask = self._device.read_Antenna_Mask_R() + antenna_to_sdp_mapping = self._device.control.read_parent_attribute( + "Antenna_to_SDP_Mapping_R" + ) + + antennas_used = [False] * len(antenna_mask) + + for antenna_nr, (_fpga_nr, input_nr) in enumerate(antenna_to_sdp_mapping): + antennas_used[antenna_nr] = ( + input_nr >= 0 + and antenna_usage_mask[antenna_nr] + and antenna_mask[antenna_nr] + ) + + return antennas_used + @TimeIt() def delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime): """ @@ -74,24 +103,14 @@ class DigitalBeamManager(AbstractBeamManager): beam_weights = self._device.beamlet.calculate_bf_weights(beam_delays.flatten()) beam_weights = beam_weights.reshape(N_pn, A_pn, N_beamlets_ctrl) - # Use zero weights for antennas that we should not use: - # - if the antenna has no input mapped on the SDP - # - if the antenna is bad (False in Antenna_Usage_Mask_R) - # - if the antenna is not in the antenna set (False in Antenna_Mask_RW) - antenna_usage_mask = self._device.control.read_parent_attribute( - "Antenna_Usage_Mask_R" - ) - antenna_mask = self._device.read_Antenna_Mask_R() + # Use zero weights for antennas that we should not use + antennas_used = self.antennas_used() zeroes_for_all_beamlets = numpy.array([0] * N_beamlets_ctrl, dtype=numpy.uint32) for antenna_nr, (fpga_nr, input_nr) in enumerate( self._device.control.read_parent_attribute("Antenna_to_SDP_Mapping_R") ): - if ( - input_nr < 0 - or not antenna_usage_mask[antenna_nr] - or not antenna_mask[antenna_nr] - ): + if not antennas_used[antenna_nr]: beam_weights[fpga_nr, input_nr, :] = zeroes_for_all_beamlets return beam_weights diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py index 1d2d8f77ba0c911e3aaa832e4de328207e0cd27d..022263aab3eeeac20823a37a1b50e1368deab4f9 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py @@ -72,9 +72,19 @@ class DigitalBeam(BeamDevice): max_dim_x=MAX_ANTENNA, ) + Antenna_Usage_Mask_R = attribute( + doc="Antennas used for beam forming. Excludes antennas outside " + "the antenna set, broken antennas, unconnected antennas, etc.", + dtype=(bool,), + max_dim_x=MAX_ANTENNA, + fisallowed="is_attribute_access_allowed", + fget=lambda self: self._beam_manager.antennas_used(), + ) + Duration_delays_R = attribute( access=AttrWriteType.READ, dtype=numpy.float64, + fisallowed="is_attribute_access_allowed", fget=lambda self: self._beam_manager.delays.get_statistic(self)["last"] or 0, ) diff --git a/tangostationcontrol/test/beam/managers/test_digitalbeam_manager.py b/tangostationcontrol/test/beam/managers/test_digitalbeam_manager.py index bbb360bf46e7fd9218266621093d7915c042579d..cda32bb678eb023665a85051f99d9fa43ea3fa23 100644 --- a/tangostationcontrol/test/beam/managers/test_digitalbeam_manager.py +++ b/tangostationcontrol/test/beam/managers/test_digitalbeam_manager.py @@ -116,3 +116,76 @@ class TestDigitalBeamManager(base.TestCase): numpy.testing.assert_array_equal( sut.current_pointing_error, numpy.full(1, 1.05) ) + + +class TestDigitalBeamManagerAntennasUsed(base.TestCase): + def _test_antennas_used( + self, + antenna_to_sdp_mapping: list[list[int]], + antenna_usage_mask: list[bool], + antenna_mask: list[bool], + expected_antennas_used: list[bool], + ): + """Generic test function.""" + + def read_parent_attribute(attr): + match attr: + case "Antenna_to_SDP_Mapping_R": + return antenna_to_sdp_mapping + case "Antenna_Usage_Mask_R": + return antenna_usage_mask + + device_mock = MagicMock() + device_mock.read_Antenna_Mask_R = MagicMock(return_value=antenna_mask) + + device_mock.control = MagicMock() + device_mock.control.read_parent_attribute = MagicMock() + device_mock.control.read_parent_attribute.side_effect = read_parent_attribute + + sut = DigitalBeamManager(device_mock) + self.assertListEqual(sut.antennas_used(), expected_antennas_used) + + def test_antennas_used_single_antenna(self): + # Test simple case: single antenna + self._test_antennas_used( + antenna_to_sdp_mapping=[[0, 0]], + antenna_usage_mask=[True], + antenna_mask=[True], + expected_antennas_used=[True], + ) + + def test_antennas_used_unconnected_antenna(self): + # Exclude antenna because it is not connected + self._test_antennas_used( + antenna_to_sdp_mapping=[[0, -1]], + antenna_usage_mask=[True], + antenna_mask=[True], + expected_antennas_used=[False], + ) + + def test_antennas_used_unusable_antenna(self): + # Exclude antenna because it is not to be used + self._test_antennas_used( + antenna_to_sdp_mapping=[[0, 0]], + antenna_usage_mask=[False], + antenna_mask=[True], + expected_antennas_used=[False], + ) + + def test_antennas_used_unselected_antenna(self): + # Exclude antenna because it is not in the selected antenna set + self._test_antennas_used( + antenna_to_sdp_mapping=[[0, 0]], + antenna_usage_mask=[True], + antenna_mask=[False], + expected_antennas_used=[False], + ) + + def test_antennas_used_multiple_antennas(self): + # All combinations in one go to test multiple antennas + self._test_antennas_used( + antenna_to_sdp_mapping=[[0, 0], [0, 1], [0, 2], [0, -1]], + antenna_usage_mask=[True, False, True, True], + antenna_mask=[True, True, False, True], + expected_antennas_used=[True, False, False, False], + )