diff --git a/tangostationcontrol/tangostationcontrol/devices/observation.py b/tangostationcontrol/tangostationcontrol/devices/observation.py index e94a99ce6eb8f3cc0f2bdb0b5c4de8e202691d62..28e3b46ffc240e8efda47a4c0fc8ebc41f6757a4 100644 --- a/tangostationcontrol/tangostationcontrol/devices/observation.py +++ b/tangostationcontrol/tangostationcontrol/devices/observation.py @@ -82,7 +82,6 @@ class Observation(LOFARDevice): self.digitalbeam_proxy: Optional[DeviceProxy] = None self.tilebeam_proxy: Optional[DeviceProxy] = None self._observation_settings: Optional[ObservationSettings] = None - self._num_inputs: int = 0 self._antenna_mask = [] def init_device(self): @@ -133,9 +132,6 @@ class Observation(LOFARDevice): self.tilebeam_proxy = DeviceProxy(f"{util.get_ds_inst_name()}/Tilebeam/HBA") self.tilebeam_proxy.set_source(DevSource.DEV) - # Collect static information - self._num_inputs = self.digitalbeam_proxy.antenna_select_RW.shape[0] - logger.info( "The observation with ID=%s is configured." "It will begin as soon as On() is called and it is supposed to stop at %s.", @@ -186,9 +182,6 @@ class Observation(LOFARDevice): self.digitalbeam_proxy.Pointing_direction_RW = self._apply_saps_pointing( self.read_saps_pointing_R() ) - self.digitalbeam_proxy.antenna_select_RW = self._apply_saps_antenna_select( - self._observation_settings.antenna_mask - ) # Apply Tile Beam pointing direction tile_beam = self.read_tile_beam_R() @@ -357,15 +350,6 @@ class Observation(LOFARDevice): pointing_direction[first_beamlet : len(sap_pointing)] = sap_pointing return tuple(pointing_direction) - def _apply_saps_antenna_select(self, antenna_mask: list): - """Convert an array of antenna indexes into a boolean select array""" - antenna_select = numpy.array([[False] * N_beamlets_ctrl] * self._num_inputs) - first_beamlet = numpy.array(self.read_first_beamlet_R(), dtype=numpy.int64) - for a in antenna_mask: - for i in range(first_beamlet, N_beamlets_ctrl): - antenna_select[a, i] = True - return antenna_select - def _apply_observation_id(self, observation_id: numpy.int64): """Convert the observation id value into the correct format for Antennafield device""" return numpy.array([observation_id] * MAX_ANTENNA, dtype=numpy.uint32) diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py index 00fe2b9ec64f6b10c4259de3449ecb8ed769b247..f60d52464eb5aeecfa6a0147d85c7f48b18806ae 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py @@ -84,26 +84,6 @@ class DigitalBeam(BeamDevice): fget=lambda self: self._delays.get_statistic(self)["last"] or 0, ) - input_select_RW = attribute( - doc="Selection of inputs to use for forming each beamlet. \ - Allows selecting broken antennas.", - dtype=((bool,),), - max_dim_x=N_beamlets_ctrl, - max_dim_y=MAX_ANTENNA, - access=AttrWriteType.READ_WRITE, - fisallowed="is_attribute_access_allowed", - ) - - antenna_select_RW = attribute( - doc="Selection of antennas desired to use for forming each beamlet \ - (= a subset of input_select of the configured antennas). Unselects broken antennas.", - dtype=((bool,),), - max_dim_x=N_beamlets_ctrl, - max_dim_y=MAX_ANTENNA, - access=AttrWriteType.READ_WRITE, - fisallowed="is_attribute_access_allowed", - ) - nr_inputs_R = attribute( doc="Number of configured inputs from the associated antenna field.", dtype=numpy.uint32, @@ -132,43 +112,6 @@ class DigitalBeam(BeamDevice): """Return the number of controlled beamlets.""" return len([x for x in self.Beamlet_Select if x is True]) - def read_input_select_RW(self): - return self._input_select - - def write_input_select_RW(self, inputs): - self._input_select = inputs - - def read_antenna_select_RW(self): - # select only the rows from self.__input_select - # for which a mapping onto antennas is defined. - antenna_select = [[False] * N_beamlets_ctrl] * self.nr_inputs() - - for antenna_nr, (fpga_nr, input_nr) in enumerate( - self.antennafield_proxy.Antenna_to_SDP_Mapping_R - ): - if input_nr >= 0: - antenna_select[antenna_nr] = self._input_select[ - fpga_nr * A_pn + input_nr - ] - - return antenna_select - - def write_antenna_select_RW(self, antennas): - # Do not use any broken antennas, even if the user requests it. This allows the user - # to select the antennas they would like to use. - antenna_usage_mask = self.antennafield_proxy.Antenna_Usage_Mask_R - - for antenna_nr, (fpga_nr, input_nr) in enumerate( - self.antennafield_proxy.Antenna_to_SDP_Mapping_R - ): - if input_nr >= 0: - if antenna_usage_mask[antenna_nr]: - # use antenna for the beamlets as supplied by the client - self._input_select[fpga_nr * A_pn + input_nr] = antennas[antenna_nr] - else: - # do not use antenna for any beamlet - self._input_select[fpga_nr * A_pn + input_nr] = False - def read_subband_select_RW(self): """Beamlet attribute read filtered through Beamlet Select""" subband_select = self.beamlet_proxy.subband_select_RW @@ -230,18 +173,29 @@ class DigitalBeam(BeamDevice): # relative positions of each antenna self.relative_input_positions = input_itrf - reference_itrf - # use all antennas in the mapping for all beamlets, unless specified otherwise - self.write_input_select_RW( - numpy.zeros((MAX_ANTENNA, N_beamlets_ctrl), dtype=bool) - ) - self.write_antenna_select_RW( - numpy.ones((self.nr_inputs(), N_beamlets_ctrl), dtype=bool) - ) + # configure which inputs are selected to be on + self._input_select() # -------- # internal functions # -------- + def _input_select(self): + """ + Sets up the input selection + """ + + input_select = numpy.zeros((MAX_ANTENNA, N_beamlets_ctrl), dtype=bool) + + antenna_usage_mask = self.antennafield_proxy.Antenna_Usage_Mask_R + + for antenna_nr, (fpga_nr, input_nr) in enumerate( + self.antennafield_proxy.Antenna_to_SDP_Mapping_R + ): + if input_nr >= 0: + input_select[fpga_nr * A_pn + input_nr] = antenna_usage_mask[antenna_nr] + return input_select + @TimeIt() def _delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime): """ @@ -334,7 +288,7 @@ class DigitalBeam(BeamDevice): beam_weights, self.beamlet_proxy, "FPGA_bf_weights_xx_yy_RW", - self._map_inputs_on_polarised_inputs(self._input_select), + self._map_inputs_on_polarised_inputs(self._input_select()), ) # Determine how far away from ideal apply time diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py index 7ff0cf7c408b4d405b89eb6461c350f687477362..ed7fd69821bdbd9d54edca82821f7a08a5f1d9e2 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py @@ -193,7 +193,12 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): self.addCleanup(TestDeviceProxy.test_device_turn_off, self.antennafield_iden) self.setup_sdp_proxy() - self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok) + self.antennafield_proxy = self.setup_antennafield_proxy( + self.antenna_qualities_ok, self.antenna_use_ok + ) + + # Enable all inputs + self.antennafield_proxy.Antenna_Set_RW = "ALL" self.proxy.initialise() self.proxy.Tracking_enabled_RW = False @@ -202,11 +207,6 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): all_zeros = numpy.array([[0] * 5856] * N_pn) self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW = all_zeros - # Enable all inputs - self.proxy.input_select_RW = numpy.array( - [[True] * N_beamlets_ctrl] * MAX_ANTENNA - ) - self.proxy.set_pointing( numpy.array( [["AZELGEO", "0deg", "90deg"]] * self.proxy.nr_beamlets_R @@ -220,105 +220,6 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): ) ) - def test_set_pointing_masked_disable(self): - """Verify that only diabled inputs are unchanged""" - - self.addCleanup(TestDeviceProxy.test_device_turn_off, self.beamlet_iden) - self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden) - self.addCleanup(TestDeviceProxy.test_device_turn_off, self.antennafield_iden) - - self.setup_sdp_proxy() - self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok) - - self.proxy.initialise() - self.proxy.Tracking_enabled_RW = False - self.proxy.on() - - non_zeros = numpy.array([[N_pn] * 5856] * N_pn) - self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW = non_zeros - - # Disable all inputs - self.proxy.input_select_RW = numpy.array( - [[False] * N_beamlets_ctrl] * MAX_ANTENNA - ) - - self.proxy.set_pointing( - numpy.array( - [["AZELGEO", "0deg", "90deg"]] * self.proxy.nr_beamlets_R - ).flatten() - ) - - # Verify all zeros are replaced with other values for all inputs - numpy.testing.assert_equal( - non_zeros, self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW - ) - - def test_input_select_with_all_antennas_ok(self): - """Verify if input and antenna select are correctly calculated - following Antennafield.Antenna_Usage_Mask""" - - self.addCleanup(TestDeviceProxy.test_device_turn_off, self.beamlet_iden) - self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden) - self.addCleanup(TestDeviceProxy.test_device_turn_off, self.antennafield_iden) - - self.setup_sdp_proxy() - antennafield_proxy = self.setup_antennafield_proxy( - self.antenna_qualities_ok, self.antenna_use_ok - ) - numpy.testing.assert_equal( - numpy.array([True] * MAX_ANTENNA), antennafield_proxy.Antenna_Usage_Mask_R - ) - - self.proxy.boot() - - expected_input_select = numpy.array( - [[True] * N_beamlets_ctrl] * DEFAULT_N_HBA_TILES - + [[False] * N_beamlets_ctrl] * DEFAULT_N_HBA_TILES - ) # first 48 rows are True - numpy.testing.assert_equal(expected_input_select, self.proxy.input_select_RW) - expected_antenna_select = numpy.array( - [[True] * N_beamlets_ctrl] * DEFAULT_N_HBA_TILES - ) - numpy.testing.assert_equal( - expected_antenna_select, self.proxy.antenna_select_RW - ) - - def test_input_select_with_only_second_antenna_ok(self): - """Verify if input and antenna select are correctly calculated - following Antennafield.Antenna_Usage_Mask""" - - self.addCleanup(TestDeviceProxy.test_device_turn_off, self.beamlet_iden) - self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden) - self.addCleanup(TestDeviceProxy.test_device_turn_off, self.antennafield_iden) - - self.setup_sdp_proxy() - - antennafield_proxy = self.setup_antennafield_proxy( - self.antenna_qualities_only_second, self.antenna_use_ok - ) - numpy.testing.assert_equal( - numpy.array([False] + [True] + [False] * (MAX_ANTENNA - 2)), - antennafield_proxy.Antenna_Usage_Mask_R, - ) - - self.proxy.boot() - - expected_input_select = numpy.array( - [[False] * N_beamlets_ctrl] - + [[True] * N_beamlets_ctrl] - + [[False] * N_beamlets_ctrl] * (DEFAULT_N_HBA_TILES - 2) - + [[False] * N_beamlets_ctrl] * DEFAULT_N_HBA_TILES - ) # first 48 rows are True - numpy.testing.assert_equal(expected_input_select, self.proxy.input_select_RW) - expected_antenna_select = numpy.array( - [[False] * N_beamlets_ctrl] - + [[True] * N_beamlets_ctrl] - + [[False] * N_beamlets_ctrl] * (DEFAULT_N_HBA_TILES - 2) - ) - numpy.testing.assert_equal( - expected_antenna_select, self.proxy.antenna_select_RW - ) - @timeout_decorator.timeout(15) def test_beam_tracking_90_percent_interval(self): """Verify that the beam tracking operates within 95% of interval""" diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation.py index af4982431e3a50b5dc01f765fc679b7f4f738962..00257ead770400ae9e0d09fe27b4e5401d53243b 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation.py @@ -334,25 +334,6 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase): list(digitalbeam_proxy.Pointing_direction_RW), expected_pointing ) - def test_apply_antenna_select(self): - """Test that antenna selection is correctly applied""" - digitalbeam_proxy = self.setup_digitalbeam_proxy() - default_selection = [[False] * N_beamlets_ctrl] * MAX_ANTENNA - digitalbeam_proxy.antenna_select_RW = default_selection - self.assertListEqual( - digitalbeam_proxy.antenna_select_RW.tolist()[9], default_selection[9] - ) - self.proxy.off() - self.proxy.observation_settings_RW = self.VALID_JSON - self.proxy.Initialise() - self.proxy.On() - self.assertListEqual( - digitalbeam_proxy.antenna_select_RW.tolist()[9], [True] * N_beamlets_ctrl - ) - self.assertListEqual( - digitalbeam_proxy.antenna_select_RW.tolist()[10], [False] * N_beamlets_ctrl - ) - def test_apply_tilebeam(self): # failing """Test that attribute tilebeam is correctly applied"""