Skip to content
Snippets Groups Projects
Commit 696f37fe authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'master' into 'L2SS-334_2021-10-21_replace_streams_with_logger'

# Conflicts:
#   tangostationcontrol/tangostationcontrol/devices/lofar_device.py
parents 0ec06652 b55d1790
No related branches found
No related tags found
1 merge request!164Resolve L2SS-334 "2021 10 21 replace streams with logger"
......@@ -27,6 +27,20 @@ numpy_to_OPCua_dict = {
numpy.str: asyncua.ua.VariantType.String
}
def numpy_to_opcua(numpy_val):
""" Convert a numpy type to a corresponding opcua Variant type. """
numpy_type = type(numpy_val)
assert numpy_type not in [list, numpy.array], "Converting arrays not yet supported."
try:
ua_type = numpy_to_OPCua_dict[numpy_type]
except KeyError as e:
raise TypeError(f"Could not convert {numpy_val} (type {type(numpy_val).__name__}) to an OPC UA type.") from e
return asyncua.ua.uatypes.Variant(Value=numpy_val, VariantType=ua_type)
class OPCUAConnection(AsyncCommClient):
"""
Connects to OPC-UA in the foreground or background, and sends HELLO
......@@ -117,7 +131,7 @@ class OPCUAConnection(AsyncCommClient):
return path
async def setup_attribute(self, annotation, attribute):
async def setup_protocol_attribute(self, annotation, attribute):
# process the annotation
path = self.get_node_path(annotation)
......@@ -142,6 +156,11 @@ class OPCUAConnection(AsyncCommClient):
except:
pass
return prot_attr
async def setup_attribute(self, annotation, attribute):
prot_attr = await self.setup_protocol_attribute(annotation, attribute)
# Tango will call these from a separate polling thread.
def read_function():
return asyncio.run_coroutine_threadsafe(prot_attr.read_function(), self.event_loop).result()
......@@ -153,15 +172,24 @@ class OPCUAConnection(AsyncCommClient):
return read_function, write_function
async def call_method(self, method_path, *args):
node = await self.obj.get_child(method_path[:-1])
return await node.call_method(method_path[-1], *args)
async def _call_method(self, method_path, *args):
method_path = self.get_node_path(method_path)
# convert the arguments to OPC UA types
args = [numpy_to_opcua(arg) for arg in args]
def call_method(self, method_path, *args):
method_path = self.get_node_path(method_path)
try:
# call method in its parent node
node = await self.obj.get_child(method_path[:-1]) if len(method_path) > 1 else self.obj
result = await node.call_method(method_path[-1], *args)
except Exception as e:
raise Exception(f"Calling method {method_path} failed") from e
return result
raise NotImplementedError
def call_method(self, method_path, *args):
return asyncio.run_coroutine_threadsafe(self._call_method(method_path, *args), self.event_loop).result()
class ProtocolAttribute:
......
......@@ -103,32 +103,22 @@ class APSCT(opcua_device):
@command()
@DebugIt()
@only_when_on()
def CLK_off(self):
def APSCT_off(self):
"""
:return:None
"""
self.opcua_connection.call_method(["CLK_off"])
self.opcua_connection.call_method(["APSCT_off"])
@command()
@DebugIt()
@only_when_on()
def CLK_on(self):
def APSCT_on(self):
"""
:return:None
"""
self.opcua_connection.call_method(["CLK_on"])
@command()
@DebugIt()
@only_when_on()
def CLK_PLL_setup(self):
"""
:return:None
"""
self.opcua_connection.call_method(["CLK_PLL_setup"])
self.opcua_connection.call_method(["APSCT_on"])
# ----------
# Run server
......
......@@ -16,7 +16,6 @@ from abc import abstractmethod
# PyTango imports
from tango.server import Device, command, DeviceMeta, attribute
from tango import AttrWriteType, DevState, DebugIt, Attribute, DeviceProxy
import time
import math
......@@ -87,6 +86,15 @@ class lofar_device(Device, metaclass=AbstractDeviceMetas):
self.set_state(DevState.OFF)
self.set_status("Device is in the OFF state.")
# register a proxy to ourselves, to interact with
# our attributes and commands as a client would.
#
# this is required to get/set attributes.
#
# we cannot write directly to our attribute, as that would not
# trigger a write_{name} call. See https://www.tango-controls.org/community/forum/c/development/c/accessing-own-deviceproxy-class/?page=1#post-2021
self.proxy = DeviceProxy(self.get_name())
@log_exceptions()
def delete_device(self):
"""Hook to delete resources allocated in init_device.
......@@ -231,12 +239,6 @@ class lofar_device(Device, metaclass=AbstractDeviceMetas):
2) Any remaining default properties are set.
"""
# we cannot write directly to our attribute, as that would not
# trigger a write_{name} call. See https://www.tango-controls.org/community/forum/c/development/c/accessing-own-deviceproxy-class/?page=1#post-2021
# obtain a proxy to myself, to write values
proxy = DeviceProxy(self.get_name())
# collect all attributes for which defaults are provided
attributes_with_defaults = [name for name in dir(self)
# collect all attribute members
......@@ -254,7 +256,7 @@ class lofar_device(Device, metaclass=AbstractDeviceMetas):
# set the attribute to the configured default
logger.debug(f"Setting attribute {name} to {default_value}")
proxy.write_attribute(name, default_value)
self.proxy.write_attribute(name, default_value)
except Exception as e:
# log which attribute we're addressing
raise Exception(f"Cannot assign default to attribute {name}") from e
......@@ -281,13 +283,11 @@ class lofar_device(Device, metaclass=AbstractDeviceMetas):
pollperiod: how often to check the attribute, in seconds.
"""
attr = getattr(self, attr_name)
# Poll every half a second
for _ in range(math.ceil(timeout/pollperiod)):
if attr != value:
if getattr(self.proxy, attr_name) != value:
return
time.sleep(pollperiod)
raise Exception(f"{attr} != {value} after f{timeout} seconds still.")
raise Exception(f"{attr_name} != {value} after {timeout} seconds still.")
......@@ -57,8 +57,8 @@ class RECV(opcua_device):
# ----------
# Attributes
# ----------
ANT_status_R = attribute(dtype=str, max_dim_x=3, max_dim_y=32)
RCU_LED_colour_R = attribute(dtype=numpy.uint32, max_dim_x=32, fget=lambda self: (2 * self.RCU_LED_green_on_R + 4 * self.RCU_LED_red_on_R).astype(numpy.uint32))
ANT_status_R = attribute(dtype=(str,), max_dim_x=3, max_dim_y=32)
RCU_LED_colour_R = attribute(dtype=(numpy.uint32,), max_dim_x=32, fget=lambda self: (2 * self.proxy.RCU_LED_green_on_R + 4 * self.proxy.RCU_LED_red_on_R).astype(numpy.uint32))
ANT_mask_RW = attribute_wrapper(comms_annotation=["ANT_mask_RW" ],datatype=numpy.bool_ , dims=(3,32), access=AttrWriteType.READ_WRITE)
HBAT_BF_delays_R = attribute_wrapper(comms_annotation=["HBAT_BF_delays_R" ],datatype=numpy.int64 , dims=(32,96))
......@@ -131,22 +131,22 @@ class RECV(opcua_device):
@command()
@DebugIt()
@only_when_on()
def ADC_on(self):
def RCU_DTH_off(self):
"""
:return:None
"""
self.opcua_connection.call_method(["ADC_on"])
self.opcua_connection.call_method(["RCU_DTH_off"])
@command()
@DebugIt()
@only_when_on()
def RCU_update(self):
def RCU_DTH_on(self):
"""
:return:None
"""
self.opcua_connection.call_method(["RCU_update"])
self.opcua_connection.call_method(["RCU_DTH_on"])
def _initialise_hardware(self):
""" Initialise the RCU hardware. """
......@@ -170,8 +170,8 @@ class RECV(opcua_device):
This function can be used as input to modify the RCU_mask_RW. """
rcu_mask = self.RCU_mask_RW
i2c_errors = self.RCU_I2C_STATUS_R
rcu_mask = self.proxy.RCU_mask_RW
i2c_errors = self.proxy.RCU_I2C_STATUS_R
nr_rcus = len(rcu_mask)
rcu_status = [""] * nr_rcus
......@@ -195,10 +195,10 @@ class RECV(opcua_device):
This function can be used as input to modify the Ant_mask_RW. """
ant_mask = self.ANT_mask_RW
rcu_mask = self.RCU_mask_RW
adc_lock = self.RCU_ADC_locked_R
i2c_errors = self.RCU_I2C_STATUS_R
ant_mask = self.proxy.ANT_mask_RW
rcu_mask = self.proxy.RCU_mask_RW
adc_lock = self.proxy.RCU_ADC_locked_R
i2c_errors = self.proxy.RCU_I2C_STATUS_R
nr_rcus = len(ant_mask)
nr_ants_per_rcu = len(ant_mask[0])
......
......@@ -12,15 +12,16 @@
"""
# PyTango imports
from tango.server import run
from tango.server import run, command
from tango.server import device_property, attribute
from tango import AttrWriteType
from tango import AttrWriteType, DebugIt
# Additional import
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
from tangostationcontrol.devices.opcua_device import opcua_device
from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions
from tangostationcontrol.devices.device_decorators import only_when_on
import numpy
......@@ -121,6 +122,26 @@ class UNB2(opcua_device):
# Commands
# --------
@command()
@DebugIt()
@only_when_on()
def UNB2_off(self):
"""
:return:None
"""
self.opcua_connection.call_method(["UNB2_off"])
@command()
@DebugIt()
@only_when_on()
def UNB2_on(self):
"""
:return:None
"""
self.opcua_connection.call_method(["UNB2_on"])
# ----------
# Run server
# ----------
......
import asyncua
import numpy
from tangostationcontrol.clients.opcua_client import OPCUAConnection
from tangostationcontrol.integration_test import base
class TestClientServer(base.IntegrationAsyncTestCase):
""" Test the OPCUAConnection against an OPCUA server we instantiate ourselves. """
async def setup_server(self, port):
""" Setup a server on a dedicated port for the test, to allow
the tests to be run in parallel. """
# where we will run the server
self.endpoint = f"opc.tcp://127.0.0.1:{port}"
self.namespace = "http://example.com"
# setup an OPC-UA server
self.server = asyncua.Server()
await self.server.init()
self.server.set_endpoint(self.endpoint)
self.server.set_server_name(f"Test server spawned by {__file__}")
# create an interface
idx = await self.server.register_namespace(self.namespace)
obj = self.server.get_objects_node()
# add double_R/double_RW
double_R = await obj.add_variable(idx, "double_R", 42.0)
double_RW = await obj.add_variable(idx, "double_RW", 42.0)
await double_RW.set_writable()
# add methods
@asyncua.uamethod
def multiply(parent, x, y):
self.assertEqual(float, type(x))
self.assertEqual(int, type(y))
return x * y
@asyncua.uamethod
def procedure(parent):
return
@asyncua.uamethod
def throws(parent):
raise Exception("Expected test exception")
multiply_method = await obj.add_method(idx, "multiply", multiply, [asyncua.ua.VariantType.Double, asyncua.ua.VariantType.Int64], [asyncua.ua.VariantType.Double])
procedure_method = await obj.add_method(idx, "procedure", procedure, [], [])
throws_method = await obj.add_method(idx, "throws", throws, [], [])
# run the server
await self.server.start()
async def setUp(self):
self.server = None
async def tearDown(self):
if self.server:
await self.server.stop()
def fault_func(self):
raise Exception("FAULT")
async def test_opcua_connection(self):
await self.setup_server(14840)
test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop)
try:
await test_client.start()
finally:
await test_client.stop()
async def test_read_attribute(self):
await self.setup_server(14841)
test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop)
try:
await test_client.start()
# setup the attribute
class attribute(object):
dim_x = 1
dim_y = 0
numpy_type = numpy.double
prot_attr = await test_client.setup_protocol_attribute(["double_R"], attribute())
# read it from the server
self.assertEqual(42.0, await prot_attr.read_function())
finally:
await test_client.stop()
async def test_write_attribute(self):
await self.setup_server(14842)
test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop)
try:
await test_client.start()
# setup the attribute
class attribute(object):
dim_x = 1
dim_y = 0
numpy_type = numpy.double
prot_attr = await test_client.setup_protocol_attribute(["double_RW"], attribute())
# write it to the server and read it back to verify
await prot_attr.write_function(123.0)
self.assertEqual(123.0, await prot_attr.read_function())
finally:
await test_client.stop()
async def test_method_without_args(self):
await self.setup_server(14843)
test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop)
try:
await test_client.start()
self.assertEqual(None, await test_client._call_method(["procedure"]))
finally:
await test_client.stop()
async def test_method_with_args(self):
await self.setup_server(14843)
test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop)
try:
await test_client.start()
self.assertEqual(21.0, await test_client._call_method(["multiply"], numpy.double(3.0), numpy.int64(7)))
finally:
await test_client.stop()
async def test_method_with_wrong_arg_types(self):
await self.setup_server(14844)
test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop)
try:
await test_client.start()
with self.assertRaises(Exception):
# correct signature is multiply(double,int64)
_ = await test_client._call_method(["multiply"], numpy.double(3.0), numpy.double(7))
finally:
await test_client.stop()
async def test_errorring_method(self):
await self.setup_server(14845)
test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop)
try:
await test_client.start()
with self.assertRaises(Exception):
await test_client._call_method(["throws"])
finally:
await test_client.stop()
......@@ -13,12 +13,14 @@ from tango import DevState
from tangostationcontrol.test.clients.test_client import test_client
from tangostationcontrol.clients.attribute_wrapper import *
from tangostationcontrol.devices.lofar_device import *
import tangostationcontrol.devices.lofar_device
# Test imports
from tango.test_context import DeviceTestContext
from tangostationcontrol.test import base
import asyncio
import mock
scalar_dims = (1,)
spectrum_dims = (4,)
......@@ -38,6 +40,11 @@ def dev_init(device):
class TestAttributeTypes(base.TestCase):
def setUp(self):
# Avoid the device trying to access itself as a client
self.deviceproxy_patch = mock.patch.object(tangostationcontrol.devices.lofar_device,'DeviceProxy')
self.deviceproxy_patch.start()
self.addCleanup(self.deviceproxy_patch.stop)
class str_scalar_device(lofar_device):
scalar_R = attribute_wrapper(comms_annotation="str_scalar_R", datatype=numpy.str)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment