Skip to content
Snippets Groups Projects
Select Git revision
  • 85306c706e7c01a0bf1869c29f5ed8c7a90fec9d
  • main default protected
  • tickets/156
  • tickets/86
  • add_to_basket
5 results

zooniverse.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_lofar_device.py 12.72 KiB
    # Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    
    import time
    from unittest import mock, IsolatedAsyncioTestCase
    from unittest.mock import ANY
    
    import numpy
    from tango import AttrWriteType
    from tango import DevFailed
    from tango import DevSource
    from tango import DevVarBooleanArray
    from tango.server import attribute
    from tango.server import command
    from tango.server import Device
    from tango.server import device_property
    from tango.test_context import DeviceTestContext
    
    from tangostationcontrol.metrics import AttributeMetric
    from tangostationcontrol.devices.base_device_classes import lofar_device
    from tangostationcontrol.devices.base_device_classes.async_device import AsyncDevice
    
    from test.devices import device_base
    
    
    class TestAttributePoller(IsolatedAsyncioTestCase):
        class DeviceMockBase(Device):
            """Device providing the necessary interface for AttributePoller and metrics."""
    
            def __init__(self):
                self.metric_labels = {"label_name": "label_value"}
    
            def get_name(self):
                return "DeviceMock"
    
            def is_attribute_access_allowed(self, _):
                return True
    
        async def test_poll_updates_metrics(self):
            """Does poll() really read the attribute and update the metric?"""
    
            class DeviceMock(self.DeviceMockBase):
                def __init__(self):
                    super().__init__()
                    self.attr_read_counter = 0
    
                async def async_read_attribute(self, _):
                    self.attr_read_counter += 1
                    return 42.0
    
            device = DeviceMock()
            ap = lofar_device.AttributePoller(device)
    
            # register attribute with metric
            metric = AttributeMetric("metric", "description", device.metric_labels)
            ap.register("A", metric, False)
    
            # poll
            await ap.poll()
    
            # check whether attribute was read
            self.assertEqual(1, device.attr_read_counter)
    
            # check whether metric got updated
            samples = metric.metric.collect()[0]
            self.assertEqual(1, len(samples.samples))
            self.assertEqual(42.0, samples.samples[0].value)
    
        async def test_poll_stale_removes_metric(self):
            """Does poll() really remove the metric if the attribute cannot be read?"""
    
            class DeviceMock(self.DeviceMockBase):
                def __init__(self):
                    super().__init__()
                    self.stale = False
    
                async def async_read_attribute(self, _):
                    if self.stale:
                        # claim we lost connnection to the OPC-UA translator
                        raise ConnectionError()
                    return 42.0
    
            device = DeviceMock()
            ap = lofar_device.AttributePoller(device)
    
            # register attribute with metric
            metric = AttributeMetric("metric", "description", device.metric_labels)
            ap.register("A", metric, False)
    
            # poll
            await ap.poll()
    
            # check whether metric got updated
            samples = metric.metric.collect()[0]
            self.assertEqual(1, len(samples.samples))
    
            # make attribute stale & poll again
            device.stale = True
            await ap.poll()
    
            # check whether metric got removed
            samples = metric.metric.collect()[0]
            self.assertEqual(0, len(samples.samples))
    
    
    class TestLofarDevice(device_base.DeviceTestCase):
        def setUp(self):
            # DeviceTestCase setUp patches lofar_device DeviceProxy
            super(TestLofarDevice, self).setUp()
    
            self.test_device = lofar_device.LOFARDevice
    
        def test_get_properties(self):
            # allow to explicitly refer to the test
            testcase = self
    
            class MyLofarDevice(self.test_device):
                A = device_property(dtype=float, mandatory=False, default_value=1.0)
                B = device_property(dtype=float, mandatory=False, default_value=2.0)
                C = device_property(dtype=float, mandatory=False)
    
                @command()
                def test(self):
                    properties = self.get_properties()
    
                    testcase.assertEqual(properties["A"], 1.0)  # value from code
                    testcase.assertEqual(properties["B"], 42.0)  # value from database
                    testcase.assertEqual(properties["C"], None)  # value from neither
    
            with DeviceTestContext(
                MyLofarDevice, process=False, timeout=10, properties={"B": 42.0}
            ) as proxy:
                proxy.test()
    
        def test_get_defaults(self):
            # allow to explicitly refer to the test
            testcase = self
    
            class MyLofarDevice(self.test_device):
                def init_device(self):
                    self._A = 42
                    self._B = numpy.array((2, 3), dtype=int)
    
                @attribute(dtype=int)
                def A(self):
                    return self._A
    
                @attribute(dtype=((int,),), max_dim_x=3, max_dim_y=2)
                def B(self):
                    return self._B
    
                @command()
                def test(self):
                    # test with a singular value
                    properties = {"A_default": 4}
                    defaults = self.get_defaults(properties)
    
                    testcase.assertListEqual([("A", 4)], defaults)
    
                    # test with a 2D array (should get a reshape as properties are 1D)
                    properties = {"B_default": [1, 2, 3, 4, 5, 6]}
                    defaults = self.get_defaults(properties)
    
                    testcase.assertListEqual([("B", ANY)], defaults)
                    testcase.assertListEqual(
                        [[1, 2, 3], [4, 5, 6]], defaults[0][1].tolist()
                    )
    
                    # test with a default for a non-existing property
                    properties = {"C_default": 4}
                    defaults = self.get_defaults(properties)
    
                    testcase.assertListEqual([], defaults)
    
            with DeviceTestContext(MyLofarDevice, process=False, timeout=10) as proxy:
                proxy.test()
    
        def test_read_attribute(self):
            """Test whether read_attribute really returns the attribute."""
    
            if issubclass(self.test_device, AsyncDevice):
                # AsyncDevices replace read_attribute with async_read_attibute,
                # and has its own tests.
                return
    
            class MyLofarDevice(self.test_device):
                @attribute(dtype=float)
                def A(self):
                    return 42.0
    
                @attribute(dtype=float)
                def read_attribute_A(self):
                    return self.read_attribute("A")
    
                @attribute(dtype=(float,), max_dim_x=2)
                def B_array(self):
                    return [42.0, 43.0]
    
                @attribute(dtype=(float,), max_dim_x=2)
                def read_attribute_B_array(self):
                    return self.read_attribute("B_array")
    
            with DeviceTestContext(MyLofarDevice, process=False, timeout=10) as proxy:
                proxy.initialise()
                self.assertEqual(42.0, proxy.read_attribute_A)
                self.assertListEqual([42.0, 43.0], proxy.read_attribute_B_array.tolist())
    
        def test_poll_attribute(self):
            """Test whether poll_attribute really polls registered attributes."""
    
            if issubclass(self.test_device, AsyncDevice):
                # AsyncDevices replace init_device (etc) with an async version,
                # and has its own tests.
                return
    
            class MyLofarDevice(self.test_device):
                def init_device(self):
                    super().init_device()
    
                    self._A_read_counter = 0
    
                    self.attribute_poller.register("A", None, False)
    
                @attribute(dtype=float)
                def A(self):
                    self._A_read_counter += 1
                    return 42.0
    
                @attribute(dtype=int)
                def A_read_counter(self):
                    return self._A_read_counter
    
            with DeviceTestContext(MyLofarDevice, process=False, timeout=10) as proxy:
                # make sure we don't get a cached result if the
                # poll_attribute was already called by Tango's polling loop.
                proxy.set_source(DevSource.DEV)
    
                # turn device ON to allow polling
                proxy.initialise()
                proxy.on()
    
                # force poll
                proxy.poll_attributes()
    
                # check whether A was read. It could have been read by a periodic
                # call from poll_attributes, but that's what we're testing so it's ok.
                self.assertGreaterEqual(proxy.A_read_counter, 1)
    
        def test_attributes_polled(self):
            """Test whether attributes are actually polled."""
    
            if issubclass(self.test_device, AsyncDevice):
                # AsyncDevices replace init_device (etc) with an async version,
                # and has its own tests.
                return
    
            class MyLofarDevice(self.test_device):
                def init_device(self):
                    super().init_device()
    
                    self._A_read_counter = 0
    
                    self.attribute_poller.register("A", None, False)
    
                @attribute(dtype=float)
                def A(self):
                    self._A_read_counter += 1
                    return 42.0
    
                @attribute(dtype=int)
                def A_read_counter(self):
                    return self._A_read_counter
    
                # The first (and possibly only) call to poll()
                # is done when the device is still in the INIT state, because
                # that is when polling is set up.
                #
                # So we make sure polling is allowed then already.
                def is_attribute_access_allowed(self, _):
                    return True
    
            with DeviceTestContext(MyLofarDevice, process=False, timeout=10) as proxy:
                # make sure we don't get a cached result if the
                # poll_attribute was already called by Tango's polling loop.
                proxy.set_source(DevSource.DEV)
    
                # turn device ON to allow polling
                proxy.initialise()
                proxy.on()
    
                # wait until read counter increments due to polling
                TIMEOUT = 20
                SLEEP_TIME = 0.1
                for _ in range(int(TIMEOUT / SLEEP_TIME)):
                    if proxy.A_read_counter > 0:
                        break
    
                    time.sleep(SLEEP_TIME)
    
                # turn device OFF to force polling to be terminated
                proxy.off()
    
                # check whether A was read. It could have been read by a periodic
                # call from poll_attributes, but that's what we're testing so it's ok.
                self.assertGreaterEqual(proxy.A_read_counter, 1)
    
        def test_disable_state_transitions(self):
            with DeviceTestContext(self.test_device, process=False, timeout=10) as proxy:
                proxy.off()
                with self.assertRaises(DevFailed):
                    proxy.power_hardware_off()
                proxy.boot()
                proxy.Fault()
                with self.assertRaises(DevFailed):
                    proxy.power_hardware_off()
    
        def test_atomic_read_modify_write(self):
            """Test atomic read modify write for attribute"""
    
            class AttributeLofarDevice(self.test_device):
                BOOL_ARRAY_DIM = 32
    
                # Just for demo, do not use class variables to store attribute state
                _bool_array = [False] * BOOL_ARRAY_DIM
                bool_array = attribute(
                    dtype=(bool,),
                    max_dim_x=BOOL_ARRAY_DIM,
                    access=AttrWriteType.READ_WRITE,
                    fget="get_bool_array",
                    fset="set_bool_array",
                )
    
                def get_bool_array(self):
                    return self._bool_array
    
                def set_bool_array(self, bool_array):
                    self._bool_array = bool_array
    
                @command(dtype_in=DevVarBooleanArray)
                def do_read_modify_write(self, values: numpy.array):
                    bool_array_half = int(self.BOOL_ARRAY_DIM / 2)
    
                    # We have to mock the proxy because lofar_device.proxy will be
                    # patched
                    t_write = mock.Mock()
                    t_proxy = mock.Mock(
                        read_attribute=mock.Mock(
                            return_value=mock.Mock(value=numpy.array(self._bool_array))
                        ),
                        write_attribute=t_write,
                    )
    
                    self.atomic_read_modify_write_attribute(
                        values,
                        t_proxy,
                        "bool_array",
                        numpy.array([True, False] * bool_array_half),
                    )
    
                    # Fake the write, extract the call argument from t_write mock
                    self._bool_array = t_write.call_args[0][1]
    
            with DeviceTestContext(AttributeLofarDevice, process=False) as proxy:
                bool_array_half = int(AttributeLofarDevice.BOOL_ARRAY_DIM / 2)
                excepted_result = [True, False] * bool_array_half
    
                proxy.initialise()
                proxy.do_read_modify_write([True] * AttributeLofarDevice.BOOL_ARRAY_DIM)
    
                numpy.testing.assert_array_equal(excepted_result, proxy.bool_array)