Skip to content
Snippets Groups Projects
Select Git revision
  • 439cab712ae87f843d4d646895683398e88cee3d
  • master default protected
  • control-single-hba-and-lba
  • L2SS-1957-remove-pcon-control
  • stabilise-landing-page
  • all-stations-lofar2
  • L2SS-2357-fix-ruff
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • fix-build-dind
  • L2SS-2153--Improve-Error-Handling
  • L2SS-2153-Add-Grpc-Gateway-support
  • L2SS-1970-apsct-lol
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
  • v0.52.1-rc1 protected
  • v0.51.9-6 protected
  • v0.51.9-5 protected
  • v0.51.9-4 protected
  • v0.51.9-3 protected
  • v0.51.9-2 protected
  • v0.51.9-1 protected
  • v0.51.9 protected
  • v0.51.8 protected
  • v0.39.15-wsrttwo protected
  • v0.39.15-wsrt protected
  • v0.39.14-wsrt protected
41 results

opcua_client.py

Blame
  • opcua_client.py 9.50 KiB
    from threading import Thread
    import socket
    import numpy
    import opcua
    from opcua import Client
    
    from clients.comms_client import CommClient
    
    __all__ = ["OPCUAConnection"]
    
    numpy_to_OPCua_dict = {
        numpy.bool_: opcua.ua.VariantType.Boolean,
        numpy.int8: opcua.ua.VariantType.SByte,
        numpy.uint8: opcua.ua.VariantType.Byte,
        numpy.int16: opcua.ua.VariantType.Int16,
        numpy.uint16: opcua.ua.VariantType.UInt16,
        numpy.int32: opcua.ua.VariantType.Int32,
        numpy.uint32: opcua.ua.VariantType.UInt32,
        numpy.int64: opcua.ua.VariantType.Int64,
        numpy.uint64: opcua.ua.VariantType.UInt64,
        numpy.datetime_data: opcua.ua.VariantType.DateTime, # is this the right type, does it even matter?
        numpy.float32: opcua.ua.VariantType.Float,
        numpy.double: opcua.ua.VariantType.Double,
        numpy.float64: opcua.ua.VariantType.Double,
        numpy.str_: opcua.ua.VariantType.String,
        numpy.str: opcua.ua.VariantType.String,
        str: opcua.ua.VariantType.String
    }
    
    # <class 'numpy.bool_'>
    
    class OPCUAConnection(CommClient):
        """
          Connects to OPC-UA in the foreground or background, and sends HELLO
          messages to keep a check on the connection. On connection failure, reconnects once.
        """
    
        def start(self):
            super().start()
    
        def __init__(self, address, namespace, timeout, fault_func, streams, try_interval=2):
            """
            Create the OPC ua client and connect() to it and get the object node
            """
            super().__init__(fault_func, streams, try_interval)
    
            self.client = Client(address, timeout)
    
            # Explicitly connect
            if not self.connect():
                # hardware or infra is down -- needs fixing first
                fault_func()
                return
    
    
            # determine namespace used
            try:
                if type(namespace) is str:
                    self.name_space_index = self.client.get_namespace_index(namespace)
                elif type(namespace) is int:
                    self.name_space_index = namespace
    
            except Exception as e:
                #TODO remove once SDP is fixed
                self.streams.warn_stream("Cannot determine the OPC-UA name space index.  Will try and use the default = 2.")
                self.name_space_index = 2
    
            self.obj = self.client.get_objects_node()
            self.check_nodes()
    
        def _servername(self):
            return self.client.server_url.geturl()
    
        def connect(self):
            """
            Try to connect to the client
            """
    
            try:
                self.streams.debug_stream("Connecting to server %s", self._servername())
                self.client.connect()
                self.connected = True
                self.streams.debug_stream("Connected to %s. Initialising.", self._servername())
                return True
            except socket.error as e:
                self.streams.error_stream("Could not connect to server %s: %s", self._servername(), e)
                raise Exception("Could not connect to server %s", self._servername()) from e
    
        def check_nodes(self):
            """
            function purely for debugging/development only. Simply lists all top level nodes and the nodes below that
            """
    
            for i in self.obj.get_children():
                print(i.get_browse_name())
                for j in i.get_children():
                    try:
                        print(j.get_browse_name(), j.get_data_type_as_variant_type())
                    except:
                        print(j.get_browse_name())
                    finally:
                        pass
    
    
        def disconnect(self):
            """
            disconnect from the client
            """
            self.connected = False  # always force a reconnect, regardless of a successful disconnect
    
            try:
                self.client.disconnect()
            except Exception as e:
                self.streams.error_stream("Disconnect from OPC-UA server %s failed: %s", self._servername(), e)
    
        def ping(self):
            """
            ping the client to make sure the connection with the client is still functional.
            """
            try:
                self.client.send_hello()
            except Exception as e:
                raise Exception("Lost connection to server %s: %s", self._servername(), e)
    
        def _setup_annotation(self, annotation):
            """
            This class's Implementation of the get_mapping function. returns the read and write functions
            """
    
            if isinstance(annotation, dict):
                # check if required path inarg is present
                if annotation.get('path') is None:
                    raise Exception("OPC-ua mapping requires a path argument in the annotation, was given: %s", annotation)
    
                path = annotation.get("path")  # required
            elif isinstance(annotation, list):
                path = annotation
            else:
                raise Exception("OPC-ua mapping requires either a list of the path or dict with the path. Was given %s type containing: %s", type(annotation), annotation)
    
            try:
                node = self.obj.get_child(path)
            except Exception as e:
                self.streams.error_stream("Could not get node: %s on server %s: %s", path, self._servername(), e)
                raise Exception("Could not get node: %s on server %s", path, self._servername()) from e
    
            return node
    
        def setup_value_conversion(self, attribute):
            """
            gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
            the OPC ua read/write functions require the dimensionality and the type to be known
            """
    
            dim_x = attribute.dim_x
            dim_y = attribute.dim_y
            ua_type = numpy_to_OPCua_dict[attribute.numpy_type]	 # convert the numpy type to a corresponding UA type
    
            return dim_x, dim_y, ua_type
    
        def setup_attribute(self, annotation, attribute):
            """
            MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
            """
    
            # process the annotation
            node = self._setup_annotation(annotation)
    
            # get all the necessary data to set up the read/write functions from the attribute_wrapper
            dim_x, dim_y, ua_type = self.setup_value_conversion(attribute)
    
            # configure and return the read/write functions
            prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type)
    
            try:
                # NOTE: debug statement tries to get the qualified name, this may not always work. in that case forgo the name and just print the path
                node_name = str(node.get_browse_name())[len("QualifiedName(2:"):]
                self.streams.debug_stream("connected OPC ua node {} of type {} to attribute with dimensions: {} x {} ".format(str(node_name)[:len(node_name)-1], str(ua_type)[len("VariantType."):], dim_x, dim_y))
            except:
                pass
            # return the read/write functions
            return prot_attr.read_function, prot_attr.write_function
    
    
    class ProtocolAttribute:
        """
        This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
        """
    
        def __init__(self, node, dim_x, dim_y, ua_type):
            self.node = node
            self.dim_y = dim_y
            self.dim_x = dim_x
            self.ua_type = ua_type
    
        def read_function(self):
            """
            Read_R function
            """
            value = self.node.get_value()
    
            if self.dim_y + self.dim_x == 1:
                # scalar
                return value
            elif self.dim_y != 0:
                # 2D array
                value = numpy.array(numpy.split(numpy.array(value), indices_or_sections=self.dim_y))
            else:
                # 1D array
                value = numpy.array(value)
    
            return value
    
    
    
        def write_function(self, value):
            """
            write_RW function
            """
    
            if self.dim_y != 0:
                # flatten array, convert to python array
                value = numpy.concatenate(value).tolist()
            elif self.dim_x != 1:
                # make sure it is a python array
                value = value.tolist() if type(value) == numpy.ndarray else value
    
            try:
                self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type))
            except (TypeError, opcua.ua.uaerrors.BadTypeMismatch) as e:
                # A type conversion went wrong or there is a type mismatch.
                #
                # This is either the conversion us -> opcua in our client, or client -> server.
                # Report all types involved to allow assessment of the location of the error.
                if type(value) == list:
                    our_type = "list({dtype}) x ({dimensions})".format(
                        dtype=(type(value[0]).__name__ if value else ""),
                        dimensions=len(value))
                else:
                    our_type = "{dtype}".format(
                        dtype=type(value))
    
                is_scalar = (self.dim_x + self.dim_y) == 1
    
                if is_scalar:
                    expected_server_type = "{dtype} (scalar)".format(
                        dtype=self.ua_type)
                else:
                    expected_server_type = "{dtype} x ({dim_x}, {dim_y})".format(
                        dtype=self.ua_type,
                        dim_x=self.dim_x,
                        dim_y=self.dim_y)
    
                actual_server_type = "{dtype} {dimensions}".format(
                    dtype=self.node.get_data_type_as_variant_type(),
                    dimensions=(self.node.get_array_dimensions() or "???"))
    
                attribute_name = self.node.get_display_name().to_string()
    
                raise TypeError(f"Cannot write value to OPC-UA attribute '{attribute_name}': tried to convert data type {our_type} to expected server type {expected_server_type}, server reports type {actual_server_type}") from e