diff --git a/tangostationcontrol/tangostationcontrol/devices/apsct.py b/tangostationcontrol/tangostationcontrol/devices/apsct.py index d26c21dfe51438d3629ceaae5be1026e3a7e8d51..dd8bd1c2a7698d2c2481dbb52d60e99541f7e43b 100644 --- a/tangostationcontrol/tangostationcontrol/devices/apsct.py +++ b/tangostationcontrol/tangostationcontrol/devices/apsct.py @@ -84,16 +84,16 @@ class APSCT(opcua_device): # ---------- # Summarising Attributes # ---------- - APSCT_error_R = attribute(dtype=bool) + APSCT_error_R = attribute(dtype=bool, fisallowed="is_attribute_wrapper_allowed") def read_APSCT_error_R(self): - return ((self.proxy.APSCTTR_I2C_error_R > 0) + return ((self.read_attribute("APSCTTR_I2C_error_R") > 0) or self.alarm_val("APSCT_PCB_ID_R") - or (not self.proxy.APSCT_INPUT_10MHz_good_R) - or (not self.proxy.APSCT_INPUT_PPS_good_R and not self.proxy.APSCT_PPS_ignore_R) - or (not self.proxy.APSCT_PLL_160MHz_locked_R and not self.proxy.APSCT_PLL_200MHz_locked_R) - or (self.proxy.APSCT_PLL_200MHz_locked_R and self.proxy.APSCT_PLL_200MHz_error_R) - or (self.proxy.APSCT_PLL_160MHz_locked_R and self.proxy.APSCT_PLL_160MHz_error_R) + or (not self.read_attribute("APSCT_INPUT_10MHz_good_R")) + or (not self.read_attribute("APSCT_INPUT_PPS_good_R") and not self.read_attribute("APSCT_PPS_ignore_R")) + or (not self.read_attribute("APSCT_PLL_160MHz_locked_R") and not self.read_attribute("APSCT_PLL_200MHz_locked_R")) + or (self.read_attribute("APSCT_PLL_200MHz_locked_R") and self.read_attribute("APSCT_PLL_200MHz_error_R")) + or (self.read_attribute("APSCT_PLL_160MHz_locked_R") and self.read_attribute("APSCT_PLL_160MHz_error_R")) ) APSCT_TEMP_error_R = attribute(dtype=bool) @@ -108,9 +108,9 @@ class APSCT(opcua_device): or self.alarm_val("APSCT_PWR_CLKDIST2_3V3_R") or self.alarm_val("APSCT_PWR_CTRL_3V3_R") or self.alarm_val("APSCT_PWR_INPUT_3V3_R") - or (self.proxy.APSCT_PWR_PLL_160MHz_on_R and self.alarm_val("APSCT_PWR_PLL_160MHz_3V3_R")) - or (self.proxy.APSCT_PWR_PLL_200MHz_on_R and self.alarm_val("APSCT_PWR_PLL_200MHz_3V3_R")) - or (not self.proxy.APSCT_PWR_on_R) + or (self.read_attribute("APSCT_PWR_PLL_160MHz_on_R") and self.alarm_val("APSCT_PWR_PLL_160MHz_3V3_R")) + or (self.read_attribute("APSCT_PWR_PLL_200MHz_on_R") and self.alarm_val("APSCT_PWR_PLL_200MHz_3V3_R")) + or (not self.read_attribute("APSCT_PWR_on_R")) ) # -------- @@ -126,8 +126,8 @@ class APSCT(opcua_device): self.APSCT_200MHz_on() self.wait_attribute("APSCTTR_translator_busy_R", False, self.APSCT_On_Off_timeout) - if not self.proxy.APSCT_PLL_200MHz_locked_R: - if self.proxy.APSCTTR_I2C_error_R: + if not self.read_attribute("APSCT_PLL_200MHz_locked_R"): + if self.read_attribute("APSCTTR_I2C_error_R"): raise Exception("I2C is not working. Maybe power cycle subrack to restart CLK board and translator?") else: raise Exception("200MHz signal is not locked. The subrack probably do not receive clock input or the CLK PCB is broken?") diff --git a/tangostationcontrol/tangostationcontrol/devices/apspu.py b/tangostationcontrol/tangostationcontrol/devices/apspu.py index 6be213696c16efc69ca8f20319efc03cfc0f4fe7..b61b9b0e248b16953a08bad359bf47ec877b0b3d 100644 --- a/tangostationcontrol/tangostationcontrol/devices/apspu.py +++ b/tangostationcontrol/tangostationcontrol/devices/apspu.py @@ -66,15 +66,15 @@ class APSPU(opcua_device): APSPU_error_R = attribute(dtype=bool) def read_APSPU_error_R(self): - return ((self.proxy.APSPUTR_I2C_error_R > 0) + return ((self.read_attribute("APSPUTR_I2C_error_R") > 0) or self.alarm_val("APSPU_PCB_ID_R") or self.alarm_val("APSPU_FAN1_RPM_R") or self.alarm_val("APSPU_FAN2_RPM_R") or self.alarm_val("APSPU_FAN3_RPM_R")) - APSPU_IOUT_error_R = attribute(dtype=bool) - APSPU_TEMP_error_R = attribute(dtype=bool) - APSPU_VOUT_error_R = attribute(dtype=bool) + APSPU_IOUT_error_R = attribute(dtype=bool, fisallowed="is_attribute_wrapper_allowed") + APSPU_TEMP_error_R = attribute(dtype=bool, fisallowed="is_attribute_wrapper_allowed") + APSPU_VOUT_error_R = attribute(dtype=bool, fisallowed="is_attribute_wrapper_allowed") def read_APSPU_IOUT_error_R(self): return ( self.alarm_val("APSPU_LBA_IOUT_R") diff --git a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py index 067f09538d4cb972c0b1a186e726792b69132fbd..8988f2c3fc31e5e870eb9749ab84510004544220 100644 --- a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py @@ -365,6 +365,19 @@ class lofar_device(Device, metaclass=DeviceMeta): """ Override this method to initialise any hardware after configuring it. """ pass + def read_attribute(self, attr_name): + """ Read the value of a certain attribute (directly from the hardware). """ + + # obtain the class information of this attribute, effectively equal + # to getattr(self, attr_name), but this also makes sure we actually + # address an attribute. + class_attribute = self.get_device_attr().get_attr_by_name(attr_name) + + # obtain the low-level wrapper for the read function + read_wrapper = getattr(self, f"__read_{attr_name}_wrapper__") + + # obtain the actual value + return read_wrapper(class_attribute) def wait_attribute(self, attr_name, value, timeout=10, pollperiod=0.2): """ Wait until the given attribute obtains the given value. @@ -385,7 +398,7 @@ class lofar_device(Device, metaclass=DeviceMeta): # Poll every half a second for _ in range(math.ceil(timeout/pollperiod)): - if is_correct(getattr(self.proxy, attr_name)): + if is_correct(self.read_attribute(attr_name)): return time.sleep(pollperiod) @@ -403,7 +416,7 @@ class lofar_device(Device, metaclass=DeviceMeta): is_scalar = attr_config.data_format == AttrDataFormat.SCALAR # fetch attribute value as an array - value = self.proxy.read_attribute(attr_name).value + value = self.read_attribute(attr_name) if is_scalar: value = numpy.array(value) # this stays a scalar in numpy diff --git a/tangostationcontrol/tangostationcontrol/devices/recv.py b/tangostationcontrol/tangostationcontrol/devices/recv.py index eaf0f189b53ca3f64606d196298f6c9dc27708a1..787a51df3e62ed35071cd97e66ffc1851819f74a 100644 --- a/tangostationcontrol/tangostationcontrol/devices/recv.py +++ b/tangostationcontrol/tangostationcontrol/devices/recv.py @@ -255,20 +255,20 @@ class RECV(opcua_device): RCU_LED_colour_R = attribute(dtype=(numpy.uint32,), max_dim_x=32) def read_RCU_LED_colour_R(self): - return (2 * self.proxy.RCU_LED_green_on_R + 4 * self.proxy.RCU_LED_red_on_R).astype(numpy.uint32) + return (2 * self.read_attribute("RCU_LED_green_on_R") + 4 * self.read_attribute("RCU_LED_red_on_R")).astype(numpy.uint32) - RCU_error_R = attribute(dtype=(bool,), max_dim_x=32) - ANT_error_R = attribute(dtype=((bool,),), max_dim_x=3, max_dim_y=32) + RCU_error_R = attribute(dtype=(bool,), max_dim_x=32, fisallowed="is_attribute_wrapper_allowed") + ANT_error_R = attribute(dtype=((bool,),), max_dim_x=3, max_dim_y=32, fisallowed="is_attribute_wrapper_allowed") def read_RCU_error_R(self): - return self.proxy.RCU_mask_RW & ( - (self.proxy.RECVTR_I2C_error_R > 0) + return self.read_attribute("RCU_mask_RW") & ( + (self.read_attribute("RECVTR_I2C_error_R") > 0) | self.alarm_val("RCU_PCB_ID_R") ) def read_ANT_error_R(self): - return self.proxy.ANT_mask_RW & ( - ~self.proxy.RCU_ADC_locked_R + return self.read_attribute("ANT_mask_RW") & ( + ~self.read_attribute("RCU_ADC_locked_R") ) RECV_IOUT_error_R = attribute(dtype=(bool,), max_dim_x=32) @@ -276,7 +276,7 @@ class RECV(opcua_device): RECV_VOUT_error_R = attribute(dtype=(bool,), max_dim_x=32) def read_RECV_IOUT_error_R(self): - return (self.proxy.ANT_mask_RW & ( + return (self.read_attribute("ANT_mask_RW") & ( self.alarm_val("RCU_PWR_ANT_IOUT_R") )).any(axis=1) @@ -287,15 +287,15 @@ class RECV(opcua_device): ) def read_RECV_VOUT_error_R(self): - return (self.proxy.ANT_mask_RW & ( + return (self.read_attribute("ANT_mask_RW") & ( self.alarm_val("RCU_PWR_ANT_VIN_R") | self.alarm_val("RCU_PWR_ANT_VOUT_R") - )).any(axis=1) | (self.proxy.RCU_mask_RW & ( + )).any(axis=1) | (self.read_attribute("RCU_mask_RW") & ( self.alarm_val("RCU_PWR_1V8_R") | self.alarm_val("RCU_PWR_2V5_R") | self.alarm_val("RCU_PWR_3V3_R") - | ~self.proxy.RCU_PWR_DIGITAL_on_R - | ~self.proxy.RCU_PWR_good_R + | ~self.read_attribute("RCU_PWR_DIGITAL_on_R") + | ~self.read_attribute("RCU_PWR_good_R") )) # -------- diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py b/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py index bedae17dac39d06b39635d2ee9a2fed508fac5f4..c4dc14a165f69ce471e7c416fef14dd0771b54cb 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py @@ -166,27 +166,27 @@ class SDP(opcua_device): # ---------- # Summarising Attributes # ---------- - FPGA_error_R = attribute(dtype=(bool,), max_dim_x=16) - FPGA_processing_error_R = attribute(dtype=(bool,), max_dim_x=16) - FPGA_input_error_R = attribute(dtype=(bool,), max_dim_x=16) + FPGA_error_R = attribute(dtype=(bool,), max_dim_x=16, fisallowed="is_attribute_wrapper_allowed") + FPGA_processing_error_R = attribute(dtype=(bool,), max_dim_x=16, fisallowed="is_attribute_wrapper_allowed") + FPGA_input_error_R = attribute(dtype=(bool,), max_dim_x=16, fisallowed="is_attribute_wrapper_allowed") def read_FPGA_error_R(self): - return self.proxy.TR_fpga_mask_RW & ( - self.proxy.TR_fpga_communication_error_R - | (self.proxy.FPGA_firmware_version_R != "") - | (self.proxy.FPGA_jesd204b_csr_dev_syncn_R == 0).any(axis=1) + return self.read_attribute("TR_fpga_mask_R") & ( + self.read_attribute("TR_fpga_communication_error_R") + | (self.read_attribute("FPGA_firmware_version_R") != "") + | (self.read_attribute("FPGA_jesd204b_csr_dev_syncn_R") == 0).any(axis=1) ) def read_FPGA_processing_error_R(self): - return self.proxy.TR_fpga_mask_RW & ( - ~self.proxy.FPGA_processing_enable_R - | (self.proxy.FPGA_boot_image_R == 0) + return self.read_attribute("TR_fpga_mask_R") & ( + ~self.read_attribute("FPGA_processing_enable_R") + | (self.read_attribute("FPGA_boot_image_R") == 0) ) def read_FPGA_input_error_R(self): - return self.proxy.TR_fpga_mask_RW & ( - self.proxy.FPGA_wg_enable_R.any(axis=1) - | (self.proxy.FPGA_signal_input_rms_R == 0).any(axis=1) + return self.read_attribute("TR_fpga_mask_R") & ( + self.read_attribute("FPGA_wg_enable_R").any(axis=1) + | (self.read_attribute("FPGA_signal_input_rms_R") == 0).any(axis=1) ) # -------- diff --git a/tangostationcontrol/tangostationcontrol/devices/unb2.py b/tangostationcontrol/tangostationcontrol/devices/unb2.py index 48b5c3147507cc2b939c88ac303854f823c7ecb8..614055a716ce8a527197aaad159273ebfda220ef 100644 --- a/tangostationcontrol/tangostationcontrol/devices/unb2.py +++ b/tangostationcontrol/tangostationcontrol/devices/unb2.py @@ -130,23 +130,23 @@ class UNB2(opcua_device): # ---------- # Summarising Attributes # ---------- - UNB2_error_R = attribute(dtype=(bool,), max_dim_x=2) + UNB2_error_R = attribute(dtype=(bool,), max_dim_x=2, fisallowed="is_attribute_wrapper_allowed") def read_UNB2_error_R(self): - return self.proxy.UNB2_mask_RW & ( - (self.proxy.UNB2TR_I2C_bus_error_R > 0) + return self.read_attribute("UNB2_mask_RW") & ( + (self.read_attribute("UNB2TR_I2C_bus_error_R") > 0) | self.alarm_val("UNB2_PCB_ID_R") - | (self.proxy.UNB2TR_I2C_bus_DDR4_error_R > 0).any(axis=1) - | (self.proxy.UNB2TR_I2C_bus_FPGA_PS_error_R > 0).any(axis=1) - | (self.proxy.UNB2TR_I2C_bus_QSFP_error_R > 0).any(axis=1) + | (self.read_attribute("UNB2TR_I2C_bus_DDR4_error_R") > 0).any(axis=1) + | (self.read_attribute("UNB2TR_I2C_bus_FPGA_PS_error_R") > 0).any(axis=1) + | (self.read_attribute("UNB2TR_I2C_bus_QSFP_error_R") > 0).any(axis=1) ) - UNB2_IOUT_error_R = attribute(dtype=(bool,), max_dim_x=2) - UNB2_TEMP_error_R = attribute(dtype=(bool,), max_dim_x=2) - UNB2_VOUT_error_R = attribute(dtype=(bool,), max_dim_x=2) + UNB2_IOUT_error_R = attribute(dtype=(bool,), max_dim_x=2, fisallowed="is_attribute_wrapper_allowed") + UNB2_TEMP_error_R = attribute(dtype=(bool,), max_dim_x=2, fisallowed="is_attribute_wrapper_allowed") + UNB2_VOUT_error_R = attribute(dtype=(bool,), max_dim_x=2, fisallowed="is_attribute_wrapper_allowed") def read_UNB2_IOUT_error_R(self): - return self.proxy.UNB2_mask_RW & ( + return self.read_attribute("UNB2_mask_RW") & ( self.alarm_val("UNB2_DC_DC_48V_12V_IOUT_R") | self.alarm_val("UNB2_FPGA_POL_CORE_IOUT_R").any(axis=1) | self.alarm_val("UNB2_FPGA_POL_ERAM_IOUT_R").any(axis=1) @@ -178,7 +178,7 @@ class UNB2(opcua_device): ) def read_UNB2_VOUT_error_R(self): - return self.proxy.UNB2_mask_RW & ( + return self.read_attribute("UNB2_mask_RW") & ( self.alarm_val("UNB2_DC_DC_48V_12V_VOUT_R") | self.alarm_val("UNB2_FPGA_POL_CORE_VOUT_R").any(axis=1) | self.alarm_val("UNB2_FPGA_POL_ERAM_VOUT_R").any(axis=1) diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_lofar_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_lofar_device.py new file mode 100644 index 0000000000000000000000000000000000000000..46004707ea59c681015b987ce97adb26931a189a --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_lofar_device.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +from tango.test_context import DeviceTestContext +from tango.server import attribute + +from tangostationcontrol.devices import lofar_device + +import mock + +from tangostationcontrol.test import base + +class TestLofarDevice(base.TestCase): + def setUp(self): + super(TestLofarDevice, self).setUp() + + # Patch DeviceProxy to allow making the proxies during initialisation + # that we otherwise avoid using + for device in [lofar_device]: + proxy_patcher = mock.patch.object( + device, 'DeviceProxy') + proxy_patcher.start() + self.addCleanup(proxy_patcher.stop) + + def test_read_attribute(self): + """ Test whether read_attribute really returns the attribute. """ + + class MyLofarDevice(lofar_device.lofar_device): + @attribute(dtype=float) + def A(self): + return 42.0 + + @attribute(dtype=float) + def read_attribute_A(self): + return self.read_attribute("A") + + @attribute(dtype=(float,), max_dim_x=2) + def B_array(self): + return [42.0, 43.0] + + @attribute(dtype=(float,), max_dim_x=2) + def read_attribute_B_array(self): + return self.read_attribute("B_array") + + with DeviceTestContext(MyLofarDevice, process=True, timeout=10) as proxy: + proxy.initialise() + self.assertEqual(42.0, proxy.read_attribute_A) + self.assertListEqual([42.0, 43.0], proxy.read_attribute_B_array.tolist())