Skip to content
Snippets Groups Projects
StatsCrosslet.py 10.8 KiB
Newer Older
# -*- coding: utf-8 -*-
#
# This file is part of the StatsCrosslet project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.

""" OPC-UA client for LOFAR stations crosslet stats

"""

# PyTango imports
import tango
from tango import DebugIt
from tango.server import run
from tango.server import Device
from tango.server import attribute, command, pipe
from tango.server import device_property
from tango import AttrQuality, DispLevel, DevState
from tango import AttrWriteType, PipeWriteType
# Additional import
# PROTECTED REGION ID(StatsCrosslet.additionnal_import) ENABLED START #
import opcua
import numpy
import traceback
import threading
import time
# PROTECTED REGION END #    //  StatsCrosslet.additionnal_import

__all__ = ["StatsCrosslet", "main"]


class StatsCrosslet(Device):
    """

    **Properties:**

    - Device Property
        OPC_Server_Name
            - Type:'DevString'
        OPC_Server_Port
            - Type:'DevULong'
        OPC_time_out
            - Type:'DevULong'
        Default_pause_time
            - Type:'DevDouble'
        Default_subband
            - Type:'DevULong'
        Default_integration_time
            - Type:'DevDouble'
    """
    # PROTECTED REGION ID(StatsCrosslet.class_variable) ENABLED START #
    client = 0
    ns = 0
    obj = 0
    record_cross = 0
    data = []
    data_lock = threading.Lock()
    stop_data_read_loop = threading.Event()
    data_acquisition_is_active = threading.Event()
    data_read_loop = threading.Thread()

    def read_data(self):
        # This is the the thread that continuously reads the crosslet
        # statistics from the station and feeds it into the Tango
        # property system.
        while self.stop_data_read_loop.is_set() is False:
            time.sleep(self._pause_time)
            if self.data_acquisition_is_active.is_set() is True:
                data = []
                try:
                    timestamp, visibilities, rcu_modes = self.obj.call_method(self.record_cross, self._subband, self._integration_time)
                    self.data_lock.acquire()
                    self._time_stamp = timestamp
                    self._raw_visibilities = visibilities
                    self._visibilities = numpy.array(visibilities)[0] + 1j * numpy.array(visibilities[1])
                    self._rcu_modes = rcu_modes
                    self.data_lock.release()

                except Exception as e:
                    print("Cannot call the method %s in the OPC-UA server %s.  Trace: %s" % (self.record_cross, self.OPC_Server_Name, traceback.format_exc
()))
    # PROTECTED REGION END #    //  StatsCrosslet.class_variable

    # -----------------
    # Device Properties
    # -----------------

    OPC_Server_Name = device_property(
        dtype='DevString',
        default_value="okeanos"
    )

    OPC_Server_Port = device_property(
        dtype='DevULong',
        default_value=55556
    )

    OPC_time_out = device_property(
        dtype='DevULong',
        default_value=1000
    )

    Default_pause_time = device_property(
        dtype='DevDouble',
        default_value=60.0
    )

    Default_subband = device_property(
        dtype='DevULong',
        default_value=150
    )

    Default_integration_time = device_property(
        dtype='DevDouble',
        default_value=1.0
    )

    # ----------
    # Attributes
    # ----------

    subband = attribute(
        dtype='DevUShort',
        access=AttrWriteType.READ_WRITE,
    )

    integration_time = attribute(
        dtype='DevDouble',
    )

    time_stamp = attribute(
        dtype='DevDouble',
    )

    pause_time = attribute(
        dtype='DevDouble',
        access=AttrWriteType.READ_WRITE,
    )

    visibilities = attribute(
        dtype=('DevDouble',),
        max_dim_x=96,
    )

    rcu_modes = attribute(
        dtype=('DevString',),
        max_dim_x=96,
    )

    raw_visibilities = attribute(
        dtype=(('DevDouble',),),
        max_dim_x=96, max_dim_y=96,
    )

    # -----
    # Pipes
    # -----

    pipe_visibilities = pipe(
    )
    pipe_raw_visibilities = pipe(
    )

    # ---------------
    # General methods
    # ---------------

    def init_device(self):
        """Initialises the attributes and properties of the StatsCrosslet."""
        Device.init_device(self)
        # PROTECTED REGION ID(StatsCrosslet.init_device) ENABLED START #
        self._visibilities = (0.0,)
        try:
            self.client = opcua.Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_time_out)
            self.client.connect()
            ns = self.client.get_namespace_index("http://lofar.eu")
            self.obj = self.client.get_root_node().get_child(["0:Objects", "{}:StationMetrics".format(ns),                                            "{}:RCU".format(ns)])
            self.record_cross = "{}:record_cross".format(ns)

            # Set default values in the read/write attributes
            self._pause_time = self.Default_pause_time
            self._integration_time = self.Default_integration_time
            self._subband = self.Default_subband

            self.data_read_loop = threading.Thread(target = self.read_data)
            self.data_acquisition_is_active.set()
            self.stop_data_read_loop.clear()
            self.data_read_loop.start()
        except Exception as e:
            print("Cannot connect to the OPC-UA server %s.  Trace: %s" % (self.OPC_Server_Name, traceback.format_exc()))
        # PROTECTED REGION END #    //  StatsCrosslet.init_device

    def always_executed_hook(self):
        """Method always executed before any TANGO command is executed."""
        # PROTECTED REGION ID(StatsCrosslet.always_executed_hook) ENABLED START #
        # PROTECTED REGION END #    //  StatsCrosslet.always_executed_hook

    def delete_device(self):
        """Hook to delete resources allocated in init_device.

        This method allows for any memory or other resources allocated in the
        init_device method to be released.  This method is called by the device
        destructor and by the device Init command.
        """
        # PROTECTED REGION ID(StatsCrosslet.delete_device) ENABLED START #
        self.data_acquisition_is_active.clear()
        self.stop_data_read_loop.set()
        self.data_read_loop.join()
        if self.client is not Null:
            self.client.close_session()
            self.client_secure_channel()
        # PROTECTED REGION END #    //  StatsCrosslet.delete_device
    # ------------------
    # Attributes methods
    # ------------------

    def read_subband(self):
        # PROTECTED REGION ID(StatsCrosslet.subband_read) ENABLED START #
        """Return the subband attribute."""
        return self._subband
        # PROTECTED REGION END #    //  StatsCrosslet.subband_read

    def write_subband(self, value):
        # PROTECTED REGION ID(StatsCrosslet.subband_write) ENABLED START #
        """Set the subband attribute."""
        self._subband = value
        # PROTECTED REGION END #    //  StatsCrosslet.subband_write

    def read_integration_time(self):
        # PROTECTED REGION ID(StatsCrosslet.integration_time_read) ENABLED START #
        """Return the integration_time attribute."""
        return self._integration_time
        # PROTECTED REGION END #    //  StatsCrosslet.integration_time_read

    def read_time_stamp(self):
        # PROTECTED REGION ID(StatsCrosslet.time_stamp_read) ENABLED START #
        """Return the time_stamp attribute."""
        self.data_lock.acquire()
        time_stamp = self._time_stamp
        self.data_lock.release()
        return time_stamp
        # PROTECTED REGION END #    //  StatsCrosslet.time_stamp_read

    def read_pause_time(self):
        # PROTECTED REGION ID(StatsCrosslet.pause_time_read) ENABLED START #
        """Return the pause_time attribute."""
        return self._pause_time
        # PROTECTED REGION END #    //  StatsCrosslet.pause_time_read

    def write_pause_time(self, value):
        # PROTECTED REGION ID(StatsCrosslet.pause_time_write) ENABLED START #
        """Set the pause_time attribute."""
        self._pause_time = value
        # PROTECTED REGION END #    //  StatsCrosslet.pause_time_write

    def read_visibilities(self):
        # PROTECTED REGION ID(StatsCrosslet.visibilities_read) ENABLED START #
        """Return the visibilities attribute."""
        self.data_lock.acquire()
        visibilities = self._visibilities
        self.data_lock.release()
        return visibilities
        # PROTECTED REGION END #    //  StatsCrosslet.visibilities_read

    def read_rcu_modes(self):
        # PROTECTED REGION ID(StatsCrosslet.rcu_modes_read) ENABLED START #
        """Return the rcu_modes attribute."""
        self.data_lock.acquire()
        rcu_modes = self._rcu_modes
        self.data_lock.release()
        return rcu_modes
        # PROTECTED REGION END #    //  StatsCrosslet.rcu_modes_read

    def read_raw_visibilities(self):
        # PROTECTED REGION ID(StatsCrosslet.raw_visibilities_read) ENABLED START #
        """Return the raw_visibilities attribute."""
        self.data_lock.acquire()
        raw_visibilities = self._raw_visibilities
        self.data_lock.release()
        return raw_visibilities
        # PROTECTED REGION END #    //  StatsCrosslet.raw_visibilities_read

    # -------------
    # Pipes methods
    # -------------

    def read_pipe_visibilities(self):
        # PROTECTED REGION ID(StatsCrosslet.pipe_visibilities_read) ENABLED START #
        return dict(x=0, y=0)
        # PROTECTED REGION END #    //  StatsCrosslet.pipe_visibilities_read

    def read_pipe_raw_visibilities(self):
        # PROTECTED REGION ID(StatsCrosslet.pipe_raw_visibilities_read) ENABLED START #
        return dict(x=0, y=0)
        # PROTECTED REGION END #    //  StatsCrosslet.pipe_raw_visibilities_read

    # --------
    # Commands
    # --------

    @command(
    )
    @DebugIt()
    def start_acquisition(self):
        # PROTECTED REGION ID(StatsCrosslet.start_acquisition) ENABLED START #
        """
        Start the data acquisition of the station`s crosslet stats.

        :param argin: 'DevULong'

        :return:None
        """
        self.data_acquisition_is_active.set()
        # PROTECTED REGION END #    //  StatsCrosslet.start_acquisition

    @command(
    )
    @DebugIt()
    def stop_acquisition(self):
        # PROTECTED REGION ID(StatsCrosslet.stop_acquisition) ENABLED START #
        """
        Stop the data acquisition.

        :return:None
        """
        self.data_acquisition_is_active.clear()
        # PROTECTED REGION END #    //  StatsCrosslet.stop_acquisition

# ----------
# Run server
# ----------


def main(args=None, **kwargs):
    """Main function of the StatsCrosslet module."""
    # PROTECTED REGION ID(StatsCrosslet.main) ENABLED START #
    return run((StatsCrosslet,), args=args, **kwargs)
    # PROTECTED REGION END #    //  StatsCrosslet.main


if __name__ == '__main__':
    main()