Skip to content
Snippets Groups Projects
Commit 74acc067 authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-412: Use asynctest.TestCase to simplify asyncio tests, move event_loop...

L2SS-412: Use asynctest.TestCase to simplify asyncio tests, move event_loop out of ProtocolAttribute
parent 17f19999
No related branches found
No related tags found
1 merge request!142L2SS-412: Use asyncio for opcua and other clients
......@@ -121,7 +121,7 @@ class OPCUAConnection(AsyncCommClient):
ua_type = numpy_to_OPCua_dict[attribute.numpy_type] # convert the numpy type to a corresponding UA type
# configure and return the read/write functions
prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type, self.event_loop)
prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type)
try:
# NOTE: debug statement tries to get the qualified name, this may not always work. in that case forgo the name and just print the path
......@@ -129,8 +129,16 @@ class OPCUAConnection(AsyncCommClient):
self.streams.debug_stream("connected OPC ua node {} of type {} to attribute with dimensions: {} x {} ".format(str(node_name)[:len(node_name)-1], str(ua_type)[len("VariantType."):], dim_x, dim_y))
except:
pass
# Tango will call these from a separate polling thread.
def read_function():
asyncio.run_coroutine_threadsafe(prot_attr.read_function(), self.event_loop)
def write_function(value):
asyncio.run_coroutine_threadsafe(prot_attr.write_function(value), self.event_loop)
# return the read/write functions
return prot_attr.read_function, prot_attr.write_function
return read_function, write_function
async def call_method(self, method_path, *args):
......@@ -143,23 +151,18 @@ class ProtocolAttribute:
This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
"""
def __init__(self, node, dim_x, dim_y, ua_type, event_loop):
def __init__(self, node, dim_x, dim_y, ua_type):
self.node = node
self.dim_y = dim_y
self.dim_x = dim_x
self.ua_type = ua_type
self.event_loop = event_loop
async def _read_value(self):
return await self.node.get_value()
def read_function(self):
async def read_function(self):
"""
Read_R function
"""
future = asyncio.run_coroutine_threadsafe(self._read_value(), self.event_loop)
value = future.result()
value = await self.node.get_value()
try:
if self.dim_y + self.dim_x == 1:
......@@ -178,7 +181,18 @@ class ProtocolAttribute:
raise ValueError(f"Failed to parse atribute value retrieved from OPC-UA: {value}") from e
async def _write_value(self, value):
async def write_function(self, value):
"""
write_RW function
"""
if self.dim_y != 0:
# flatten array, convert to python array
value = numpy.concatenate(value).tolist()
elif self.dim_x != 1:
# make sure it is a python array
value = value.tolist() if type(value) == numpy.ndarray else value
try:
await self.node.set_data_value(asyncua.ua.uatypes.Variant(Value=value, VariantType=self.ua_type))
except (TypeError, asyncua.ua.uaerrors.BadTypeMismatch) as e:
......@@ -212,19 +226,3 @@ class ProtocolAttribute:
attribute_name = (await self.node.read_display_name()).to_string()
raise TypeError(f"Cannot write value to OPC-UA attribute '{attribute_name}': tried to convert data type {our_type} to expected server type {expected_server_type}, server reports type {actual_server_type}") from e
def write_function(self, value):
"""
write_RW function
"""
if self.dim_y != 0:
# flatten array, convert to python array
value = numpy.concatenate(value).tolist()
elif self.dim_x != 1:
# make sure it is a python array
value = value.tolist() if type(value) == numpy.ndarray else value
future = asyncio.run_coroutine_threadsafe(self._write_value(value), self.event_loop)
_ = future.result()
......@@ -9,6 +9,7 @@
import unittest
import testscenarios
import asynctest
class BaseTestCase(testscenarios.WithScenarios, unittest.TestCase):
......@@ -23,3 +24,10 @@ class TestCase(BaseTestCase):
def setUp(self):
super().setUp()
class AsyncTestCase(BaseTestCase):
"""Test case base class for all asyncio unit tests."""
def setUp(self):
super().setUp()
import numpy
from clients.opcua_client import OPCUAConnection
from clients import opcua_client, comms_client
from clients import opcua_client
import asyncua
import io
......@@ -39,20 +39,10 @@ image_shape = (2, 3)
dimension_tests = [scalar_shape, spectrum_shape, image_shape]
class TestOPCua(base.TestCase):
def setUp(self):
self.comms_client = comms_client.AsyncCommClient()
self.event_loop = self.comms_client.event_loop
def tearDown(self):
# must explicitl delete, or python will mess up the descruction
# of the event loop
del self.event_loop
del self.comms_client
class TestOPCua(base.AsyncTestCase):
@asynctest.patch.object(OPCUAConnection, "ping")
@asynctest.patch.object(opcua_client, "Client")
def test_opcua_connection(self, m_opc_client, m_ping):
async def test_opcua_connection(self, m_opc_client, m_ping):
"""
This tests verifies whether the correct connection steps happen. It checks whether we can init an OPCUAConnection object
Whether we can set the namespace, and the OPCua client.
......@@ -65,8 +55,7 @@ class TestOPCua(base.TestCase):
m_opc_client_members.send_hello = asynctest.asynctest.CoroutineMock()
m_opc_client.return_value = m_opc_client_members
async def run_test():
test_client = OPCUAConnection("opc.tcp://localhost:4874/freeopcua/server/", "http://lofar.eu", 5, mock.Mock(), self.event_loop)
test_client = OPCUAConnection("opc.tcp://localhost:4874/freeopcua/server/", "http://lofar.eu", 5, mock.Mock(), self.loop)
try:
await test_client.start()
......@@ -78,13 +67,11 @@ class TestOPCua(base.TestCase):
finally:
await test_client.stop()
asyncio.run_coroutine_threadsafe(run_test(), self.event_loop).result()
@asynctest.patch.object(OPCUAConnection, "ping")
@asynctest.patch.object(opcua_client, "Client")
@mock.patch.object(opcua_client, 'ProtocolAttribute')
def test_opcua_attr_setup(self, m_protocol_attr, m_opc_client, m_ping):
async def test_opcua_attr_setup(self, m_protocol_attr, m_opc_client, m_ping):
"""
This tests covers the correct creation of read/write functions.
In normal circumstances called by he attribute wrapper.
......@@ -103,7 +90,6 @@ class TestOPCua(base.TestCase):
m_opc_client_members.get_objects_node = asynctest.Mock(return_value=m_objects_node)
m_opc_client.return_value = m_opc_client_members
async def run_test():
for i in attr_test_types:
class mock_attr:
def __init__(self, dtype, x, y):
......@@ -125,7 +111,7 @@ class TestOPCua(base.TestCase):
# pretend like there is a running OPCua server with a node that has this name
m_annotation = ["2:PCC", f"2:testNode_{str(i.numpy_type)}_{str(dim_x)}_{str(dim_y)}"]
test_client = OPCUAConnection("opc.tcp://localhost:4874/freeopcua/server/", "http://lofar.eu", 5, mock.Mock(), self.event_loop)
test_client = OPCUAConnection("opc.tcp://localhost:4874/freeopcua/server/", "http://lofar.eu", 5, mock.Mock(), self.loop)
try:
await test_client.start()
await test_client.setup_attribute(m_annotation, m_attribute)
......@@ -134,12 +120,6 @@ class TestOPCua(base.TestCase):
# success if there are no errors.
await test_client.stop()
asyncio.run_coroutine_threadsafe(run_test(), self.event_loop).result()
def test_protocol_attr(self):
"""
This tests finding an OPCua node and returning a valid object with read/write functions.
......@@ -160,7 +140,7 @@ class TestOPCua(base.TestCase):
dims = (j[1], j[0])
ua_type = opcua_client.numpy_to_OPCua_dict[i.numpy_type]
test = opcua_client.ProtocolAttribute(node, dims[0], dims[1], ua_type, self.event_loop)
test = opcua_client.ProtocolAttribute(node, dims[0], dims[1], ua_type)
print(test.dim_y, test.dim_x, test.ua_type)
"""
......@@ -173,7 +153,7 @@ class TestOPCua(base.TestCase):
self.assertTrue(hasattr(test, "write_function"), f"No write function found")
self.assertTrue(hasattr(test, "read_function"), f"No read function found")
def test_read(self):
async def test_read(self):
"""
This tests the read functions.
"""
......@@ -189,11 +169,11 @@ class TestOPCua(base.TestCase):
m_node = asynctest.asynctest.CoroutineMock()
if len(j) == 1:
test = opcua_client.ProtocolAttribute(m_node, j[0], 0, opcua_client.numpy_to_OPCua_dict[i.numpy_type], self.event_loop)
test = opcua_client.ProtocolAttribute(m_node, j[0], 0, opcua_client.numpy_to_OPCua_dict[i.numpy_type])
else:
test = opcua_client.ProtocolAttribute(m_node, j[1], j[0], opcua_client.numpy_to_OPCua_dict[i.numpy_type], self.event_loop)
test = opcua_client.ProtocolAttribute(m_node, j[1], j[0], opcua_client.numpy_to_OPCua_dict[i.numpy_type])
m_node.get_value = get_flat_value
val = test.read_function()
val = await test.read_function()
comp = val == get_test_value()
self.assertTrue(comp.all(), "Read value unequal to expected value: \n\t{} \n\t{}".format(val, get_test_value()))
......@@ -233,7 +213,7 @@ class TestOPCua(base.TestCase):
def test_write(self):
async def test_write(self):
"""
Test the writing of values by instantiating a ProtocolAttribute attribute, and calling the write function.
but the opcua function that writes to the server has been changed to the compare_values function.
......@@ -258,9 +238,9 @@ class TestOPCua(base.TestCase):
# create the protocolattribute
if len(j) == 1:
test = opcua_client.ProtocolAttribute(m_node, j[0], 0, opcua_client.numpy_to_OPCua_dict[i.numpy_type], self.event_loop)
test = opcua_client.ProtocolAttribute(m_node, j[0], 0, opcua_client.numpy_to_OPCua_dict[i.numpy_type], self.loop)
else:
test = opcua_client.ProtocolAttribute(m_node, j[1], j[0], opcua_client.numpy_to_OPCua_dict[i.numpy_type], self.event_loop)
test = opcua_client.ProtocolAttribute(m_node, j[1], j[0], opcua_client.numpy_to_OPCua_dict[i.numpy_type], self.loop)
# comparison function that replaces `set_data_value` inside the attributes write function
async def compare_values(val):
......@@ -278,4 +258,4 @@ class TestOPCua(base.TestCase):
m_node.set_data_value = compare_values
# call the write function with the test values
test.write_function(get_test_value())
await test.write_function(get_test_value())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment