Select Git revision
opcua_client.py
-
Jan David Mol authoredJan David Mol authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
opcua_client.py 9.50 KiB
from threading import Thread
import socket
import numpy
import opcua
from opcua import Client
from clients.comms_client import CommClient
__all__ = ["OPCUAConnection"]
numpy_to_OPCua_dict = {
numpy.bool_: opcua.ua.VariantType.Boolean,
numpy.int8: opcua.ua.VariantType.SByte,
numpy.uint8: opcua.ua.VariantType.Byte,
numpy.int16: opcua.ua.VariantType.Int16,
numpy.uint16: opcua.ua.VariantType.UInt16,
numpy.int32: opcua.ua.VariantType.Int32,
numpy.uint32: opcua.ua.VariantType.UInt32,
numpy.int64: opcua.ua.VariantType.Int64,
numpy.uint64: opcua.ua.VariantType.UInt64,
numpy.datetime_data: opcua.ua.VariantType.DateTime, # is this the right type, does it even matter?
numpy.float32: opcua.ua.VariantType.Float,
numpy.double: opcua.ua.VariantType.Double,
numpy.float64: opcua.ua.VariantType.Double,
numpy.str_: opcua.ua.VariantType.String,
numpy.str: opcua.ua.VariantType.String,
str: opcua.ua.VariantType.String
}
# <class 'numpy.bool_'>
class OPCUAConnection(CommClient):
"""
Connects to OPC-UA in the foreground or background, and sends HELLO
messages to keep a check on the connection. On connection failure, reconnects once.
"""
def start(self):
super().start()
def __init__(self, address, namespace, timeout, fault_func, streams, try_interval=2):
"""
Create the OPC ua client and connect() to it and get the object node
"""
super().__init__(fault_func, streams, try_interval)
self.client = Client(address, timeout)
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
# determine namespace used
try:
if type(namespace) is str:
self.name_space_index = self.client.get_namespace_index(namespace)
elif type(namespace) is int:
self.name_space_index = namespace
except Exception as e:
#TODO remove once SDP is fixed
self.streams.warn_stream("Cannot determine the OPC-UA name space index. Will try and use the default = 2.")
self.name_space_index = 2
self.obj = self.client.get_objects_node()
self.check_nodes()
def _servername(self):
return self.client.server_url.geturl()
def connect(self):
"""
Try to connect to the client
"""
try:
self.streams.debug_stream("Connecting to server %s", self._servername())
self.client.connect()
self.connected = True
self.streams.debug_stream("Connected to %s. Initialising.", self._servername())
return True
except socket.error as e:
self.streams.error_stream("Could not connect to server %s: %s", self._servername(), e)
raise Exception("Could not connect to server %s", self._servername()) from e
def check_nodes(self):
"""
function purely for debugging/development only. Simply lists all top level nodes and the nodes below that
"""
for i in self.obj.get_children():
print(i.get_browse_name())
for j in i.get_children():
try:
print(j.get_browse_name(), j.get_data_type_as_variant_type())
except:
print(j.get_browse_name())
finally:
pass
def disconnect(self):
"""
disconnect from the client
"""
self.connected = False # always force a reconnect, regardless of a successful disconnect
try:
self.client.disconnect()
except Exception as e:
self.streams.error_stream("Disconnect from OPC-UA server %s failed: %s", self._servername(), e)
def ping(self):
"""
ping the client to make sure the connection with the client is still functional.
"""
try:
self.client.send_hello()
except Exception as e:
raise Exception("Lost connection to server %s: %s", self._servername(), e)
def _setup_annotation(self, annotation):
"""
This class's Implementation of the get_mapping function. returns the read and write functions
"""
if isinstance(annotation, dict):
# check if required path inarg is present
if annotation.get('path') is None:
raise Exception("OPC-ua mapping requires a path argument in the annotation, was given: %s", annotation)
path = annotation.get("path") # required
elif isinstance(annotation, list):
path = annotation
else:
raise Exception("OPC-ua mapping requires either a list of the path or dict with the path. Was given %s type containing: %s", type(annotation), annotation)
try:
node = self.obj.get_child(path)
except Exception as e:
self.streams.error_stream("Could not get node: %s on server %s: %s", path, self._servername(), e)
raise Exception("Could not get node: %s on server %s", path, self._servername()) from e
return node
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
the OPC ua read/write functions require the dimensionality and the type to be known
"""
dim_x = attribute.dim_x
dim_y = attribute.dim_y
ua_type = numpy_to_OPCua_dict[attribute.numpy_type] # convert the numpy type to a corresponding UA type
return dim_x, dim_y, ua_type
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
# process the annotation
node = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dim_x, dim_y, ua_type = self.setup_value_conversion(attribute)
# configure and return the read/write functions
prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type)
try:
# NOTE: debug statement tries to get the qualified name, this may not always work. in that case forgo the name and just print the path
node_name = str(node.get_browse_name())[len("QualifiedName(2:"):]
self.streams.debug_stream("connected OPC ua node {} of type {} to attribute with dimensions: {} x {} ".format(str(node_name)[:len(node_name)-1], str(ua_type)[len("VariantType."):], dim_x, dim_y))
except:
pass
# return the read/write functions
return prot_attr.read_function, prot_attr.write_function
class ProtocolAttribute:
"""
This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
"""
def __init__(self, node, dim_x, dim_y, ua_type):
self.node = node
self.dim_y = dim_y
self.dim_x = dim_x
self.ua_type = ua_type
def read_function(self):
"""
Read_R function
"""
value = self.node.get_value()
if self.dim_y + self.dim_x == 1:
# scalar
return value
elif self.dim_y != 0:
# 2D array
value = numpy.array(numpy.split(numpy.array(value), indices_or_sections=self.dim_y))
else:
# 1D array
value = numpy.array(value)
return value
def write_function(self, value):
"""
write_RW function
"""
if self.dim_y != 0:
# flatten array, convert to python array
value = numpy.concatenate(value).tolist()
elif self.dim_x != 1:
# make sure it is a python array
value = value.tolist() if type(value) == numpy.ndarray else value
try:
self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type))
except (TypeError, opcua.ua.uaerrors.BadTypeMismatch) as e:
# A type conversion went wrong or there is a type mismatch.
#
# This is either the conversion us -> opcua in our client, or client -> server.
# Report all types involved to allow assessment of the location of the error.
if type(value) == list:
our_type = "list({dtype}) x ({dimensions})".format(
dtype=(type(value[0]).__name__ if value else ""),
dimensions=len(value))
else:
our_type = "{dtype}".format(
dtype=type(value))
is_scalar = (self.dim_x + self.dim_y) == 1
if is_scalar:
expected_server_type = "{dtype} (scalar)".format(
dtype=self.ua_type)
else:
expected_server_type = "{dtype} x ({dim_x}, {dim_y})".format(
dtype=self.ua_type,
dim_x=self.dim_x,
dim_y=self.dim_y)
actual_server_type = "{dtype} {dimensions}".format(
dtype=self.node.get_data_type_as_variant_type(),
dimensions=(self.node.get_array_dimensions() or "???"))
attribute_name = self.node.get_display_name().to_string()
raise TypeError(f"Cannot write value to OPC-UA attribute '{attribute_name}': tried to convert data type {our_type} to expected server type {expected_server_type}, server reports type {actual_server_type}") from e