Skip to content
Snippets Groups Projects
Commit 6d263fbc authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-1265_remove_antenna_select_and_input_select' into 'master'

L2SS-1265 remove antenna_select and input_select

Closes L2SS-1265

See merge request !551
parents 955aff9f 932a4bea
No related branches found
No related tags found
1 merge request!551L2SS-1265 remove antenna_select and input_select
......@@ -82,7 +82,6 @@ class Observation(LOFARDevice):
self.digitalbeam_proxy: Optional[DeviceProxy] = None
self.tilebeam_proxy: Optional[DeviceProxy] = None
self._observation_settings: Optional[ObservationSettings] = None
self._num_inputs: int = 0
self._antenna_mask = []
def init_device(self):
......@@ -133,9 +132,6 @@ class Observation(LOFARDevice):
self.tilebeam_proxy = DeviceProxy(f"{util.get_ds_inst_name()}/Tilebeam/HBA")
self.tilebeam_proxy.set_source(DevSource.DEV)
# Collect static information
self._num_inputs = self.digitalbeam_proxy.antenna_select_RW.shape[0]
logger.info(
"The observation with ID=%s is configured."
"It will begin as soon as On() is called and it is supposed to stop at %s.",
......@@ -186,9 +182,6 @@ class Observation(LOFARDevice):
self.digitalbeam_proxy.Pointing_direction_RW = self._apply_saps_pointing(
self.read_saps_pointing_R()
)
self.digitalbeam_proxy.antenna_select_RW = self._apply_saps_antenna_select(
self._observation_settings.antenna_mask
)
# Apply Tile Beam pointing direction
tile_beam = self.read_tile_beam_R()
......@@ -357,15 +350,6 @@ class Observation(LOFARDevice):
pointing_direction[first_beamlet : len(sap_pointing)] = sap_pointing
return tuple(pointing_direction)
def _apply_saps_antenna_select(self, antenna_mask: list):
"""Convert an array of antenna indexes into a boolean select array"""
antenna_select = numpy.array([[False] * N_beamlets_ctrl] * self._num_inputs)
first_beamlet = numpy.array(self.read_first_beamlet_R(), dtype=numpy.int64)
for a in antenna_mask:
for i in range(first_beamlet, N_beamlets_ctrl):
antenna_select[a, i] = True
return antenna_select
def _apply_observation_id(self, observation_id: numpy.int64):
"""Convert the observation id value into the correct format for Antennafield device"""
return numpy.array([observation_id] * MAX_ANTENNA, dtype=numpy.uint32)
......
......@@ -84,26 +84,6 @@ class DigitalBeam(BeamDevice):
fget=lambda self: self._delays.get_statistic(self)["last"] or 0,
)
input_select_RW = attribute(
doc="Selection of inputs to use for forming each beamlet. \
Allows selecting broken antennas.",
dtype=((bool,),),
max_dim_x=N_beamlets_ctrl,
max_dim_y=MAX_ANTENNA,
access=AttrWriteType.READ_WRITE,
fisallowed="is_attribute_access_allowed",
)
antenna_select_RW = attribute(
doc="Selection of antennas desired to use for forming each beamlet \
(= a subset of input_select of the configured antennas). Unselects broken antennas.",
dtype=((bool,),),
max_dim_x=N_beamlets_ctrl,
max_dim_y=MAX_ANTENNA,
access=AttrWriteType.READ_WRITE,
fisallowed="is_attribute_access_allowed",
)
nr_inputs_R = attribute(
doc="Number of configured inputs from the associated antenna field.",
dtype=numpy.uint32,
......@@ -132,43 +112,6 @@ class DigitalBeam(BeamDevice):
"""Return the number of controlled beamlets."""
return len([x for x in self.Beamlet_Select if x is True])
def read_input_select_RW(self):
return self._input_select
def write_input_select_RW(self, inputs):
self._input_select = inputs
def read_antenna_select_RW(self):
# select only the rows from self.__input_select
# for which a mapping onto antennas is defined.
antenna_select = [[False] * N_beamlets_ctrl] * self.nr_inputs()
for antenna_nr, (fpga_nr, input_nr) in enumerate(
self.antennafield_proxy.Antenna_to_SDP_Mapping_R
):
if input_nr >= 0:
antenna_select[antenna_nr] = self._input_select[
fpga_nr * A_pn + input_nr
]
return antenna_select
def write_antenna_select_RW(self, antennas):
# Do not use any broken antennas, even if the user requests it. This allows the user
# to select the antennas they would like to use.
antenna_usage_mask = self.antennafield_proxy.Antenna_Usage_Mask_R
for antenna_nr, (fpga_nr, input_nr) in enumerate(
self.antennafield_proxy.Antenna_to_SDP_Mapping_R
):
if input_nr >= 0:
if antenna_usage_mask[antenna_nr]:
# use antenna for the beamlets as supplied by the client
self._input_select[fpga_nr * A_pn + input_nr] = antennas[antenna_nr]
else:
# do not use antenna for any beamlet
self._input_select[fpga_nr * A_pn + input_nr] = False
def read_subband_select_RW(self):
"""Beamlet attribute read filtered through Beamlet Select"""
subband_select = self.beamlet_proxy.subband_select_RW
......@@ -230,18 +173,29 @@ class DigitalBeam(BeamDevice):
# relative positions of each antenna
self.relative_input_positions = input_itrf - reference_itrf
# use all antennas in the mapping for all beamlets, unless specified otherwise
self.write_input_select_RW(
numpy.zeros((MAX_ANTENNA, N_beamlets_ctrl), dtype=bool)
)
self.write_antenna_select_RW(
numpy.ones((self.nr_inputs(), N_beamlets_ctrl), dtype=bool)
)
# configure which inputs are selected to be on
self._input_select()
# --------
# internal functions
# --------
def _input_select(self):
"""
Sets up the input selection
"""
input_select = numpy.zeros((MAX_ANTENNA, N_beamlets_ctrl), dtype=bool)
antenna_usage_mask = self.antennafield_proxy.Antenna_Usage_Mask_R
for antenna_nr, (fpga_nr, input_nr) in enumerate(
self.antennafield_proxy.Antenna_to_SDP_Mapping_R
):
if input_nr >= 0:
input_select[fpga_nr * A_pn + input_nr] = antenna_usage_mask[antenna_nr]
return input_select
@TimeIt()
def _delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime):
"""
......@@ -334,7 +288,7 @@ class DigitalBeam(BeamDevice):
beam_weights,
self.beamlet_proxy,
"FPGA_bf_weights_xx_yy_RW",
self._map_inputs_on_polarised_inputs(self._input_select),
self._map_inputs_on_polarised_inputs(self._input_select()),
)
# Determine how far away from ideal apply time
......
......@@ -193,7 +193,12 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.antennafield_iden)
self.setup_sdp_proxy()
self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok)
self.antennafield_proxy = self.setup_antennafield_proxy(
self.antenna_qualities_ok, self.antenna_use_ok
)
# Enable all inputs
self.antennafield_proxy.Antenna_Set_RW = "ALL"
self.proxy.initialise()
self.proxy.Tracking_enabled_RW = False
......@@ -202,11 +207,6 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
all_zeros = numpy.array([[0] * 5856] * N_pn)
self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW = all_zeros
# Enable all inputs
self.proxy.input_select_RW = numpy.array(
[[True] * N_beamlets_ctrl] * MAX_ANTENNA
)
self.proxy.set_pointing(
numpy.array(
[["AZELGEO", "0deg", "90deg"]] * self.proxy.nr_beamlets_R
......@@ -220,105 +220,6 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
)
)
def test_set_pointing_masked_disable(self):
"""Verify that only diabled inputs are unchanged"""
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.beamlet_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.antennafield_iden)
self.setup_sdp_proxy()
self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok)
self.proxy.initialise()
self.proxy.Tracking_enabled_RW = False
self.proxy.on()
non_zeros = numpy.array([[N_pn] * 5856] * N_pn)
self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW = non_zeros
# Disable all inputs
self.proxy.input_select_RW = numpy.array(
[[False] * N_beamlets_ctrl] * MAX_ANTENNA
)
self.proxy.set_pointing(
numpy.array(
[["AZELGEO", "0deg", "90deg"]] * self.proxy.nr_beamlets_R
).flatten()
)
# Verify all zeros are replaced with other values for all inputs
numpy.testing.assert_equal(
non_zeros, self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW
)
def test_input_select_with_all_antennas_ok(self):
"""Verify if input and antenna select are correctly calculated
following Antennafield.Antenna_Usage_Mask"""
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.beamlet_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.antennafield_iden)
self.setup_sdp_proxy()
antennafield_proxy = self.setup_antennafield_proxy(
self.antenna_qualities_ok, self.antenna_use_ok
)
numpy.testing.assert_equal(
numpy.array([True] * MAX_ANTENNA), antennafield_proxy.Antenna_Usage_Mask_R
)
self.proxy.boot()
expected_input_select = numpy.array(
[[True] * N_beamlets_ctrl] * DEFAULT_N_HBA_TILES
+ [[False] * N_beamlets_ctrl] * DEFAULT_N_HBA_TILES
) # first 48 rows are True
numpy.testing.assert_equal(expected_input_select, self.proxy.input_select_RW)
expected_antenna_select = numpy.array(
[[True] * N_beamlets_ctrl] * DEFAULT_N_HBA_TILES
)
numpy.testing.assert_equal(
expected_antenna_select, self.proxy.antenna_select_RW
)
def test_input_select_with_only_second_antenna_ok(self):
"""Verify if input and antenna select are correctly calculated
following Antennafield.Antenna_Usage_Mask"""
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.beamlet_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.antennafield_iden)
self.setup_sdp_proxy()
antennafield_proxy = self.setup_antennafield_proxy(
self.antenna_qualities_only_second, self.antenna_use_ok
)
numpy.testing.assert_equal(
numpy.array([False] + [True] + [False] * (MAX_ANTENNA - 2)),
antennafield_proxy.Antenna_Usage_Mask_R,
)
self.proxy.boot()
expected_input_select = numpy.array(
[[False] * N_beamlets_ctrl]
+ [[True] * N_beamlets_ctrl]
+ [[False] * N_beamlets_ctrl] * (DEFAULT_N_HBA_TILES - 2)
+ [[False] * N_beamlets_ctrl] * DEFAULT_N_HBA_TILES
) # first 48 rows are True
numpy.testing.assert_equal(expected_input_select, self.proxy.input_select_RW)
expected_antenna_select = numpy.array(
[[False] * N_beamlets_ctrl]
+ [[True] * N_beamlets_ctrl]
+ [[False] * N_beamlets_ctrl] * (DEFAULT_N_HBA_TILES - 2)
)
numpy.testing.assert_equal(
expected_antenna_select, self.proxy.antenna_select_RW
)
@timeout_decorator.timeout(15)
def test_beam_tracking_90_percent_interval(self):
"""Verify that the beam tracking operates within 95% of interval"""
......
......@@ -334,25 +334,6 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase):
list(digitalbeam_proxy.Pointing_direction_RW), expected_pointing
)
def test_apply_antenna_select(self):
"""Test that antenna selection is correctly applied"""
digitalbeam_proxy = self.setup_digitalbeam_proxy()
default_selection = [[False] * N_beamlets_ctrl] * MAX_ANTENNA
digitalbeam_proxy.antenna_select_RW = default_selection
self.assertListEqual(
digitalbeam_proxy.antenna_select_RW.tolist()[9], default_selection[9]
)
self.proxy.off()
self.proxy.observation_settings_RW = self.VALID_JSON
self.proxy.Initialise()
self.proxy.On()
self.assertListEqual(
digitalbeam_proxy.antenna_select_RW.tolist()[9], [True] * N_beamlets_ctrl
)
self.assertListEqual(
digitalbeam_proxy.antenna_select_RW.tolist()[10], [False] * N_beamlets_ctrl
)
def test_apply_tilebeam(self):
# failing
"""Test that attribute tilebeam is correctly applied"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment