Select Git revision
plot_Ateamclipper.cwl
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_lofar_device.py 12.72 KiB
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import time
from unittest import mock, IsolatedAsyncioTestCase
from unittest.mock import ANY
import numpy
from tango import AttrWriteType
from tango import DevFailed
from tango import DevSource
from tango import DevVarBooleanArray
from tango.server import attribute
from tango.server import command
from tango.server import Device
from tango.server import device_property
from tango.test_context import DeviceTestContext
from tangostationcontrol.metrics import AttributeMetric
from tangostationcontrol.devices.base_device_classes import lofar_device
from tangostationcontrol.devices.base_device_classes.async_device import AsyncDevice
from test.devices import device_base
class TestAttributePoller(IsolatedAsyncioTestCase):
class DeviceMockBase(Device):
"""Device providing the necessary interface for AttributePoller and metrics."""
def __init__(self):
self.metric_labels = {"label_name": "label_value"}
def get_name(self):
return "DeviceMock"
def is_attribute_access_allowed(self, _):
return True
async def test_poll_updates_metrics(self):
"""Does poll() really read the attribute and update the metric?"""
class DeviceMock(self.DeviceMockBase):
def __init__(self):
super().__init__()
self.attr_read_counter = 0
async def async_read_attribute(self, _):
self.attr_read_counter += 1
return 42.0
device = DeviceMock()
ap = lofar_device.AttributePoller(device)
# register attribute with metric
metric = AttributeMetric("metric", "description", device.metric_labels)
ap.register("A", metric, False)
# poll
await ap.poll()
# check whether attribute was read
self.assertEqual(1, device.attr_read_counter)
# check whether metric got updated
samples = metric.metric.collect()[0]
self.assertEqual(1, len(samples.samples))
self.assertEqual(42.0, samples.samples[0].value)
async def test_poll_stale_removes_metric(self):
"""Does poll() really remove the metric if the attribute cannot be read?"""
class DeviceMock(self.DeviceMockBase):
def __init__(self):
super().__init__()
self.stale = False
async def async_read_attribute(self, _):
if self.stale:
# claim we lost connnection to the OPC-UA translator
raise ConnectionError()
return 42.0
device = DeviceMock()
ap = lofar_device.AttributePoller(device)
# register attribute with metric
metric = AttributeMetric("metric", "description", device.metric_labels)
ap.register("A", metric, False)
# poll
await ap.poll()
# check whether metric got updated
samples = metric.metric.collect()[0]
self.assertEqual(1, len(samples.samples))
# make attribute stale & poll again
device.stale = True
await ap.poll()
# check whether metric got removed
samples = metric.metric.collect()[0]
self.assertEqual(0, len(samples.samples))
class TestLofarDevice(device_base.DeviceTestCase):
def setUp(self):
# DeviceTestCase setUp patches lofar_device DeviceProxy
super(TestLofarDevice, self).setUp()
self.test_device = lofar_device.LOFARDevice
def test_get_properties(self):
# allow to explicitly refer to the test
testcase = self
class MyLofarDevice(self.test_device):
A = device_property(dtype=float, mandatory=False, default_value=1.0)
B = device_property(dtype=float, mandatory=False, default_value=2.0)
C = device_property(dtype=float, mandatory=False)
@command()
def test(self):
properties = self.get_properties()
testcase.assertEqual(properties["A"], 1.0) # value from code
testcase.assertEqual(properties["B"], 42.0) # value from database
testcase.assertEqual(properties["C"], None) # value from neither
with DeviceTestContext(
MyLofarDevice, process=False, timeout=10, properties={"B": 42.0}
) as proxy:
proxy.test()
def test_get_defaults(self):
# allow to explicitly refer to the test
testcase = self
class MyLofarDevice(self.test_device):
def init_device(self):
self._A = 42
self._B = numpy.array((2, 3), dtype=int)
@attribute(dtype=int)
def A(self):
return self._A
@attribute(dtype=((int,),), max_dim_x=3, max_dim_y=2)
def B(self):
return self._B
@command()
def test(self):
# test with a singular value
properties = {"A_default": 4}
defaults = self.get_defaults(properties)
testcase.assertListEqual([("A", 4)], defaults)
# test with a 2D array (should get a reshape as properties are 1D)
properties = {"B_default": [1, 2, 3, 4, 5, 6]}
defaults = self.get_defaults(properties)
testcase.assertListEqual([("B", ANY)], defaults)
testcase.assertListEqual(
[[1, 2, 3], [4, 5, 6]], defaults[0][1].tolist()
)
# test with a default for a non-existing property
properties = {"C_default": 4}
defaults = self.get_defaults(properties)
testcase.assertListEqual([], defaults)
with DeviceTestContext(MyLofarDevice, process=False, timeout=10) as proxy:
proxy.test()
def test_read_attribute(self):
"""Test whether read_attribute really returns the attribute."""
if issubclass(self.test_device, AsyncDevice):
# AsyncDevices replace read_attribute with async_read_attibute,
# and has its own tests.
return
class MyLofarDevice(self.test_device):
@attribute(dtype=float)
def A(self):
return 42.0
@attribute(dtype=float)
def read_attribute_A(self):
return self.read_attribute("A")
@attribute(dtype=(float,), max_dim_x=2)
def B_array(self):
return [42.0, 43.0]
@attribute(dtype=(float,), max_dim_x=2)
def read_attribute_B_array(self):
return self.read_attribute("B_array")
with DeviceTestContext(MyLofarDevice, process=False, timeout=10) as proxy:
proxy.initialise()
self.assertEqual(42.0, proxy.read_attribute_A)
self.assertListEqual([42.0, 43.0], proxy.read_attribute_B_array.tolist())
def test_poll_attribute(self):
"""Test whether poll_attribute really polls registered attributes."""
if issubclass(self.test_device, AsyncDevice):
# AsyncDevices replace init_device (etc) with an async version,
# and has its own tests.
return
class MyLofarDevice(self.test_device):
def init_device(self):
super().init_device()
self._A_read_counter = 0
self.attribute_poller.register("A", None, False)
@attribute(dtype=float)
def A(self):
self._A_read_counter += 1
return 42.0
@attribute(dtype=int)
def A_read_counter(self):
return self._A_read_counter
with DeviceTestContext(MyLofarDevice, process=False, timeout=10) as proxy:
# make sure we don't get a cached result if the
# poll_attribute was already called by Tango's polling loop.
proxy.set_source(DevSource.DEV)
# turn device ON to allow polling
proxy.initialise()
proxy.on()
# force poll
proxy.poll_attributes()
# check whether A was read. It could have been read by a periodic
# call from poll_attributes, but that's what we're testing so it's ok.
self.assertGreaterEqual(proxy.A_read_counter, 1)
def test_attributes_polled(self):
"""Test whether attributes are actually polled."""
if issubclass(self.test_device, AsyncDevice):
# AsyncDevices replace init_device (etc) with an async version,
# and has its own tests.
return
class MyLofarDevice(self.test_device):
def init_device(self):
super().init_device()
self._A_read_counter = 0
self.attribute_poller.register("A", None, False)
@attribute(dtype=float)
def A(self):
self._A_read_counter += 1
return 42.0
@attribute(dtype=int)
def A_read_counter(self):
return self._A_read_counter
# The first (and possibly only) call to poll()
# is done when the device is still in the INIT state, because
# that is when polling is set up.
#
# So we make sure polling is allowed then already.
def is_attribute_access_allowed(self, _):
return True
with DeviceTestContext(MyLofarDevice, process=False, timeout=10) as proxy:
# make sure we don't get a cached result if the
# poll_attribute was already called by Tango's polling loop.
proxy.set_source(DevSource.DEV)
# turn device ON to allow polling
proxy.initialise()
proxy.on()
# wait until read counter increments due to polling
TIMEOUT = 20
SLEEP_TIME = 0.1
for _ in range(int(TIMEOUT / SLEEP_TIME)):
if proxy.A_read_counter > 0:
break
time.sleep(SLEEP_TIME)
# turn device OFF to force polling to be terminated
proxy.off()
# check whether A was read. It could have been read by a periodic
# call from poll_attributes, but that's what we're testing so it's ok.
self.assertGreaterEqual(proxy.A_read_counter, 1)
def test_disable_state_transitions(self):
with DeviceTestContext(self.test_device, process=False, timeout=10) as proxy:
proxy.off()
with self.assertRaises(DevFailed):
proxy.power_hardware_off()
proxy.boot()
proxy.Fault()
with self.assertRaises(DevFailed):
proxy.power_hardware_off()
def test_atomic_read_modify_write(self):
"""Test atomic read modify write for attribute"""
class AttributeLofarDevice(self.test_device):
BOOL_ARRAY_DIM = 32
# Just for demo, do not use class variables to store attribute state
_bool_array = [False] * BOOL_ARRAY_DIM
bool_array = attribute(
dtype=(bool,),
max_dim_x=BOOL_ARRAY_DIM,
access=AttrWriteType.READ_WRITE,
fget="get_bool_array",
fset="set_bool_array",
)
def get_bool_array(self):
return self._bool_array
def set_bool_array(self, bool_array):
self._bool_array = bool_array
@command(dtype_in=DevVarBooleanArray)
def do_read_modify_write(self, values: numpy.array):
bool_array_half = int(self.BOOL_ARRAY_DIM / 2)
# We have to mock the proxy because lofar_device.proxy will be
# patched
t_write = mock.Mock()
t_proxy = mock.Mock(
read_attribute=mock.Mock(
return_value=mock.Mock(value=numpy.array(self._bool_array))
),
write_attribute=t_write,
)
self.atomic_read_modify_write_attribute(
values,
t_proxy,
"bool_array",
numpy.array([True, False] * bool_array_half),
)
# Fake the write, extract the call argument from t_write mock
self._bool_array = t_write.call_args[0][1]
with DeviceTestContext(AttributeLofarDevice, process=False) as proxy:
bool_array_half = int(AttributeLofarDevice.BOOL_ARRAY_DIM / 2)
excepted_result = [True, False] * bool_array_half
proxy.initialise()
proxy.do_read_modify_write([True] * AttributeLofarDevice.BOOL_ARRAY_DIM)
numpy.testing.assert_array_equal(excepted_result, proxy.bool_array)