Skip to content
Snippets Groups Projects
StatsCrosslet.py 13.5 KiB
Newer Older
# -*- coding: utf-8 -*-
#
# This file is part of the StatsCrosslet project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.

""" OPC-UA client for LOFAR stations crosslet stats

"""

# PyTango imports
import tango
from tango import DebugIt
from tango.server import run
from tango.server import Device
from tango.server import attribute, command
from tango.server import device_property
from tango import AttrQuality, DispLevel, DevState
from tango import AttrWriteType, PipeWriteType
# Additional import
# PROTECTED REGION ID(StatsCrosslet.additionnal_import) ENABLED START #
import opcua
import numpy
import traceback
import threading
from datetime import datetime
# PROTECTED REGION END #    //  StatsCrosslet.additionnal_import

__all__ = ["StatsCrosslet", "main"]


class StatsCrosslet(Device):
    """

    **Properties:**

    - Device Property
        OPC_Server_Name
            - Type:'DevString'
        OPC_Server_Port
            - Type:'DevULong'
        OPC_Time_out
            - Type:'DevDouble'
        Default_pause_time
            - Type:'DevDouble'
        Default_subband
            - Type:'DevULong'
        Default_integration_time
            - Type:'DevDouble'
    """
    # PROTECTED REGION ID(StatsCrosslet.class_variable) ENABLED START #
    client = 0
    ns = 0
    obj = 0
    record_cross = 0
    stop_data_read_loop = False
    data_acquisition_is_active = False
    data_read_loop = threading.Thread()

    def read_data(self):
        # This is the the thread that continuously reads the crosslet
        # statistics from the station and feeds it into the Tango
        # property system.
        self.debug_stream("Entering read_data loop.")
        while self.stop_data_read_loop is False:
            self.debug_stream("read_data is running...")
            threading.Event().wait(self._pause_time)
            if self.data_acquisition_is_active is True:
                    self.debug_stream("fetching data:  subband = %d, integration_time = %d", self._subband, self._integration_time)
                    t, visibilities_list, rcu_modes = self.obj.call_method(self.record_cross, self._subband, self._integration_time)
                    self.debug_stream("fetching data done: t = %s, len(visibilities_list) = %d, len(rcu_modes) = %d", t, len(visibilities_list), len(rcu_modes))
                    visibilities = numpy.array(visibilities_list)[0] + 1j * numpy.array(visibilities_list[1])
                    self._visibilities_real = visibilities.real
                    self._visibilities_imag = visibilities.imag
                    self._rcu_modes = rcu_modes
                    self.debug_stream("timestamp = %s, visibilities_real [0][1] = %f", self._time_stamp, self._visibilities_real[0][1])
                except KeyboardInterrupt:
                    self.debug_stream("KBD interrupt caught, leaving data read loop.")
                    self.stop_data_read_loop = True
                except Exception as e:
                    self.error_stream("Exception: %s.  Cannot call the method %s in the OPC-UA server %s.  Trace: %s", e, self.record_cross, self.OPC_Server_Name, traceback.format_exc())

    # PROTECTED REGION END #    //  StatsCrosslet.class_variable

    # -----------------
    # Device Properties
    # -----------------

    OPC_Server_Name = device_property(
        dtype='DevString',
        default_value="okeanos"
    )

    OPC_Server_Port = device_property(
        dtype='DevULong',
        default_value=55556
    )

    OPC_Time_out = device_property(
        dtype='DevDouble',
        default_value=1.0
    Default_pause_time = device_property(
        dtype='DevDouble',
    )

    Default_subband = device_property(
        dtype='DevULong',
        default_value=150
    )

    Default_integration_time = device_property(
        dtype='DevDouble',
        default_value=1.0
    )

    # ----------
    # Attributes
    # ----------

    subband = attribute(
        dtype='DevUShort',
        access=AttrWriteType.READ_WRITE,
    )

    integration_time = attribute(
        dtype='DevDouble',
    )

    time_stamp = attribute(
    )

    pause_time = attribute(
        dtype='DevDouble',
        access=AttrWriteType.READ_WRITE,
    )

    rcu_modes = attribute(
        dtype=('DevString',),
        max_dim_x=96,
    )

    visibilities_imag = attribute(
        dtype=(('DevDouble',),),
        max_dim_x=96, max_dim_y=96,
    )

    visibilities_real = attribute(
        dtype=(('DevDouble',),),
        max_dim_x=96, max_dim_y=96,
    )

    # ---------------
    # General methods
    # ---------------

    def init_device(self):
        """Initialises the attributes and properties of the StatsCrosslet."""
        Device.init_device(self)
        # PROTECTED REGION ID(StatsCrosslet.init_device) ENABLED START #
        try:
            self.debug_stream("Connecting to OPC-UA server %s:%d...", self.OPC_Server_Name, self.OPC_Server_Port)
            self.client = opcua.Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_Time_out * 1000)
            self.client.connect()
            ns = self.client.get_namespace_index("http://lofar.eu")
            self.obj = self.client.get_root_node().get_child(["0:Objects", "{}:StationMetrics".format(ns),                                            "{}:RCU".format(ns)])
            self.record_cross = "{}:record_cross".format(ns)
            self.debug_stream("Connecting to OPC-UA server %s:%d done.", self.OPC_Server_Name, self.OPC_Server_Port)
            # Set default values in the read/write attributes.
            self._pause_time = self.Default_pause_time
            self._integration_time = self.Default_integration_time
            self._subband = self.Default_subband

            self.data_read_loop = threading.Thread(target = self.read_data)
            self.data_acquisition_is_active = False
            self.stop_data_read_loop = False
            self.data_read_loop.start()
        except Exception as e:
            self.error_stream("Cannot connect to the OPC-UA server %s.  Trace: %s" % (self.OPC_Server_Name, traceback.format_exc()))
            raise e
        # PROTECTED REGION END #    //  StatsCrosslet.init_device

    def always_executed_hook(self):
        """Method always executed before any TANGO command is executed."""
        # PROTECTED REGION ID(StatsCrosslet.always_executed_hook) ENABLED START #
        # PROTECTED REGION END #    //  StatsCrosslet.always_executed_hook

    def delete_device(self):
        """Hook to delete resources allocated in init_device.

        This method allows for any memory or other resources allocated in the
        init_device method to be released.  This method is called by the device
        destructor and by the device Init command.
        """
        # PROTECTED REGION ID(StatsCrosslet.delete_device) ENABLED START #
        self.debug_stream("Shutting down...")
        self.data_acquisition_is_active = False
        self.stop_data_read_loop = True
        self.debug_stream("Waiting for data acquisition thread to shut down...")
        self.data_read_loop.join()
        self.debug_stream("Waiting for data acquisition thread to shut down, done")
        if self.client is not None:
            self.client.close_session()
            self.client.close_secure_channel()
        self.debug_stream("Shutdown done.")
        # PROTECTED REGION END #    //  StatsCrosslet.delete_device
    # ------------------
    # Attributes methods
    # ------------------

    def read_subband(self):
        # PROTECTED REGION ID(StatsCrosslet.subband_read) ENABLED START #
        """Return the subband attribute."""
        return self._subband
        # PROTECTED REGION END #    //  StatsCrosslet.subband_read

    def write_subband(self, value):
        # PROTECTED REGION ID(StatsCrosslet.subband_write) ENABLED START #
        """Set the subband attribute."""
        self._subband = value
        # PROTECTED REGION END #    //  StatsCrosslet.subband_write

