diff --git a/tangostationcontrol/tangostationcontrol/clients/opcua_client.py b/tangostationcontrol/tangostationcontrol/clients/opcua_client.py index cd2671095e6d53da917d9f5ee52d186b118e541b..ce4447de3b9c8339bd36cc1e75a04ec6867b04de 100644 --- a/tangostationcontrol/tangostationcontrol/clients/opcua_client.py +++ b/tangostationcontrol/tangostationcontrol/clients/opcua_client.py @@ -27,6 +27,20 @@ numpy_to_OPCua_dict = { numpy.str: asyncua.ua.VariantType.String } +def numpy_to_opcua(numpy_val): + """ Convert a numpy type to a corresponding opcua Variant type. """ + + numpy_type = type(numpy_val) + + assert numpy_type not in [list, numpy.array], "Converting arrays not yet supported." + + try: + ua_type = numpy_to_OPCua_dict[numpy_type] + except KeyError as e: + raise TypeError(f"Could not convert {numpy_val} (type {type(numpy_val).__name__}) to an OPC UA type.") from e + + return asyncua.ua.uatypes.Variant(Value=numpy_val, VariantType=ua_type) + class OPCUAConnection(AsyncCommClient): """ Connects to OPC-UA in the foreground or background, and sends HELLO @@ -117,7 +131,7 @@ class OPCUAConnection(AsyncCommClient): return path - async def setup_attribute(self, annotation, attribute): + async def setup_protocol_attribute(self, annotation, attribute): # process the annotation path = self.get_node_path(annotation) @@ -142,6 +156,11 @@ class OPCUAConnection(AsyncCommClient): except: pass + return prot_attr + + async def setup_attribute(self, annotation, attribute): + prot_attr = await self.setup_protocol_attribute(annotation, attribute) + # Tango will call these from a separate polling thread. def read_function(): return asyncio.run_coroutine_threadsafe(prot_attr.read_function(), self.event_loop).result() @@ -153,15 +172,24 @@ class OPCUAConnection(AsyncCommClient): return read_function, write_function - async def call_method(self, method_path, *args): - node = await self.obj.get_child(method_path[:-1]) - return await node.call_method(method_path[-1], *args) + async def _call_method(self, method_path, *args): + method_path = self.get_node_path(method_path) + # convert the arguments to OPC UA types + args = [numpy_to_opcua(arg) for arg in args] - def call_method(self, method_path, *args): - method_path = self.get_node_path(method_path) + try: + # call method in its parent node + node = await self.obj.get_child(method_path[:-1]) if len(method_path) > 1 else self.obj + result = await node.call_method(method_path[-1], *args) + except Exception as e: + raise Exception(f"Calling method {method_path} failed") from e + + return result - raise NotImplementedError + + def call_method(self, method_path, *args): + return asyncio.run_coroutine_threadsafe(self._call_method(method_path, *args), self.event_loop).result() class ProtocolAttribute: diff --git a/tangostationcontrol/tangostationcontrol/devices/apsct.py b/tangostationcontrol/tangostationcontrol/devices/apsct.py index 78a0626d8a12985d944c752e544aff93209373be..b32c5d0c8b1637a99f7e429c44083246491fe332 100644 --- a/tangostationcontrol/tangostationcontrol/devices/apsct.py +++ b/tangostationcontrol/tangostationcontrol/devices/apsct.py @@ -103,32 +103,22 @@ class APSCT(opcua_device): @command() @DebugIt() @only_when_on() - def CLK_off(self): + def APSCT_off(self): """ :return:None """ - self.opcua_connection.call_method(["CLK_off"]) + self.opcua_connection.call_method(["APSCT_off"]) @command() @DebugIt() @only_when_on() - def CLK_on(self): + def APSCT_on(self): """ :return:None """ - self.opcua_connection.call_method(["CLK_on"]) - - @command() - @DebugIt() - @only_when_on() - def CLK_PLL_setup(self): - """ - - :return:None - """ - self.opcua_connection.call_method(["CLK_PLL_setup"]) + self.opcua_connection.call_method(["APSCT_on"]) # ---------- # Run server diff --git a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py index d37d4e6acd3ea47053c02ff3c9e3e5ba36bee6d5..8886abd8a40b38df1fa9ec181783d9f4a2ff08ae 100644 --- a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py @@ -16,7 +16,6 @@ from abc import abstractmethod # PyTango imports from tango.server import Device, command, DeviceMeta, attribute from tango import AttrWriteType, DevState, DebugIt, Attribute, DeviceProxy - import time import math @@ -87,6 +86,15 @@ class lofar_device(Device, metaclass=AbstractDeviceMetas): self.set_state(DevState.OFF) self.set_status("Device is in the OFF state.") + # register a proxy to ourselves, to interact with + # our attributes and commands as a client would. + # + # this is required to get/set attributes. + # + # we cannot write directly to our attribute, as that would not + # trigger a write_{name} call. See https://www.tango-controls.org/community/forum/c/development/c/accessing-own-deviceproxy-class/?page=1#post-2021 + self.proxy = DeviceProxy(self.get_name()) + @log_exceptions() def delete_device(self): """Hook to delete resources allocated in init_device. @@ -231,12 +239,6 @@ class lofar_device(Device, metaclass=AbstractDeviceMetas): 2) Any remaining default properties are set. """ - # we cannot write directly to our attribute, as that would not - # trigger a write_{name} call. See https://www.tango-controls.org/community/forum/c/development/c/accessing-own-deviceproxy-class/?page=1#post-2021 - - # obtain a proxy to myself, to write values - proxy = DeviceProxy(self.get_name()) - # collect all attributes for which defaults are provided attributes_with_defaults = [name for name in dir(self) # collect all attribute members @@ -254,7 +256,7 @@ class lofar_device(Device, metaclass=AbstractDeviceMetas): # set the attribute to the configured default logger.debug(f"Setting attribute {name} to {default_value}") - proxy.write_attribute(name, default_value) + self.proxy.write_attribute(name, default_value) except Exception as e: # log which attribute we're addressing raise Exception(f"Cannot assign default to attribute {name}") from e @@ -281,13 +283,11 @@ class lofar_device(Device, metaclass=AbstractDeviceMetas): pollperiod: how often to check the attribute, in seconds. """ - attr = getattr(self, attr_name) - # Poll every half a second for _ in range(math.ceil(timeout/pollperiod)): - if attr != value: + if getattr(self.proxy, attr_name) != value: return time.sleep(pollperiod) - raise Exception(f"{attr} != {value} after f{timeout} seconds still.") + raise Exception(f"{attr_name} != {value} after {timeout} seconds still.") diff --git a/tangostationcontrol/tangostationcontrol/devices/recv.py b/tangostationcontrol/tangostationcontrol/devices/recv.py index a9eee4160f9b7a6f981119c58ad2e707df0cf717..a920dd59e393da0d5497700e31fb064fedae7ad9 100644 --- a/tangostationcontrol/tangostationcontrol/devices/recv.py +++ b/tangostationcontrol/tangostationcontrol/devices/recv.py @@ -57,8 +57,8 @@ class RECV(opcua_device): # ---------- # Attributes # ---------- - ANT_status_R = attribute(dtype=str, max_dim_x=3, max_dim_y=32) - RCU_LED_colour_R = attribute(dtype=numpy.uint32, max_dim_x=32, fget=lambda self: (2 * self.RCU_LED_green_on_R + 4 * self.RCU_LED_red_on_R).astype(numpy.uint32)) + ANT_status_R = attribute(dtype=(str,), max_dim_x=3, max_dim_y=32) + RCU_LED_colour_R = attribute(dtype=(numpy.uint32,), max_dim_x=32, fget=lambda self: (2 * self.proxy.RCU_LED_green_on_R + 4 * self.proxy.RCU_LED_red_on_R).astype(numpy.uint32)) ANT_mask_RW = attribute_wrapper(comms_annotation=["ANT_mask_RW" ],datatype=numpy.bool_ , dims=(3,32), access=AttrWriteType.READ_WRITE) HBAT_BF_delays_R = attribute_wrapper(comms_annotation=["HBAT_BF_delays_R" ],datatype=numpy.int64 , dims=(32,96)) @@ -131,22 +131,22 @@ class RECV(opcua_device): @command() @DebugIt() @only_when_on() - def ADC_on(self): + def RCU_DTH_off(self): """ :return:None """ - self.opcua_connection.call_method(["ADC_on"]) + self.opcua_connection.call_method(["RCU_DTH_off"]) @command() @DebugIt() @only_when_on() - def RCU_update(self): + def RCU_DTH_on(self): """ :return:None """ - self.opcua_connection.call_method(["RCU_update"]) + self.opcua_connection.call_method(["RCU_DTH_on"]) def _initialise_hardware(self): """ Initialise the RCU hardware. """ @@ -170,8 +170,8 @@ class RECV(opcua_device): This function can be used as input to modify the RCU_mask_RW. """ - rcu_mask = self.RCU_mask_RW - i2c_errors = self.RCU_I2C_STATUS_R + rcu_mask = self.proxy.RCU_mask_RW + i2c_errors = self.proxy.RCU_I2C_STATUS_R nr_rcus = len(rcu_mask) rcu_status = [""] * nr_rcus @@ -195,10 +195,10 @@ class RECV(opcua_device): This function can be used as input to modify the Ant_mask_RW. """ - ant_mask = self.ANT_mask_RW - rcu_mask = self.RCU_mask_RW - adc_lock = self.RCU_ADC_locked_R - i2c_errors = self.RCU_I2C_STATUS_R + ant_mask = self.proxy.ANT_mask_RW + rcu_mask = self.proxy.RCU_mask_RW + adc_lock = self.proxy.RCU_ADC_locked_R + i2c_errors = self.proxy.RCU_I2C_STATUS_R nr_rcus = len(ant_mask) nr_ants_per_rcu = len(ant_mask[0]) diff --git a/tangostationcontrol/tangostationcontrol/devices/unb2.py b/tangostationcontrol/tangostationcontrol/devices/unb2.py index 0076a37a25c0838f8324a409f08223fab79b92f1..9ca54c5fbc4eaaba2a205744cad23af1bb139691 100644 --- a/tangostationcontrol/tangostationcontrol/devices/unb2.py +++ b/tangostationcontrol/tangostationcontrol/devices/unb2.py @@ -12,15 +12,16 @@ """ # PyTango imports -from tango.server import run +from tango.server import run, command from tango.server import device_property, attribute -from tango import AttrWriteType +from tango import AttrWriteType, DebugIt # Additional import from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper from tangostationcontrol.devices.opcua_device import opcua_device from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions +from tangostationcontrol.devices.device_decorators import only_when_on import numpy @@ -121,6 +122,26 @@ class UNB2(opcua_device): # Commands # -------- + @command() + @DebugIt() + @only_when_on() + def UNB2_off(self): + """ + + :return:None + """ + self.opcua_connection.call_method(["UNB2_off"]) + + @command() + @DebugIt() + @only_when_on() + def UNB2_on(self): + """ + + :return:None + """ + self.opcua_connection.call_method(["UNB2_on"]) + # ---------- # Run server # ---------- diff --git a/tangostationcontrol/tangostationcontrol/integration_test/client/test_opcua_client_against_server.py b/tangostationcontrol/tangostationcontrol/integration_test/client/test_opcua_client_against_server.py new file mode 100644 index 0000000000000000000000000000000000000000..bdd4e43b94b1a98596f339e220ee31116818fc37 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/integration_test/client/test_opcua_client_against_server.py @@ -0,0 +1,163 @@ +import asyncua +import numpy + +from tangostationcontrol.clients.opcua_client import OPCUAConnection + +from tangostationcontrol.integration_test import base + + +class TestClientServer(base.IntegrationAsyncTestCase): + """ Test the OPCUAConnection against an OPCUA server we instantiate ourselves. """ + + async def setup_server(self, port): + """ Setup a server on a dedicated port for the test, to allow + the tests to be run in parallel. """ + + # where we will run the server + self.endpoint = f"opc.tcp://127.0.0.1:{port}" + self.namespace = "http://example.com" + + # setup an OPC-UA server + self.server = asyncua.Server() + await self.server.init() + self.server.set_endpoint(self.endpoint) + self.server.set_server_name(f"Test server spawned by {__file__}") + + # create an interface + idx = await self.server.register_namespace(self.namespace) + obj = self.server.get_objects_node() + + # add double_R/double_RW + double_R = await obj.add_variable(idx, "double_R", 42.0) + double_RW = await obj.add_variable(idx, "double_RW", 42.0) + await double_RW.set_writable() + + # add methods + @asyncua.uamethod + def multiply(parent, x, y): + self.assertEqual(float, type(x)) + self.assertEqual(int, type(y)) + return x * y + + @asyncua.uamethod + def procedure(parent): + return + + @asyncua.uamethod + def throws(parent): + raise Exception("Expected test exception") + + multiply_method = await obj.add_method(idx, "multiply", multiply, [asyncua.ua.VariantType.Double, asyncua.ua.VariantType.Int64], [asyncua.ua.VariantType.Double]) + procedure_method = await obj.add_method(idx, "procedure", procedure, [], []) + throws_method = await obj.add_method(idx, "throws", throws, [], []) + + # run the server + await self.server.start() + + async def setUp(self): + self.server = None + + async def tearDown(self): + if self.server: + await self.server.stop() + + def fault_func(self): + raise Exception("FAULT") + + async def test_opcua_connection(self): + await self.setup_server(14840) + + test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop) + try: + await test_client.start() + finally: + await test_client.stop() + + async def test_read_attribute(self): + await self.setup_server(14841) + + test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop) + try: + await test_client.start() + + # setup the attribute + class attribute(object): + dim_x = 1 + dim_y = 0 + numpy_type = numpy.double + + prot_attr = await test_client.setup_protocol_attribute(["double_R"], attribute()) + + # read it from the server + self.assertEqual(42.0, await prot_attr.read_function()) + finally: + await test_client.stop() + + async def test_write_attribute(self): + await self.setup_server(14842) + + test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop) + try: + await test_client.start() + + # setup the attribute + class attribute(object): + dim_x = 1 + dim_y = 0 + numpy_type = numpy.double + + prot_attr = await test_client.setup_protocol_attribute(["double_RW"], attribute()) + + # write it to the server and read it back to verify + await prot_attr.write_function(123.0) + + self.assertEqual(123.0, await prot_attr.read_function()) + finally: + await test_client.stop() + + async def test_method_without_args(self): + await self.setup_server(14843) + + test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop) + try: + await test_client.start() + + self.assertEqual(None, await test_client._call_method(["procedure"])) + finally: + await test_client.stop() + + async def test_method_with_args(self): + await self.setup_server(14843) + + test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop) + try: + await test_client.start() + + self.assertEqual(21.0, await test_client._call_method(["multiply"], numpy.double(3.0), numpy.int64(7))) + finally: + await test_client.stop() + + async def test_method_with_wrong_arg_types(self): + await self.setup_server(14844) + + test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop) + try: + await test_client.start() + + with self.assertRaises(Exception): + # correct signature is multiply(double,int64) + _ = await test_client._call_method(["multiply"], numpy.double(3.0), numpy.double(7)) + finally: + await test_client.stop() + + async def test_errorring_method(self): + await self.setup_server(14845) + + test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop) + try: + await test_client.start() + + with self.assertRaises(Exception): + await test_client._call_method(["throws"]) + finally: + await test_client.stop() diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py b/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py index ef5356e6b91dc391055bb611cdfbf66e38149a7c..24ba5f506023f4260a35958cba568936cb2ad76f 100644 --- a/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py @@ -13,12 +13,14 @@ from tango import DevState from tangostationcontrol.test.clients.test_client import test_client from tangostationcontrol.clients.attribute_wrapper import * from tangostationcontrol.devices.lofar_device import * +import tangostationcontrol.devices.lofar_device # Test imports from tango.test_context import DeviceTestContext from tangostationcontrol.test import base import asyncio +import mock scalar_dims = (1,) spectrum_dims = (4,) @@ -38,6 +40,11 @@ def dev_init(device): class TestAttributeTypes(base.TestCase): + def setUp(self): + # Avoid the device trying to access itself as a client + self.deviceproxy_patch = mock.patch.object(tangostationcontrol.devices.lofar_device,'DeviceProxy') + self.deviceproxy_patch.start() + self.addCleanup(self.deviceproxy_patch.stop) class str_scalar_device(lofar_device): scalar_R = attribute_wrapper(comms_annotation="str_scalar_R", datatype=numpy.str)