diff --git a/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py b/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py index 0a779f980eabd5cc08d95f0521e48acceefee2f5..9c99a5ac776a61334bbade193c12c425e0d92d33 100644 --- a/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py +++ b/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py @@ -7,6 +7,8 @@ import numpy import numpy.testing import casacore +from tangostationcontrol.common.constants import MAX_ANTENNA, N_beamlets_ctrl + from tangostationcontrol.beam.delays import Delays from tangostationcontrol.test import base @@ -179,8 +181,8 @@ class TestDelays(base.TestCase): timestamp = datetime.datetime(2022, 3, 1, 0, 0, 0) # timestamp does not actually matter, but casacore doesn't know that. d.set_measure_time(timestamp) - positions = numpy.array([[1,2,3]] * 96) - directions = numpy.array([["J2000", "0deg", "0deg"]] * 488) + positions = numpy.array([[1,2,3]] * MAX_ANTENNA) + directions = numpy.array([["J2000", "0deg", "0deg"]] * N_beamlets_ctrl) count = 10 before = time.monotonic_ns() diff --git a/tangostationcontrol/tangostationcontrol/test/common/test_baselines.py b/tangostationcontrol/tangostationcontrol/test/common/test_baselines.py index 25eb5d1dfffa2fca8748d74020893edfb17c2037..0701dee8262042bca4039d66939bd2246bb76ed9 100644 --- a/tangostationcontrol/tangostationcontrol/test/common/test_baselines.py +++ b/tangostationcontrol/tangostationcontrol/test/common/test_baselines.py @@ -8,6 +8,7 @@ # See LICENSE.txt for more info. from tangostationcontrol.common import baselines +from tangostationcontrol.common.constants import MAX_INPUTS from tangostationcontrol.test import base @@ -24,7 +25,7 @@ class TestBaselines(base.TestCase): def test_baseline_indices(self): """ Test whether baseline_from_index and baseline_index line up. """ - for major in range(192): + for major in range(MAX_INPUTS): for minor in range(major + 1): idx = baselines.baseline_index(major, minor) self.assertEqual((major, minor), baselines.baseline_from_index(idx), msg=f'baseline_index({major},{minor}) resulted in {idx}, and should match baseline_from_index({idx})') diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py index 5e73c6ae50986b76d52c376926d623f93b07b0cb..65f2bcb543c81670a7231e1bd3fc2bd3ac4f718d 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py @@ -20,6 +20,7 @@ from tangostationcontrol.devices import antennafield from tangostationcontrol.devices.antennafield import AntennaToRecvMapper, AntennaQuality, AntennaUse from tangostationcontrol.test import base from tangostationcontrol.test.devices import device_base +from tangostationcontrol.common.constants import MAX_ANTENNA, N_rcu logger = logging.getLogger() @@ -37,7 +38,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_ant_read_mask_r_no_mapping(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[False] * 96, [False] * 96, [False] * 96] + receiver_values = [[False] * MAX_ANTENNA, [False] * MAX_ANTENNA, [False] * MAX_ANTENNA] expected = [False] * 48 actual = mapper.map_read("ANT_mask_RW", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -45,7 +46,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_ant_read_mask_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[False, True, False] + [False, False, False] * 31, [False] * 96, [False] * 96] + receiver_values = [[False, True, False] + [False, False, False] * (N_rcu - 1), [False] * MAX_ANTENNA, [False] * MAX_ANTENNA] expected = [True, False] + [False] * 46 actual = mapper.map_read("ANT_mask_RW", receiver_values) @@ -53,7 +54,7 @@ class TestAntennaToRecvMapper(base.TestCase): def test_rcu_band_select_no_mapping(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[0] * 96, [0] * 96, [0] * 96] + receiver_values = [[0] * MAX_ANTENNA, [0] * MAX_ANTENNA, [0] * MAX_ANTENNA] expected = [0] * 48 actual = mapper.map_read("RCU_band_select_RW", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -61,16 +62,16 @@ class TestAntennaToRecvMapper(base.TestCase): def test_bf_read_delay_steps_r_no_mapping(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[0] * 32] * 96, [[0] * 32] * 96, [[0] * 32] * 96] - expected = [[0] * 32] * 48 + receiver_values = [[[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA] + expected = [[0] * N_rcu] * 48 actual = mapper.map_read("HBAT_BF_delay_steps_R", receiver_values) numpy.testing.assert_equal(expected, actual) def test_bf_read_delay_steps_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[2] * 32, [1] * 32] + [[0] * 32] * 94, [[0] * 32] * 96, [[0] * 32] * 96] - expected = [[1] * 32, [2] * 32] + [[0] * 32] * 46 + receiver_values = [[[2] * N_rcu, [1] * N_rcu] + [[0] * N_rcu] * (MAX_ANTENNA - 2), [[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA] + expected = [[1] * N_rcu, [2] * N_rcu] + [[0] * N_rcu] * 46 actual = mapper.map_read("HBAT_BF_delay_steps_R", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -78,16 +79,16 @@ class TestAntennaToRecvMapper(base.TestCase): def test_bf_read_delay_steps_rw_no_mapping(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[0] * 32] * 96, [[0] * 32] * 96, [[0] * 32] * 96] - expected = [[0] * 32] * 48 + receiver_values = [[[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA] + expected = [[0] * N_rcu] * 48 actual = mapper.map_read("HBAT_BF_delay_steps_RW", receiver_values) numpy.testing.assert_equal(expected, actual) def test_bf_read_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[2] * 32, [1] * 32] + [[0] * 32] * 94, [[0] * 32] * 96, [[0] * 32] * 96] - expected = [[1] * 32, [2] * 32] + [[0] * 32] * 46 + receiver_values = [[[2] * N_rcu, [1] * N_rcu] + [[0] * N_rcu] * (MAX_ANTENNA - 2), [[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA] + expected = [[1] * N_rcu, [2] * N_rcu] + [[0] * N_rcu] * 46 actual = mapper.map_read("HBAT_BF_delay_steps_RW", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -95,102 +96,102 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_read_led_on_r_unmapped(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96] - expected = [[False] * 32] * 48 + receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] + expected = [[False] * N_rcu] * 48 actual = mapper.map_read("HBAT_LED_on_R", receiver_values) numpy.testing.assert_equal(expected, actual) def test_map_read_led_on_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96] + receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[True, False] * 16, [False, True] * 16] + [[False] * 32] * 46 + expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * 46 actual = mapper.map_read("HBAT_LED_on_R", receiver_values) numpy.testing.assert_equal(expected, actual) def test_map_read_led_on_rw_unmapped(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96] - expected = [[False] * 32] * 48 + receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] + expected = [[False] * N_rcu] * 48 actual = mapper.map_read("HBAT_LED_on_RW", receiver_values) numpy.testing.assert_equal(expected, actual) def test_map_read_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96] + receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[True, False] * 16, [False, True] * 16] + [[False] * 32] * 46 + expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * 46 actual = mapper.map_read("HBAT_LED_on_RW", receiver_values) numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_lna_on_r_unmapped(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96] - expected = [[False] * 32] * 48 + receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] + expected = [[False] * N_rcu] * 48 actual = mapper.map_read("HBAT_PWR_LNA_on_R", receiver_values) numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_lna_on_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96] + receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[True, False] * 16, [False, True] * 16] + [[False] * 32] * 46 + expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * 46 actual = mapper.map_read("HBAT_PWR_LNA_on_R", receiver_values) numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_lna_on_rw_unmapped(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96] - expected = [[False] * 32] * 48 + receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] + expected = [[False] * N_rcu] * 48 actual = mapper.map_read("HBAT_PWR_LNA_on_RW", receiver_values) numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96] + receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[True, False] * 16, [False, True] * 16] + [[False] * 32] * 46 + expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * 46 actual = mapper.map_read("HBAT_PWR_LNA_on_RW", receiver_values) numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_on_r_unmapped(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96] - expected = [[False] * 32] * 48 + receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] + expected = [[False] * N_rcu] * 48 actual = mapper.map_read("HBAT_PWR_on_R", receiver_values) numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_on_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96] + receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[True, False] * 16, [False, True] * 16] + [[False] * 32] * 46 + expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * 46 actual = mapper.map_read("HBAT_PWR_on_R", receiver_values) numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_on_rw_unmapped(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[False] * 32] * 96, [[False] * 32] * 96, [[False] * 32] * 96] - expected = [[False] * 32] * 48 + receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] + expected = [[False] * N_rcu] * 48 actual = mapper.map_read("HBAT_PWR_on_RW", receiver_values) numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3) - receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * 32] * 94, [[False] * 32] * 96, [[False] * 32] * 96] + receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA] - expected = [[True, False] * 16, [False, True] * 16] + [[False] * 32] * 46 + expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * 46 actual = mapper.map_read("HBAT_PWR_on_RW", receiver_values) numpy.testing.assert_equal(expected, actual) @@ -202,7 +203,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) set_values = [None] * 48 - expected = [[None] * 96] + expected = [[None] * MAX_ANTENNA] actual = mapper.map_write("ANT_mask_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -212,7 +213,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) set_values = [None] * 48 - expected = [[None] * 96] * 2 + expected = [[None] * MAX_ANTENNA] * 2 actual = mapper.map_write("ANT_mask_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -220,7 +221,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) set_values = [True, False] + [None] * 46 - expected = [[False, True] + [None] * 94] + expected = [[False, True] + [None] * (MAX_ANTENNA - 2)] actual = mapper.map_write("ANT_mask_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -228,7 +229,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) set_values = [None] * 48 - expected = [[[None, None, None]] * 32] + expected = [[[None, None, None]] * N_rcu] actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -236,7 +237,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) set_values = [None] * 48 - expected = [[[None, None, None]] * 32] * 2 + expected = [[[None, None, None]] * N_rcu] * 2 actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -244,7 +245,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) set_values = [1, 0] + [None] * 46 - expected = [[[0, 1, None]] + [[None, None, None]] * 31] + expected = [[[0, 1, None]] + [[None, None, None]] * (N_rcu - 1)] actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -252,7 +253,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) set_values = [None] * 48 - expected = [[[None, None, None]] * 32] + expected = [[[None, None, None]] * N_rcu] actual = mapper.map_write("RCU_band_select_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -260,7 +261,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) set_values = [None] * 48 - expected = [[[None, None, None]] * 32] * 2 + expected = [[[None, None, None]] * N_rcu] * 2 actual = mapper.map_write("RCU_band_select_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -268,102 +269,102 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) set_values = [1, 0] + [None] * 46 - expected = [[[0, 1, None]] + [[None, None, None]] * 31] + expected = [[[0, 1, None]] + [[None, None, None]] * (N_rcu - 1)] actual = mapper.map_write("RCU_band_select_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_bf_delay_steps_rw_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [[1] * 32] * 48 - expected = [[[None] * 32] * 96] + set_values = [[1] * N_rcu] * 48 + expected = [[[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_bf_delay_steps_rw_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [[1] * 32] * 48 - expected = [[[None] * 32] * 96, [[None] * 32] * 96] + set_values = [[1] * N_rcu] * 48 + expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_bf_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[1] * 32, [2] * 32] + [[None] * 32] * 46 - expected = [[[2] * 32, [1] * 32] + [[None] * 32] * 94] + set_values = [[1] * N_rcu, [2] * N_rcu] + [[None] * N_rcu] * 46 + expected = [[[2] * N_rcu, [1] * N_rcu] + [[None] * N_rcu] * (MAX_ANTENNA - 2)] actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_led_on_rw_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [[None] * 32] * 48 - expected = [[[None] * 32] * 96] + set_values = [[None] * N_rcu] * 48 + expected = [[[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_LED_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_led_on_rw_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [[None] * 32] * 48 - expected = [[[None] * 32] * 96, [[None] * 32] * 96] + set_values = [[None] * N_rcu] * 48 + expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_LED_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46 - expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94] + set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * 46 + expected = [[[True, False] * 16, [False, True] * 16] + [[None] * N_rcu] * (MAX_ANTENNA - 2)] actual = mapper.map_write("HBAT_LED_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_lna_on_rw_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [[None] * 32] * 48 - expected = [[[None] * 32] * 96] + set_values = [[None] * N_rcu] * 48 + expected = [[[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_lna_on_rw_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [[None] * 32] * 48 - expected = [[[None] * 32] * 96, [[None] * 32] * 96] + set_values = [[None] * N_rcu] * 48 + expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46 - expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94] + set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * 46 + expected = [[[True, False] * 16, [False, True] * 16] + [[None] * N_rcu] * (MAX_ANTENNA - 2)] actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_on_rw_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [[None] * 32] * 48 - expected = [[[None] * 32] * 96] + set_values = [[None] * N_rcu] * 48 + expected = [[[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_PWR_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_lna_on_rw_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [[None] * 32] * 48 - expected = [[[None] * 32] * 96, [[None] * 32] * 96] + set_values = [[None] * N_rcu] * 48 + expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_PWR_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46 - expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94] + set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * 46 + expected = [[[True, False] * 16, [False, True] * 16] + [[None] * N_rcu] * (MAX_ANTENNA - 2)] actual = mapper.map_write("HBAT_PWR_on_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -374,8 +375,8 @@ class TestAntennaToRecvMapper(base.TestCase): # self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1 # ) # - # merge_values = [[None] * 32] * 96 - # current_values = [[False] * 32] * 96 + # merge_values = [[None] * N_rcu] * MAX_ANTENNA + # current_values = [[False] * N_rcu] * MAX_ANTENNA # # mapper.merge_write(merge_values, current_values) # numpy.testing.assert_equal(merge_values, current_values) @@ -399,8 +400,8 @@ class TestAntennaToRecvMapper(base.TestCase): # self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1 # ) # - # merge_values = [[True] * 32] * 2 + [[None] * 32] * 94 - # current_values = [[True] * 32] * 2 + [[False] * 32] * 94 + # merge_values = [[True] * N_rcu] * 2 + [[None] * N_rcu] * (MAX_ANTENNA - 2) + # current_values = [[True] * N_rcu] * 2 + [[False] * N_rcu] * (MAX_ANTENNA - 2) # # mapper.merge_write(merge_values, current_values) # numpy.testing.assert_equal(merge_values, current_values) @@ -433,31 +434,31 @@ class TestAntennafieldDevice(device_base.DeviceTestCase): def test_read_Antenna_Quality(self): """ Verify if Antenna_Quality_R is correctly retrieved """ - antenna_qualities = numpy.array([AntennaQuality.OK] * 96) + antenna_qualities = numpy.array([AntennaQuality.OK] * MAX_ANTENNA) with DeviceTestContext(antennafield.AntennaField, properties=self.AT_PROPERTIES, process=True) as proxy: numpy.testing.assert_equal(antenna_qualities, proxy.Antenna_Quality_R) def test_read_Antenna_Use(self): """ Verify if Antenna_Use_R is correctly retrieved """ - antenna_use = numpy.array([AntennaUse.AUTO] * 96) + antenna_use = numpy.array([AntennaUse.AUTO] * MAX_ANTENNA) with DeviceTestContext(antennafield.AntennaField, properties=self.AT_PROPERTIES, process=True) as proxy: numpy.testing.assert_equal(antenna_use, proxy.Antenna_Use_R) def test_read_Antenna_Usage_Mask(self): """ Verify if Antenna_Usage_Mask_R is correctly retrieved """ - antenna_qualities = numpy.array([AntennaQuality.OK] * 96) - antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * 95) + antenna_qualities = numpy.array([AntennaQuality.OK] * MAX_ANTENNA) + antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * (MAX_ANTENNA - 1)) antenna_properties = {'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use} with DeviceTestContext(antennafield.AntennaField, properties={**self.AT_PROPERTIES, **antenna_properties}, process=True) as proxy: - numpy.testing.assert_equal(numpy.array([True] * 96), proxy.Antenna_Usage_Mask_R) + numpy.testing.assert_equal(numpy.array([True] * MAX_ANTENNA), proxy.Antenna_Usage_Mask_R) def test_read_Antenna_Usage_Mask_only_one_functioning_antenna(self): """ Verify if Antenna_Usage_Mask_R (only first antenna is OK) is correctly retrieved """ - antenna_qualities = numpy.array([AntennaQuality.OK] + [AntennaQuality.BROKEN] * 95) - antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * 95) + antenna_qualities = numpy.array([AntennaQuality.OK] + [AntennaQuality.BROKEN] * (MAX_ANTENNA - 1)) + antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * (MAX_ANTENNA - 1)) antenna_properties = {'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use} with DeviceTestContext(antennafield.AntennaField, properties={**self.AT_PROPERTIES, **antenna_properties}, process=True) as proxy: - numpy.testing.assert_equal(numpy.array([True] + [False] * 95), proxy.Antenna_Usage_Mask_R) + numpy.testing.assert_equal(numpy.array([True] + [False] * (MAX_ANTENNA - 1)), proxy.Antenna_Usage_Mask_R) def test_read_Antenna_Names(self): """ Verify if Antenna_Names_R is correctly retrieved """ @@ -476,7 +477,7 @@ class TestAntennafieldDevice(device_base.DeviceTestCase): 'RECV_devices': ['stat/RECV/1'], } - data = numpy.array([[False] * 32] * 96) + data = numpy.array([[False] * N_rcu] * MAX_ANTENNA) m_proxy.return_value = mock.Mock( read_attribute=mock.Mock( @@ -490,7 +491,7 @@ class TestAntennafieldDevice(device_base.DeviceTestCase): ) as proxy: proxy.boot() - proxy.write_attribute("HBAT_PWR_on_RW", numpy.array([[False] * 32] * 48)) + proxy.write_attribute("HBAT_PWR_on_RW", numpy.array([[False] * N_rcu] * 48)) numpy.testing.assert_equal( m_proxy.return_value.write_attribute.call_args[0][1], diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_beamlet_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_beamlet_device.py index ff43c4ada4f8f73257f32b13c3e1261afd13aa47..1432b2f715b15ff640d80ba07fda3cd0fc4f8a83 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_beamlet_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_beamlet_device.py @@ -8,6 +8,7 @@ # See LICENSE.txt for more info. from tangostationcontrol.devices.sdp.beamlet import Beamlet +from tangostationcontrol.common.constants import CLK_200_MHZ, CLK_160_MHZ, DEFAULT_SUBBAND import numpy import numpy.testing @@ -79,7 +80,7 @@ class TestBeamletDevice(base.TestCase): ]) beamlet_frequencies = numpy.array([ - [200e6, 200e6, 200e6, 200e6, 200e6] + [CLK_200_MHZ, CLK_200_MHZ, CLK_200_MHZ, CLK_200_MHZ, CLK_200_MHZ] ]) bf_weights = Beamlet._calculate_bf_weights(delays, beamlet_frequencies) @@ -94,7 +95,7 @@ class TestBeamletDevice(base.TestCase): def test_subband_frequencies(self): subbands = numpy.array([ - [0, 1, 102], + [0, 1, DEFAULT_SUBBAND], ]) nyquist_zones_0 = numpy.zeros(subbands.shape) @@ -103,29 +104,29 @@ class TestBeamletDevice(base.TestCase): # for reference values, see https://proxy.lofar.eu/rtsm/tests/ - lba_frequencies = Beamlet._subband_frequencies(subbands, 160 * 1000000, nyquist_zones_0) + lba_frequencies = Beamlet._subband_frequencies(subbands, CLK_160_MHZ, nyquist_zones_0) self.assertAlmostEqual(lba_frequencies[0][0], 0.0000000e6) self.assertAlmostEqual(lba_frequencies[0][1], 0.1562500e6) self.assertAlmostEqual(lba_frequencies[0][2], 15.9375000e6) - lba_frequencies = Beamlet._subband_frequencies(subbands, 200 * 1000000, nyquist_zones_0) + lba_frequencies = Beamlet._subband_frequencies(subbands, CLK_200_MHZ, nyquist_zones_0) self.assertAlmostEqual(lba_frequencies[0][0], 0.0000000e6) self.assertAlmostEqual(lba_frequencies[0][1], 0.1953125e6) self.assertAlmostEqual(lba_frequencies[0][2], 19.9218750e6) # Nyquist zone 1 is not used in 160 MHz - hba_low_frequencies = Beamlet._subband_frequencies(subbands, 200 * 1000000, nyquist_zones_1) + hba_low_frequencies = Beamlet._subband_frequencies(subbands, CLK_200_MHZ, nyquist_zones_1) self.assertAlmostEqual(hba_low_frequencies[0][0], 100.0000000e6) self.assertAlmostEqual(hba_low_frequencies[0][1], 100.1953125e6) self.assertAlmostEqual(hba_low_frequencies[0][2], 119.9218750e6) - hba_high_frequencies = Beamlet._subband_frequencies(subbands, 160 * 1000000, nyquist_zones_2) + hba_high_frequencies = Beamlet._subband_frequencies(subbands, CLK_160_MHZ, nyquist_zones_2) self.assertAlmostEqual(hba_high_frequencies[0][0], 160.0000000e6) self.assertAlmostEqual(hba_high_frequencies[0][1], 160.1562500e6) self.assertAlmostEqual(hba_high_frequencies[0][2], 175.9375000e6) - hba_high_frequencies = Beamlet._subband_frequencies(subbands, 200 * 1000000, nyquist_zones_2) + hba_high_frequencies = Beamlet._subband_frequencies(subbands, CLK_200_MHZ, nyquist_zones_2) self.assertAlmostEqual(hba_high_frequencies[0][0], 200.0000000e6) self.assertAlmostEqual(hba_high_frequencies[0][1], 200.1953125e6) self.assertAlmostEqual(hba_high_frequencies[0][2], 219.9218750e6) diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_digitalbeam_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_digitalbeam_device.py index 0d19f7cab3b6284caed0e6a6e63f701957f5c582..d57f912218550b7e73181a332caa61b94e6c3914 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_digitalbeam_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_digitalbeam_device.py @@ -15,6 +15,7 @@ import numpy # Internal regular imports from tangostationcontrol.devices.sdp import digitalbeam +from tangostationcontrol.common.constants import MAX_ANTENNA, N_beamlets_ctrl, N_xyz, N_pn # Builtin test libraries from unittest import mock @@ -40,20 +41,20 @@ class TestDigitalBeamDevice(device_base.DeviceTestCase): def test_apply_weights_disabled(self, m_proxy, m_compute, m_wait): """Verify won't overwrite digitalbeam data if no input_selected""" - input_data = numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten() - current_data = numpy.array([[16384] * 5856] * 16) + input_data = numpy.array([["AZELGEO", "0deg", "90deg"]] * N_beamlets_ctrl).flatten() + current_data = numpy.array([[16384] * 5856] * N_pn) m_proxy.return_value = mock.Mock( read_attribute=mock.Mock( return_value=mock.Mock(value=copy.copy(current_data)) ), - Antenna_Usage_Mask_R=numpy.array([0] * 96), + Antenna_Usage_Mask_R=numpy.array([0] * MAX_ANTENNA), Antenna_Field_Reference_ITRF_R=mock.MagicMock(), - HBAT_reference_ITRF_R=numpy.array([[0] * 3] * 96) + HBAT_reference_ITRF_R=numpy.array([[0] * N_xyz] * MAX_ANTENNA) ) new_data = numpy.array( - [[16384] * 2928 + [0] * 2928] * 16 + [[16384] * 2928 + [0] * 2928] * N_pn ) m_compute.return_value = copy.copy(new_data) @@ -62,7 +63,7 @@ class TestDigitalBeamDevice(device_base.DeviceTestCase): ) as proxy: proxy.initialise() proxy.Tracking_enabled_RW = False - proxy.input_select_RW = numpy.array([[False] * 488] * 96) + proxy.input_select_RW = numpy.array([[False] * N_beamlets_ctrl] * MAX_ANTENNA) proxy.set_pointing(input_data) @@ -78,20 +79,20 @@ class TestDigitalBeamDevice(device_base.DeviceTestCase): def test_apply_weights_enabled(self, m_proxy, m_compute, m_wait): """Verify can overwrite digitalbeam data if input_selected""" - input_data = numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten() - current_data = numpy.array([[16384] * 5856] * 16) + input_data = numpy.array([["AZELGEO", "0deg", "90deg"]] * N_beamlets_ctrl).flatten() + current_data = numpy.array([[16384] * 5856] * N_pn) m_proxy.return_value = mock.Mock( read_attribute=mock.Mock( return_value=mock.Mock(value=current_data) ), - Antenna_Usage_Mask_R=numpy.array([0] * 96), + Antenna_Usage_Mask_R=numpy.array([0] * MAX_ANTENNA), Antenna_Field_Reference_ITRF_R=mock.MagicMock(), - HBAT_reference_ITRF_R=numpy.array([[0] * 3] * 96) + HBAT_reference_ITRF_R=numpy.array([[0] * N_xyz] * MAX_ANTENNA) ) new_data = numpy.array( - [[16384] * 2928 + [0] * 2928] * 16 + [[16384] * 2928 + [0] * 2928] * N_pn ) m_compute.return_value = copy.copy(new_data) @@ -100,7 +101,7 @@ class TestDigitalBeamDevice(device_base.DeviceTestCase): ) as proxy: proxy.initialise() proxy.Tracking_enabled_RW = False - proxy.input_select_RW = numpy.array([[True] * 488] * 96) + proxy.input_select_RW = numpy.array([[True] * N_beamlets_ctrl] * MAX_ANTENNA) proxy.set_pointing(input_data) diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_recv_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_recv_device.py index a0f85653cd55aa2b6a60dd61c4b7194eea237f9c..49102ab0c2f3396ed9cc5e8e3a2a5acc63acbc3a 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_recv_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_recv_device.py @@ -10,6 +10,7 @@ from tango.test_context import DeviceTestContext from tangostationcontrol.devices import recv +from tangostationcontrol.common.constants import MAX_ANTENNA, N_elements import numpy @@ -28,7 +29,7 @@ class TestRecvDevice(device_base.DeviceTestCase): def test_calculate_HBAT_bf_delay_steps(self): """Verify HBAT beamforming calculations are correctly executed""" with DeviceTestContext(recv.RECV, properties=self.RECV_PROPERTIES, process=True) as proxy: - delays = numpy.random.rand(96,16).flatten() + delays = numpy.random.rand(MAX_ANTENNA,N_elements).flatten() HBAT_bf_delay_steps = proxy.calculate_HBAT_bf_delay_steps(delays) self.assertEqual(3072, len(HBAT_bf_delay_steps)) # 96x32=3072