Newer
Older
from src.comms_client import *
__all__ = ["OPCUAConnection"]
numpy_to_OPCua_dict = {
numpy.bool_: opcua.ua.VariantType.Boolean,
numpy.int8: opcua.ua.VariantType.SByte,
numpy.uint8: opcua.ua.VariantType.Byte,
numpy.int16: opcua.ua.VariantType.Int16,
numpy.uint16: opcua.ua.VariantType.UInt16,
numpy.int32: opcua.ua.VariantType.Int32,
numpy.uint32: opcua.ua.VariantType.UInt32,
numpy.int64: opcua.ua.VariantType.Int64,
numpy.uint64: opcua.ua.VariantType.UInt64,
numpy.datetime_data: opcua.ua.VariantType.DateTime, # is this the right type, does it even matter?
numpy.float32: opcua.ua.VariantType.Float,
numpy.double: opcua.ua.VariantType.Double,
numpy.float64: opcua.ua.VariantType.Double,
numpy.str_: opcua.ua.VariantType.String,
numpy.str: opcua.ua.VariantType.String,
str: opcua.ua.VariantType.String
}
# <class 'numpy.bool_'>
class OPCUAConnection(CommClient):
"""
Connects to OPC-UA in the foreground or background, and sends HELLO
messages to keep a check on the connection. On connection failure, reconnects once.
"""
def start(self):
super().start()
def __init__(self, address, namespace, timeout, fault_func, streams, try_interval=2):
"""
Create the OPC ua client and connect() to it and get the object node
"""
super().__init__(fault_func, streams, try_interval)
self.client = Client(address, timeout)
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
# determine namespace used
try:
if type(namespace) is str:
self.name_space_index = self.client.get_namespace_index(namespace)
elif type(namespace) is int:
self.name_space_index = namespace
except Exception as e:
#TODO remove once SDP is fixed
self.streams.warn_stream("Cannot determine the OPC-UA name space index. Will try and use the default = 2.")
self.name_space_index = 2
self.obj = self.client.get_objects_node()
def _servername(self):
return self.client.server_url.geturl()
def connect(self):
"""
Try to connect to the client
"""
try:
self.streams.debug_stream("Connecting to server %s", self._servername())
self.client.connect()
self.connected = True
self.streams.debug_stream("Connected to %s. Initialising.", self._servername())
return True
except socket.error as e:
self.streams.error_stream("Could not connect to server %s: %s", self._servername(), e)
raise Exception("Could not connect to server %s", self._servername()) from e
def disconnect(self):
"""
disconnect from the client
"""
self.connected = False # always force a reconnect, regardless of a successful disconnect
try:
self.client.disconnect()
except Exception as e:
self.streams.error_stream("Disconnect from OPC-UA server %s failed: %s", self._servername(), e)
def ping(self):
"""
ping the client to make sure the connection with the client is still functional.
"""
try:
self.client.send_hello()
raise Exception("Lost connection to server %s: %s", self._servername(), e)
def _setup_annotation(self, annotation):
"""
This class's Implementation of the get_mapping function. returns the read and write functions
"""
if isinstance(annotation, dict):
# check if required path inarg is present
if annotation.get('path') is None:
raise Exception("OPC-ua mapping requires a path argument in the annotation, was given: %s", annotation)
path = annotation.get("path") # required
elif isinstance(annotation, list):
path = annotation
else:
raise Exception("OPC-ua mapping requires either a list of the path or dict with the path. Was given %s type containing: %s", type(annotation), annotation)
try:
node = self.obj.get_child(path)
except Exception as e:
self.streams.error_stream("Could not get node: %s on server %s: %s", path, self._servername(), e)
raise Exception("Could not get node: %s on server %s", path, self._servername()) from e
return node
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
the OPC ua read/write functions require the dimensionality and the type to be known
"""
dim_x = attribute.dim_x
dim_y = attribute.dim_y
ua_type = numpy_to_OPCua_dict[attribute.numpy_type] # convert the numpy type to a corresponding UA type
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
return dim_x, dim_y, ua_type
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
# process the annotation
node = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dim_x, dim_y, ua_type = self.setup_value_conversion(attribute)
# configure and return the read/write functions
prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type)
try:
# NOTE: debug statement tries to get the qualified name, this may not always work. in that case forgo the name and just print the path
node_name = str(node.get_browse_name())[len("QualifiedName(2:"):]
self.streams.debug_stream("connected OPC ua node {} of type {} to attribute with dimensions: {} x {} ".format(str(node_name)[:len(node_name)-1], str(ua_type)[len("VariantType."):], dim_x, dim_y))
except:
pass
# return the read/write functions
return prot_attr.read_function, prot_attr.write_function
class ProtocolAttribute:
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
"""
This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
"""
def __init__(self, node, dim_x, dim_y, ua_type):
self.node = node
self.dim_y = dim_y
self.dim_x = dim_x
self.ua_type = ua_type
def read_function(self):
"""
Read_R function
"""
value = numpy.array(self.node.get_value())
if self.dim_y != 0:
value = numpy.array(numpy.split(value, indices_or_sections=self.dim_y))
else:
value = numpy.array(value)
return value
def write_function(self, value):
"""
write_RW function
"""
# set_data_value(opcua.ua.uatypes.Variant(value = value.tolist(), varianttype=opcua.ua.VariantType.Int32))
if self.dim_y != 0:
v = numpy.concatenate(value)
self.node.set_data_value(opcua.ua.uatypes.Variant(value=v.tolist(), varianttype=self.ua_type))
elif self.dim_x != 1:
self.node.set_data_value(opcua.ua.uatypes.Variant(value=value.tolist(), varianttype=self.ua_type))
else:
self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type))