Skip to content
Snippets Groups Projects
Commit 72e5e493 authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-669: Convert incoming unicode from OPC-UA to ?

parent 0ae5f21c
No related branches found
No related tags found
1 merge request!260L2SS-669: Convert incoming unicode from OPC-UA to ?
...@@ -243,6 +243,21 @@ class ProtocolAttribute: ...@@ -243,6 +243,21 @@ class ProtocolAttribute:
value = await self.node.get_value() value = await self.node.get_value()
try: try:
# Pytango strings are Latin-1, and will crash on receiving Unicode strings
# (see https://gitlab.com/tango-controls/pytango/-/issues/72)
# So we explicitly convert to Latin-1 and replace errors with '?'.
#
# Note that PyTango also accepts byte arrays as strings, so we don't
# have to actually decode() the result.
def fix_string(s):
return s.encode('latin-1',errors="replace").decode('latin-1')
if type(value) == list and len(value) > 0 and type(value[0]) == str:
value = [fix_string(v) for v in value]
elif type(value) == str:
value = fix_string(value)
if self.dim_y + self.dim_x == 1: if self.dim_y + self.dim_x == 1:
# scalar # scalar
return value return value
......
...@@ -178,6 +178,35 @@ class TestOPCua(base.AsyncTestCase): ...@@ -178,6 +178,35 @@ class TestOPCua(base.AsyncTestCase):
comp = val == get_test_value() comp = val == get_test_value()
self.assertTrue(comp.all(), "Read value unequal to expected value: \n\t{} \n\t{}".format(val, get_test_value())) self.assertTrue(comp.all(), "Read value unequal to expected value: \n\t{} \n\t{}".format(val, get_test_value()))
async def test_read_unicode(self):
"""
Test whether unicode characters are replaced by '?'.
"""
# test 0-2 dimensions of strings
for dims in range(0,2):
# wrap a value in the current number of dimensions
def wrap_dims(x):
if dims == 0:
return x
elif dims == 1:
return [x]
elif dims == 2:
return [[x]]
# return a constructed value with unicode
async def get_value():
return wrap_dims(b'foo \xef\xbf\xbd bar'.decode('utf-8'))
m_node = asynctest.asynctest.CoroutineMock()
m_node.get_value = get_value
# create the ProtocolAttribute to test
test = opcua_client.ProtocolAttribute(m_node, 1, 0, opcua_client.numpy_to_OPCua_dict[str])
# check if unicode is replaced by ?
val = await test.read_function()
self.assertEqual(wrap_dims("foo ? bar"), val)
def test_type_map(self): def test_type_map(self):
for numpy_type, opcua_type in opcua_client.numpy_to_OPCua_dict.items(): for numpy_type, opcua_type in opcua_client.numpy_to_OPCua_dict.items():
# derive a default value that can get lost in a type translation # derive a default value that can get lost in a type translation
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment