Skip to content
Snippets Groups Projects
Commit 37c95cbf authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-705-avoid-proxy-in-read_attribute' into 'master'

L2SS-705: Read values directly from the hardware instead of through the proxy

Closes L2SS-705

See merge request !274
parents d6cd74ad ff253101
No related branches found
No related tags found
1 merge request!274L2SS-705: Read values directly from the hardware instead of through the proxy
...@@ -84,16 +84,16 @@ class APSCT(opcua_device): ...@@ -84,16 +84,16 @@ class APSCT(opcua_device):
# ---------- # ----------
# Summarising Attributes # Summarising Attributes
# ---------- # ----------
APSCT_error_R = attribute(dtype=bool) APSCT_error_R = attribute(dtype=bool, fisallowed="is_attribute_wrapper_allowed")
def read_APSCT_error_R(self): def read_APSCT_error_R(self):
return ((self.proxy.APSCTTR_I2C_error_R > 0) return ((self.read_attribute("APSCTTR_I2C_error_R") > 0)
or self.alarm_val("APSCT_PCB_ID_R") or self.alarm_val("APSCT_PCB_ID_R")
or (not self.proxy.APSCT_INPUT_10MHz_good_R) or (not self.read_attribute("APSCT_INPUT_10MHz_good_R"))
or (not self.proxy.APSCT_INPUT_PPS_good_R and not self.proxy.APSCT_PPS_ignore_R) or (not self.read_attribute("APSCT_INPUT_PPS_good_R") and not self.read_attribute("APSCT_PPS_ignore_R"))
or (not self.proxy.APSCT_PLL_160MHz_locked_R and not self.proxy.APSCT_PLL_200MHz_locked_R) or (not self.read_attribute("APSCT_PLL_160MHz_locked_R") and not self.read_attribute("APSCT_PLL_200MHz_locked_R"))
or (self.proxy.APSCT_PLL_200MHz_locked_R and self.proxy.APSCT_PLL_200MHz_error_R) or (self.read_attribute("APSCT_PLL_200MHz_locked_R") and self.read_attribute("APSCT_PLL_200MHz_error_R"))
or (self.proxy.APSCT_PLL_160MHz_locked_R and self.proxy.APSCT_PLL_160MHz_error_R) or (self.read_attribute("APSCT_PLL_160MHz_locked_R") and self.read_attribute("APSCT_PLL_160MHz_error_R"))
) )
APSCT_TEMP_error_R = attribute(dtype=bool) APSCT_TEMP_error_R = attribute(dtype=bool)
...@@ -108,9 +108,9 @@ class APSCT(opcua_device): ...@@ -108,9 +108,9 @@ class APSCT(opcua_device):
or self.alarm_val("APSCT_PWR_CLKDIST2_3V3_R") or self.alarm_val("APSCT_PWR_CLKDIST2_3V3_R")
or self.alarm_val("APSCT_PWR_CTRL_3V3_R") or self.alarm_val("APSCT_PWR_CTRL_3V3_R")
or self.alarm_val("APSCT_PWR_INPUT_3V3_R") or self.alarm_val("APSCT_PWR_INPUT_3V3_R")
or (self.proxy.APSCT_PWR_PLL_160MHz_on_R and self.alarm_val("APSCT_PWR_PLL_160MHz_3V3_R")) or (self.read_attribute("APSCT_PWR_PLL_160MHz_on_R") and self.alarm_val("APSCT_PWR_PLL_160MHz_3V3_R"))
or (self.proxy.APSCT_PWR_PLL_200MHz_on_R and self.alarm_val("APSCT_PWR_PLL_200MHz_3V3_R")) or (self.read_attribute("APSCT_PWR_PLL_200MHz_on_R") and self.alarm_val("APSCT_PWR_PLL_200MHz_3V3_R"))
or (not self.proxy.APSCT_PWR_on_R) or (not self.read_attribute("APSCT_PWR_on_R"))
) )
# -------- # --------
...@@ -126,8 +126,8 @@ class APSCT(opcua_device): ...@@ -126,8 +126,8 @@ class APSCT(opcua_device):
self.APSCT_200MHz_on() self.APSCT_200MHz_on()
self.wait_attribute("APSCTTR_translator_busy_R", False, self.APSCT_On_Off_timeout) self.wait_attribute("APSCTTR_translator_busy_R", False, self.APSCT_On_Off_timeout)
if not self.proxy.APSCT_PLL_200MHz_locked_R: if not self.read_attribute("APSCT_PLL_200MHz_locked_R"):
if self.proxy.APSCTTR_I2C_error_R: if self.read_attribute("APSCTTR_I2C_error_R"):
raise Exception("I2C is not working. Maybe power cycle subrack to restart CLK board and translator?") raise Exception("I2C is not working. Maybe power cycle subrack to restart CLK board and translator?")
else: else:
raise Exception("200MHz signal is not locked. The subrack probably do not receive clock input or the CLK PCB is broken?") raise Exception("200MHz signal is not locked. The subrack probably do not receive clock input or the CLK PCB is broken?")
......
...@@ -66,15 +66,15 @@ class APSPU(opcua_device): ...@@ -66,15 +66,15 @@ class APSPU(opcua_device):
APSPU_error_R = attribute(dtype=bool) APSPU_error_R = attribute(dtype=bool)
def read_APSPU_error_R(self): def read_APSPU_error_R(self):
return ((self.proxy.APSPUTR_I2C_error_R > 0) return ((self.read_attribute("APSPUTR_I2C_error_R") > 0)
or self.alarm_val("APSPU_PCB_ID_R") or self.alarm_val("APSPU_PCB_ID_R")
or self.alarm_val("APSPU_FAN1_RPM_R") or self.alarm_val("APSPU_FAN1_RPM_R")
or self.alarm_val("APSPU_FAN2_RPM_R") or self.alarm_val("APSPU_FAN2_RPM_R")
or self.alarm_val("APSPU_FAN3_RPM_R")) or self.alarm_val("APSPU_FAN3_RPM_R"))
APSPU_IOUT_error_R = attribute(dtype=bool) APSPU_IOUT_error_R = attribute(dtype=bool, fisallowed="is_attribute_wrapper_allowed")
APSPU_TEMP_error_R = attribute(dtype=bool) APSPU_TEMP_error_R = attribute(dtype=bool, fisallowed="is_attribute_wrapper_allowed")
APSPU_VOUT_error_R = attribute(dtype=bool) APSPU_VOUT_error_R = attribute(dtype=bool, fisallowed="is_attribute_wrapper_allowed")
def read_APSPU_IOUT_error_R(self): def read_APSPU_IOUT_error_R(self):
return ( self.alarm_val("APSPU_LBA_IOUT_R") return ( self.alarm_val("APSPU_LBA_IOUT_R")
......
...@@ -365,6 +365,19 @@ class lofar_device(Device, metaclass=DeviceMeta): ...@@ -365,6 +365,19 @@ class lofar_device(Device, metaclass=DeviceMeta):
""" Override this method to initialise any hardware after configuring it. """ """ Override this method to initialise any hardware after configuring it. """
pass pass
def read_attribute(self, attr_name):
""" Read the value of a certain attribute (directly from the hardware). """
# obtain the class information of this attribute, effectively equal
# to getattr(self, attr_name), but this also makes sure we actually
# address an attribute.
class_attribute = self.get_device_attr().get_attr_by_name(attr_name)
# obtain the low-level wrapper for the read function
read_wrapper = getattr(self, f"__read_{attr_name}_wrapper__")
# obtain the actual value
return read_wrapper(class_attribute)
def wait_attribute(self, attr_name, value, timeout=10, pollperiod=0.2): def wait_attribute(self, attr_name, value, timeout=10, pollperiod=0.2):
""" Wait until the given attribute obtains the given value. """ Wait until the given attribute obtains the given value.
...@@ -385,7 +398,7 @@ class lofar_device(Device, metaclass=DeviceMeta): ...@@ -385,7 +398,7 @@ class lofar_device(Device, metaclass=DeviceMeta):
# Poll every half a second # Poll every half a second
for _ in range(math.ceil(timeout/pollperiod)): for _ in range(math.ceil(timeout/pollperiod)):
if is_correct(getattr(self.proxy, attr_name)): if is_correct(self.read_attribute(attr_name)):
return return
time.sleep(pollperiod) time.sleep(pollperiod)
...@@ -403,7 +416,7 @@ class lofar_device(Device, metaclass=DeviceMeta): ...@@ -403,7 +416,7 @@ class lofar_device(Device, metaclass=DeviceMeta):
is_scalar = attr_config.data_format == AttrDataFormat.SCALAR is_scalar = attr_config.data_format == AttrDataFormat.SCALAR
# fetch attribute value as an array # fetch attribute value as an array
value = self.proxy.read_attribute(attr_name).value value = self.read_attribute(attr_name)
if is_scalar: if is_scalar:
value = numpy.array(value) # this stays a scalar in numpy value = numpy.array(value) # this stays a scalar in numpy
......
...@@ -255,20 +255,20 @@ class RECV(opcua_device): ...@@ -255,20 +255,20 @@ class RECV(opcua_device):
RCU_LED_colour_R = attribute(dtype=(numpy.uint32,), max_dim_x=32) RCU_LED_colour_R = attribute(dtype=(numpy.uint32,), max_dim_x=32)
def read_RCU_LED_colour_R(self): def read_RCU_LED_colour_R(self):
return (2 * self.proxy.RCU_LED_green_on_R + 4 * self.proxy.RCU_LED_red_on_R).astype(numpy.uint32) return (2 * self.read_attribute("RCU_LED_green_on_R") + 4 * self.read_attribute("RCU_LED_red_on_R")).astype(numpy.uint32)
RCU_error_R = attribute(dtype=(bool,), max_dim_x=32) RCU_error_R = attribute(dtype=(bool,), max_dim_x=32, fisallowed="is_attribute_wrapper_allowed")
ANT_error_R = attribute(dtype=((bool,),), max_dim_x=3, max_dim_y=32) ANT_error_R = attribute(dtype=((bool,),), max_dim_x=3, max_dim_y=32, fisallowed="is_attribute_wrapper_allowed")
def read_RCU_error_R(self): def read_RCU_error_R(self):
return self.proxy.RCU_mask_RW & ( return self.read_attribute("RCU_mask_RW") & (
(self.proxy.RECVTR_I2C_error_R > 0) (self.read_attribute("RECVTR_I2C_error_R") > 0)
| self.alarm_val("RCU_PCB_ID_R") | self.alarm_val("RCU_PCB_ID_R")
) )
def read_ANT_error_R(self): def read_ANT_error_R(self):
return self.proxy.ANT_mask_RW & ( return self.read_attribute("ANT_mask_RW") & (
~self.proxy.RCU_ADC_locked_R ~self.read_attribute("RCU_ADC_locked_R")
) )
RECV_IOUT_error_R = attribute(dtype=(bool,), max_dim_x=32) RECV_IOUT_error_R = attribute(dtype=(bool,), max_dim_x=32)
...@@ -276,7 +276,7 @@ class RECV(opcua_device): ...@@ -276,7 +276,7 @@ class RECV(opcua_device):
RECV_VOUT_error_R = attribute(dtype=(bool,), max_dim_x=32) RECV_VOUT_error_R = attribute(dtype=(bool,), max_dim_x=32)
def read_RECV_IOUT_error_R(self): def read_RECV_IOUT_error_R(self):
return (self.proxy.ANT_mask_RW & ( return (self.read_attribute("ANT_mask_RW") & (
self.alarm_val("RCU_PWR_ANT_IOUT_R") self.alarm_val("RCU_PWR_ANT_IOUT_R")
)).any(axis=1) )).any(axis=1)
...@@ -287,15 +287,15 @@ class RECV(opcua_device): ...@@ -287,15 +287,15 @@ class RECV(opcua_device):
) )
def read_RECV_VOUT_error_R(self): def read_RECV_VOUT_error_R(self):
return (self.proxy.ANT_mask_RW & ( return (self.read_attribute("ANT_mask_RW") & (
self.alarm_val("RCU_PWR_ANT_VIN_R") self.alarm_val("RCU_PWR_ANT_VIN_R")
| self.alarm_val("RCU_PWR_ANT_VOUT_R") | self.alarm_val("RCU_PWR_ANT_VOUT_R")
)).any(axis=1) | (self.proxy.RCU_mask_RW & ( )).any(axis=1) | (self.read_attribute("RCU_mask_RW") & (
self.alarm_val("RCU_PWR_1V8_R") self.alarm_val("RCU_PWR_1V8_R")
| self.alarm_val("RCU_PWR_2V5_R") | self.alarm_val("RCU_PWR_2V5_R")
| self.alarm_val("RCU_PWR_3V3_R") | self.alarm_val("RCU_PWR_3V3_R")
| ~self.proxy.RCU_PWR_DIGITAL_on_R | ~self.read_attribute("RCU_PWR_DIGITAL_on_R")
| ~self.proxy.RCU_PWR_good_R | ~self.read_attribute("RCU_PWR_good_R")
)) ))
# -------- # --------
......
...@@ -166,27 +166,27 @@ class SDP(opcua_device): ...@@ -166,27 +166,27 @@ class SDP(opcua_device):
# ---------- # ----------
# Summarising Attributes # Summarising Attributes
# ---------- # ----------
FPGA_error_R = attribute(dtype=(bool,), max_dim_x=16) FPGA_error_R = attribute(dtype=(bool,), max_dim_x=16, fisallowed="is_attribute_wrapper_allowed")
FPGA_processing_error_R = attribute(dtype=(bool,), max_dim_x=16) FPGA_processing_error_R = attribute(dtype=(bool,), max_dim_x=16, fisallowed="is_attribute_wrapper_allowed")
FPGA_input_error_R = attribute(dtype=(bool,), max_dim_x=16) FPGA_input_error_R = attribute(dtype=(bool,), max_dim_x=16, fisallowed="is_attribute_wrapper_allowed")
def read_FPGA_error_R(self): def read_FPGA_error_R(self):
return self.proxy.TR_fpga_mask_RW & ( return self.read_attribute("TR_fpga_mask_R") & (
self.proxy.TR_fpga_communication_error_R self.read_attribute("TR_fpga_communication_error_R")
| (self.proxy.FPGA_firmware_version_R != "") | (self.read_attribute("FPGA_firmware_version_R") != "")
| (self.proxy.FPGA_jesd204b_csr_dev_syncn_R == 0).any(axis=1) | (self.read_attribute("FPGA_jesd204b_csr_dev_syncn_R") == 0).any(axis=1)
) )
def read_FPGA_processing_error_R(self): def read_FPGA_processing_error_R(self):
return self.proxy.TR_fpga_mask_RW & ( return self.read_attribute("TR_fpga_mask_R") & (
~self.proxy.FPGA_processing_enable_R ~self.read_attribute("FPGA_processing_enable_R")
| (self.proxy.FPGA_boot_image_R == 0) | (self.read_attribute("FPGA_boot_image_R") == 0)
) )
def read_FPGA_input_error_R(self): def read_FPGA_input_error_R(self):
return self.proxy.TR_fpga_mask_RW & ( return self.read_attribute("TR_fpga_mask_R") & (
self.proxy.FPGA_wg_enable_R.any(axis=1) self.read_attribute("FPGA_wg_enable_R").any(axis=1)
| (self.proxy.FPGA_signal_input_rms_R == 0).any(axis=1) | (self.read_attribute("FPGA_signal_input_rms_R") == 0).any(axis=1)
) )
# -------- # --------
......
...@@ -130,23 +130,23 @@ class UNB2(opcua_device): ...@@ -130,23 +130,23 @@ class UNB2(opcua_device):
# ---------- # ----------
# Summarising Attributes # Summarising Attributes
# ---------- # ----------
UNB2_error_R = attribute(dtype=(bool,), max_dim_x=2) UNB2_error_R = attribute(dtype=(bool,), max_dim_x=2, fisallowed="is_attribute_wrapper_allowed")
def read_UNB2_error_R(self): def read_UNB2_error_R(self):
return self.proxy.UNB2_mask_RW & ( return self.read_attribute("UNB2_mask_RW") & (
(self.proxy.UNB2TR_I2C_bus_error_R > 0) (self.read_attribute("UNB2TR_I2C_bus_error_R") > 0)
| self.alarm_val("UNB2_PCB_ID_R") | self.alarm_val("UNB2_PCB_ID_R")
| (self.proxy.UNB2TR_I2C_bus_DDR4_error_R > 0).any(axis=1) | (self.read_attribute("UNB2TR_I2C_bus_DDR4_error_R") > 0).any(axis=1)
| (self.proxy.UNB2TR_I2C_bus_FPGA_PS_error_R > 0).any(axis=1) | (self.read_attribute("UNB2TR_I2C_bus_FPGA_PS_error_R") > 0).any(axis=1)
| (self.proxy.UNB2TR_I2C_bus_QSFP_error_R > 0).any(axis=1) | (self.read_attribute("UNB2TR_I2C_bus_QSFP_error_R") > 0).any(axis=1)
) )
UNB2_IOUT_error_R = attribute(dtype=(bool,), max_dim_x=2) UNB2_IOUT_error_R = attribute(dtype=(bool,), max_dim_x=2, fisallowed="is_attribute_wrapper_allowed")
UNB2_TEMP_error_R = attribute(dtype=(bool,), max_dim_x=2) UNB2_TEMP_error_R = attribute(dtype=(bool,), max_dim_x=2, fisallowed="is_attribute_wrapper_allowed")
UNB2_VOUT_error_R = attribute(dtype=(bool,), max_dim_x=2) UNB2_VOUT_error_R = attribute(dtype=(bool,), max_dim_x=2, fisallowed="is_attribute_wrapper_allowed")
def read_UNB2_IOUT_error_R(self): def read_UNB2_IOUT_error_R(self):
return self.proxy.UNB2_mask_RW & ( return self.read_attribute("UNB2_mask_RW") & (
self.alarm_val("UNB2_DC_DC_48V_12V_IOUT_R") self.alarm_val("UNB2_DC_DC_48V_12V_IOUT_R")
| self.alarm_val("UNB2_FPGA_POL_CORE_IOUT_R").any(axis=1) | self.alarm_val("UNB2_FPGA_POL_CORE_IOUT_R").any(axis=1)
| self.alarm_val("UNB2_FPGA_POL_ERAM_IOUT_R").any(axis=1) | self.alarm_val("UNB2_FPGA_POL_ERAM_IOUT_R").any(axis=1)
...@@ -178,7 +178,7 @@ class UNB2(opcua_device): ...@@ -178,7 +178,7 @@ class UNB2(opcua_device):
) )
def read_UNB2_VOUT_error_R(self): def read_UNB2_VOUT_error_R(self):
return self.proxy.UNB2_mask_RW & ( return self.read_attribute("UNB2_mask_RW") & (
self.alarm_val("UNB2_DC_DC_48V_12V_VOUT_R") self.alarm_val("UNB2_DC_DC_48V_12V_VOUT_R")
| self.alarm_val("UNB2_FPGA_POL_CORE_VOUT_R").any(axis=1) | self.alarm_val("UNB2_FPGA_POL_CORE_VOUT_R").any(axis=1)
| self.alarm_val("UNB2_FPGA_POL_ERAM_VOUT_R").any(axis=1) | self.alarm_val("UNB2_FPGA_POL_ERAM_VOUT_R").any(axis=1)
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from tango.test_context import DeviceTestContext
from tango.server import attribute
from tangostationcontrol.devices import lofar_device
import mock
from tangostationcontrol.test import base
class TestLofarDevice(base.TestCase):
def setUp(self):
super(TestLofarDevice, self).setUp()
# Patch DeviceProxy to allow making the proxies during initialisation
# that we otherwise avoid using
for device in [lofar_device]:
proxy_patcher = mock.patch.object(
device, 'DeviceProxy')
proxy_patcher.start()
self.addCleanup(proxy_patcher.stop)
def test_read_attribute(self):
""" Test whether read_attribute really returns the attribute. """
class MyLofarDevice(lofar_device.lofar_device):
@attribute(dtype=float)
def A(self):
return 42.0
@attribute(dtype=float)
def read_attribute_A(self):
return self.read_attribute("A")
@attribute(dtype=(float,), max_dim_x=2)
def B_array(self):
return [42.0, 43.0]
@attribute(dtype=(float,), max_dim_x=2)
def read_attribute_B_array(self):
return self.read_attribute("B_array")
with DeviceTestContext(MyLofarDevice, process=True, timeout=10) as proxy:
proxy.initialise()
self.assertEqual(42.0, proxy.read_attribute_A)
self.assertListEqual([42.0, 43.0], proxy.read_attribute_B_array.tolist())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment