Skip to content
Snippets Groups Projects
opcua_connection.py 6.42 KiB
Newer Older
from src.comms_client import *


__all__ = ["OPCUAConnection"]

numpy_to_OPCua_dict = {
	numpy.bool_: opcua.ua.VariantType.Boolean,
	numpy.int8: opcua.ua.VariantType.SByte,
	numpy.uint8: opcua.ua.VariantType.Byte,
	numpy.int16: opcua.ua.VariantType.Int16,
	numpy.uint16: opcua.ua.VariantType.UInt16,
	numpy.int32: opcua.ua.VariantType.Int32,
	numpy.uint32: opcua.ua.VariantType.UInt32,
	numpy.int64: opcua.ua.VariantType.Int64,
	numpy.uint64: opcua.ua.VariantType.UInt64,
	numpy.datetime_data: opcua.ua.VariantType.DateTime, # is this the right type, does it even matter?
	numpy.float32: opcua.ua.VariantType.Float,
	numpy.double: opcua.ua.VariantType.Double,
	numpy.float64: opcua.ua.VariantType.Double,
	numpy.str_: opcua.ua.VariantType.String,
	numpy.str: opcua.ua.VariantType.String,
	str: opcua.ua.VariantType.String
}

# <class 'numpy.bool_'>

class OPCUAConnection(CommClient):
	"""
	  Connects to OPC-UA in the foreground or background, and sends HELLO
	  messages to keep a check on the connection. On connection failure, reconnects once.
	"""

	def start(self):
		super().start()

	def __init__(self, address, namespace, timeout, on_func, fault_func, streams, try_interval=2):
		"""
		Create the OPC ua client and connect() to it and get the object node
		"""
		super().__init__(on_func, fault_func, streams, try_interval)

		self.client = Client(address, timeout)

		# Explicitly connect
		if not self.connect():
			# hardware or infra is down -- needs fixing first
			fault_func()
			return

		# determine namespace used
		try:
			if type(namespace) is str:
				self.name_space_index = self.client.get_namespace_index(namespace)
			elif type(namespace) is int:
				self.name_space_index = namespace

			#TODO remove once SDP is fixed
			self.streams.warn_stream("Cannot determine the OPC-UA name space index.  Will try and use the default = 2.")
			self.name_space_index = 2

		self.obj = self.client.get_objects_node()

	def _servername(self):
		return self.client.server_url.geturl()

	def connect(self):
		"""
		Try to connect to the client
		"""

		try:
			self.streams.debug_stream("Connecting to server %s", self._servername())
			self.client.connect()
			self.connected = True
			self.streams.debug_stream("Connected to %s. Initialising.", self._servername())
			#TODO
			self.streams.error_stream("Could not connect to server %s: %s", self._servername())
			raise Exception("") from e


	def disconnect(self):
		"""
		disconnect from the client
		"""
		self.connected = False  # always force a reconnect, regardless of a successful disconnect

		try:
			self.client.disconnect()
		except Exception as e:
			self.streams.error_stream("Disconnect from OPC-UA server %s failed: %s", self._servername(), e)

	def ping(self):
		"""
		ping the client to make sure the connection with the client is still functional.
		"""
		try:
			self.client.send_hello()
		except Exception as e:
			raise Exception("Lost connection to server %s: %s", self._servername(), e)

	def _setup_annotation(self, annotation):
		"""
		This class's Implementation of the get_mapping function. returns the read and write functions
		"""

		if isinstance(annotation, dict):
			# check if required path inarg is present
			if annotation.get('path') is None:
				raise Exception("OPC-ua mapping requires a path argument in the annotation, was given: %s", annotation)

			path = annotation.get("path")  # required
		elif isinstance(annotation, list):
			path = annotation
		else:
			raise Exception("OPC-ua mapping requires either a list of the path or dict with the path. Was given %s type containing: %s", type(annotation), annotation)

		try:
			node = self.obj.get_child(path)
		except Exception as e:
			self.streams.error_stream("Could not get node: %s on server %s: %s", path, self._servername(), e)
			raise Exception("Could not get node: %s on server %s", path, self._servername()) from e

		return node

	def setup_value_conversion(self, attribute):
		"""
		gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
		the OPC ua read/write functions require the dimensionality and the type to be known
		"""

		dim_x = attribute.dim_x
		dim_y = attribute.dim_y
		ua_type = numpy_to_OPCua_dict[attribute.numpy_type]	 # convert the numpy type to a corresponding UA type

		return dim_x, dim_y, ua_type

	def setup_attribute(self, annotation, attribute):
		"""
		MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
		"""

		# process the annotation
		node = self._setup_annotation(annotation)

		# get all the necessary data to set up the read/write functions from the attribute_wrapper
		dim_x, dim_y, ua_type = self.setup_value_conversion(attribute)

		# configure and return the read/write functions
		prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type)

		try:
			# NOTE: debug statement tries to get the qualified name, this may not always work. in that case forgo the name and just print the path
			node_name = str(node.get_browse_name())[len("QualifiedName(2:"):]
			self.streams.debug_stream("connected OPC ua node {} of type {} to attribute with dimensions: {} x {} ".format(str(node_name)[:len(node_name)-1], str(ua_type)[len("VariantType."):], dim_x, dim_y))
		except:
			pass

		# return the read/write functions
		return prot_attr.read_function, prot_attr.write_function


class ProtocolAttribute:
	"""
	This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
	"""

	def __init__(self, node, dim_x, dim_y, ua_type):
		self.node = node
		self.dim_y = dim_y
		self.dim_x = dim_x
		self.ua_type = ua_type

	def read_function(self):
		"""
		Read_R function
		"""
		value = numpy.array(self.node.get_value())

		if self.dim_y != 0:
			value = numpy.array(numpy.split(value, indices_or_sections=self.dim_y))
		else:
			value = numpy.array(value)
		return value

	def write_function(self, value):
		"""
		write_RW function
		"""
		# set_data_value(opcua.ua.uatypes.Variant(value = value.tolist(), varianttype=opcua.ua.VariantType.Int32))

		if self.dim_y != 0:
			v = numpy.concatenate(value)
			self.node.set_data_value(opcua.ua.uatypes.Variant(value=v.tolist(), varianttype=self.ua_type))

		elif self.dim_x != 1:
			self.node.set_data_value(opcua.ua.uatypes.Variant(value=value.tolist(), varianttype=self.ua_type))
		else:
			self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type))