From f0dc9d242fc11d3297c64462d58ba6d67809efff Mon Sep 17 00:00:00 2001 From: lukken <lukken@astron.nl> Date: Tue, 13 Sep 2022 09:06:38 +0000 Subject: [PATCH] L2SS-877: Working integration tests for disabled / enabled inputs --- .../devices/lofar_device.py | 8 +- .../devices/sdp/digitalbeam.py | 7 -- .../devices/test_device_digitalbeam.py | 57 +++++++++--- .../integration_test/device_proxy.py | 1 + .../test/devices/test_digitalbeam_device.py | 89 ++++++++++--------- 5 files changed, 95 insertions(+), 67 deletions(-) diff --git a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py index baf2c9abc..0e9b99ef3 100644 --- a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py @@ -104,18 +104,15 @@ class lofar_device(Device, metaclass=DeviceMeta): return self.get_state() in INITIALISED_STATES - # TODO(Corne): Actually implement this in L2SS-940 + # TODO(Corne): Actually implement locking in L2SS-940 def atomic_read_modify_write_attribute( self, values: List[any], proxy: DeviceProxy, attribute: str, sparse=None ): """Atomatically read-modify-write the attribute on the given proxy""" current_values = proxy.read_attribute(attribute).value - logger.info("current_values") - logger.info(values) self.merge_write(values, current_values, sparse) - # import pdb; pdb.set_trace() - proxy.write_attribute(values) + proxy.write_attribute(attribute, values) # TODO(Corne): Update docstring in L2SS-940 def merge_write( @@ -154,7 +151,6 @@ class lofar_device(Device, metaclass=DeviceMeta): def _merge_write_mask( self, merge_values: List[any], current_values: List[any], mask: List[any] ): - # import pdb; pdb.set_trace() for idx, value in enumerate(merge_values): if sequence_not_str(value): self._merge_write_mask(merge_values[idx], current_values[idx], mask[idx]) diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py index 376a98343..a310eb5e4 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py @@ -232,13 +232,6 @@ class DigitalBeam(beam_device): Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) """ - # import pdb; pdb.set_trace() - - logger.info("beam weights") - logger.info(beam_weights) - - logger.info("inputs") - logger.info(self._map_inputs_on_polarised_inputs(self._input_select)) self.atomic_read_modify_write_attribute( beam_weights, self.beamlet_proxy, diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py index e15214ea4..d4ba9f993 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py @@ -21,22 +21,30 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): antenna_qualities_only_second = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * 94) antenna_use_ok = numpy.array([AntennaUse.AUTO] * 96) + antennafield_iden = "STAT/AntennaField/1" + beamlet_iden = "STAT/Beamlet/1" + recv_iden = "STAT/RECV/1" + sdp_iden = "STAT/SDP/1" + def setUp(self): """Intentionally recreate the device object in each test""" super().setUp("STAT/DigitalBeam/1") + self.addCleanup(TestDeviceProxy.test_device_turn_off, self.beamlet_iden) + self.addCleanup(TestDeviceProxy.test_device_turn_off, self.recv_iden) + self.recv_proxy = self.setup_recv_proxy() self.beamlet_proxy = self.setup_beamlet_proxy() def setup_recv_proxy(self): - recv_proxy = TestDeviceProxy("STAT/RECV/1") + recv_proxy = TestDeviceProxy(self.recv_iden) recv_proxy.off() recv_proxy.warm_boot() recv_proxy.set_defaults() return recv_proxy def setup_beamlet_proxy(self): - beamlet_proxy = TestDeviceProxy("STAT/Beamlet/1") + beamlet_proxy = TestDeviceProxy(self.beamlet_iden) beamlet_proxy.off() beamlet_proxy.warm_boot() beamlet_proxy.set_defaults() @@ -44,7 +52,7 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): def setup_sdp_proxy(self): # setup SDP, on which this device depends - sdp_proxy = TestDeviceProxy("STAT/SDP/1") + sdp_proxy = TestDeviceProxy(self.sdp_iden) sdp_proxy.off() sdp_proxy.warm_boot() sdp_proxy.set_defaults() @@ -53,19 +61,24 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): def setup_antennafield_proxy(self, antenna_qualities, antenna_use): # setup AntennaField NR_TILES = 48 - antennafield_proxy = TestDeviceProxy("STAT/AntennaField/1") + antennafield_proxy = TestDeviceProxy(self.antennafield_iden) control_mapping = [[1,i] for i in range(NR_TILES)] - antennafield_proxy.put_property({"RECV_devices": ["STAT/RECV/1"], - "Control_to_RECV_mapping": numpy.array(control_mapping).flatten(), + antennafield_proxy.put_property({"RECV_devices": [self.recv_iden], + "HBAT_Control_to_RECV_mapping": numpy.array(control_mapping).flatten(), 'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use}) antennafield_proxy.off() antennafield_proxy.boot() return antennafield_proxy def test_pointing_to_zenith(self): + self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden) + self.addCleanup( + TestDeviceProxy.test_device_turn_off, self.antennafield_iden + ) + self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok) self.setup_sdp_proxy() - self.setup_recv_proxy() + # Setup beamlet configuration self.beamlet_proxy.clock_RW = 200 * 1000000 self.beamlet_proxy.subband_select = list(range(488)) @@ -83,9 +96,14 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): def test_set_pointing_masked_enable(self): """Verify that only selected inputs are written""" + self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden) + self.addCleanup( + TestDeviceProxy.test_device_turn_off, self.antennafield_iden + ) + self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok) self.setup_sdp_proxy() - self.setup_recv_proxy() + # Setup beamlet configuration self.beamlet_proxy.clock_RW = 200 * 1000000 self.beamlet_proxy.subband_select = list(range(488)) @@ -112,9 +130,14 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): def test_set_pointing_masked_disable(self): """Verify that only diabled inputs are unchanged""" + self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden) + self.addCleanup( + TestDeviceProxy.test_device_turn_off, self.antennafield_iden + ) + self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok) self.setup_sdp_proxy() - self.setup_recv_proxy() + # Setup beamlet configuration self.beamlet_proxy.clock_RW = 200 * 1000000 self.beamlet_proxy.subband_select = list(range(488)) @@ -127,8 +150,6 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW = non_zeros # Disable all inputs - # import pdb; - # pdb.set_trace() self.proxy.input_select_RW = numpy.array([[False] * 488] * 96) self.proxy.set_pointing( @@ -141,7 +162,12 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): ) def test_input_select_with_all_antennas_ok(self): - """ Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask """ + """Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask""" + + self.addCleanup( + TestDeviceProxy.test_device_turn_off, self.antennafield_iden + ) + antennafield_proxy = self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok) numpy.testing.assert_equal(numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R) self.setUp() @@ -152,7 +178,12 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): numpy.testing.assert_equal(expected_antenna_select, self.proxy.antenna_select_RW) def test_input_select_with_only_second_antenna_ok(self): - """ Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask """ + """Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask""" + + self.addCleanup( + TestDeviceProxy.test_device_turn_off, self.antennafield_iden + ) + antennafield_proxy = self.setup_antennafield_proxy(self.antenna_qualities_only_second, self.antenna_use_ok) numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 94), antennafield_proxy.Antenna_Usage_Mask_R) self.setUp() diff --git a/tangostationcontrol/tangostationcontrol/integration_test/device_proxy.py b/tangostationcontrol/tangostationcontrol/integration_test/device_proxy.py index 93f84dc98..4c519a411 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/device_proxy.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/device_proxy.py @@ -22,6 +22,7 @@ class TestDeviceProxy(DeviceProxy): # See also https://www.tango-controls.org/community/forum/c/development/python/attribute-direct-reading-from-device-when-polling-is-turned-on/ self.set_source(DevSource.DEV) + @staticmethod def test_device_turn_off(endpoint): d = TestDeviceProxy(endpoint) diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_digitalbeam_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_digitalbeam_device.py index 7bc2a5a7b..07407c802 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_digitalbeam_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_digitalbeam_device.py @@ -7,16 +7,23 @@ # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. +# Builtin regular libraries import copy +# External regular libraries import numpy -from tango.test_context import DeviceTestContext - +# Internal regular imports from tangostationcontrol.devices.sdp import digitalbeam +# Builtin test libraries from unittest import mock +import unittest +# External test libraries +from tango.test_context import DeviceTestContext + +# Internal test imports from tangostationcontrol.test.devices import device_base @@ -26,11 +33,12 @@ class TestDigitalBeamDevice(device_base.DeviceTestCase): # DeviceTestCase setUp patches lofar_device DeviceProxy super(TestDigitalBeamDevice, self).setUp() + @unittest.skip("Test for manual use, enable at most one (process=false)") @mock.patch.object(digitalbeam.DigitalBeam, "_wait_to_apply_weights") @mock.patch.object(digitalbeam.DigitalBeam, "_compute_weights") @mock.patch.object(digitalbeam, "DeviceProxy") def test_apply_weights(self, m_proxy, m_compute, m_wait): - """Verify can overwrite digitalbeam data if input_selected""" + """Verify won't overwrite digitalbeam data if no input_selected""" input_data = numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten() current_data = numpy.array([[16384] * 5856] * 16) @@ -58,46 +66,45 @@ class TestDigitalBeamDevice(device_base.DeviceTestCase): proxy.set_pointing(input_data) - # import pdb; - # pdb.set_trace() numpy.testing.assert_equal( m_proxy.return_value.write_attribute.call_args[0][0], current_data ) - # @mock.patch.object(digitalbeam.DigitalBeam, "_wait_to_apply_weights") - # @mock.patch.object(digitalbeam.DigitalBeam, "_compute_weights") - # @mock.patch.object(digitalbeam, "DeviceProxy") - # def test_apply_weights(self, m_proxy, m_compute, m_wait): - # """Verify can overwrite digitalbeam data if input_selected""" - # - # input_data = numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten() - # current_data = numpy.array([[16384] * 5856] * 16) - # - # m_proxy.return_value = mock.Mock( - # read_attribute=mock.Mock( - # return_value=mock.Mock(value=current_data) - # ), - # Antenna_Usage_Mask_R=numpy.array([0] * 96), - # Antenna_Field_Reference_ITRF_R=mock.MagicMock(), - # HBAT_reference_ITRF_R=numpy.array([[0] * 3] * 96) - # ) - # - # new_data = numpy.array( - # [[16384] * 2928 + [0] * 2928] * 16 - # ) - # m_compute.return_value = copy.copy(new_data) - # - # with DeviceTestContext( - # digitalbeam.DigitalBeam, process=False, - # ) as proxy: - # proxy.initialise() - # proxy.Tracking_enabled_RW = False - # proxy.input_select_RW = numpy.array([[True] * 488] * 96) - # - # proxy.set_pointing(input_data) - # - # numpy.testing.assert_equal( - # m_proxy.return_value.write_attribute.call_args[0][0], - # new_data - # ) + @unittest.skip("Test for manual use, enable at most one (process=false)") + @mock.patch.object(digitalbeam.DigitalBeam, "_wait_to_apply_weights") + @mock.patch.object(digitalbeam.DigitalBeam, "_compute_weights") + @mock.patch.object(digitalbeam, "DeviceProxy") + def test_apply_weights(self, m_proxy, m_compute, m_wait): + """Verify can overwrite digitalbeam data if input_selected""" + + input_data = numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten() + current_data = numpy.array([[16384] * 5856] * 16) + + m_proxy.return_value = mock.Mock( + read_attribute=mock.Mock( + return_value=mock.Mock(value=current_data) + ), + Antenna_Usage_Mask_R=numpy.array([0] * 96), + Antenna_Field_Reference_ITRF_R=mock.MagicMock(), + HBAT_reference_ITRF_R=numpy.array([[0] * 3] * 96) + ) + + new_data = numpy.array( + [[16384] * 2928 + [0] * 2928] * 16 + ) + m_compute.return_value = copy.copy(new_data) + + with DeviceTestContext( + digitalbeam.DigitalBeam, process=False, + ) as proxy: + proxy.initialise() + proxy.Tracking_enabled_RW = False + proxy.input_select_RW = numpy.array([[True] * 488] * 96) + + proxy.set_pointing(input_data) + + numpy.testing.assert_equal( + m_proxy.return_value.write_attribute.call_args[0][0], + new_data + ) -- GitLab