Skip to content
Snippets Groups Projects
Select Git revision
  • 1b77101e470f368a97983bb8fb8525fc50b33c2e
  • master default protected
  • fix-64-bit
  • L2SDP-261
  • 2021-04-14T14.09.01_sdptr
  • 2021-04-14T13.43.34_sdptr
6 results

fpga.h

Blame
  • Forked from LOFAR2.0 / sdptr
    Source project has a limited visibility.
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    opcua_connection.py 7.46 KiB
    from threading import Thread
    import socket
    from src.comms_client import CommClient
    import numpy
    import opcua
    from opcua import Client
    
    __all__ = ["OPCUAConnection"]
    
    numpy_to_OPCua_dict = {
        numpy.bool_: opcua.ua.VariantType.Boolean,
        numpy.int8: opcua.ua.VariantType.SByte,
        numpy.uint8: opcua.ua.VariantType.Byte,
        numpy.int16: opcua.ua.VariantType.Int16,
        numpy.uint16: opcua.ua.VariantType.UInt16,
        numpy.int32: opcua.ua.VariantType.Int32,
        numpy.uint32: opcua.ua.VariantType.UInt32,
        numpy.int64: opcua.ua.VariantType.Int64,
        numpy.uint64: opcua.ua.VariantType.UInt64,
        numpy.datetime_data: opcua.ua.VariantType.DateTime, # is this the right type, does it even matter?
        numpy.float32: opcua.ua.VariantType.Float,
        numpy.double: opcua.ua.VariantType.Double,
        numpy.float64: opcua.ua.VariantType.Double,
        numpy.str_: opcua.ua.VariantType.String,
        numpy.str: opcua.ua.VariantType.String,
        str: opcua.ua.VariantType.String
    }
    
    # <class 'numpy.bool_'>
    
    class OPCUAConnection(CommClient):
        """
          Connects to OPC-UA in the foreground or background, and sends HELLO
          messages to keep a check on the connection. On connection failure, reconnects once.
        """
    
        def start(self):
            super().start()
    
        def __init__(self, address, namespace, timeout, fault_func, streams, try_interval=2):
            """
            Create the OPC ua client and connect() to it and get the object node
            """
            super().__init__(fault_func, streams, try_interval)
    
            self.client = Client(address, timeout)
    
            # Explicitly connect
            if not self.connect():
                # hardware or infra is down -- needs fixing first
                fault_func()
                return
    
            # determine namespace used
            try:
                if type(namespace) is str:
                    self.name_space_index = self.client.get_namespace_index(namespace)
                elif type(namespace) is int:
                    self.name_space_index = namespace
    
            except Exception as e:
                #TODO remove once SDP is fixed
                self.streams.warn_stream("Cannot determine the OPC-UA name space index.  Will try and use the default = 2.")
                self.name_space_index = 2
    
            self.obj = self.client.get_objects_node()
    
        def _servername(self):
            return self.client.server_url.geturl()
    
        def connect(self):
            """
            Try to connect to the client
            """
    
            try:
                self.streams.debug_stream("Connecting to server %s", self._servername())
                self.client.connect()
                self.connected = True
                self.streams.debug_stream("Connected to %s. Initialising.", self._servername())
                return True
            except socket.error as e:
                self.streams.error_stream("Could not connect to server %s: %s", self._servername(), e)
                raise Exception("Could not connect to server %s", self._servername()) from e
    
    
        def disconnect(self):
            """
            disconnect from the client
            """
            self.connected = False  # always force a reconnect, regardless of a successful disconnect
    
            try:
                self.client.disconnect()
            except Exception as e:
                self.streams.error_stream("Disconnect from OPC-UA server %s failed: %s", self._servername(), e)
    
        def ping(self):
            """
            ping the client to make sure the connection with the client is still functional.
            """
            try:
                self.client.send_hello()
            except Exception as e:
                raise Exception("Lost connection to server %s: %s", self._servername(), e)
    
        def _setup_annotation(self, annotation):
            """
            This class's Implementation of the get_mapping function. returns the read and write functions
            """
    
            if isinstance(annotation, dict):
                # check if required path inarg is present
                if annotation.get('path') is None:
                    raise Exception("OPC-ua mapping requires a path argument in the annotation, was given: %s", annotation)
    
                path = annotation.get("path")  # required
            elif isinstance(annotation, list):
                path = annotation
            else:
                raise Exception("OPC-ua mapping requires either a list of the path or dict with the path. Was given %s type containing: %s", type(annotation), annotation)
    
            try:
                node = self.obj.get_child(path)
            except Exception as e:
                self.streams.error_stream("Could not get node: %s on server %s: %s", path, self._servername(), e)
                raise Exception("Could not get node: %s on server %s", path, self._servername()) from e
    
            return node
    
        def setup_value_conversion(self, attribute):
            """
            gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
            the OPC ua read/write functions require the dimensionality and the type to be known
            """
    
            dim_x = attribute.dim_x
            dim_y = attribute.dim_y
            ua_type = numpy_to_OPCua_dict[attribute.numpy_type]	 # convert the numpy type to a corresponding UA type
    
            return dim_x, dim_y, ua_type
    
        def setup_attribute(self, annotation, attribute):
            """
            MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
            """
    
            # process the annotation
            node = self._setup_annotation(annotation)
    
            # get all the necessary data to set up the read/write functions from the attribute_wrapper
            dim_x, dim_y, ua_type = self.setup_value_conversion(attribute)
    
            # configure and return the read/write functions
            prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type)
    
            try:
                # NOTE: debug statement tries to get the qualified name, this may not always work. in that case forgo the name and just print the path
                node_name = str(node.get_browse_name())[len("QualifiedName(2:"):]
                self.streams.debug_stream("connected OPC ua node {} of type {} to attribute with dimensions: {} x {} ".format(str(node_name)[:len(node_name)-1], str(ua_type)[len("VariantType."):], dim_x, dim_y))
            except:
                pass
    
            # return the read/write functions
            return prot_attr.read_function, prot_attr.write_function
    
    
    class ProtocolAttribute:
        """
        This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
        """
    
        def __init__(self, node, dim_x, dim_y, ua_type):
            self.node = node
            self.dim_y = dim_y
            self.dim_x = dim_x
            self.ua_type = ua_type
    
        def read_function(self):
            """
            Read_R function
            """
            value = numpy.array(self.node.get_value())
    
            if self.dim_y != 0:
                value = numpy.array(numpy.split(value, indices_or_sections=self.dim_y))
            else:
                value = numpy.array(value)
            return value
    
        def write_function(self, value):
            """
            write_RW function
            """
            # set_data_value(opcua.ua.uatypes.Variant(value = value.tolist(), varianttype=opcua.ua.VariantType.Int32))
    
            if self.dim_y != 0:
                v = numpy.concatenate(value)
                self.node.set_data_value(opcua.ua.uatypes.Variant(value=v.tolist(), varianttype=self.ua_type))
    
            elif self.dim_x != 1:
                self.node.set_data_value(opcua.ua.uatypes.Variant(value=value.tolist(), varianttype=self.ua_type))
            else:
                self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type))