Skip to content
Snippets Groups Projects
Commit f0dc9d24 authored by Corné Lukken's avatar Corné Lukken
Browse files

L2SS-877: Working integration tests for disabled / enabled inputs

parent e91e4b6b
No related branches found
No related tags found
1 merge request!411Resolve L2SS-877 "Read modify write digitalbeam"
......@@ -104,18 +104,15 @@ class lofar_device(Device, metaclass=DeviceMeta):
return self.get_state() in INITIALISED_STATES
# TODO(Corne): Actually implement this in L2SS-940
# TODO(Corne): Actually implement locking in L2SS-940
def atomic_read_modify_write_attribute(
self, values: List[any], proxy: DeviceProxy, attribute: str, sparse=None
):
"""Atomatically read-modify-write the attribute on the given proxy"""
current_values = proxy.read_attribute(attribute).value
logger.info("current_values")
logger.info(values)
self.merge_write(values, current_values, sparse)
# import pdb; pdb.set_trace()
proxy.write_attribute(values)
proxy.write_attribute(attribute, values)
# TODO(Corne): Update docstring in L2SS-940
def merge_write(
......@@ -154,7 +151,6 @@ class lofar_device(Device, metaclass=DeviceMeta):
def _merge_write_mask(
self, merge_values: List[any], current_values: List[any], mask: List[any]
):
# import pdb; pdb.set_trace()
for idx, value in enumerate(merge_values):
if sequence_not_str(value):
self._merge_write_mask(merge_values[idx], current_values[idx], mask[idx])
......
......@@ -232,13 +232,6 @@ class DigitalBeam(beam_device):
Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters)
"""
# import pdb; pdb.set_trace()
logger.info("beam weights")
logger.info(beam_weights)
logger.info("inputs")
logger.info(self._map_inputs_on_polarised_inputs(self._input_select))
self.atomic_read_modify_write_attribute(
beam_weights,
self.beamlet_proxy,
......
......@@ -21,22 +21,30 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
antenna_qualities_only_second = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * 94)
antenna_use_ok = numpy.array([AntennaUse.AUTO] * 96)
antennafield_iden = "STAT/AntennaField/1"
beamlet_iden = "STAT/Beamlet/1"
recv_iden = "STAT/RECV/1"
sdp_iden = "STAT/SDP/1"
def setUp(self):
"""Intentionally recreate the device object in each test"""
super().setUp("STAT/DigitalBeam/1")
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.beamlet_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.recv_iden)
self.recv_proxy = self.setup_recv_proxy()
self.beamlet_proxy = self.setup_beamlet_proxy()
def setup_recv_proxy(self):
recv_proxy = TestDeviceProxy("STAT/RECV/1")
recv_proxy = TestDeviceProxy(self.recv_iden)
recv_proxy.off()
recv_proxy.warm_boot()
recv_proxy.set_defaults()
return recv_proxy
def setup_beamlet_proxy(self):
beamlet_proxy = TestDeviceProxy("STAT/Beamlet/1")
beamlet_proxy = TestDeviceProxy(self.beamlet_iden)
beamlet_proxy.off()
beamlet_proxy.warm_boot()
beamlet_proxy.set_defaults()
......@@ -44,7 +52,7 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
def setup_sdp_proxy(self):
# setup SDP, on which this device depends
sdp_proxy = TestDeviceProxy("STAT/SDP/1")
sdp_proxy = TestDeviceProxy(self.sdp_iden)
sdp_proxy.off()
sdp_proxy.warm_boot()
sdp_proxy.set_defaults()
......@@ -53,19 +61,24 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
def setup_antennafield_proxy(self, antenna_qualities, antenna_use):
# setup AntennaField
NR_TILES = 48
antennafield_proxy = TestDeviceProxy("STAT/AntennaField/1")
antennafield_proxy = TestDeviceProxy(self.antennafield_iden)
control_mapping = [[1,i] for i in range(NR_TILES)]
antennafield_proxy.put_property({"RECV_devices": ["STAT/RECV/1"],
"Control_to_RECV_mapping": numpy.array(control_mapping).flatten(),
antennafield_proxy.put_property({"RECV_devices": [self.recv_iden],
"HBAT_Control_to_RECV_mapping": numpy.array(control_mapping).flatten(),
'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use})
antennafield_proxy.off()
antennafield_proxy.boot()
return antennafield_proxy
def test_pointing_to_zenith(self):
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden)
self.addCleanup(
TestDeviceProxy.test_device_turn_off, self.antennafield_iden
)
self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok)
self.setup_sdp_proxy()
self.setup_recv_proxy()
# Setup beamlet configuration
self.beamlet_proxy.clock_RW = 200 * 1000000
self.beamlet_proxy.subband_select = list(range(488))
......@@ -83,9 +96,14 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
def test_set_pointing_masked_enable(self):
"""Verify that only selected inputs are written"""
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden)
self.addCleanup(
TestDeviceProxy.test_device_turn_off, self.antennafield_iden
)
self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok)
self.setup_sdp_proxy()
self.setup_recv_proxy()
# Setup beamlet configuration
self.beamlet_proxy.clock_RW = 200 * 1000000
self.beamlet_proxy.subband_select = list(range(488))
......@@ -112,9 +130,14 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
def test_set_pointing_masked_disable(self):
"""Verify that only diabled inputs are unchanged"""
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden)
self.addCleanup(
TestDeviceProxy.test_device_turn_off, self.antennafield_iden
)
self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok)
self.setup_sdp_proxy()
self.setup_recv_proxy()
# Setup beamlet configuration
self.beamlet_proxy.clock_RW = 200 * 1000000
self.beamlet_proxy.subband_select = list(range(488))
......@@ -127,8 +150,6 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW = non_zeros
# Disable all inputs
# import pdb;
# pdb.set_trace()
self.proxy.input_select_RW = numpy.array([[False] * 488] * 96)
self.proxy.set_pointing(
......@@ -141,7 +162,12 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
)
def test_input_select_with_all_antennas_ok(self):
""" Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask """
"""Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask"""
self.addCleanup(
TestDeviceProxy.test_device_turn_off, self.antennafield_iden
)
antennafield_proxy = self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok)
numpy.testing.assert_equal(numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R)
self.setUp()
......@@ -152,7 +178,12 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
numpy.testing.assert_equal(expected_antenna_select, self.proxy.antenna_select_RW)
def test_input_select_with_only_second_antenna_ok(self):
""" Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask """
"""Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask"""
self.addCleanup(
TestDeviceProxy.test_device_turn_off, self.antennafield_iden
)
antennafield_proxy = self.setup_antennafield_proxy(self.antenna_qualities_only_second, self.antenna_use_ok)
numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 94), antennafield_proxy.Antenna_Usage_Mask_R)
self.setUp()
......
......@@ -22,6 +22,7 @@ class TestDeviceProxy(DeviceProxy):
# See also https://www.tango-controls.org/community/forum/c/development/python/attribute-direct-reading-from-device-when-polling-is-turned-on/
self.set_source(DevSource.DEV)
@staticmethod
def test_device_turn_off(endpoint):
d = TestDeviceProxy(endpoint)
......
......@@ -7,16 +7,23 @@
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
# Builtin regular libraries
import copy
# External regular libraries
import numpy
from tango.test_context import DeviceTestContext
# Internal regular imports
from tangostationcontrol.devices.sdp import digitalbeam
# Builtin test libraries
from unittest import mock
import unittest
# External test libraries
from tango.test_context import DeviceTestContext
# Internal test imports
from tangostationcontrol.test.devices import device_base
......@@ -26,11 +33,12 @@ class TestDigitalBeamDevice(device_base.DeviceTestCase):
# DeviceTestCase setUp patches lofar_device DeviceProxy
super(TestDigitalBeamDevice, self).setUp()
@unittest.skip("Test for manual use, enable at most one (process=false)")
@mock.patch.object(digitalbeam.DigitalBeam, "_wait_to_apply_weights")
@mock.patch.object(digitalbeam.DigitalBeam, "_compute_weights")
@mock.patch.object(digitalbeam, "DeviceProxy")
def test_apply_weights(self, m_proxy, m_compute, m_wait):
"""Verify can overwrite digitalbeam data if input_selected"""
"""Verify won't overwrite digitalbeam data if no input_selected"""
input_data = numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten()
current_data = numpy.array([[16384] * 5856] * 16)
......@@ -58,46 +66,45 @@ class TestDigitalBeamDevice(device_base.DeviceTestCase):
proxy.set_pointing(input_data)
# import pdb;
# pdb.set_trace()
numpy.testing.assert_equal(
m_proxy.return_value.write_attribute.call_args[0][0],
current_data
)
# @mock.patch.object(digitalbeam.DigitalBeam, "_wait_to_apply_weights")
# @mock.patch.object(digitalbeam.DigitalBeam, "_compute_weights")
# @mock.patch.object(digitalbeam, "DeviceProxy")
# def test_apply_weights(self, m_proxy, m_compute, m_wait):
# """Verify can overwrite digitalbeam data if input_selected"""
#
# input_data = numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten()
# current_data = numpy.array([[16384] * 5856] * 16)
#
# m_proxy.return_value = mock.Mock(
# read_attribute=mock.Mock(
# return_value=mock.Mock(value=current_data)
# ),
# Antenna_Usage_Mask_R=numpy.array([0] * 96),
# Antenna_Field_Reference_ITRF_R=mock.MagicMock(),
# HBAT_reference_ITRF_R=numpy.array([[0] * 3] * 96)
# )
#
# new_data = numpy.array(
# [[16384] * 2928 + [0] * 2928] * 16
# )
# m_compute.return_value = copy.copy(new_data)
#
# with DeviceTestContext(
# digitalbeam.DigitalBeam, process=False,
# ) as proxy:
# proxy.initialise()
# proxy.Tracking_enabled_RW = False
# proxy.input_select_RW = numpy.array([[True] * 488] * 96)
#
# proxy.set_pointing(input_data)
#
# numpy.testing.assert_equal(
# m_proxy.return_value.write_attribute.call_args[0][0],
# new_data
# )
@unittest.skip("Test for manual use, enable at most one (process=false)")
@mock.patch.object(digitalbeam.DigitalBeam, "_wait_to_apply_weights")
@mock.patch.object(digitalbeam.DigitalBeam, "_compute_weights")
@mock.patch.object(digitalbeam, "DeviceProxy")
def test_apply_weights(self, m_proxy, m_compute, m_wait):
"""Verify can overwrite digitalbeam data if input_selected"""
input_data = numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten()
current_data = numpy.array([[16384] * 5856] * 16)
m_proxy.return_value = mock.Mock(
read_attribute=mock.Mock(
return_value=mock.Mock(value=current_data)
),
Antenna_Usage_Mask_R=numpy.array([0] * 96),
Antenna_Field_Reference_ITRF_R=mock.MagicMock(),
HBAT_reference_ITRF_R=numpy.array([[0] * 3] * 96)
)
new_data = numpy.array(
[[16384] * 2928 + [0] * 2928] * 16
)
m_compute.return_value = copy.copy(new_data)
with DeviceTestContext(
digitalbeam.DigitalBeam, process=False,
) as proxy:
proxy.initialise()
proxy.Tracking_enabled_RW = False
proxy.input_select_RW = numpy.array([[True] * 488] * 96)
proxy.set_pointing(input_data)
numpy.testing.assert_equal(
m_proxy.return_value.write_attribute.call_args[0][0],
new_data
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment