Skip to content
Snippets Groups Projects
SDP.py 15.5 KiB
Newer Older
Thomas Juerges's avatar
Thomas Juerges committed
# -*- coding: utf-8 -*-
#
# This file is part of the SDP project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.

""" SDP Device Server for LOFAR2.0

"""

# PyTango imports
from tango import DebugIt
from tango.server import run
from tango.server import Device
from tango.server import attribute, command
from tango.server import device_property
from tango import AttrQuality, DispLevel, DevState
from tango import AttrWriteType, PipeWriteType
# Additional import
import sys
import opcua
import numpy

from wrappers import only_in_states, only_when_on, fault_on_error
from opcua_connection import OPCUAConnection

__all__ = ["SDP", "main"]

class SDP(Device):
    """

    **Properties:**

    - Device Property
        OPC_Server_Name
            - Type:numpy.str
Thomas Juerges's avatar
Thomas Juerges committed
        OPC_Server_Port
            - Type:'DevULong'
        OPC_Time_Out
Thomas Juerges's avatar
Thomas Juerges committed

    States are as follows:
        INIT    = Device is initialising.
        STANDBY = Device is initialised, but pends external configuration and an explicit turning on,
        ON      = Device is fully configured, functional, controls the hardware, and is possibly actively running,
        FAULT   = Device detected an unrecoverable error, and is thus malfunctional,
        OFF     = Device is turned off, drops connection to the hardware,

    The following state transitions are implemented:
        boot -> OFF:     Triggered by tango.  Device will be instantiated,
        OFF  -> INIT:    Triggered by device. Device will initialise (connect to hardware, other devices),
        INIT -> STANDBY: Triggered by device. Device is initialised, and is ready for additional configuration by the user,
        STANDBY -> ON:   Triggered by user.   Device reports to be functional,
        * -> FAULT:      Triggered by device. Device has degraded to malfunctional, for example because the connection to the hardware is lost,
        * -> FAULT:      Triggered by user.   Emulate a forced malfunction for integration testing purposes,
        * -> OFF:        Triggered by user.   Device is turned off. Triggered by the Off() command,
        FAULT -> INIT:   Triggered by user.   Device is reinitialised to recover from an error,

        The user triggers their transitions by the commands reflecting the target state (Initialise(), On(), Fault()).
    """
Thomas Juerges's avatar
Thomas Juerges committed
    client = 0
    name_space_index = 0
    obj = 0

    # -----------------
    # Device Properties
    # -----------------

    OPC_Server_Name = device_property(
        dtype=numpy.str,
Thomas Juerges's avatar
Thomas Juerges committed
        mandatory=True
    )

    OPC_Server_Port = device_property(
Thomas Juerges's avatar
Thomas Juerges committed
        mandatory=True
    )

    OPC_Time_Out = device_property(
Thomas Juerges's avatar
Thomas Juerges committed
        mandatory=True
    )

    # ----------
    # Attributes
    # ----------
    fpga_mask_RW = attribute(
        access=AttrWriteType.READ_WRITE,
    )

Thomas Juerges's avatar
Thomas Juerges committed
    fpga_scrap_R = attribute(
        max_dim_x = 2048,
    )

