Select Git revision
test_devices.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_devices.py 11.33 KiB
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from enum import Enum
from unittest import mock
from unittest.mock import MagicMock
import numpy
from tango import DevState, DevFailed
from tango.server import Device, attribute, AttrWriteType
from tango.test_context import MultiDeviceTestContext
import lofar_station_client
from lofar_station_client import devices
from lofar_station_client.devices import LofarDeviceProxy
from lofar_station_client.dts.constants import N_pn, S_pn, CST_N_SUB, CST_FS
from tests import base
class MyDevice(Device):
A = attribute(
dtype=((bool,),),
max_dim_x=4,
max_dim_y=6,
format="(2,3,4)",
access=AttrWriteType.READ_WRITE,
)
scalar = attribute(dtype=bool, access=AttrWriteType.READ_WRITE)
spectrum = attribute(dtype=(bool,), max_dim_x=2, access=AttrWriteType.READ_WRITE)
image = attribute(
dtype=((bool,),), max_dim_x=2, max_dim_y=3, access=AttrWriteType.READ_WRITE
)
def init_device(self):
self.value_A = numpy.zeros((6, 4), dtype=bool)
self.value_scalar = False
self.value_spectrum = [False, False]
self.value_image = [[False, False]] * 3
def read_scalar(self):
return self.value_scalar
def write_scalar(self, value):
self.value_scalar = value
def read_spectrum(self):
return self.value_spectrum
def write_spectrum(self, value):
self.value_spectrum = value
def read_image(self):
return self.value_image
def write_image(self, value):
self.value_image = value
def read_A(self):
return self.value_A
def write_A(self, value):
self.value_A = value
class LofarDeviceProxyTest(base.TestCase):
TEST_DEVICE_INFO = [
{
"class": MyDevice,
"devices": [{"name": "STAT/MyDevice/1", "properties": {}, "memorized": {}}],
}
]
@classmethod
def setUpClass(cls):
# setting up the TestContext takes ~1 second, so do it only once
cls.context = MultiDeviceTestContext(
cls.TEST_DEVICE_INFO,
process=True,
)
cls.context.start()
cls.proxy = LofarDeviceProxy(cls.context.get_device_access("STAT/MyDevice/1"))
# cls.proxy.connect() # necessary in the DeviceTestContext
@classmethod
def tearDownClass(cls):
# In Python3.8+, we can use addClassCleanup instead
cls.context.stop()
def test_read_scalar(self):
value = self.proxy.scalar
self.assertEqual(bool, type(value))
def test_write_scalar(self):
self.proxy.scalar = True
def test_read_spectrum(self):
value = self.proxy.spectrum
self.assertEqual((2,), value.shape)
self.assertEqual(numpy.bool_, type(value[0]))
def test_write_spectrum(self):
self.proxy.spectrum = [True, False]
def test_read_image(self):
value = self.proxy.image
self.assertEqual((3, 2), value.shape)
self.assertEqual(numpy.bool_, type(value[0, 0]))
def test_write_image(self):
self.proxy.image = [[True, False]] * 3
def test_write_3D_attribute_lists(self):
self.proxy.A = [
[True, True, True, True],
[True, True, True, True],
[True, True, True, True],
], [
[False, False, False, False],
[False, False, False, False],
[False, False, False, False],
]
def test_write_3D_attribute_numpy(self):
self.proxy.A = numpy.zeros((2, 3, 4), dtype=bool)
def test_write_3D_attribute_numpy_with_too_few_dimensions(self):
# write a 2D shape
with self.assertRaises(ValueError):
self.proxy.A = numpy.zeros((2, 12), dtype=bool)
def test_write_3D_attribute_numpy_with_too_many_dimension(self):
# write a 4D shape
with self.assertRaises(ValueError):
self.proxy.A = numpy.zeros((2, 3, 2, 2), dtype=bool)
def test_read_3D_attribute(self):
value = self.proxy.A
self.assertEqual((2, 3, 4), value.shape)
@mock.patch("lofar_station_client.devices.LofarDeviceProxy")
class LazyLofarDeviceProxyTest(base.TestCase):
TEST_DEVICE_INFO = [
{
"class": MyDevice,
"devices": [{"name": "STAT/MyDevice/2", "properties": {}, "memorized": {}}],
}
]
@classmethod
def setUpClass(cls):
# setting up the TestContext takes ~1 second, so do it only once
cls.context = MultiDeviceTestContext(
cls.TEST_DEVICE_INFO,
process=True,
)
cls.context.start()
cls.device_name = cls.context.get_device_access("STAT/MyDevice/2")
@classmethod
def tearDownClass(cls):
# In Python3.8+, we can use addClassCleanup instead
cls.context.stop()
def test_lazy_read_scalar(self, dev_proxy_mock):
mock = MagicMock()
mock.side_effect = DevFailed(type("", (object,), {"desc": "something"}))
dev_proxy_mock.side_effect = devices.lazy(mock)
proxy = lofar_station_client.devices.LofarDeviceProxy(self.device_name)
# DeviceProxy not yet initialised
with self.assertRaises(AttributeError):
_ = proxy.scalar
# Simulate connection with DB
mock.side_effect = None
mock.return_value = type("", (object,), {"scalar": True})
value = proxy.scalar
self.assertEqual(bool, type(value))
def test_lazy_write_scalar(self, dev_proxy_mock):
mock = MagicMock()
mock.side_effect = DevFailed(type("", (object,), {"desc": "something"}))
dev_proxy_mock.side_effect = devices.lazy(mock)
proxy = lofar_station_client.devices.LofarDeviceProxy(self.device_name)
with self.assertRaises(AttributeError):
proxy.scalar = True
# Simulate connection with DB
mock.side_effect = None
mock.return_value = type("", (object,), {"scalar": False})
self.assertEqual(False, proxy.scalar)
proxy.scalar = True
self.assertEqual(True, proxy.scalar)
class RecvDeviceTest(MyDevice):
RCU_attenuator_dB_R = attribute(
dtype=((numpy.int64,),),
max_dim_x=3,
max_dim_y=32,
access=AttrWriteType.READ,
)
RCU_band_select_R = attribute(
dtype=((numpy.int64,),),
max_dim_x=3,
max_dim_y=32,
access=AttrWriteType.READ,
)
RCU_DTH_on_R = attribute(
dtype=((bool,),),
max_dim_x=3,
max_dim_y=32,
access=AttrWriteType.READ,
)
def init_device(self):
self.RCU_attenuator_dB_R = numpy.zeros((32, 3), dtype=numpy.int64)
self.RCU_band_select_R = numpy.zeros((32, 3), dtype=numpy.int64)
self.RCU_DTH_on_R = numpy.zeros((32, 3), dtype=bool)
super().init_device()
def read_RCU_attenuator_dB_R(self):
return self.RCU_attenuator_dB_R
def read_RCU_band_select_R(self):
return self.RCU_band_select_R
def read_RCU_DTH_on_R(self):
return self.RCU_DTH_on_R
class RecvDeviceProxyTest(base.TestCase):
TEST_DEVICE_INFO = [
{
"class": RecvDeviceTest,
"devices": [{"name": "stat/recv/1", "properties": {}, "memorized": {}}],
}
]
@classmethod
def setUpClass(cls):
# setting up the TestContext takes ~1 second, so do it only once
cls.context = MultiDeviceTestContext(
cls.TEST_DEVICE_INFO,
process=True,
)
cls.context.start()
cls.proxy = LofarDeviceProxy(cls.context.get_device_access("stat/recv/1"))
@classmethod
def tearDownClass(cls):
# In Python3.8+, we can use addClassCleanup instead
cls.context.stop()
@classmethod
def state(cls):
return cls.state
@classmethod
def off(cls):
cls.state = DevState.OFF
@classmethod
def on(cls):
cls.state = DevState.ON
class FakeAntennaFieldDeviceProxy:
"""DeviceProxy that mocks access to an AntennaField device."""
class AntennaStatus(Enum):
OK = 0
nr_antennas_R = 3
Antenna_to_SDP_Mapping_R = [[0, 0], [0, 1], [0, 2]]
Antenna_Names_R = ["Aap", "Noot", "Mies"]
Antenna_Reference_ITRF_R = [[0, 0, 0]] * nr_antennas_R
Antenna_Status_R = [AntennaStatus.OK] * nr_antennas_R
Antenna_Usage_Mask_R = [True] * nr_antennas_R
Frequency_Band_RW = ["HBA_110_190"] * nr_antennas_R
RCU_attenuator_dB_R = [0, 1, 2]
RCU_band_select_R = [1] * nr_antennas_R
RCU_DTH_on_R = [False] * nr_antennas_R
RCU_DTH_freq_R = [0.0] * nr_antennas_R
RCU_PCB_ID_R = [[1, 1]] * 48
RCU_PCB_version_R = [["version", "version"]] * 48
HBAT_PWR_on_R = []
def __init__(self, name):
self._name = name
def name(self):
return self._name
def dev_name(self):
return self._name.lower()
class FakeOffAntennaFieldDeviceProxy:
"""DeviceProxy that mocks access to an Off AntennaField device."""
def __init__(self, name):
self._name = name
def name(self):
return self._name
def dev_name(self):
return self._name.lower()
def __getattr__(self, attrname):
raise DevFailed("Device is off")
class FakeSDPFirmwareDeviceProxy:
"""DeviceProxy that mocks access to an SDPFirmware device."""
FPGA_firmware_version_R = ["firmware version"] * N_pn
FPGA_hardware_version_R = ["hardware version"] * N_pn
nr_signal_inputs_R = 192
first_signal_input_index_R = 0
def __init__(self, name):
self._name = name
def name(self):
return self._name
def __getattr__(self, attrname):
return getattr(self, attrname)
class FakeSDPDeviceProxy:
"""DeviceProxy that mocks access to an SDP device."""
nyquist_zone_RW = numpy.array([[0] * S_pn] * N_pn, dtype=numpy.int64)
FPGA_spectral_inversion_R = numpy.array([[False] * S_pn] * N_pn, dtype=bool)
subband_frequency_R = numpy.array(
[[sb * CST_FS / CST_N_SUB for sb in range(CST_N_SUB)]] * (S_pn * N_pn),
dtype=numpy.float64,
)
def __init__(self, name):
self._name = name
def name(self):
return self._name
def __getattr__(self, attrname):
return getattr(self, attrname)
class FakeTileBeamDeviceProxy:
"""DeviceProxy that mocks access to a TileBeam device."""
N_tiles = 48
Pointing_direction_str_R = ["J2000 (0deg, 0deg)"] * N_tiles
Tracking_enabled_R = [True] * N_tiles
def __init__(self, name):
self._name = name
def name(self):
return self._name
def __getattr__(self, attrname):
return getattr(self, attrname)
class FakeDigitalBeamDeviceProxy:
"""DeviceProxy that mocks access to an DigitalBeam device."""
N_beamlets_ctrl = 488
subband_select_RW = list(range(0, N_beamlets_ctrl))
Pointing_direction_str_R = ["J2000 (0deg, 0deg)"] * N_beamlets_ctrl
Tracking_enabled_R = [True] * N_beamlets_ctrl
def __init__(self, name):
self._name = name
def name(self):
return self._name
def __getattr__(self, attrname):
return getattr(self, attrname)
class FakeStationManagerDeviceProxy:
"""DeviceProxy that mocks access to a StationManager device."""
station_name_R = "DevStation"
def __init__(self, name):
self._name = name
def name(self):
return self._name
def __getattr__(self, attrname):
return getattr(self, attrname)