diff --git a/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py b/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py index e982221b82f8d37b730fee52121c76ad55a8e68d..9ecbe0382a39417cb1478891cc44af899464d387 100644 --- a/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py +++ b/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py @@ -25,9 +25,9 @@ logger = logging.getLogger() class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): antenna_qualities_ok = numpy.array([AntennaQuality.OK] * MAX_ANTENNA) antenna_qualities_only_second = numpy.array( - [AntennaQuality.BROKEN] - + [AntennaQuality.OK] - + [AntennaQuality.BROKEN] * (MAX_ANTENNA - 2) + [AntennaQuality.BROKEN] + + [AntennaQuality.OK] + + [AntennaQuality.BROKEN] * (MAX_ANTENNA - 2) ) antenna_use_ok = numpy.array([AntennaUse.AUTO] * MAX_ANTENNA) @@ -94,18 +94,18 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): control_mapping = [[1, i] for i in range(NR_TILES)] sdp_mapping = [[i // 6, i % 6] for i in range(NR_TILES)] antennafield_proxy.put_property( - { - "Control_to_RECV_mapping": numpy.array(control_mapping).flatten(), - "Antenna_to_SDP_Mapping" : numpy.array(sdp_mapping).flatten(), - "Antenna_Quality" : antenna_qualities, - "Antenna_Use" : antenna_use, - "Antenna_Cables" : ["50m", "80m"] * (CS001_TILES // 2), - "Antenna_Sets" : ["FIRST", "ALL"], - "Antenna_Set_Masks" : [ - "1" + ("0" * (NR_TILES - 1)), - "1" * NR_TILES, - ], - } + { + "Control_to_RECV_mapping": numpy.array(control_mapping).flatten(), + "Antenna_to_SDP_Mapping": numpy.array(sdp_mapping).flatten(), + "Antenna_Quality": antenna_qualities, + "Antenna_Use": antenna_use, + "Antenna_Cables": ["50m", "80m"] * (CS001_TILES // 2), + "Antenna_Sets": ["FIRST", "ALL"], + "Antenna_Set_Masks": [ + "1" + ("0" * (NR_TILES - 1)), + "1" * NR_TILES, + ], + } ) antennafield_proxy.off() antennafield_proxy.boot() @@ -133,9 +133,9 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): # Point to Zenith self.proxy.set_pointing( - numpy.array( - [["AZELGEO", "0rad", "1.570796rad"]] * self.proxy.nr_beamlets_R - ).flatten() + numpy.array( + [["AZELGEO", "0rad", "1.570796rad"]] * self.proxy.nr_beamlets_R + ).flatten() ) # beam weights should now be non-zero, we don't actually check their values for correctness @@ -152,7 +152,7 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): FPGA_bf_weights_pp_clock160 = self.beamlet_proxy.FPGA_bf_weights_pp_RW.flatten() # Assert some values are different self.assertNotEqual( - sum(FPGA_bf_weights_pp_clock160), sum(FPGA_bf_weights_pp_clock200) + sum(FPGA_bf_weights_pp_clock160), sum(FPGA_bf_weights_pp_clock200) ) def test_pointing_to_zenith_subband_change(self): @@ -166,8 +166,8 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): self.beamlet_proxy = self.initialise_beamlet_proxy() self.beamlet_proxy.subband_select_RW = numpy.array( - list(range(317)) + [316] + list(range(318, N_beamlets_ctrl)), - dtype=numpy.uint32, + list(range(317)) + [316] + list(range(318, N_beamlets_ctrl)), + dtype=numpy.uint32, ) self.beamlet_proxy.on() @@ -177,9 +177,9 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): # Point to Zenith self.proxy.set_pointing( - numpy.array( - [["AZELGEO", "0rad", "1.570796rad"]] * self.proxy.nr_beamlets_R - ).flatten() + numpy.array( + [["AZELGEO", "0rad", "1.570796rad"]] * self.proxy.nr_beamlets_R + ).flatten() ) # Store values with first subband configuration FPGA_bf_weights_pp_subband_v1 = ( @@ -197,7 +197,7 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): ) # Assert some values are different self.assertNotEqual( - sum(FPGA_bf_weights_pp_subband_v1), sum(FPGA_bf_weights_pp_subband_v2) + sum(FPGA_bf_weights_pp_subband_v1), sum(FPGA_bf_weights_pp_subband_v2) ) def test_set_pointing_masked_enable(self): @@ -210,7 +210,7 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): self.setup_sdpfirmware_proxy() self.setup_sdp_proxy() self.antennafield_proxy = self.setup_antennafield_proxy( - self.antenna_qualities_ok, self.antenna_use_ok + self.antenna_qualities_ok, self.antenna_use_ok ) self.proxy.initialise() @@ -225,32 +225,32 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): self.beamlet_proxy.FPGA_bf_weights_pp_RW = impossible_values self.proxy.set_pointing( - numpy.array( - [["AZELGEO", "0rad", "1.570796rad"]] * self.proxy.nr_beamlets_R - ).flatten() + numpy.array( + [["AZELGEO", "0rad", "1.570796rad"]] * self.proxy.nr_beamlets_R + ).flatten() ) # Verify all impossible values are replaced with other values for all inputs # which should be non-zero for the first antenna, and zero for the rest FPGA_bf_weights_pp_RW = self.beamlet_proxy.FPGA_bf_weights_pp_RW.reshape( - (N_pn * A_pn, -1) + (N_pn * A_pn, -1) ) # first antenna should have values from the beamformer, so not 0 and not 2 self.assertTrue( - numpy.all(numpy.not_equal(0, FPGA_bf_weights_pp_RW[0, :])), - f"{FPGA_bf_weights_pp_RW}", + numpy.all(numpy.not_equal(0, FPGA_bf_weights_pp_RW[0, :])), + f"{FPGA_bf_weights_pp_RW}", ) self.assertTrue( - numpy.all(numpy.not_equal(2, FPGA_bf_weights_pp_RW[0, :])), - f"{FPGA_bf_weights_pp_RW}", + numpy.all(numpy.not_equal(2, FPGA_bf_weights_pp_RW[0, :])), + f"{FPGA_bf_weights_pp_RW}", ) # rest of the antennas should have been given a weight of 0, as they # were not in the usage mask. self.assertTrue( - numpy.all(numpy.equal(0, FPGA_bf_weights_pp_RW[1:CS001_TILES, :])), - f"{FPGA_bf_weights_pp_RW}", + numpy.all(numpy.equal(0, FPGA_bf_weights_pp_RW[1:CS001_TILES, :])), + f"{FPGA_bf_weights_pp_RW}", ) @timeout_decorator.timeout(15) @@ -270,10 +270,9 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): self.proxy.on() interval = float( - self.proxy.get_property("Beam_tracking_interval")[ - "Beam_tracking_interval"][ - 0 - ] + self.proxy.get_property("Beam_tracking_interval")["Beam_tracking_interval"][ + 0 + ] ) # Allow beam tracking time to settle @@ -284,8 +283,8 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): for _ in range(0, 5): error = self.proxy.Pointing_error_R[0] self.assertTrue( - -interval * 0.10 < error < interval * 0.10, - f"Error: {error} larger than {interval * 0.10}", + -interval * 0.10 < error < interval * 0.10, + f"Error: {error} larger than {interval * 0.10}", ) logger.info("BeamTracking error: %s", error) time.sleep(interval)