Skip to content
Snippets Groups Projects
Commit ac35e001 authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-392: Add opcua client test that talks to a server instead of mocking one. Fix call_method.

parent 0ba8eaf3
No related branches found
No related tags found
1 merge request!170L2SS-392: Support calling OPC-UA methods.
...@@ -117,7 +117,7 @@ class OPCUAConnection(AsyncCommClient): ...@@ -117,7 +117,7 @@ class OPCUAConnection(AsyncCommClient):
return path return path
async def setup_attribute(self, annotation, attribute): async def setup_protocol_attribute(self, annotation, attribute):
# process the annotation # process the annotation
path = self.get_node_path(annotation) path = self.get_node_path(annotation)
...@@ -142,6 +142,11 @@ class OPCUAConnection(AsyncCommClient): ...@@ -142,6 +142,11 @@ class OPCUAConnection(AsyncCommClient):
except: except:
pass pass
return prot_attr
async def setup_attribute(self, annotation, attribute):
prot_attr = await self.setup_protocol_attribute(annotation, attribute)
# Tango will call these from a separate polling thread. # Tango will call these from a separate polling thread.
def read_function(): def read_function():
return asyncio.run_coroutine_threadsafe(prot_attr.read_function(), self.event_loop).result() return asyncio.run_coroutine_threadsafe(prot_attr.read_function(), self.event_loop).result()
...@@ -157,11 +162,13 @@ class OPCUAConnection(AsyncCommClient): ...@@ -157,11 +162,13 @@ class OPCUAConnection(AsyncCommClient):
method_path = self.get_node_path(method_path) method_path = self.get_node_path(method_path)
try: try:
node = await self.obj.get_child(method_path[:-1]) node = await self.obj.get_child(method_path[:-1]) if len(method_path) > 1 else self.obj
result = await node.call_method(method_path[-1], *args) result = await node.call_method(method_path[-1], *args)
except Exception as e: except Exception as e:
raise Exception(f"Calling method {method_path} failed") from e raise Exception(f"Calling method {method_path} failed") from e
return result
def call_method(self, method_path, *args): def call_method(self, method_path, *args):
return asyncio.run_coroutine_threadsafe(self._call_method(method_path, *args), self.event_loop).result() return asyncio.run_coroutine_threadsafe(self._call_method(method_path, *args), self.event_loop).result()
......
import asyncua
import numpy
from clients.opcua_client import OPCUAConnection
from test import base
class TestClientServer(base.AsyncTestCase):
""" Test the OPCUAConnection against an OPCUA server we instantiate ourselves. """
async def setup_server(self, port):
""" Setup a server on a dedicated port for the test, to allow
the tests to be run in parallel. """
# where we will run the server
self.endpoint = f"opc.tcp://127.0.0.1:{port}"
self.namespace = "http://example.com"
# setup an OPC-UA server
self.server = asyncua.Server()
await self.server.init()
self.server.set_endpoint(self.endpoint)
self.server.set_server_name(f"Test server spawned by {__file__}")
# create an interface
idx = await self.server.register_namespace(self.namespace)
obj = self.server.get_objects_node()
# add double_R/double_RW
double_R = await obj.add_variable(idx, "double_R", 42.0)
double_RW = await obj.add_variable(idx, "double_RW", 42.0)
await double_RW.set_writable()
# add methods
@asyncua.uamethod
def multiply(parent, x, y):
return x * y
@asyncua.uamethod
def procedure(parent):
return
multiply_method = await obj.add_method(idx, "multiply", multiply, [asyncua.ua.VariantType.Double, asyncua.ua.VariantType.Int64], [asyncua.ua.VariantType.Double])
procedure_method = await obj.add_method(idx, "procedure", procedure, [], [])
# run the server
await self.server.start()
async def setUp(self):
self.server = None
async def tearDown(self):
if self.server:
await self.server.stop()
def fault_func(self):
raise Exception("FAULT")
async def test_opcua_connection(self):
await self.setup_server(14840)
test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop)
try:
await test_client.start()
finally:
await test_client.stop()
async def test_read_attribute(self):
await self.setup_server(14841)
test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop)
try:
await test_client.start()
# setup the attribute
class attribute(object):
dim_x = 1
dim_y = 0
numpy_type = numpy.double
prot_attr = await test_client.setup_protocol_attribute(["double_R"], attribute())
# read it from the server
self.assertEqual(42.0, await prot_attr.read_function())
finally:
await test_client.stop()
async def test_write_attribute(self):
await self.setup_server(14842)
test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop)
try:
await test_client.start()
# setup the attribute
class attribute(object):
dim_x = 1
dim_y = 0
numpy_type = numpy.double
prot_attr = await test_client.setup_protocol_attribute(["double_RW"], attribute())
# write it to the server and read it back to verify
await prot_attr.write_function(123.0)
self.assertEqual(123.0, await prot_attr.read_function())
finally:
await test_client.stop()
async def test_method_without_args(self):
await self.setup_server(14843)
test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop)
try:
await test_client.start()
self.assertEqual(None, await test_client._call_method(["procedure"]))
finally:
await test_client.stop()
async def test_method_with_args(self):
await self.setup_server(14843)
test_client = OPCUAConnection(self.endpoint, self.namespace, 5, self.fault_func, self.loop)
try:
await test_client.start()
self.assertEqual(21.0, await test_client._call_method(
["multiply"],
asyncua.ua.Variant(3.0, asyncua.ua.VariantType.Double),
asyncua.ua.Variant(7, asyncua.ua.VariantType.Int64)))
finally:
await test_client.stop()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment