Skip to content
Snippets Groups Projects
opcua_connection.py 7.36 KiB
Newer Older
from src.comms_client import *

__all__ = ["OPCUAConnection"]

numpy_to_OPCua_dict = {
Thomas Juerges's avatar
Thomas Juerges committed
    numpy.bool_: opcua.ua.VariantType.Boolean,
    numpy.int8: opcua.ua.VariantType.SByte,
    numpy.uint8: opcua.ua.VariantType.Byte,
    numpy.int16: opcua.ua.VariantType.Int16,
    numpy.uint16: opcua.ua.VariantType.UInt16,
    numpy.int32: opcua.ua.VariantType.Int32,
    numpy.uint32: opcua.ua.VariantType.UInt32,
    numpy.int64: opcua.ua.VariantType.Int64,
    numpy.uint64: opcua.ua.VariantType.UInt64,
    numpy.datetime_data: opcua.ua.VariantType.DateTime, # is this the right type, does it even matter?
    numpy.float32: opcua.ua.VariantType.Float,
    numpy.double: opcua.ua.VariantType.Double,
    numpy.float64: opcua.ua.VariantType.Double,
    numpy.str_: opcua.ua.VariantType.String,
    numpy.str: opcua.ua.VariantType.String,
    str: opcua.ua.VariantType.String
}

# <class 'numpy.bool_'>

class OPCUAConnection(CommClient):
Thomas Juerges's avatar
Thomas Juerges committed
    """
      Connects to OPC-UA in the foreground or background, and sends HELLO
      messages to keep a check on the connection. On connection failure, reconnects once.
    """

    def start(self):
        super().start()

    def __init__(self, address, namespace, timeout, fault_func, streams, try_interval=2):
Thomas Juerges's avatar
Thomas Juerges committed
        """
        Create the OPC ua client and connect() to it and get the object node
        """
        super().__init__(fault_func, streams, try_interval)
        self.client = Client(address, timeout)
Thomas Juerges's avatar
Thomas Juerges committed

        # Explicitly connect
        if not self.connect():
            # hardware or infra is down -- needs fixing first
            fault_func()
            return

        # determine namespace used
        try:
            if type(namespace) is str:
                self.name_space_index = self.client.get_namespace_index(namespace)
            elif type(namespace) is int:
                self.name_space_index = namespace

        except Exception as e:
            #TODO remove once SDP is fixed
            self.streams.warn_stream("Cannot determine the OPC-UA name space index.  Will try and use the default = 2.")
            self.name_space_index = 2

        self.obj = self.client.get_objects_node()

    def _servername(self):
        return self.client.server_url.geturl()

    def connect(self):
        """
        Try to connect to the client
        """

        try:
            self.streams.debug_stream("Connecting to server %s", self._servername())
            self.client.connect()
            self.connected = True
            self.streams.debug_stream("Connected to %s. Initialising.", self._servername())
            return True
        except socket.error as e:
            self.streams.error_stream("Could not connect to server %s: %s", self._servername(), e)
            raise Exception("Could not connect to server %s", self._servername()) from e
Thomas Juerges's avatar
Thomas Juerges committed


    def disconnect(self):
        """
        disconnect from the client
        """
        self.connected = False  # always force a reconnect, regardless of a successful disconnect

        try:
            self.client.disconnect()
        except Exception as e:
            self.streams.error_stream("Disconnect from OPC-UA server %s failed: %s", self._servername(), e)
Thomas Juerges's avatar
Thomas Juerges committed

    def ping(self):
        """
        ping the client to make sure the connection with the client is still functional.
        """
        try:
Thomas Juerges's avatar
Thomas Juerges committed
        except Exception as e:
            raise Exception("Lost connection to server %s: %s", self._servername(), e)
Thomas Juerges's avatar
Thomas Juerges committed

    def _setup_annotation(self, annotation):
        """
        This class's Implementation of the get_mapping function. returns the read and write functions
        """

        if isinstance(annotation, dict):
            # check if required path inarg is present
            if annotation.get('path') is None:
                raise Exception("OPC-ua mapping requires a path argument in the annotation, was given: %s", annotation)

            path = annotation.get("path")  # required
        elif isinstance(annotation, list):
            path = annotation
        else:
            raise Exception("OPC-ua mapping requires either a list of the path or dict with the path. Was given %s type containing: %s", type(annotation), annotation)
Thomas Juerges's avatar
Thomas Juerges committed

        try:
            node = self.obj.get_child(path)
        except Exception as e:
            self.streams.error_stream("Could not get node: %s on server %s: %s", path, self._servername(), e)
            raise Exception("Could not get node: %s on server %s", path, self._servername()) from e
Thomas Juerges's avatar
Thomas Juerges committed

        return node

    def setup_value_conversion(self, attribute):
        """
        gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
        the OPC ua read/write functions require the dimensionality and the type to be known
        """

        dim_x = attribute.dim_x
        dim_y = attribute.dim_y
        ua_type = numpy_to_OPCua_dict[attribute.numpy_type]	 # convert the numpy type to a corresponding UA type
Thomas Juerges's avatar
Thomas Juerges committed

        return dim_x, dim_y, ua_type

    def setup_attribute(self, annotation, attribute):
        """
        MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
        """

        # process the annotation
        node = self._setup_annotation(annotation)

        # get all the necessary data to set up the read/write functions from the attribute_wrapper
        dim_x, dim_y, ua_type = self.setup_value_conversion(attribute)

        # configure and return the read/write functions
        prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type)

        try:
            # NOTE: debug statement tries to get the qualified name, this may not always work. in that case forgo the name and just print the path
            node_name = str(node.get_browse_name())[len("QualifiedName(2:"):]
            self.streams.debug_stream("connected OPC ua node {} of type {} to attribute with dimensions: {} x {} ".format(str(node_name)[:len(node_name)-1], str(ua_type)[len("VariantType."):], dim_x, dim_y))
        except:
            pass

        # return the read/write functions
        return prot_attr.read_function, prot_attr.write_function
Thomas Juerges's avatar
Thomas Juerges committed
    """
    This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
    """

    def __init__(self, node, dim_x, dim_y, ua_type):
        self.node = node
        self.dim_y = dim_y
        self.dim_x = dim_x
        self.ua_type = ua_type

    def read_function(self):
        """
        Read_R function
        """
        value = numpy.array(self.node.get_value())

        if self.dim_y != 0:
            value = numpy.array(numpy.split(value, indices_or_sections=self.dim_y))
        else:
            value = numpy.array(value)
        return value

    def write_function(self, value):
        """
        write_RW function
        """
        # set_data_value(opcua.ua.uatypes.Variant(value = value.tolist(), varianttype=opcua.ua.VariantType.Int32))

        if self.dim_y != 0:
            v = numpy.concatenate(value)
            self.node.set_data_value(opcua.ua.uatypes.Variant(value=v.tolist(), varianttype=self.ua_type))

        elif self.dim_x != 1:
            self.node.set_data_value(opcua.ua.uatypes.Variant(value=value.tolist(), varianttype=self.ua_type))
        else:
            self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type))