Thomas Juerges's avatar
Thomas Juerges committed
    fpga_scrap_RW = attribute(
        max_dim_x = 2048,
Thomas Juerges's avatar
Thomas Juerges committed
        access=AttrWriteType.READ_WRITE,
    )

    fpga_status_R = attribute(
Thomas Juerges's avatar
Thomas Juerges committed
    )

    fpga_temp_R = attribute(
    fpga_version_R = attribute(
        dtype = (numpy.str,),
Thomas Juerges's avatar
Thomas Juerges committed

Thomas Juerges's avatar
Thomas Juerges committed
    fpga_weights_R = attribute(
Thomas Juerges's avatar
Thomas Juerges committed
        max_dim_x = 12 * 488 * 2, max_dim_y = 16,
    )

    fpga_weights_RW = attribute(
Thomas Juerges's avatar
Thomas Juerges committed
        max_dim_x = 12 * 488 * 2, max_dim_y = 16,
        access=AttrWriteType.READ_WRITE,
    )

Thomas Juerges's avatar
Thomas Juerges committed
    tr_busy_R = attribute(
        dtype = (numpy.bool_),
Thomas Juerges's avatar
Thomas Juerges committed
    tr_reload_RW = attribute(
        dtype = (numpy.bool_),
        access=AttrWriteType.READ_WRITE,
Thomas Juerges's avatar
Thomas Juerges committed
    tr_tod_R = attribute(
        dtype = (numpy.uint64),
Thomas Juerges's avatar
Thomas Juerges committed
    tr_uptime_R = attribute(
        dtype = (numpy.uint64,),
Thomas Juerges's avatar
Thomas Juerges committed

    # ---------------
    # General methods
    # ---------------
    def get_node(self, node):
Thomas Juerges's avatar
Thomas Juerges committed
        try:
            return self.lofar_device_node.get_child(["{}:{}".format(self.name_space_index, node)])
Thomas Juerges's avatar
Thomas Juerges committed
        except opcua.ua.uaerrors._auto.BadNoMatch:
            self.error_stream("Could not find LOFAR device %s node %s", self.device, node)
Thomas Juerges's avatar
Thomas Juerges committed

            # Contract with hardware is broken --- cannot recover
            raise

    def _map_attributes(self):
        try:
            self.name_space_index = self.client.get_namespace_index("http://lofar.eu")
        except Exception as e:
            self.name_space_index = 1
            self.warn_stream("Cannot determine the OPC-UA name space index.  Will try and use the default = %d." % (self.name_space_index))
Thomas Juerges's avatar
Thomas Juerges committed

        self.obj_node = self.client.get_objects_node()
        # TODO
        # The server does not implement the correct namespace yet.
        # Instead it is directly using the Objects node.
        #self.lofar_device_node = self.obj_node.get_child(["{}:SDP".format(self.name_space_index)])
        self.lofar_device_node = self.obj_node
Thomas Juerges's avatar
Thomas Juerges committed

        self.info_stream("Mapping OPC-UA MP/CP to attributes...")
Thomas Juerges's avatar
Thomas Juerges committed

        self.attribute_mapping["fpga_mask_RW"] = self.get_node("fpga_mask_RW")
        self.attribute_mapping["fpga_scrap_R"] = self.get_node("fpga_scrap_R")
Thomas Juerges's avatar
Thomas Juerges committed
        self.attribute_mapping["fpga_scrap_RW"] = self.get_node("fpga_scrap_RW")
        self.attribute_mapping["fpga_status_R"] = self.get_node("fpga_status_R")
        self.attribute_mapping["fpga_temp_R"] = self.get_node("fpga_temp_R")
        self.attribute_mapping["fpga_version_R"] = self.get_node("fpga_version_R")
Thomas Juerges's avatar
Thomas Juerges committed
        self.attribute_mapping["fpga_weights_R"] = self.get_node("fpga_weights_R")
        self.attribute_mapping["fpga_weights_RW"] = self.get_node("fpga_weights_RW")
Thomas Juerges's avatar
Thomas Juerges committed
        self.attribute_mapping["tr_busy_R"] = self.get_node("tr_busy_R")
        self.attribute_mapping["tr_reload_RW"] = self.get_node("tr_reload_W")
        self.attribute_mapping["tr_tod_R"] = self.get_node("tr_tod_R")
        self.attribute_mapping["tr_uptime_R"] = self.get_node("tr_uptime_R")
Thomas Juerges's avatar
Thomas Juerges committed

        self.info_stream("Mapping OPC-UA MP/CP to attributes done.")
Thomas Juerges's avatar
Thomas Juerges committed

    def init_device(self):
        """ Instantiates the device in the OFF state. """

        # NOTE: Will delete_device first, if necessary
        Device.init_device(self)

        self.set_state(DevState.OFF)

    def initialise(self):
        """Initialises the attributes and properties of the SDP."""

        self.set_state(DevState.INIT)

        # Init the dict that contains attribute to OPC-UA MP/CP mappings.
        self.attribute_mapping = {}

        # Set default values in the RW/R attributes and add them to
        # the mapping.
        self._fpga_mask_RW = numpy.full(16, False)
        self.attribute_mapping["fpga_mask_RW"] = {}
        self._fpga_scrap_R = numpy.full(2048, False)
        self.attribute_mapping["fpga_scrap_R"] = {}
Thomas Juerges's avatar
Thomas Juerges committed
        self._fpga_scrap_RW = numpy.full(2048, False)
        self.attribute_mapping["fpga_scrap_RW"] = {}
        self._fpga_status_R = numpy.full(16, False)
        self.attribute_mapping["fpga_status_R"] = {}
        self._fpga_temp_R = numpy.full(16, 0.0)
Thomas Juerges's avatar
Thomas Juerges committed
        self.attribute_mapping["fpga_temp_R"] = {}
        self._fpga_version_R = numpy.full(16, "NO_VERSION_INFO_YET")
        self.attribute_mapping["fpga_version_R"] = {}
Thomas Juerges's avatar
Thomas Juerges committed
        self._fpga_weights_R = numpy.full((16, 2 * 488 * 12), 0)
        self.attribute_mapping["fpga_weights_R"] = {}
        self._fpga_weights_RW = numpy.full((16, 2 * 488 * 12), 0)
        self.attribute_mapping["fpga_weights_RW"] = {}
Thomas Juerges's avatar
Thomas Juerges committed
        self._tr_busy_R = False
        self.attribute_mapping["tr_busy_R"] = {}
        self._tr_reload_RW = False
        self.attribute_mapping["tr_reload_RW"] = {}
        self._tr_tod_R = 0
        self.attribute_mapping["tr_tod_R"] = {}
        self._tr_uptime_R = 0
        self.attribute_mapping["tr_uptime_R"] = {}
Thomas Juerges's avatar
Thomas Juerges committed

        # Init the dict that contains function to OPC-UA function mappings.
        self.function_mapping = {}

        self.client = opcua.Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_Time_Out) # timeout in seconds

        # Connect to OPC-UA -- will set ON state on success in case of a reconnect
        self.opcua_connection = OPCUAConnection(self.client, self.Standby, self.Fault, self)

        # Explicitly connect
        if not self.opcua_connection.connect():
            # hardware or infra is down -- needs fixing first
            self.Fault()
            return

        # Retrieve and map server attributes
        try:
            self._map_attributes()
        except Exception as e:
            self.error_stream("Could not map server interface: %s", e)
            self.Fault()
            return

        # Start keep-alive
        self.opcua_connection.start()

        # Set the masks.
        #
        # Attention!
        # Set the masks only after the OPCUA connection has been
        # established!  The setting of the masks needs to go through
        # to the server.
        #
        # TODO
        # Read default masks from config DB
        #self.write_fpga_mask_RW(self._fpga_mask_R)
Thomas Juerges's avatar
Thomas Juerges committed

        # Everything went ok -- go standby.
        self.set_state(DevState.STANDBY)


    def always_executed_hook(self):
        """Method always executed before any TANGO command is executed."""
        pass

    @DebugIt()
    def delete_device(self):
        """Hook to delete resources allocated in init_device.

        This method allows for any memory or other resources allocated in the
        init_device method to be released.  This method is called by the device
        destructor and by the device Init command (a Tango built-in).
        """
        self.Off()


    # ------------------
    # Attributes methods
    # ------------------
    @only_when_on
    @fault_on_error
    def read_fpga_mask_RW(self):
        """Return the fpga_mask_RW attribute."""
        return self._fpga_mask_RW

    @only_when_on
    @fault_on_error
    def write_fpga_mask_RW(self, value):
        """Return the fpga_mask_RW attribute."""
        self.attribute_mapping["fpga_mask_RW"].set_value(value.tolist())
Thomas Juerges's avatar
Thomas Juerges committed
        self._fpga_mask_RW = value
        return

    @only_when_on
    @fault_on_error
    def read_fpga_scrap_R(self):
        """Return the fpga_scrap_R attribute."""
        self._fpga_scrap_R = numpy.array(self.attribute_mapping["fpga_scrap_R"].get_value(), dtype = numpy.int32)
        return self._fpga_scrap_R

    @only_when_on
    @fault_on_error
    def read_fpga_scrap_RW(self):
        """Return the fpga_scrap_RW attribute."""
        return self._fpga_scrap_RW

    @only_when_on
    @fault_on_error
    def write_fpga_scrap_RW(self, value):
        """Return the fpga_scrap_RW attribute."""
        self.attribute_mapping["fpga_scrap_RW"].set_data_value(opcua.ua.uatypes.Variant(value = value.tolist(), varianttype=opcua.ua.VariantType.Int32))
        _fpga_scrap_RW = value

Thomas Juerges's avatar
Thomas Juerges committed
    @only_when_on
    @fault_on_error
    def read_fpga_status_R(self):
        """Return the fpga_status_R attribute."""
        self._fpga_status_R = numpy.array(self.attribute_mapping["fpga_status_R"].get_value())
        return self._fpga_status_R
Thomas Juerges's avatar
Thomas Juerges committed

    @only_when_on
    @fault_on_error
    def read_fpga_temp_R(self):
        """Return the fpga_temp_R attribute."""
        self._fpga_temp_R = numpy.array(self.attribute_mapping["fpga_temp_R"].get_value())
Thomas Juerges's avatar
Thomas Juerges committed
        return self._fpga_temp_R

    @only_when_on
    @fault_on_error
    def read_fpga_version_R(self):
        """Return the fpga_version_R attribute."""
        self._fpga_version_R = numpy.array(self.attribute_mapping["fpga_version_R"].get_value())
        return self._fpga_version_R

Thomas Juerges's avatar
Thomas Juerges committed
    @only_when_on
    @fault_on_error
    def read_fpga_weights_R(self):
        """Return the fpga_weights_R attribute."""
        value = numpy.array(numpy.split(numpy.array(self.attribute_mapping["fpga_weights_R"].get_value(), dtype = numpy.int16), indices_or_sections = 16))
Thomas Juerges's avatar
Thomas Juerges committed
        self._fpga_weights_R = value
        return self._fpga_weights_R

    @only_when_on
    @fault_on_error
    def read_fpga_weights_RW(self):
        """Return the fpga_weights_RW attribute."""
        return self._fpga_weights_RW

    @only_when_on
    @fault_on_error
    def write_fpga_weights_RW(self, value):
        """Return the fpga_weights_RW attribute."""
        self.attribute_mapping["fpga_weights_RW"].set_data_value(opcua.ua.uatypes.Variant(value = value.flatten().tolist(), varianttype=opcua.ua.VariantType.Int16))
Thomas Juerges's avatar
Thomas Juerges committed
        self._fpga_weights_RW = value
Thomas Juerges's avatar
Thomas Juerges committed

    @only_when_on
    @fault_on_error
Thomas Juerges's avatar
Thomas Juerges committed
    def read_tr_busy_R(self):
        """Return the tr_busy_R attribute."""
        self._tr_busy_R = self.attribute_mapping["tr_busy_R"].get_value()
        return self._tr_busy_R

    @only_when_on
    @fault_on_error
    def read_tr_reload_RW(self):
        """Return the tr_reload_RW attribute."""
        self._tr_reload_RW = self.attribute_mapping["tr_reload_RW"].get_value()
        return self._tr_reload_RW
Thomas Juerges's avatar
Thomas Juerges committed

    @only_when_on
    @fault_on_error
Thomas Juerges's avatar
Thomas Juerges committed
    def write_tr_reload_RW(self, value):
        """Return the tr_reload_RW attribute."""
        self.attribute_mapping["tr_reload_RW"].set_value(value)
        self._tr_reload_RW = value
Thomas Juerges's avatar
Thomas Juerges committed

    @only_when_on
    @fault_on_error
Thomas Juerges's avatar
Thomas Juerges committed
    def read_tr_tod_R(self):
        """Return the _tr_tod_R attribute."""
        self._tr_tod_R = self.attribute_mapping["tr_tod_R"].get_value()
        return self._tr_tod_R
Thomas Juerges's avatar
Thomas Juerges committed

    @only_when_on
    @fault_on_error
Thomas Juerges's avatar
Thomas Juerges committed
    def read_tr_uptime_R(self):
        """Return the _tr_uptime_R attribute."""
        self._tr_uptime_R = self.attribute_mapping["tr_uptime_R"].get_value()
        return self._tr_uptime_R
Thomas Juerges's avatar
Thomas Juerges committed

Thomas Juerges's avatar
Thomas Juerges committed

    # --------
    # Commands
    # --------
    @command()
    @only_in_states([DevState.FAULT, DevState.OFF])
    @DebugIt()
    def Initialise(self):
        """
        Command to ask for initialisation of this device. Can only be called in FAULT or OFF state.

        :return:None
        """
        self.initialise()

    @only_in_states([DevState.INIT])
    def Standby(self):
        """
        Command to ask for initialisation of this device. Can only be called in FAULT or OFF state.

        :return:None
        """
        self.set_state(DevState.STANDBY)

    @command()
    @only_in_states([DevState.STANDBY])
    @DebugIt()
    def On(self):
        """
        Command to ask for initialisation of this device. Can only be called in FAULT or OFF state.

        :return:None
        """
        self.set_state(DevState.ON)

    @command()
    @DebugIt()
    def Off(self):
        """
        Command to ask for shutdown of this device.

        :return:None
        """
        if self.get_state() == DevState.OFF:
          # Already off. Don't complain.
          return

        # Turn off
        self.set_state(DevState.OFF)

        # Stop keep-alive
        self.opcua_connection.stop()

        # Turn off again, in case of race conditions through reconnecting
        self.set_state(DevState.OFF)

    @command()
    @only_in_states([DevState.ON, DevState.INIT, DevState.STANDBY])
    @DebugIt()
    def Fault(self):
        """
        FAULT state is used to indicate our connection with the OPC-UA server is down.

        This device will try to reconnect once, and transition to the ON state on success.

        If reconnecting fails, the user needs to call Initialise() to retry to restart this device.

        :return:None
        """
        self.set_state(DevState.FAULT)


# ----------
# Run server
# ----------
def main(args=None, **kwargs):
    """Main function of the SDP module."""
    return run((SDP,), args=args, **kwargs)


if __name__ == '__main__':
    main()