Thomas Juerges's avatar
Thomas Juerges committed
    def is_subband_allowed(self, attr):
        # PROTECTED REGION ID(StatsCrosslet.is_subband_allowed) ENABLED START #
        if attr==attr.READ_REQ:
            return self.get_state() not in [DevState.STANDBY]
        else:
            return self.get_state() not in [DevState.STANDBY]
        # PROTECTED REGION END #    //  StatsCrosslet.is_subband_allowed

    def read_integration_time(self):
        # PROTECTED REGION ID(StatsCrosslet.integration_time_read) ENABLED START #
        """Return the integration_time attribute."""
        return self._integration_time
        # PROTECTED REGION END #    //  StatsCrosslet.integration_time_read

Thomas Juerges's avatar
Thomas Juerges committed
    def is_integration_time_allowed(self, attr):
        # PROTECTED REGION ID(StatsCrosslet.is_integration_time_allowed) ENABLED START #
        return self.get_state() not in [DevState.ON,DevState.OFF,DevState.INIT]
        # PROTECTED REGION END #    //  StatsCrosslet.is_integration_time_allowed

    def read_time_stamp(self):
        # PROTECTED REGION ID(StatsCrosslet.time_stamp_read) ENABLED START #
        """Return the time_stamp attribute."""
        return self._time_stamp
        # PROTECTED REGION END #    //  StatsCrosslet.time_stamp_read

Thomas Juerges's avatar
Thomas Juerges committed
    def is_time_stamp_allowed(self, attr):
        # PROTECTED REGION ID(StatsCrosslet.is_time_stamp_allowed) ENABLED START #
        return self.get_state() not in [DevState.ON,DevState.OFF,DevState.INIT]
        # PROTECTED REGION END #    //  StatsCrosslet.is_time_stamp_allowed

    def read_pause_time(self):
        # PROTECTED REGION ID(StatsCrosslet.pause_time_read) ENABLED START #
        """Return the pause_time attribute."""
        return self._pause_time
        # PROTECTED REGION END #    //  StatsCrosslet.pause_time_read

    def write_pause_time(self, value):
        # PROTECTED REGION ID(StatsCrosslet.pause_time_write) ENABLED START #
        """Set the pause_time attribute."""
        self._pause_time = value
        # PROTECTED REGION END #    //  StatsCrosslet.pause_time_write

    def read_rcu_modes(self):
        # PROTECTED REGION ID(StatsCrosslet.rcu_modes_read) ENABLED START #
        """Return the rcu_modes attribute."""
        return self._rcu_modes
        # PROTECTED REGION END #    //  StatsCrosslet.rcu_modes_read

Thomas Juerges's avatar
Thomas Juerges committed
    def is_rcu_modes_allowed(self, attr):
        # PROTECTED REGION ID(StatsCrosslet.is_rcu_modes_allowed) ENABLED START #
        return self.get_state() not in [DevState.ON,DevState.OFF,DevState.INIT]
        # PROTECTED REGION END #    //  StatsCrosslet.is_rcu_modes_allowed

    def read_visibilities_imag(self):
        # PROTECTED REGION ID(StatsCrosslet.visibilities_imag_read) ENABLED START #
        """Return the visibilities_imag attribute."""
        return self._visibilities_imag
        # PROTECTED REGION END #    //  StatsCrosslet.visibilities_imag_read

Thomas Juerges's avatar
Thomas Juerges committed
    def is_visibilities_imag_allowed(self, attr):
        # PROTECTED REGION ID(StatsCrosslet.is_visibilities_imag_allowed) ENABLED START #
        return self.get_state() not in [DevState.ON,DevState.OFF,DevState.INIT]
        # PROTECTED REGION END #    //  StatsCrosslet.is_visibilities_imag_allowed

    def read_visibilities_real(self):
        # PROTECTED REGION ID(StatsCrosslet.visibilities_real_read) ENABLED START #
        """Return the visibilities_real attribute."""
        return self._visibilities_real
        # PROTECTED REGION END #    //  StatsCrosslet.visibilities_real_read
Thomas Juerges's avatar
Thomas Juerges committed
    def is_visibilities_real_allowed(self, attr):
        # PROTECTED REGION ID(StatsCrosslet.is_visibilities_real_allowed) ENABLED START #
        return self.get_state() not in [DevState.ON,DevState.OFF,DevState.INIT]
        # PROTECTED REGION END #    //  StatsCrosslet.is_visibilities_real_allowed

    # --------
    # Commands
    # --------

    @command(
    )
    @DebugIt()
    def start_acquisition(self):
        # PROTECTED REGION ID(StatsCrosslet.start_acquisition) ENABLED START #
        """
        Start the data acquisition of the station`s crosslet stats.

        :param argin: 'DevULong'

        :return:None
        """
Thomas Juerges's avatar
Thomas Juerges committed
        if is_start_acquisition_allowed() is True:
            self.data_acquisition_is_active = True
        # PROTECTED REGION END #    //  StatsCrosslet.start_acquisition

Thomas Juerges's avatar
Thomas Juerges committed
    def is_start_acquisition_allowed(self):
        # PROTECTED REGION ID(StatsCrosslet.is_start_acquisition_allowed) ENABLED START #
        return self.get_state() not in [DevState.ON,DevState.OFF,DevState.STANDBY,DevState.RUNNING]
        # PROTECTED REGION END #    //  StatsCrosslet.is_start_acquisition_allowed

    @command(
    )
    @DebugIt()
    def stop_acquisition(self):
        # PROTECTED REGION ID(StatsCrosslet.stop_acquisition) ENABLED START #
        """
        Stop the data acquisition.

        :return:None
        """
Thomas Juerges's avatar
Thomas Juerges committed
        if is_stop_acquisition_allowed() is True:
            self.data_acquisition_is_active = False
        # PROTECTED REGION END #    //  StatsCrosslet.stop_acquisition

Thomas Juerges's avatar
Thomas Juerges committed
    def is_stop_acquisition_allowed(self):
        # PROTECTED REGION ID(StatsCrosslet.is_stop_acquisition_allowed) ENABLED START #
        return self.get_state() not in [DevState.ON,DevState.INIT,DevState.RUNNING]
        # PROTECTED REGION END #    //  StatsCrosslet.is_stop_acquisition_allowed

# ----------
# Run server
# ----------


def main(args=None, **kwargs):
    """Main function of the StatsCrosslet module."""
    # PROTECTED REGION ID(StatsCrosslet.main) ENABLED START #
    return run((StatsCrosslet,), args=args, **kwargs)
    # PROTECTED REGION END #    //  StatsCrosslet.main


if __name__ == '__main__':
    main()