Skip to content
Snippets Groups Projects
Commit 84f760bb authored by Hannes Feldt's avatar Hannes Feldt
Browse files

L2SS-1388: Lofar station client intermittent unit test test_lazy_write_scalar

parent 220eaa57
No related branches found
No related tags found
1 merge request!54L2SS-1388: Lofar station client intermittent unit test test_lazy_write_scalar
......@@ -105,6 +105,7 @@ tox -e debug tests.requests.test_prometheus
```
## Releasenotes
- 0.14.7 - Refactor LazyDeviceProxy
- 0.14.6 - Removed deprecated StationSSTCollector
- 0.14.5 - Added `gn_indices` and support for global node indices > 16.
- 0.14.4 - Fixed bug on `writer_version` retrieval
......
0.14.6
0.14.7
""" Enhanced interfaces towards the station's Tango devices. """
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
# inconsistent-return-statements
# pylint: disable=R1710
import ast
from functools import lru_cache
import logging
from functools import lru_cache
import numpy
from tango import DeviceProxy, DevFailed
from tango import DevFailed, DeviceProxy
from tango import ExtractAs
logger = logging.getLogger()
class LofarDeviceProxy(DeviceProxy):
"""A LOFAR-specific tango.DeviceProxy that provides
a richer experience."""
LAZY_DEVICE_PROXY_SLOTS = ["_create_instance", "connected", "_instance"]
LAZY_DEVICE_PROXY_MEMBERS = LAZY_DEVICE_PROXY_SLOTS + ["_connect"]
def __init__(self, *args):
"""Do not connect on device construction"""
self.__dict__["dev_name"] = str(args[0])
self.dev_name = str(args[0])
self.__dict__["connected"] = False
self.connected = False
logger.info("LOFARDeviceProxy %s not connected", self.dev_name)
def __str__(self):
if not self.connected:
return f"<LOFARDeviceProxy({self.dev_name})"
return super().__str__()
class LazyDeviceProxy:
"""A wrapper class that allows the regular DeviceProxy to only open the connection
when needed and not during creation."""
__repr__ = __str__
repr = __str__
__slots__ = LAZY_DEVICE_PROXY_SLOTS
def connect(self):
def __init__(self, cls, *args, **kwargs):
self._create_instance = lambda: cls(*args, **kwargs)
self.connected = False
self._instance = None
def _connect(self):
"""Try to estabilish a connection when a device operation is called"""
if not self.connected:
try:
super().__init__(self.dev_name)
self.__dict__["connected"] = True
self._instance = self._create_instance()
self.connected = True
except DevFailed as excep:
self.__dict__["connected"] = False
except DevFailed as ex:
self.connected = False
reason = excep.args[0].desc.replace("\n", " ")
reason = ex.args[0].desc.replace("\n", " ")
logger.warning("LOFARDeviceProxy not connected: %s", reason)
def __getattribute__(self, name):
if name in LAZY_DEVICE_PROXY_MEMBERS:
return object.__getattribute__(self, name)
self._connect()
return getattr(object.__getattribute__(self, "_instance"), name)
def __delattr__(self, name):
self._connect()
delattr(object.__getattribute__(self, "_instance"), name)
def __setattr__(self, name, value):
if name in LAZY_DEVICE_PROXY_MEMBERS:
object.__setattr__(self, name, value)
else:
self._connect()
setattr(object.__getattribute__(self, "_instance"), name, value)
def __nonzero__(self):
self._connect()
return bool(object.__getattribute__(self, "_instance"))
def __str__(self):
self._connect()
return str(object.__getattribute__(self, "_instance"))
def __repr__(self):
self._connect()
return repr(object.__getattribute__(self, "_instance"))
def lazy(cls):
"""A decorator that allows the regular DeviceProxy to only open the connection
when needed and not during creation."""
return lambda *args, **kwargs: LazyDeviceProxy(cls, *args, **kwargs)
@lazy
class LofarDeviceProxy(DeviceProxy):
"""A LOFAR-specific tango.DeviceProxy that provides
a richer experience."""
@lru_cache()
def get_attribute_config(self, name):
"""Get cached attribute configurations, as they are not expected to change."""
self.connect()
if self.connected:
return super().get_attribute_config(name)
@lru_cache()
def get_attribute_shape(self, name):
"""Get the shape of the requested attribute, as a tuple."""
self.connect()
if self.connected:
config = self.get_attribute_config(name)
if config.format and config.format[0] == "(":
......@@ -81,8 +113,6 @@ class LofarDeviceProxy(DeviceProxy):
def read_attribute(self, name, extract_as=ExtractAs.Numpy):
"""Read an attribute from the server."""
self.connect()
if self.connected:
attr = super().read_attribute(name, extract_as)
# convert non-scalar values into their actual shape
......@@ -94,8 +124,6 @@ class LofarDeviceProxy(DeviceProxy):
def write_attribute(self, name, value):
"""Write an attribute to the server."""
self.connect()
if self.connected:
config = self.get_attribute_config(name)
shape = self.get_attribute_shape(name)
......
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from unittest import mock
from unittest.mock import MagicMock
import numpy
from tango import DevState, DevFailed
from tango.server import Device, attribute, AttrWriteType
from tango.test_context import MultiDeviceTestContext
import lofar_station_client
from lofar_station_client import devices
from lofar_station_client.devices import LofarDeviceProxy
from tests import base
......@@ -76,7 +80,7 @@ class LofarDeviceProxyTest(base.TestCase):
cls.context.start()
cls.proxy = LofarDeviceProxy(cls.context.get_device_access("STAT/MyDevice/1"))
cls.proxy.connect() # necessary in the DeviceTestContext
# cls.proxy.connect() # necessary in the DeviceTestContext
@classmethod
def tearDownClass(cls):
......@@ -139,6 +143,7 @@ class LofarDeviceProxyTest(base.TestCase):
self.assertEqual((2, 3, 4), value.shape)
@mock.patch("lofar_station_client.devices.LofarDeviceProxy")
class LazyLofarDeviceProxyTest(base.TestCase):
TEST_DEVICE_INFO = [
{
......@@ -156,44 +161,47 @@ class LazyLofarDeviceProxyTest(base.TestCase):
)
cls.context.start()
cls.proxy = LofarDeviceProxy(cls.context.get_device_access("STAT/MyDevice/2"))
cls.proxy.connected = False
cls.device_name = cls.context.get_device_access("STAT/MyDevice/2")
@classmethod
def tearDownClass(cls):
# In Python3.8+, we can use addClassCleanup instead
cls.context.stop()
@classmethod
def connect(cls):
# Simulate that a connection with the DB has been estabilished,
# i.e. Device has been created in the TangoDB
cls.proxy.connect()
def test_lazy_read_scalar(self, dev_proxy_mock):
mock = MagicMock()
mock.side_effect = DevFailed(type("", (object,), {"desc": "something"}))
dev_proxy_mock.side_effect = devices.lazy(mock)
@classmethod
def disconnect(cls):
# Simulate a disconnected device
cls.proxy.connected = False
proxy = lofar_station_client.devices.LofarDeviceProxy(self.device_name)
def test_lazy_read_scalar(self):
self.disconnect()
# DeviceProxy not yet initialised
with self.assertRaises(AttributeError):
value = self.proxy.scalar
_ = proxy.scalar
# Simulate connection with DB
self.connect()
value = self.proxy.scalar
mock.side_effect = None
mock.return_value = type("", (object,), {"scalar": True})
value = proxy.scalar
self.assertEqual(bool, type(value))
def test_lazy_write_scalar(self):
self.disconnect()
def test_lazy_write_scalar(self, dev_proxy_mock):
mock = MagicMock()
mock.side_effect = DevFailed(type("", (object,), {"desc": "something"}))
dev_proxy_mock.side_effect = devices.lazy(mock)
proxy = lofar_station_client.devices.LofarDeviceProxy(self.device_name)
with self.assertRaises(AttributeError):
self.proxy.scalar = True
proxy.scalar = True
# Simulate connection with DB
self.connect()
self.assertEqual(False, self.proxy.scalar)
self.proxy.scalar = True
self.assertEqual(True, self.proxy.scalar)
mock.side_effect = None
mock.return_value = type("", (object,), {"scalar": False})
self.assertEqual(False, proxy.scalar)
proxy.scalar = True
self.assertEqual(True, proxy.scalar)
class RecvDeviceTest(MyDevice):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment