diff --git a/tangostationcontrol/tangostationcontrol/devices/antennafield.py b/tangostationcontrol/tangostationcontrol/devices/antennafield.py index 29f17192a578a03280d44732689a7da1aa0772ff..6a2f02253a9f2306d02c41c26dcd84832dd34b1b 100644 --- a/tangostationcontrol/tangostationcontrol/devices/antennafield.py +++ b/tangostationcontrol/tangostationcontrol/devices/antennafield.py @@ -326,18 +326,6 @@ class AntennaField(lofar_device): def read_HBAT_reference_GEOHASH_R(self): return GEO_to_GEOHASH(self.read_HBAT_reference_GEO_R()) - @log_exceptions() - def configure_for_initialise(self): - super().configure_for_initialise() - self.__setup_all_receiver_proxies() - self.__setup_mapper() - - @log_exceptions() - def _initialise_hardware(self): - # Disable controlling the tiles that fall outside the mask - # WARN: Needed in configure_for_initialise but Tango does not allow to write attributes in INIT state - self.proxy.write_attribute('HBAT_ANT_mask_RW', self.read_attribute('Antenna_Usage_Mask_R')) - def __setup_all_receiver_proxies(self): self.recv_proxies = [] @@ -371,6 +359,22 @@ class AntennaField(lofar_device): for idx, recv_proxy in enumerate(self.recv_proxies): recv_proxy.write_attribute(mapped_point, mapped_value[idx]) + # -------- + # Overloaded functions + # -------- + + @log_exceptions() + def configure_for_initialise(self): + super().configure_for_initialise() + self.__setup_all_receiver_proxies() + self.__setup_mapper() + + @log_exceptions() + def _initialise_hardware(self): + # Disable controlling the tiles that fall outside the mask + # WARN: Needed in configure_for_initialise but Tango does not allow to write attributes in INIT state + self.proxy.write_attribute('HBAT_ANT_mask_RW', self.read_attribute('Antenna_Usage_Mask_R')) + # -------- # Commands # -------- diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py index 7bb139292584d117442cfff418023f100d0dcf5f..f4e7260a76416b3f67317ee4b9241c8a6de3df87 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py @@ -99,7 +99,7 @@ class DigitalBeam(beam_device): def write_antenna_select_RW(self, antennas): for input_nr, antenna_nr in enumerate(self.Input_to_Antenna_Mapping): - if antenna_nr >= 0: + if self.antennafield_proxy.Antenna_Usage_Mask_R[input_nr] and antenna_nr >= 0: self._input_select[input_nr] = antennas[antenna_nr] # ---------- @@ -120,6 +120,7 @@ class DigitalBeam(beam_device): self.antennafield_proxy = DeviceProxy(self.AntennaField_Device) self.antennafield_proxy.set_source(DevSource.DEV) + antenna_usage_mask = self.antennafield_proxy.Antenna_Usage_Mask_R self.beamlet_proxy = DeviceProxy(self.Beamlet_Device) self.beamlet_proxy.set_source(DevSource.DEV) @@ -132,7 +133,7 @@ class DigitalBeam(beam_device): # Use reference position for any missing antennas so they always get a delay of 0 input_itrf = numpy.array([reference_itrf] * self.NUM_INPUTS) for input_nr, antenna_nr in enumerate(self.Input_to_Antenna_Mapping): - if antenna_nr >= 0: + if antenna_usage_mask[input_nr] and antenna_nr >= 0: input_itrf[input_nr] = antenna_itrf[antenna_nr] # a delay calculator @@ -142,7 +143,8 @@ class DigitalBeam(beam_device): self.relative_input_positions = input_itrf - reference_itrf # use all antennas in the mapping for all beamlets, unless specified otherwise - input_select = numpy.repeat((numpy.array(self.Input_to_Antenna_Mapping) >= 0), self.NUM_BEAMLETS).reshape(self.NUM_INPUTS, self.NUM_BEAMLETS) + input_select_mask = numpy.logical_and(numpy.array(self.Input_to_Antenna_Mapping) >= 0, antenna_usage_mask) + input_select = numpy.repeat(input_select_mask, self.NUM_BEAMLETS).reshape(self.NUM_INPUTS, self.NUM_BEAMLETS) self.write_input_select_RW(input_select) # -------- diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py index d13b16a1ef4cdfbc8e2e02d22805fa92314fb3bf..f8857a0f52cb9c9e84394863b52c41e0cbf461b3 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py @@ -9,6 +9,7 @@ # See LICENSE.txt for more info. from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy +from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse from .base import AbstractTestBases @@ -16,6 +17,10 @@ import numpy class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): + antenna_qualities_ok = numpy.array([AntennaQuality.OK] * 96) + antenna_qualities_only_second = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * 94) + antenna_use_ok = numpy.array([AntennaUse.AUTO] * 96) + def setUp(self): """Intentionally recreate the device object in each test""" super().setUp("STAT/DigitalBeam/1") @@ -36,8 +41,31 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): beamlet_proxy.warm_boot() beamlet_proxy.set_defaults() return beamlet_proxy + + def setup_sdp_proxy(self): + # setup SDP, on which this device depends + sdp_proxy = TestDeviceProxy("STAT/SDP/1") + sdp_proxy.off() + sdp_proxy.warm_boot() + sdp_proxy.set_defaults() + return sdp_proxy + + def setup_antennafield_proxy(self, antenna_qualities, antenna_use): + # setup AntennaField + NR_TILES = 48 + antennafield_proxy = TestDeviceProxy("STAT/AntennaField/1") + control_mapping = [[1,i] for i in range(NR_TILES)] + antennafield_proxy.put_property({"RECV_devices": ["STAT/RECV/1"], + "HBAT_Control_to_RECV_mapping": numpy.array(control_mapping).flatten(), + 'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use}) + antennafield_proxy.off() + antennafield_proxy.boot() + return antennafield_proxy def test_pointing_to_zenith(self): + self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok) + self.setup_sdp_proxy() + self.setup_recv_proxy() # Setup beamlet configuration self.beamlet_proxy.clock_RW = 200 * 1000000 self.beamlet_proxy.subband_select = list(range(488)) @@ -51,3 +79,25 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): # beam weights should now be non-zero, we don't actually check their values for correctness self.assertNotEqual(0, sum(self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW.flatten())) + + def test_input_select_with_all_antennas_ok(self): + """ Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask """ + antennafield_proxy = self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok) + numpy.testing.assert_equal(numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R) + self.setUp() + self.proxy.warm_boot() + expected_input_select = numpy.array([[True] * 488 ] * 48 + [[False] * 488] * 48) # first 48 rows are True + numpy.testing.assert_equal(expected_input_select, self.proxy.input_select_RW) + expected_antenna_select = numpy.array([[True] * 488 ] * 48) + numpy.testing.assert_equal(expected_antenna_select, self.proxy.antenna_select_RW) + + def test_input_select_with_only_second_antenna_ok(self): + """ Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask """ + antennafield_proxy = self.setup_antennafield_proxy(self.antenna_qualities_only_second, self.antenna_use_ok) + numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 94), antennafield_proxy.Antenna_Usage_Mask_R) + self.setUp() + self.proxy.warm_boot() + expected_input_select = numpy.array([[False] * 488 ] + [[True] * 488] + [[False] * 488] * 46 + [[False] * 488] * 48) # first 48 rows are True + numpy.testing.assert_equal(expected_input_select, self.proxy.input_select_RW) + expected_antenna_select = numpy.array([[False] * 488 ] + [[True] * 488] + [[False] * 488] * 46) + numpy.testing.assert_equal(expected_antenna_select, self.proxy.antenna_select_RW)