Newer
Older
# This file is part of the Femto project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" LOFAR2.0 Station Software
Implementation of a Tango device on top of an existing OPC-UA server in Python3.
"""
# PyTango imports
import tango
from tango import DebugIt
from tango.server import run
from tango.server import Device
from tango.server import attribute, command, pipe
from tango.server import device_property
from tango import AttrQuality, DispLevel, DevState
from tango import AttrWriteType, PipeWriteType
# PROTECTED REGION ID(Femto.additionnal_import) ENABLED START #
from opcua import Client
# PROTECTED REGION END # // Femto.additionnal_import
"""
Implementation of a Tango device on top of an existing OPC-UA server in Python3.
**Properties:**
- Device Property
OPC_Server_Name
- Type:'DevString'
OPC_Server_Port
- Type:'DevString'
OPC_time_out
- Type:'DevULong'
# PROTECTED REGION ID(Femto.class_variable) ENABLED START #
# PROTECTED REGION END # // Femto.class_variable
# -----------------
# Device Properties
# -----------------
OPC_Server_Name = device_property(
dtype='DevString',
default_value="localhost"
)
OPC_Server_Port = device_property(
dtype='DevString',
default_value="55555"
OPC_time_out = device_property(
dtype='DevULong',
default_value=1000
# ----------
# Attributes
# ----------
RCU_modes = attribute(
max_dim_x=1024,
)
crosslet_stat = attribute(
max_dim_x=10240,
)
# -----
# Pipes
# -----
xlt_stat = pipe(
label="xlt",
)
# ---------------
# General methods
# ---------------
def init_device(self):
"""Initialises the attributes and properties of the Femto."""
# PROTECTED REGION ID(Femto.init_device) ENABLED START #
self._rcu_modes = ('',)
self._crosslet_stat = (0.0,)
try:
self.client = Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_time_out)
self.client.connect()
self.client.load_type_definitions()
objects = self.client.get_objects_node()
idx = self.client.get_namespace_index("http://lofar.eu")
# Now getting a variable node using its browse path
self.opc_obj = self.client.get_root_node().get_child(["0:Objects",
"{}:FEMTO".format(idx),
"{}:RCU_error_spectra".format(idx)])
print("Connected to the OPC-UA server %s" % (self.OPC_Server_Name))
except Exception as e:
print("Failed to connect to the OPC-UA server %s. Traceback: %s" % (self.OPC_Server_Name, traceback.format_exc()))
self.delete_device()
# PROTECTED REGION END # // Femto.init_device
"""Method always executed before any TANGO command is executed."""
# PROTECTED REGION ID(Femto.always_executed_hook) ENABLED START #
# PROTECTED REGION END # // Femto.always_executed_hook
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command.
"""
# PROTECTED REGION ID(Femto.delete_device) ENABLED START #
if self.client is not Null:
self.client.close_session()
self.client.close_secure_channel()
# PROTECTED REGION END # // Femto.delete_device
# ------------------
# Attributes methods
# ------------------
def read_RCU_modes(self):
# PROTECTED REGION ID(Femto.RCU_modes_read) ENABLED START #
"""Return the RCU_modes attribute."""
return self._rcu_modes
# PROTECTED REGION END # // Femto.RCU_modes_read
# PROTECTED REGION ID(Femto.crosslet_stat_read) ENABLED START #
"""Return the crosslet_stat attribute."""
return self._crosslet_stat
# PROTECTED REGION END # // Femto.crosslet_stat_read
# -------------
# Pipes methods
# -------------
def read_xlt_stat(self):
# PROTECTED REGION ID(Femto.xlt_stat_read) ENABLED START #
return dict(x=self._crosslet_stat, y=self._rcu_modes)
# PROTECTED REGION END # // Femto.xlt_stat_read
# --------
# Commands
# --------
@command(
)
@DebugIt()
def record_cross(self):
# PROTECTED REGION ID(Femto.record_cross) ENABLED START #
"""
:return:None
"""
if self.is_record_cross_allowed() is True:
timeStamp, self._crosslet_stat, self._rcu_modes = self.opc_obj.call_method("{}:record_cross".format(idx), self.subBand, self.integrationTime)
print("Crosscorrelations are ", self._crosslet_stat)
print("RCU modes are", self._rcu_modes)
# PROTECTED REGION END # // Femto.record_cross
# PROTECTED REGION ID(Femto.is_record_cross_allowed) ENABLED START #
return self.get_state() not in [DevState.OFF,DevState.INIT,DevState.FAULT]
# PROTECTED REGION END # // Femto.is_record_cross_allowed
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the Femto module."""
# PROTECTED REGION ID(Femto.main) ENABLED START #
return run((Femto,), args=args, **kwargs)
# PROTECTED REGION END # // Femto.main