diff --git a/tangostationcontrol/tangostationcontrol/common/constants.py b/tangostationcontrol/tangostationcontrol/common/constants.py index 8a2dbb7bd75b281e411c3a73cc9686306f6d364b..0abc42d5a29108f80953ac5b2c6366d4bfaa64d0 100644 --- a/tangostationcontrol/tangostationcontrol/common/constants.py +++ b/tangostationcontrol/tangostationcontrol/common/constants.py @@ -94,3 +94,6 @@ MAX_ETH_FRAME_SIZE = 9000 # The default polling period for polled attributes DEFAULT_POLLING_PERIOD = 1000 + +# amount of tiles in a HBA +N_HBA_TILES = 48 \ No newline at end of file diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/client/test_tcp_replicator.py b/tangostationcontrol/tangostationcontrol/integration_test/default/client/test_tcp_replicator.py index faab4d9c3e9cd0a8a25d5c3e8900c481b5e8bbdf..c63a8ec5d41a0f325cab3db5548a6d018c99cb1f 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/client/test_tcp_replicator.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/client/test_tcp_replicator.py @@ -15,6 +15,7 @@ import sys import timeout_decorator from tangostationcontrol.clients.tcp_replicator import TCPReplicator +from tangostationcontrol.common.constants import MAX_ETH_FRAME_SIZE from tangostationcontrol.integration_test import base @@ -78,7 +79,7 @@ class TestTCPReplicator(base.IntegrationTestCase): replicator.join() - self.assertEqual(b'', s.recv(9000)) + self.assertEqual(b'', s.recv(MAX_ETH_FRAME_SIZE)) @timeout_decorator.timeout(15) def test_start_connect_receive(self): diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_antennafield.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_antennafield.py index 24e2c1191425cf80389490eeb34fcc2ec45414e5..efaecda61681913095e215a07248835f921caf91 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_antennafield.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_antennafield.py @@ -13,7 +13,7 @@ import numpy from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse from .base import AbstractTestBases - +from tangostationcontrol.common.constants import N_elements, MAX_ANTENNA, N_pol, N_rcu, N_rcu_inp, N_HBA_TILES class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): @@ -21,7 +21,7 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): super().setUp("STAT/AntennaField/1") self.proxy.put_property({ "RECV_devices": ["STAT/RECV/1"], - "Power_to_RECV_mapping": [1, 1, 1, 0] + [-1] * 92 + "Power_to_RECV_mapping": [1, 1, 1, 0] + [-1] * (MAX_ANTENNA - 4) }) self.recv_proxy = self.setup_recv_proxy() self.sdp_proxy = self.setup_sdp_proxy() @@ -33,8 +33,8 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): def restore_antennafield(self): self.proxy.put_property({ "RECV_devices": ["STAT/RECV/1"], - "Power_to_RECV_mapping": [-1, -1] * 48, - "Control_to_RECV_mapping": [-1, -1] * 48 + "Power_to_RECV_mapping": [-1, -1] * N_HBA_TILES, + "Control_to_RECV_mapping": [-1, -1] * N_HBA_TILES }) @staticmethod @@ -71,20 +71,20 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): antennafield_proxy = self.proxy numpy.testing.assert_equal( - numpy.array([True] * 96), self.recv_proxy.ANT_mask_RW + numpy.array([True] * MAX_ANTENNA), self.recv_proxy.ANT_mask_RW ) - antenna_qualities = numpy.array([AntennaQuality.OK] * 96) - antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * 95) + antenna_qualities = numpy.array([AntennaQuality.OK] * MAX_ANTENNA) + antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * (MAX_ANTENNA - 1)) antenna_properties = { 'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use } mapping_properties = { "RECV_devices": ["STAT/RECV/1"], - "Power_to_RECV_mapping": [-1, -1] * 48, + "Power_to_RECV_mapping": [-1, -1] * N_HBA_TILES, # Two inputs of recv device connected, only defined for 48 inputs # each pair is one input - "Control_to_RECV_mapping": [1, 0 , 1, 1] + [-1, -1] * 46 + "Control_to_RECV_mapping": [1, 0 , 1, 1] + [-1, -1] * (N_HBA_TILES - 2) } antennafield_proxy.off() antennafield_proxy.put_property(antenna_properties) @@ -93,19 +93,19 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): # Verify all antennas are indicated to work numpy.testing.assert_equal( - numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R + numpy.array([True] * MAX_ANTENNA), antennafield_proxy.Antenna_Usage_Mask_R ) # Verify only connected inputs + Antenna_Usage_Mask_R are true # As well as dimensions of ANT_mask_RW must match control mapping numpy.testing.assert_equal( - numpy.array([True] * 2 + [False] * 46), + numpy.array([True] * 2 + [False] * (N_HBA_TILES - 2)), antennafield_proxy.ANT_mask_RW ) # Verify recv proxy values unaffected as default for ANT_mask_RW is true numpy.testing.assert_equal( - numpy.array([True] * 2 + [True] * 94), + numpy.array([True] * 2 + [True] * (MAX_ANTENNA - 2)), self.recv_proxy.ANT_mask_RW ) @@ -115,8 +115,8 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): antennafield_proxy = self.proxy # Broken antennas except second - antenna_qualities = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * 94) - antenna_use = numpy.array([AntennaUse.AUTO] * 96) + antenna_qualities = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * (MAX_ANTENNA - 2)) + antenna_use = numpy.array([AntennaUse.AUTO] * MAX_ANTENNA) antenna_properties = { 'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use } @@ -124,10 +124,10 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): # Configure control mapping to control all 96 inputs of recv device mapping_properties = { "RECV_devices": ["STAT/RECV/1"], - "Power_to_RECV_mapping": [-1, -1] * 48, + "Power_to_RECV_mapping": [-1, -1] * N_HBA_TILES, "Control_to_RECV_mapping": # [1, 0, 1, 1, 1, 2, 1, x ... 1, 95] - numpy.array([[1, x] for x in range(0, 96)]).flatten() + numpy.array([[1, x] for x in range(0, MAX_ANTENNA)]).flatten() } # Cycle device and set properties @@ -138,18 +138,18 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): # Antenna_Usage_Mask_R should be false except one numpy.testing.assert_equal( - numpy.array([False] + [True] + [False] * 94), + numpy.array([False] + [True] + [False] * (MAX_ANTENNA - 2)), antennafield_proxy.Antenna_Usage_Mask_R ) # device.boot() writes Antenna_Usage_Mask_R to ANT_mask_RW numpy.testing.assert_equal( - numpy.array([False] + [True] + [False] * 94), + numpy.array([False] + [True] + [False] * (MAX_ANTENNA - 2)), antennafield_proxy.ANT_mask_RW ) # ANT_mask_RW on antennafield writes to configured recv devices for all # mapped inputs numpy.testing.assert_equal( - numpy.array([False] + [True] + [False] * 94), + numpy.array([False] + [True] + [False] * (MAX_ANTENNA - 2)), self.recv_proxy.ANT_mask_RW ) @@ -159,8 +159,8 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): # Connect recv/1 device but no control inputs mapping_properties = { "RECV_devices": ["STAT/RECV/1"], - "Power_to_RECV_mapping": [-1, -1] * 48, - "Control_to_RECV_mapping": [-1, -1] * 48 + "Power_to_RECV_mapping": [-1, -1] * N_HBA_TILES, + "Control_to_RECV_mapping": [-1, -1] * N_HBA_TILES } # Cycle device an put properties @@ -170,11 +170,11 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): antennafield_proxy.boot() # Set HBAT_PWR_on_RW to false on recv device and read results - self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 96) + self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA) current_values = self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value # write true through antennafield - antennafield_proxy.write_attribute("HBAT_PWR_on_RW", [[True] * 32] * 48) + antennafield_proxy.write_attribute("HBAT_PWR_on_RW", [[True] * N_elements * N_pol] * N_HBA_TILES) # Test that original recv values for HBAT_PWR_on_RW match current numpy.testing.assert_equal( current_values, @@ -189,9 +189,9 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): mapping_properties = { "RECV_devices": ["STAT/RECV/1"], - "Power_to_RECV_mapping": [-1, -1] * 48, + "Power_to_RECV_mapping": [-1, -1] * N_HBA_TILES, # Each pair is one mapping so 2 inputs are connected - "Control_to_RECV_mapping": [1, 0, 1, 1] + [-1, -1] * 46 + "Control_to_RECV_mapping": [1, 0, 1, 1] + [-1, -1] * (N_HBA_TILES - 2) } antennafield_proxy = self.proxy @@ -199,20 +199,20 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): antennafield_proxy.put_property(mapping_properties) antennafield_proxy.boot() - self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 96) + self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA) try: antennafield_proxy.write_attribute( - "HBAT_PWR_on_RW", [[True] * 32] * 48 + "HBAT_PWR_on_RW", [[True] * N_elements * N_pol] * N_HBA_TILES ) numpy.testing.assert_equal( - numpy.array([[True] * 32] * 2 + [[False] * 32] * 94), + numpy.array([[True] * N_elements * N_pol] * 2 + [[False] * N_elements * N_pol] * (MAX_ANTENNA - 2)), self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value ) finally: # Always restore recv again self.recv_proxy.write_attribute( - "HBAT_PWR_on_RW", [[False] * 32] * 96 + "HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA ) # Verify device did not enter FAULT state @@ -223,10 +223,10 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): mapping_properties = { "RECV_devices": ["STAT/RECV/1"], - "Power_to_RECV_mapping": [-1, -1] * 48, + "Power_to_RECV_mapping": [-1, -1] * N_HBA_TILES, "Control_to_RECV_mapping": # [1, 0, 1, 1, 1, 2, 1, x ... 1, 95] - numpy.array([[1, x] for x in range(0, 96)]).flatten() + numpy.array([[1, x] for x in range(0, MAX_ANTENNA)]).flatten() } antennafield_proxy = self.proxy @@ -234,20 +234,20 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): antennafield_proxy.put_property(mapping_properties) antennafield_proxy.boot() - self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 96) + self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA) try: antennafield_proxy.write_attribute( - "HBAT_PWR_on_RW", [[True] * 32] * 96 + "HBAT_PWR_on_RW", [[True] * N_elements * N_pol] * MAX_ANTENNA ) numpy.testing.assert_equal( - numpy.array([[True] * 32] * 96), + numpy.array([[True] * N_elements * N_pol] * MAX_ANTENNA), self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value ) finally: # Always restore recv again self.recv_proxy.write_attribute( - "HBAT_PWR_on_RW", [[False] * 32] * 96 + "HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA ) # Verify device did not enter FAULT state @@ -258,10 +258,10 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): mapping_properties = { "RECV_devices": ["STAT/RECV/1"], - "Power_to_RECV_mapping": [-1, -1] * 48, + "Power_to_RECV_mapping": [-1, -1] * N_HBA_TILES, "Control_to_RECV_mapping": # [1, 0, 1, 1, 1, 2, 1, x ... 1, 95] - numpy.array([[1, x] for x in range(0, 96)]).flatten() + numpy.array([[1, x] for x in range(0, MAX_ANTENNA)]).flatten() } antennafield_proxy = self.proxy @@ -269,20 +269,20 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): antennafield_proxy.put_property(mapping_properties) antennafield_proxy.boot() - self.recv_proxy.write_attribute("RCU_band_select_RW", [[False] * 3] * 32) + self.recv_proxy.write_attribute("RCU_band_select_RW", [[False] * N_rcu_inp] * N_rcu) try: antennafield_proxy.write_attribute( - "RCU_band_select_RW", [True] * 96 + "RCU_band_select_RW", [True] * MAX_ANTENNA ) numpy.testing.assert_equal( - numpy.array([[True] * 3] * 32), + numpy.array([[True] * N_rcu_inp] * N_rcu), self.recv_proxy.read_attribute("RCU_band_select_RW").value ) finally: # Always restore recv again self.recv_proxy.write_attribute( - "RCU_band_select_RW", [[False] * 3] * 32 + "RCU_band_select_RW", [[False] * N_rcu_inp] * N_rcu ) # Verify device did not enter FAULT state diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_beamlet.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_beamlet.py index 6aa67918d4b76867109234478c281ad68789da08..d16cbb9abbdd75f443ac9e89367b00cf2e7ae082 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_beamlet.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_beamlet.py @@ -10,6 +10,7 @@ from .base import AbstractTestBases from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy +from tangostationcontrol.common.constants import N_beamlets_ctrl, S_pn, CLK_200_MHZ, CLK_160_MHZ, MAX_INPUTS, N_pn from tango import DevState @@ -30,7 +31,7 @@ class TestDeviceBeamlet(AbstractTestBases.TestDeviceBase): super().test_device_read_all_attributes() - def setup_sdp(self, antenna_type="HBA", clock=200_000_000): + def setup_sdp(self, antenna_type="HBA", clock=CLK_200_MHZ): # setup SDP, on which this device depends sdp_proxy = TestDeviceProxy("STAT/SDP/1") sdp_proxy.off() @@ -38,7 +39,7 @@ class TestDeviceBeamlet(AbstractTestBases.TestDeviceBase): sdp_proxy.set_defaults() # setup the frequencies as expected in the test - sdp_proxy.antenna_type_RW = [[antenna_type] * 12] * 16 + sdp_proxy.antenna_type_RW = [[antenna_type] * S_pn] * N_pn sdp_proxy.clock_RW = clock return sdp_proxy @@ -53,14 +54,14 @@ class TestDeviceBeamlet(AbstractTestBases.TestDeviceBase): # The subband frequency of HBA subband 0 is 200 MHz, # so its period is 5 ns. A delay of 2.5e-9 seconds is thus half a period, # and result in a 180 degree phase offset. - delays = numpy.array([[2.5e-9] * 192] * 488) + delays = numpy.array([[2.5e-9] * MAX_INPUTS] * N_beamlets_ctrl) calculated_bf_weights = self.proxy.calculate_bf_weights(delays.flatten()) # With a unit weight of 2**14, we thus expect beamformer weights of -2**14 + 0j, # which is 49152 when read as an uint32. self.assertEqual(-2**14, c_short(49152).value) # check our calculations - expected_bf_weights = numpy.array([49152] * 192 * 488, dtype=numpy.uint32) + expected_bf_weights = numpy.array([49152] * MAX_INPUTS * N_beamlets_ctrl, dtype=numpy.uint32) numpy.testing.assert_almost_equal(expected_bf_weights, calculated_bf_weights) @@ -72,17 +73,17 @@ class TestDeviceBeamlet(AbstractTestBases.TestDeviceBase): self.proxy.off() self.proxy.initialise() self.assertEqual(DevState.STANDBY, self.proxy.state()) - self.proxy.subband_select_RW = [10] * 488 + self.proxy.subband_select_RW = [10] * N_beamlets_ctrl self.proxy.on() self.assertEqual(DevState.ON, self.proxy.state()) # The subband frequency of HBA subband 10 is 201953125 Hz # so its period is 4.95 ns ca, half period is 2.4758e-9 - delays = numpy.array([[2.4758e-9] * 192] * 488) + delays = numpy.array([[2.4758e-9] * MAX_INPUTS] * N_beamlets_ctrl) calculated_bf_weights_subband_10 = self.proxy.calculate_bf_weights(delays.flatten()) self.assertEqual(-2**14, c_short(49152).value) # check our calculations - expected_bf_weights_10 = numpy.array([49152] * 192 * 488, dtype=numpy.uint32) + expected_bf_weights_10 = numpy.array([49152] * MAX_INPUTS * N_beamlets_ctrl, dtype=numpy.uint32) numpy.testing.assert_almost_equal(expected_bf_weights_10, calculated_bf_weights_subband_10) def test_sdp_clock_change(self): @@ -90,21 +91,21 @@ class TestDeviceBeamlet(AbstractTestBases.TestDeviceBase): sdp_proxy = self.setup_sdp() self.proxy.initialise() - self.proxy.subband_select_RW = numpy.array(list(range(317)) + [316] + list(range(318,488)), dtype=numpy.uint32) + self.proxy.subband_select_RW = numpy.array(list(range(317)) + [316] + list(range(318,N_beamlets_ctrl)), dtype=numpy.uint32) self.proxy.on() # any non-zero delay should result in different weights for different clocks - delays = numpy.array([[2.5e-9] * 192] * 488) + delays = numpy.array([[2.5e-9] * MAX_INPUTS] * N_beamlets_ctrl) - sdp_proxy.clock_RW = 200 * 1000000 + sdp_proxy.clock_RW = CLK_200_MHZ time.sleep(1) # wait for beamlet device to process change event calculated_bf_weights_200 = self.proxy.calculate_bf_weights(delays.flatten()) - sdp_proxy.clock_RW = 160 * 1000000 + sdp_proxy.clock_RW = CLK_160_MHZ time.sleep(1) # wait for beamlet device to process change event calculated_bf_weights_160 = self.proxy.calculate_bf_weights(delays.flatten()) - sdp_proxy.clock_RW = 200 * 1000000 + sdp_proxy.clock_RW = CLK_200_MHZ time.sleep(1) # wait for beamlet device to process change event calculated_bf_weights_200_v2 = self.proxy.calculate_bf_weights(delays.flatten()) @@ -115,12 +116,12 @@ class TestDeviceBeamlet(AbstractTestBases.TestDeviceBase): # change subbands self.proxy.off() self.proxy.initialise() - self.proxy.subband_select_RW = [317] * 488 + self.proxy.subband_select_RW = [317] * N_beamlets_ctrl self.proxy.on() calculated_bf_weights_200_v3 = self.proxy.calculate_bf_weights(delays.flatten()) self.assertTrue((calculated_bf_weights_200_v2 != calculated_bf_weights_200_v3).all()) - sdp_proxy.clock_RW = 160 * 1000000 + sdp_proxy.clock_RW = CLK_160_MHZ time.sleep(1) # wait for beamlet device to process change event calculated_bf_weights_160_v2 = self.proxy.calculate_bf_weights(delays.flatten()) self.assertTrue((calculated_bf_weights_160 != calculated_bf_weights_160_v2).all()) diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_boot.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_boot.py index e9fb0f16e30cedd18a2dc0dc4df2f09fcb0ab213..7bd56cba750ac29bce04b45f01872b15671ff2a3 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_boot.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_boot.py @@ -13,6 +13,7 @@ from tango import DevState from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy from tangostationcontrol.integration_test import base +from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD class TestDeviceBoot(base.IntegrationTestCase): @@ -29,7 +30,7 @@ class TestDeviceBoot(base.IntegrationTestCase): """Test if we can reinitialise the station""" # This attribute needs to be polled for the TemperatureManager test to succesfully initialise - TestDeviceProxy("STAT/RECV/1").poll_attribute("HBAT_LED_on_RW", 1000) + TestDeviceProxy("STAT/RECV/1").poll_attribute("HBAT_LED_on_RW", DEFAULT_POLLING_PERIOD) self.proxy.reboot() diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py index 9a7d22306e6aa842e8976a37c7a1d2b8e1923c1e..5cd8c534db104c7ee6c975ff43f5e397cad69cdb 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py @@ -10,6 +10,7 @@ from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse +from tangostationcontrol.common.constants import MAX_ANTENNA, N_beamlets_ctrl, N_pn, CLK_200_MHZ, CLK_160_MHZ, N_HBA_TILES from .base import AbstractTestBases @@ -18,9 +19,9 @@ import time class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): - antenna_qualities_ok = numpy.array([AntennaQuality.OK] * 96) - antenna_qualities_only_second = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * 94) - antenna_use_ok = numpy.array([AntennaUse.AUTO] * 96) + antenna_qualities_ok = numpy.array([AntennaQuality.OK] * MAX_ANTENNA) + antenna_qualities_only_second = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * (MAX_ANTENNA - 2)) + antenna_use_ok = numpy.array([AntennaUse.AUTO] * MAX_ANTENNA) antennafield_iden = "STAT/AntennaField/1" beamlet_iden = "STAT/Beamlet/1" @@ -67,7 +68,7 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): def setup_antennafield_proxy(self, antenna_qualities, antenna_use): # setup AntennaField - NR_TILES = 48 + NR_TILES = N_HBA_TILES antennafield_proxy = TestDeviceProxy(self.antennafield_iden) control_mapping = [[1,i] for i in range(NR_TILES)] antennafield_proxy.put_property({ @@ -94,7 +95,7 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): self.beamlet_proxy.on() # Set first (default) clock configuration - self.sdp_proxy.clock_RW = 200 * 1000000 + self.sdp_proxy.clock_RW = CLK_200_MHZ time.sleep(1) self.proxy.initialise() @@ -102,7 +103,7 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): self.proxy.on() # Point to Zenith - self.proxy.set_pointing(numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten()) + self.proxy.set_pointing(numpy.array([["AZELGEO", "0deg", "90deg"]] * N_beamlets_ctrl).flatten()) # beam weights should now be non-zero, we don't actually check their values for correctness FPGA_bf_weights_xx_yy_clock200 = self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW.flatten() @@ -112,7 +113,7 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): self.beamlet_proxy.on() # Change clock configuration - self.sdp_proxy.clock_RW = 160 * 1000000 + self.sdp_proxy.clock_RW = CLK_160_MHZ time.sleep(1) FPGA_bf_weights_xx_yy_clock160 = self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW.flatten() @@ -130,7 +131,7 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok) self.beamlet_proxy = self.initialise_beamlet_proxy() - self.beamlet_proxy.subband_select_RW = numpy.array(list(range(317)) + [316] + list(range(318,488)), dtype=numpy.uint32) + self.beamlet_proxy.subband_select_RW = numpy.array(list(range(317)) + [316] + list(range(318,N_beamlets_ctrl)), dtype=numpy.uint32) self.beamlet_proxy.on() self.proxy.initialise() @@ -138,13 +139,13 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): self.proxy.on() # Point to Zenith - self.proxy.set_pointing(numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten()) + self.proxy.set_pointing(numpy.array([["AZELGEO", "0deg", "90deg"]] * N_beamlets_ctrl).flatten()) # Store values with first subband configuration FPGA_bf_weights_xx_yy_subband_v1 = self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW.flatten() # Restart beamlet proxy self.beamlet_proxy = self.initialise_beamlet_proxy() - self.beamlet_proxy.subband_select_RW = [317] * 488 + self.beamlet_proxy.subband_select_RW = [317] * N_beamlets_ctrl self.beamlet_proxy.on() # Store values with second subband configuration @@ -167,14 +168,14 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): self.proxy.Tracking_enabled_RW = False self.proxy.on() - all_zeros = numpy.array([[0] * 5856] * 16) + all_zeros = numpy.array([[0] * 5856] * N_pn) self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW = all_zeros # Enable all inputs - self.proxy.input_select_RW = numpy.array([[True] * 488] * 96) + self.proxy.input_select_RW = numpy.array([[True] * N_beamlets_ctrl] * MAX_ANTENNA) self.proxy.set_pointing( - numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten() + numpy.array([["AZELGEO", "0deg", "90deg"]] * N_beamlets_ctrl).flatten() ) # Verify all zeros are replaced with other values for all inputs @@ -197,14 +198,14 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): self.proxy.Tracking_enabled_RW = False self.proxy.on() - non_zeros = numpy.array([[16] * 5856] * 16) + non_zeros = numpy.array([[N_pn] * 5856] * N_pn) self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW = non_zeros # Disable all inputs - self.proxy.input_select_RW = numpy.array([[False] * 488] * 96) + self.proxy.input_select_RW = numpy.array([[False] * N_beamlets_ctrl] * MAX_ANTENNA) self.proxy.set_pointing( - numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten() + numpy.array([["AZELGEO", "0deg", "90deg"]] * N_beamlets_ctrl).flatten() ) # Verify all zeros are replaced with other values for all inputs @@ -220,12 +221,12 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): ) antennafield_proxy = self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok) - numpy.testing.assert_equal(numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R) + numpy.testing.assert_equal(numpy.array([True] * MAX_ANTENNA), antennafield_proxy.Antenna_Usage_Mask_R) self.setUp() self.proxy.warm_boot() - expected_input_select = numpy.array([[True] * 488 ] * 48 + [[False] * 488] * 48) # first 48 rows are True + expected_input_select = numpy.array([[True] * N_beamlets_ctrl ] * N_HBA_TILES + [[False] * N_beamlets_ctrl] * N_HBA_TILES) # first 48 rows are True numpy.testing.assert_equal(expected_input_select, self.proxy.input_select_RW) - expected_antenna_select = numpy.array([[True] * 488 ] * 48) + expected_antenna_select = numpy.array([[True] * N_beamlets_ctrl ] * N_HBA_TILES) numpy.testing.assert_equal(expected_antenna_select, self.proxy.antenna_select_RW) def test_input_select_with_only_second_antenna_ok(self): @@ -236,10 +237,10 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): ) antennafield_proxy = self.setup_antennafield_proxy(self.antenna_qualities_only_second, self.antenna_use_ok) - numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 94), antennafield_proxy.Antenna_Usage_Mask_R) + numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * (MAX_ANTENNA - 2)), antennafield_proxy.Antenna_Usage_Mask_R) self.setUp() self.proxy.warm_boot() - expected_input_select = numpy.array([[False] * 488 ] + [[True] * 488] + [[False] * 488] * 46 + [[False] * 488] * 48) # first 48 rows are True + expected_input_select = numpy.array([[False] * N_beamlets_ctrl ] + [[True] * N_beamlets_ctrl] + [[False] * N_beamlets_ctrl] * (N_HBA_TILES - 2) + [[False] * N_beamlets_ctrl] * N_HBA_TILES) # first 48 rows are True numpy.testing.assert_equal(expected_input_select, self.proxy.input_select_RW) - expected_antenna_select = numpy.array([[False] * 488 ] + [[True] * 488] + [[False] * 488] * 46) + expected_antenna_select = numpy.array([[False] * N_beamlets_ctrl ] + [[True] * N_beamlets_ctrl] + [[False] * N_beamlets_ctrl] * (N_HBA_TILES - 2)) numpy.testing.assert_equal(expected_antenna_select, self.proxy.antenna_select_RW) diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation.py index 46c77246eb5fd876079998c341bc7b4693d48ec3..968394d39a909b8d08391c11f87857ec29d5fda1 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation.py @@ -15,13 +15,11 @@ from tango import DevState, DevFailed from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy from tangostationcontrol.test.devices.test_observation_base import TestObservationBase from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse +from tangostationcontrol.common.constants import N_beamlets_ctrl, MAX_ANTENNA, N_HBA_TILES from .base import AbstractTestBases class TestDeviceObservation(AbstractTestBases.TestDeviceBase): - NUM_TILES = 48 - NUM_BEAMLETS_CTRL = 488 - NUM_INPUTS = 96 ANTENNA_TO_SDP_MAPPING = [ "0", "0", "0", "1", "0", "2", "0", "3", "0", "4", "0", "5", "1", "0", "1", "1", "1", "2", "1", "3", "1", "4", "1", "5", @@ -61,9 +59,9 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase): def setup_antennafield_proxy(self): # setup AntennaField antennafield_proxy = TestDeviceProxy("STAT/AntennaField/1") - control_mapping = [[1,i] for i in range(self.NUM_TILES)] - antenna_qualities = numpy.array([AntennaQuality.OK] * 96) - antenna_use = numpy.array([AntennaUse.AUTO] * 96) + control_mapping = [[1,i] for i in range(N_HBA_TILES)] + antenna_qualities = numpy.array([AntennaQuality.OK] * MAX_ANTENNA) + antenna_use = numpy.array([AntennaUse.AUTO] * MAX_ANTENNA) antennafield_proxy.put_property({"RECV_devices": ["STAT/RECV/1"], "Control_to_RECV_mapping": numpy.array(control_mapping).flatten(), "Antenna_to_SDP_Mapping": self.ANTENNA_TO_SDP_MAPPING, @@ -94,7 +92,7 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase): tilebeam_proxy.off() tilebeam_proxy.warm_boot() tilebeam_proxy.set_defaults() - return tilebeam_proxy + return tilebeam_proxy def test_init_valid(self): """Initialize an observation with valid JSON""" @@ -158,10 +156,10 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase): """Test that attributes antenna_mask and filter are correctly applied""" self.setup_recv_proxy() antennafield_proxy = self.setup_antennafield_proxy() - antennafield_proxy.ANT_mask_RW = [True] * 48 # set all masks to True - self.assertListEqual(antennafield_proxy.ANT_mask_RW.tolist(), [True] * 48) - antennafield_proxy.RCU_band_select_RW = [0] * 48 - self.assertListEqual(antennafield_proxy.RCU_band_select_RW.tolist(), [0] * 48) + antennafield_proxy.ANT_mask_RW = [True] * N_HBA_TILES # set all masks to True + self.assertListEqual(antennafield_proxy.ANT_mask_RW.tolist(), [True] * N_HBA_TILES) + antennafield_proxy.RCU_band_select_RW = [0] * N_HBA_TILES + self.assertListEqual(antennafield_proxy.RCU_band_select_RW.tolist(), [0] * N_HBA_TILES) self.proxy.off() self.proxy.observation_settings_RW = self.VALID_JSON self.proxy.Initialise() @@ -174,46 +172,46 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase): def test_apply_subbands(self): """Test that attribute sap subbands is correctly applied""" beamlet_proxy = self.setup_beamlet_proxy() - subband_select = [0] * self.NUM_BEAMLETS_CTRL + subband_select = [0] * N_beamlets_ctrl beamlet_proxy.subband_select_RW = subband_select self.assertListEqual(beamlet_proxy.subband_select_RW.tolist(), subband_select) self.proxy.off() self.proxy.observation_settings_RW = self.VALID_JSON self.proxy.Initialise() self.proxy.On() - expected_subbands = [10,20,30] + [0] * (self.NUM_BEAMLETS_CTRL-3) + expected_subbands = [10,20,30] + [0] * (N_beamlets_ctrl-3) self.assertListEqual(beamlet_proxy.subband_select_RW.tolist(), expected_subbands) def test_apply_pointing(self): """Test that attribute sap pointing is correctly applied""" digitalbeam_proxy = self.setup_digitalbeam_proxy() - default_pointing = [("AZELGEO","0deg","90deg")]*488 + default_pointing = [("AZELGEO","0deg","90deg")]*N_beamlets_ctrl digitalbeam_proxy.Pointing_direction_RW = default_pointing self.assertListEqual(list(digitalbeam_proxy.Pointing_direction_RW), default_pointing) self.proxy.off() self.proxy.observation_settings_RW = self.VALID_JSON self.proxy.Initialise() self.proxy.On() - expected_pointing = [("J2000","1.5deg","0deg")] + [("AZELGEO","0deg","90deg")] * 487 + expected_pointing = [("J2000","1.5deg","0deg")] + [("AZELGEO","0deg","90deg")] * (N_beamlets_ctrl - 1) self.assertListEqual(list(digitalbeam_proxy.Pointing_direction_RW), expected_pointing) def test_apply_antenna_select(self): """Test that antenna selection is correctly applied""" digitalbeam_proxy = self.setup_digitalbeam_proxy() - default_selection = [[False] * self.NUM_BEAMLETS_CTRL] * self.NUM_INPUTS + default_selection = [[False] * N_beamlets_ctrl] * MAX_ANTENNA digitalbeam_proxy.antenna_select_RW = default_selection self.assertListEqual(digitalbeam_proxy.antenna_select_RW.tolist()[9], default_selection[9]) self.proxy.off() self.proxy.observation_settings_RW = self.VALID_JSON self.proxy.Initialise() self.proxy.On() - self.assertListEqual(digitalbeam_proxy.antenna_select_RW.tolist()[9], [True] * self.NUM_BEAMLETS_CTRL) - self.assertListEqual(digitalbeam_proxy.antenna_select_RW.tolist()[10], [False] * self.NUM_BEAMLETS_CTRL) + self.assertListEqual(digitalbeam_proxy.antenna_select_RW.tolist()[9], [True] * N_beamlets_ctrl) + self.assertListEqual(digitalbeam_proxy.antenna_select_RW.tolist()[10], [False] * N_beamlets_ctrl) def test_apply_tilebeam(self): """Test that attribute tilebeam is correctly applied""" tilebeam_proxy = self.setup_tilebeam_proxy() - pointing_direction = [("J2000","0deg","0deg")] * self.NUM_TILES + pointing_direction = [("J2000","0deg","0deg")] * N_HBA_TILES tilebeam_proxy.Pointing_direction_RW = pointing_direction self.assertListEqual(list(tilebeam_proxy.Pointing_direction_RW[0]), ["J2000","0deg","0deg"]) self.proxy.off() diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation_control.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation_control.py index ac325e0cbcc9f5dd663ce21c104461516819a9cf..9f851600ae350c8a9116be67ce511910f28886be 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation_control.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation_control.py @@ -19,11 +19,10 @@ from tangostationcontrol.test.devices.test_observation_base import TestObservati from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy from .base import AbstractTestBases +from tangostationcontrol.common.constants import N_HBA_TILES + class TestObservationControlDevice(AbstractTestBases.TestDeviceBase): - NUM_TILES = 48 - NUM_BEAMLETS_CTRL = 488 - NUM_INPUTS = 96 ANTENNA_TO_SDP_MAPPING = [ "0", "0", "0", "1", "0", "2", "0", "3", "0", "4", "0", "5", "1", "0", "1", "1", "1", "2", "1", "3", "1", "4", "1", "5", @@ -63,7 +62,7 @@ class TestObservationControlDevice(AbstractTestBases.TestDeviceBase): def setup_antennafield_proxy(self): # setup AntennaField antennafield_proxy = TestDeviceProxy("STAT/AntennaField/1") - control_mapping = [[1,i] for i in range(self.NUM_TILES)] + control_mapping = [[1,i] for i in range(N_HBA_TILES)] antennafield_proxy.put_property({"RECV_devices": ["STAT/RECV/1"], "Power_to_RECV_mapping": numpy.array(control_mapping).flatten(), "Antenna_to_SDP_Mapping": self.ANTENNA_TO_SDP_MAPPING}) diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_sdp.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_sdp.py index 570d8ce6cee1bc2f1ab833c9d93acdd11cc270f7..12b1d031b83b1aca9c4e6fc5dd163fe5cab9613f 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_sdp.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_sdp.py @@ -8,7 +8,7 @@ # See LICENSE.txt for more info. from .base import AbstractTestBases - +from tangostationcontrol.common.constants import N_pn class TestDeviceSDP(AbstractTestBases.TestDeviceBase): @@ -21,4 +21,4 @@ class TestDeviceSDP(AbstractTestBases.TestDeviceBase): self.proxy.warm_boot() - self.assertListEqual([True]*16, list(self.proxy.TR_fpga_communication_error_R)) + self.assertListEqual([True]*N_pn, list(self.proxy.TR_fpga_communication_error_R)) diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_temperature_manager.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_temperature_manager.py index 44dcfdcf1e02afb14ac80de0d79416a46144fc8e..84323e433ddb303af4de85210218179565b85044 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_temperature_manager.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_temperature_manager.py @@ -11,6 +11,8 @@ from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy from tango._tango import DevState from tango import DeviceProxy +from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD, N_elements, N_pol, N_rcu, N_rcu_inp + import time import logging @@ -33,7 +35,7 @@ class TestDeviceTemperatureManager(AbstractTestBases.TestDeviceBase): recv_proxy.initialise() recv_proxy.on() - recv_proxy.poll_attribute("HBAT_LED_on_RW", 1000) + recv_proxy.poll_attribute("HBAT_LED_on_RW", DEFAULT_POLLING_PERIOD) self.assertTrue(recv_proxy.is_attribute_polled(f"HBAT_LED_on_RW")) return recv_proxy @@ -69,12 +71,12 @@ class TestDeviceTemperatureManager(AbstractTestBases.TestDeviceBase): self.assertEqual(self.proxy.get_property('Shutdown_Device_List')['Shutdown_Device_List'][0], "STAT/SDP/1") # Here we trigger our own change event by just using an RW attribute - self.recv_proxy.HBAT_LED_on_RW = [[False] * 32] * 96 + self.recv_proxy.HBAT_LED_on_RW = [[False] * N_elements, N_pol] * N_rcu * N_rcu_inp time.sleep(2) self.assertFalse(self.proxy.is_alarming_R) - self.recv_proxy.HBAT_LED_on_RW = [[True] * 32] * 96 + self.recv_proxy.HBAT_LED_on_RW = [[True] * N_elements, N_pol] * N_rcu * N_rcu_inp time.sleep(2) # the TEMP_MANAGER_is_alarming_R should now be True, since it should have detected the temperature alarm. diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_tilebeam.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_tilebeam.py index c474e6628d695d4fd29eba7e0fed04e296477a8a..e272a25becf5a98ed507368e72c8a9f5c586cb62 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_tilebeam.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_tilebeam.py @@ -14,6 +14,7 @@ import json from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse +from tangostationcontrol.common.constants import N_HBA_TILES, MAX_ANTENNA, N_elements, N_pol from .base import AbstractTestBases @@ -26,10 +27,8 @@ class NumpyEncoder(json.JSONEncoder): class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): - # The AntennaField is setup with self.NR_TILES tiles in the test configuration - NR_TILES = 48 - POINTING_DIRECTION = numpy.array([["J2000","0deg","0deg"]] * NR_TILES).flatten() + POINTING_DIRECTION = numpy.array([["J2000","0deg","0deg"]] * N_HBA_TILES).flatten() def setUp(self): super().setUp("STAT/TileBeam/1") @@ -45,9 +44,9 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): def setup_antennafield_proxy(self): # setup AntennaField antennafield_proxy = TestDeviceProxy("STAT/AntennaField/1") - control_mapping = [[1,i] for i in range(self.NR_TILES)] - antenna_qualities = numpy.array([AntennaQuality.OK] * 96) - antenna_use = numpy.array([AntennaUse.AUTO] * 96) + control_mapping = [[1,i] for i in range(N_HBA_TILES)] + antenna_qualities = numpy.array([AntennaQuality.OK] * MAX_ANTENNA) + antenna_use = numpy.array([AntennaUse.AUTO] * MAX_ANTENNA) antennafield_proxy.put_property({"RECV_devices": ["STAT/RECV/1"], "Control_to_RECV_mapping": numpy.array(control_mapping).flatten(), 'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use}) @@ -55,7 +54,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): antennafield_proxy.boot() # check if AntennaField really exposes the expected number of tiles - self.assertEqual(self.NR_TILES, antennafield_proxy.nr_antennas_R) + self.assertEqual(N_HBA_TILES, antennafield_proxy.nr_antennas_R) return antennafield_proxy def test_delays_dims(self): @@ -68,7 +67,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): # verify delays method returns the correct dimensions delays = self.proxy.delays(self.POINTING_DIRECTION) - self.assertEqual(self.NR_TILES*16, len(delays)) + self.assertEqual(N_HBA_TILES*N_elements, len(delays)) def test_set_pointing(self): """Verify if set pointing procedure is correctly executed""" @@ -102,11 +101,11 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): self.proxy.Tracking_enabled_RW = False # Point to Zenith - self.proxy.set_pointing(numpy.array([["AZELGEO","0deg","90deg"]] * self.NR_TILES).flatten()) + self.proxy.set_pointing(numpy.array([["AZELGEO","0deg","90deg"]] * N_HBA_TILES).flatten()) calculated_HBAT_delay_steps = numpy.array(antennafield_proxy.read_attribute('HBAT_BF_delay_steps_RW').value) - expected_HBAT_delay_steps = numpy.array([[15] * 32] * self.NR_TILES, dtype=numpy.int64) + expected_HBAT_delay_steps = numpy.array([[15] * N_elements * N_pol] * N_HBA_TILES, dtype=numpy.int64) numpy.testing.assert_equal(calculated_HBAT_delay_steps, expected_HBAT_delay_steps) @@ -119,7 +118,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): self.proxy.Tracking_enabled_RW = False # point at north on the horizon - self.proxy.set_pointing(["AZELGEO","0deg","0deg"] * self.NR_TILES) + self.proxy.set_pointing(["AZELGEO","0deg","0deg"] * N_HBA_TILES) # obtain delays of the X polarisation of all the elements of the first tile north_beam_delay_steps = antennafield_proxy.HBAT_BF_delay_steps_RW[0].reshape(4,4,2)[:,:,0] @@ -129,7 +128,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): for angle in (90,180,270): # point at angle degrees (90=E, 180=S, 270=W) - self.proxy.set_pointing(["AZELGEO",f"{angle}deg","0deg"] * self.NR_TILES) + self.proxy.set_pointing(["AZELGEO",f"{angle}deg","0deg"] * N_HBA_TILES) # obtain delays of the X polarisation of all the elements of the first tile angled_beam_delay_steps = antennafield_proxy.HBAT_BF_delay_steps_RW[0].reshape(4,4,2)[:,:,0] @@ -147,7 +146,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): self.proxy.Tracking_enabled_RW = False # Point to LOFAR 1 ref pointing (0.929342, 0.952579, J2000) - pointings = numpy.array([["J2000", "0.929342rad", "0.952579rad"]] * self.NR_TILES).flatten() + pointings = numpy.array([["J2000", "0.929342rad", "0.952579rad"]] * N_HBA_TILES).flatten() # Need to set the time to '2022-01-18 11:19:35' timestamp = datetime.datetime(2022, 1, 18, 11, 19, 35).timestamp() @@ -159,7 +158,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): json_string = json.dumps(parameters, cls=NumpyEncoder) self.proxy.set_pointing_for_specific_time(json_string) - calculated_HBAT_delay_steps = numpy.array(antennafield_proxy.read_attribute('HBAT_BF_delay_steps_RW').value) # dims (self.NR_TILES, 32) + calculated_HBAT_delay_steps = numpy.array(antennafield_proxy.read_attribute('HBAT_BF_delay_steps_RW').value) # dims (N_HBA_TILES, 32) # Check all delay steps are zero with small margin # [24, 25, 27, 28, 17, 18, 20, 21, 10, 11, 13, 14, 3, 4, 5, 7] These are the real values from LOFAR. @@ -168,7 +167,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): # they can be explained. expected_HBAT_delay_steps = numpy.repeat(numpy.array([24, 25, 27, 29, 17, 18, 20, 21, 10, 11, 13, 14, 3, 4, 5, 7], dtype=numpy.int64), 2) numpy.testing.assert_equal(calculated_HBAT_delay_steps[0], expected_HBAT_delay_steps) - numpy.testing.assert_equal(calculated_HBAT_delay_steps[self.NR_TILES - 1], expected_HBAT_delay_steps) + numpy.testing.assert_equal(calculated_HBAT_delay_steps[N_HBA_TILES - 1], expected_HBAT_delay_steps) def test_tilebeam_tracking(self): self.setup_recv_proxy() @@ -179,7 +178,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): self.assertTrue(self.proxy.Tracking_enabled_R) # point somewhere - new_pointings = [("J2000",f"{tile}deg","0deg") for tile in range(self.NR_TILES)] + new_pointings = [("J2000",f"{tile}deg","0deg") for tile in range(N_HBA_TILES)] self.proxy.Pointing_direction_RW = new_pointings # check pointing diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_lofar_device.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_lofar_device.py index e1d3256b937f9d859b108ef76db3cf8cc9f861a8..96a7d0e4d7582fb3d639ae2c436c4a5cdefb19d3 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_lofar_device.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_lofar_device.py @@ -9,8 +9,10 @@ import time from tango import DevState -from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy +from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD + +from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy from tangostationcontrol.integration_test import base @@ -31,7 +33,7 @@ class TestProxyAttributeAccess(base.IntegrationTestCase): # make sure the attribute is polled, so we force proxies to access the poll first if self.proxy.is_attribute_polled(self.ATTRIBUTE_NAME): self.proxy.stop_poll_attribute(self.ATTRIBUTE_NAME) - self.proxy.poll_attribute(self.ATTRIBUTE_NAME, 1000) + self.proxy.poll_attribute(self.ATTRIBUTE_NAME, DEFAULT_POLLING_PERIOD) def dont_poll_attribute(self): # make sure the attribute is NOT polled, so we force proxies to access the device diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py index 65f2bcb543c81670a7231e1bd3fc2bd3ac4f718d..d14a36871e6ba9cd28491549a587508a9102b1be 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py @@ -20,7 +20,7 @@ from tangostationcontrol.devices import antennafield from tangostationcontrol.devices.antennafield import AntennaToRecvMapper, AntennaQuality, AntennaUse from tangostationcontrol.test import base from tangostationcontrol.test.devices import device_base -from tangostationcontrol.common.constants import MAX_ANTENNA, N_rcu +from tangostationcontrol.common.constants import MAX_ANTENNA, N_rcu, N_HBA_TILES logger = logging.getLogger() @@ -28,18 +28,18 @@ logger = logging.getLogger() class TestAntennaToRecvMapper(base.TestCase): # A mapping where Antennas are all not mapped to power RCUs - POWER_NOT_CONNECTED = [[-1, -1]] * 48 + POWER_NOT_CONNECTED = [[-1, -1]] * N_HBA_TILES # A mapping where Antennas are all not mapped to control RCUs - CONTROL_NOT_CONNECTED = [[-1, -1]] * 48 + CONTROL_NOT_CONNECTED = [[-1, -1]] * N_HBA_TILES # A mapping where first two Antennas are mapped on the first Receiver. # The first Antenna control line on RCU 1 and the second Antenna control line on RCU 0. - CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * 46 + CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * (N_HBA_TILES - 2) def test_ant_read_mask_r_no_mapping(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[False] * MAX_ANTENNA, [False] * MAX_ANTENNA, [False] * MAX_ANTENNA] - expected = [False] * 48 + expected = [False] * N_HBA_TILES actual = mapper.map_read("ANT_mask_RW", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -47,7 +47,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) receiver_values = [[False, True, False] + [False, False, False] * (N_rcu - 1), [False] * MAX_ANTENNA, [False] * MAX_ANTENNA] - expected = [True, False] + [False] * 46 + expected = [True, False] + [False] * (N_HBA_TILES - 2) actual = mapper.map_read("ANT_mask_RW", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -55,7 +55,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_rcu_band_select_no_mapping(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[0] * MAX_ANTENNA, [0] * MAX_ANTENNA, [0] * MAX_ANTENNA] - expected = [0] * 48 + expected = [0] * N_HBA_TILES actual = mapper.map_read("RCU_band_select_RW", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -63,7 +63,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA] - expected = [[0] * N_rcu] * 48 + expected = [[0] * N_rcu] * N_HBA_TILES actual = mapper.map_read("HBAT_BF_delay_steps_R", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -71,7 +71,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[2] * N_rcu, [1] * N_rcu] + [[0] * N_rcu] * (MAX_ANTENNA - 2), [[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA] - expected = [[1] * N_rcu, [2] * N_rcu] + [[0] * N_rcu] * 46 + expected = [[1] * N_rcu, [2] * N_rcu] + [[0] * N_rcu] * (N_HBA_TILES - 2) actual = mapper.map_read("HBAT_BF_delay_steps_R", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -80,7 +80,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA] - expected = [[0] * N_rcu] * 48 + expected = [[0] * N_rcu] * N_HBA_TILES actual = mapper.map_read("HBAT_BF_delay_steps_RW", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -88,7 +88,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[2] * N_rcu, [1] * N_rcu] + [[0] * N_rcu] * (MAX_ANTENNA - 2), [[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA] - expected = [[1] * N_rcu, [2] * N_rcu] + [[0] * N_rcu] * 46 + expected = [[1] * N_rcu, [2] * N_rcu] + [[0] * N_rcu] * (N_HBA_TILES - 2) actual = mapper.map_read("HBAT_BF_delay_steps_RW", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -97,7 +97,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[False] * N_rcu] * 48 + expected = [[False] * N_rcu] * N_HBA_TILES actual = mapper.map_read("HBAT_LED_on_R", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -106,7 +106,7 @@ class TestAntennaToRecvMapper(base.TestCase): receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * 46 + expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (N_HBA_TILES - 2) actual = mapper.map_read("HBAT_LED_on_R", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -114,7 +114,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[False] * N_rcu] * 48 + expected = [[False] * N_rcu] * N_HBA_TILES actual = mapper.map_read("HBAT_LED_on_RW", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -123,7 +123,7 @@ class TestAntennaToRecvMapper(base.TestCase): receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * 46 + expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (N_HBA_TILES - 2) actual = mapper.map_read("HBAT_LED_on_RW", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -131,7 +131,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[False] * N_rcu] * 48 + expected = [[False] * N_rcu] * N_HBA_TILES actual = mapper.map_read("HBAT_PWR_LNA_on_R", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -140,7 +140,7 @@ class TestAntennaToRecvMapper(base.TestCase): receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * 46 + expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (N_HBA_TILES - 2) actual = mapper.map_read("HBAT_PWR_LNA_on_R", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -148,7 +148,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[False] * N_rcu] * 48 + expected = [[False] * N_rcu] * N_HBA_TILES actual = mapper.map_read("HBAT_PWR_LNA_on_RW", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -157,7 +157,7 @@ class TestAntennaToRecvMapper(base.TestCase): receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * 46 + expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (N_HBA_TILES - 2) actual = mapper.map_read("HBAT_PWR_LNA_on_RW", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -165,7 +165,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[False] * N_rcu] * 48 + expected = [[False] * N_rcu] * N_HBA_TILES actual = mapper.map_read("HBAT_PWR_on_R", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -174,7 +174,7 @@ class TestAntennaToRecvMapper(base.TestCase): receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * 46 + expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (N_HBA_TILES - 2) actual = mapper.map_read("HBAT_PWR_on_R", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -182,7 +182,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[False] * N_rcu] * 48 + expected = [[False] * N_rcu] * N_HBA_TILES actual = mapper.map_read("HBAT_PWR_on_RW", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -191,7 +191,7 @@ class TestAntennaToRecvMapper(base.TestCase): receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * 46 + expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (N_HBA_TILES - 2) actual = mapper.map_read("HBAT_PWR_on_RW", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -202,7 +202,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [None] * 48 + set_values = [None] * N_HBA_TILES expected = [[None] * MAX_ANTENNA] actual = mapper.map_write("ANT_mask_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -212,7 +212,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [None] * 48 + set_values = [None] * N_HBA_TILES expected = [[None] * MAX_ANTENNA] * 2 actual = mapper.map_write("ANT_mask_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -220,7 +220,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_ant_mask_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [True, False] + [None] * 46 + set_values = [True, False] + [None] * (N_HBA_TILES - 2) expected = [[False, True] + [None] * (MAX_ANTENNA - 2)] actual = mapper.map_write("ANT_mask_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -228,7 +228,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_rcu_pwr_ant_on_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [None] * 48 + set_values = [None] * N_HBA_TILES expected = [[[None, None, None]] * N_rcu] actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -236,7 +236,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_rcu_pwr_ant_on_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [None] * 48 + set_values = [None] * N_HBA_TILES expected = [[[None, None, None]] * N_rcu] * 2 actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -244,7 +244,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_rcu_pwr_ant_on_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [1, 0] + [None] * 46 + set_values = [1, 0] + [None] * (N_HBA_TILES - 2) expected = [[[0, 1, None]] + [[None, None, None]] * (N_rcu - 1)] actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -252,7 +252,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_rcu_band_select_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [None] * 48 + set_values = [None] * N_HBA_TILES expected = [[[None, None, None]] * N_rcu] actual = mapper.map_write("RCU_band_select_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -260,7 +260,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_rcu_band_select_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [None] * 48 + set_values = [None] * N_HBA_TILES expected = [[[None, None, None]] * N_rcu] * 2 actual = mapper.map_write("RCU_band_select_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -268,7 +268,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_rcu_band_select_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [1, 0] + [None] * 46 + set_values = [1, 0] + [None] * (N_HBA_TILES - 2) expected = [[[0, 1, None]] + [[None, None, None]] * (N_rcu - 1)] actual = mapper.map_write("RCU_band_select_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -276,7 +276,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_bf_delay_steps_rw_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [[1] * N_rcu] * 48 + set_values = [[1] * N_rcu] * N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -284,7 +284,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_bf_delay_steps_rw_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [[1] * N_rcu] * 48 + set_values = [[1] * N_rcu] * N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -292,7 +292,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_bf_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[1] * N_rcu, [2] * N_rcu] + [[None] * N_rcu] * 46 + set_values = [[1] * N_rcu, [2] * N_rcu] + [[None] * N_rcu] * (N_HBA_TILES - 2) expected = [[[2] * N_rcu, [1] * N_rcu] + [[None] * N_rcu] * (MAX_ANTENNA - 2)] actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -300,7 +300,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_led_on_rw_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [[None] * N_rcu] * 48 + set_values = [[None] * N_rcu] * N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_LED_on_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -308,7 +308,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_led_on_rw_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [[None] * N_rcu] * 48 + set_values = [[None] * N_rcu] * N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_LED_on_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -316,7 +316,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * 46 + set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * (N_HBA_TILES - 2) expected = [[[True, False] * 16, [False, True] * 16] + [[None] * N_rcu] * (MAX_ANTENNA - 2)] actual = mapper.map_write("HBAT_LED_on_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -324,7 +324,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_pwr_lna_on_rw_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [[None] * N_rcu] * 48 + set_values = [[None] * N_rcu] * N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -332,7 +332,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_pwr_lna_on_rw_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [[None] * N_rcu] * 48 + set_values = [[None] * N_rcu] * N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -340,14 +340,14 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * 46 + set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * (N_HBA_TILES - 2) expected = [[[True, False] * 16, [False, True] * 16] + [[None] * N_rcu] * (MAX_ANTENNA - 2)] actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_on_rw_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [[None] * N_rcu] * 48 + set_values = [[None] * N_rcu] * N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_PWR_on_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -355,7 +355,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_lna_on_rw_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [[None] * N_rcu] * 48 + set_values = [[None] * N_rcu] * N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_PWR_on_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -363,7 +363,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * 46 + set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * (N_HBA_TILES - 2) expected = [[[True, False] * 16, [False, True] * 16] + [[None] * N_rcu] * (MAX_ANTENNA - 2)] actual = mapper.map_write("HBAT_PWR_on_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -491,7 +491,7 @@ class TestAntennafieldDevice(device_base.DeviceTestCase): ) as proxy: proxy.boot() - proxy.write_attribute("HBAT_PWR_on_RW", numpy.array([[False] * N_rcu] * 48)) + proxy.write_attribute("HBAT_PWR_on_RW", numpy.array([[False] * N_rcu] * N_HBA_TILES)) numpy.testing.assert_equal( m_proxy.return_value.write_attribute.call_args[0][1],