Newer
Older
# -*- coding: utf-8 -*-
#
# This file is part of the StatsCrosslet project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" OPC-UA client for LOFAR stations crosslet stats
"""
# PyTango imports
import tango
from tango import DebugIt
from tango.server import run
from tango.server import Device
from tango.server import attribute, command
from tango.server import device_property
from tango import AttrQuality, DispLevel, DevState
from tango import AttrWriteType, PipeWriteType
# Additional import
# PROTECTED REGION ID(StatsCrosslet.additionnal_import) ENABLED START #
import opcua
import numpy
import traceback
import threading
from datetime import datetime
# PROTECTED REGION END # // StatsCrosslet.additionnal_import
__all__ = ["StatsCrosslet", "main"]
class StatsCrosslet(Device):
"""
**Properties:**
- Device Property
OPC_Server_Name
- Type:'DevString'
OPC_Server_Port
- Type:'DevULong'
OPC_Time_out
- Type:'DevDouble'
Default_pause_time
- Type:'DevDouble'
Default_subband
- Type:'DevULong'
Default_integration_time
- Type:'DevDouble'
"""
# PROTECTED REGION ID(StatsCrosslet.class_variable) ENABLED START #
client = 0
ns = 0
obj = 0
record_cross = 0
stop_data_read_loop = False
data_acquisition_is_active = False
data_read_loop = threading.Thread()
def read_data(self):
# This is the the thread that continuously reads the crosslet
# statistics from the station and feeds it into the Tango
# property system.
self.debug_stream("Entering read_data loop.")
while self.stop_data_read_loop is False:
self.debug_stream("read_data is running...")
threading.Event().wait(self._pause_time)
if self.data_acquisition_is_active is True:
self.debug_stream("fetching data: subband = %d, integration_time = %d", self._subband, self._integration_time)
t, visibilities_list, rcu_modes = self.obj.call_method(self.record_cross, self._subband, self._integration_time)
self.debug_stream("fetching data done: t = %s, len(visibilities_list) = %d, len(rcu_modes) = %d", t, len(visibilities_list), len(rcu_modes))
self._time_stamp = t
visibilities = numpy.array(visibilities_list)[0] + 1j * numpy.array(visibilities_list[1])
self._visibilities_real = visibilities.real
self._visibilities_imag = visibilities.imag
self.debug_stream("timestamp = %s, visibilities_real [0][1] = %f", self._time_stamp, self._visibilities_real[0][1])
except KeyboardInterrupt:
self.debug_stream("KBD interrupt caught, leaving data read loop.")
self.stop_data_read_loop = True
self.error_stream("Exception: %s. Cannot call the method %s in the OPC-UA server %s. Trace: %s", e, self.record_cross, self.OPC_Server_Name, traceback.format_exc())
# PROTECTED REGION END # // StatsCrosslet.class_variable
# -----------------
# Device Properties
# -----------------
OPC_Server_Name = device_property(
dtype='DevString',
default_value="okeanos"
)
OPC_Server_Port = device_property(
dtype='DevULong',
default_value=55556
)
OPC_Time_out = device_property(
dtype='DevDouble',
default_value=1.0
Default_pause_time = device_property(
dtype='DevDouble',
)
Default_subband = device_property(
dtype='DevULong',
default_value=150
)
Default_integration_time = device_property(
dtype='DevDouble',
default_value=1.0
)
# ----------
# Attributes
# ----------
subband = attribute(
dtype='DevUShort',
access=AttrWriteType.READ_WRITE,
)
integration_time = attribute(
dtype='DevDouble',
)
time_stamp = attribute(
)
pause_time = attribute(
dtype='DevDouble',
access=AttrWriteType.READ_WRITE,
)
rcu_modes = attribute(
dtype=('DevString',),
max_dim_x=96,
)
visibilities_imag = attribute(
dtype=(('DevDouble',),),
max_dim_x=96, max_dim_y=96,
)
visibilities_real = attribute(
dtype=(('DevDouble',),),
max_dim_x=96, max_dim_y=96,
)
# ---------------
# General methods
# ---------------
def init_device(self):
"""Initialises the attributes and properties of the StatsCrosslet."""
Device.init_device(self)
# PROTECTED REGION ID(StatsCrosslet.init_device) ENABLED START #
try:
self.debug_stream("Connecting to OPC-UA server %s:%d...", self.OPC_Server_Name, self.OPC_Server_Port)
self.client = opcua.Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_Time_out * 1000)
self.client.connect()
ns = self.client.get_namespace_index("http://lofar.eu")
self.obj = self.client.get_root_node().get_child(["0:Objects", "{}:StationMetrics".format(ns), "{}:RCU".format(ns)])
self.record_cross = "{}:record_cross".format(ns)
self.debug_stream("Connecting to OPC-UA server %s:%d done.", self.OPC_Server_Name, self.OPC_Server_Port)
# Set default values in the read/write attributes.
self._pause_time = self.Default_pause_time
self._integration_time = self.Default_integration_time
self._subband = self.Default_subband
self.data_read_loop = threading.Thread(target = self.read_data)
self.data_acquisition_is_active = False
self.stop_data_read_loop = False
self.data_read_loop.start()
except Exception as e:
self.error_stream("Cannot connect to the OPC-UA server %s. Trace: %s" % (self.OPC_Server_Name, traceback.format_exc()))
raise e
# PROTECTED REGION END # // StatsCrosslet.init_device
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
# PROTECTED REGION ID(StatsCrosslet.always_executed_hook) ENABLED START #
# PROTECTED REGION END # // StatsCrosslet.always_executed_hook
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command.
"""
# PROTECTED REGION ID(StatsCrosslet.delete_device) ENABLED START #
self.debug_stream("Shutting down...")
self.data_acquisition_is_active = False
self.stop_data_read_loop = True
self.debug_stream("Waiting for data acquisition thread to shut down...")
self.debug_stream("Waiting for data acquisition thread to shut down, done")
if self.client is not None:
self.client.close_secure_channel()
self.debug_stream("Shutdown done.")
# PROTECTED REGION END # // StatsCrosslet.delete_device
# ------------------
# Attributes methods
# ------------------
def read_subband(self):
# PROTECTED REGION ID(StatsCrosslet.subband_read) ENABLED START #
"""Return the subband attribute."""
return self._subband
# PROTECTED REGION END # // StatsCrosslet.subband_read
def write_subband(self, value):
# PROTECTED REGION ID(StatsCrosslet.subband_write) ENABLED START #
"""Set the subband attribute."""
self._subband = value
# PROTECTED REGION END # // StatsCrosslet.subband_write
def is_subband_allowed(self, attr):
# PROTECTED REGION ID(StatsCrosslet.is_subband_allowed) ENABLED START #
if attr==attr.READ_REQ:
return self.get_state() not in [DevState.STANDBY]
else:
return self.get_state() not in [DevState.STANDBY]
# PROTECTED REGION END # // StatsCrosslet.is_subband_allowed
def read_integration_time(self):
# PROTECTED REGION ID(StatsCrosslet.integration_time_read) ENABLED START #
"""Return the integration_time attribute."""
return self._integration_time
# PROTECTED REGION END # // StatsCrosslet.integration_time_read
def is_integration_time_allowed(self, attr):
# PROTECTED REGION ID(StatsCrosslet.is_integration_time_allowed) ENABLED START #
return self.get_state() not in [DevState.ON,DevState.OFF,DevState.INIT]
# PROTECTED REGION END # // StatsCrosslet.is_integration_time_allowed
def read_time_stamp(self):
# PROTECTED REGION ID(StatsCrosslet.time_stamp_read) ENABLED START #
"""Return the time_stamp attribute."""
return self._time_stamp
# PROTECTED REGION END # // StatsCrosslet.time_stamp_read
def is_time_stamp_allowed(self, attr):
# PROTECTED REGION ID(StatsCrosslet.is_time_stamp_allowed) ENABLED START #
return self.get_state() not in [DevState.ON,DevState.OFF,DevState.INIT]
# PROTECTED REGION END # // StatsCrosslet.is_time_stamp_allowed
def read_pause_time(self):
# PROTECTED REGION ID(StatsCrosslet.pause_time_read) ENABLED START #
"""Return the pause_time attribute."""
return self._pause_time
# PROTECTED REGION END # // StatsCrosslet.pause_time_read
def write_pause_time(self, value):
# PROTECTED REGION ID(StatsCrosslet.pause_time_write) ENABLED START #
"""Set the pause_time attribute."""
self._pause_time = value
# PROTECTED REGION END # // StatsCrosslet.pause_time_write
def read_rcu_modes(self):
# PROTECTED REGION ID(StatsCrosslet.rcu_modes_read) ENABLED START #
"""Return the rcu_modes attribute."""
return self._rcu_modes
# PROTECTED REGION END # // StatsCrosslet.rcu_modes_read
def is_rcu_modes_allowed(self, attr):
# PROTECTED REGION ID(StatsCrosslet.is_rcu_modes_allowed) ENABLED START #
return self.get_state() not in [DevState.ON,DevState.OFF,DevState.INIT]
# PROTECTED REGION END # // StatsCrosslet.is_rcu_modes_allowed
def read_visibilities_imag(self):
# PROTECTED REGION ID(StatsCrosslet.visibilities_imag_read) ENABLED START #
"""Return the visibilities_imag attribute."""
return self._visibilities_imag
# PROTECTED REGION END # // StatsCrosslet.visibilities_imag_read
def is_visibilities_imag_allowed(self, attr):
# PROTECTED REGION ID(StatsCrosslet.is_visibilities_imag_allowed) ENABLED START #
return self.get_state() not in [DevState.ON,DevState.OFF,DevState.INIT]
# PROTECTED REGION END # // StatsCrosslet.is_visibilities_imag_allowed
def read_visibilities_real(self):
# PROTECTED REGION ID(StatsCrosslet.visibilities_real_read) ENABLED START #
"""Return the visibilities_real attribute."""
return self._visibilities_real
# PROTECTED REGION END # // StatsCrosslet.visibilities_real_read
def is_visibilities_real_allowed(self, attr):
# PROTECTED REGION ID(StatsCrosslet.is_visibilities_real_allowed) ENABLED START #
return self.get_state() not in [DevState.ON,DevState.OFF,DevState.INIT]
# PROTECTED REGION END # // StatsCrosslet.is_visibilities_real_allowed
# --------
# Commands
# --------
@command(
)
@DebugIt()
def start_acquisition(self):
# PROTECTED REGION ID(StatsCrosslet.start_acquisition) ENABLED START #
"""
Start the data acquisition of the station`s crosslet stats.
:param argin: 'DevULong'
:return:None
"""
if is_start_acquisition_allowed() is True:
self.data_acquisition_is_active = True
# PROTECTED REGION END # // StatsCrosslet.start_acquisition
def is_start_acquisition_allowed(self):
# PROTECTED REGION ID(StatsCrosslet.is_start_acquisition_allowed) ENABLED START #
return self.get_state() not in [DevState.ON,DevState.OFF,DevState.STANDBY,DevState.RUNNING]
# PROTECTED REGION END # // StatsCrosslet.is_start_acquisition_allowed
@command(
)
@DebugIt()
def stop_acquisition(self):
# PROTECTED REGION ID(StatsCrosslet.stop_acquisition) ENABLED START #
"""
Stop the data acquisition.
:return:None
"""
# PROTECTED REGION END # // StatsCrosslet.stop_acquisition
def is_stop_acquisition_allowed(self):
# PROTECTED REGION ID(StatsCrosslet.is_stop_acquisition_allowed) ENABLED START #
return self.get_state() not in [DevState.ON,DevState.INIT,DevState.RUNNING]
# PROTECTED REGION END # // StatsCrosslet.is_stop_acquisition_allowed
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the StatsCrosslet module."""
# PROTECTED REGION ID(StatsCrosslet.main) ENABLED START #
return run((StatsCrosslet,), args=args, **kwargs)
# PROTECTED REGION END # // StatsCrosslet.main
if __name__ == '__main__':
main()