From 72e5e493c9acb68e48d04c06cf9478e2864bb951 Mon Sep 17 00:00:00 2001 From: Jan David Mol <mol@astron.nl> Date: Wed, 2 Mar 2022 21:33:12 +0100 Subject: [PATCH] L2SS-669: Convert incoming unicode from OPC-UA to ? --- .../clients/opcua_client.py | 15 ++++++++++ .../test/clients/test_opcua_client.py | 29 +++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/tangostationcontrol/tangostationcontrol/clients/opcua_client.py b/tangostationcontrol/tangostationcontrol/clients/opcua_client.py index 073933ad7..f94ec4e16 100644 --- a/tangostationcontrol/tangostationcontrol/clients/opcua_client.py +++ b/tangostationcontrol/tangostationcontrol/clients/opcua_client.py @@ -243,6 +243,21 @@ class ProtocolAttribute: value = await self.node.get_value() try: + # Pytango strings are Latin-1, and will crash on receiving Unicode strings + # (see https://gitlab.com/tango-controls/pytango/-/issues/72) + # So we explicitly convert to Latin-1 and replace errors with '?'. + # + # Note that PyTango also accepts byte arrays as strings, so we don't + # have to actually decode() the result. + + def fix_string(s): + return s.encode('latin-1',errors="replace").decode('latin-1') + + if type(value) == list and len(value) > 0 and type(value[0]) == str: + value = [fix_string(v) for v in value] + elif type(value) == str: + value = fix_string(value) + if self.dim_y + self.dim_x == 1: # scalar return value diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_opcua_client.py b/tangostationcontrol/tangostationcontrol/test/clients/test_opcua_client.py index 25968dede..4582c6088 100644 --- a/tangostationcontrol/tangostationcontrol/test/clients/test_opcua_client.py +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_opcua_client.py @@ -178,6 +178,35 @@ class TestOPCua(base.AsyncTestCase): comp = val == get_test_value() self.assertTrue(comp.all(), "Read value unequal to expected value: \n\t{} \n\t{}".format(val, get_test_value())) + async def test_read_unicode(self): + """ + Test whether unicode characters are replaced by '?'. + """ + # test 0-2 dimensions of strings + for dims in range(0,2): + # wrap a value in the current number of dimensions + def wrap_dims(x): + if dims == 0: + return x + elif dims == 1: + return [x] + elif dims == 2: + return [[x]] + + # return a constructed value with unicode + async def get_value(): + return wrap_dims(b'foo \xef\xbf\xbd bar'.decode('utf-8')) + + m_node = asynctest.asynctest.CoroutineMock() + m_node.get_value = get_value + + # create the ProtocolAttribute to test + test = opcua_client.ProtocolAttribute(m_node, 1, 0, opcua_client.numpy_to_OPCua_dict[str]) + + # check if unicode is replaced by ? + val = await test.read_function() + self.assertEqual(wrap_dims("foo ? bar"), val) + def test_type_map(self): for numpy_type, opcua_type in opcua_client.numpy_to_OPCua_dict.items(): # derive a default value that can get lost in a type translation -- GitLab