Newer
Older
# -*- coding: utf-8 -*-
#
# This file is part of the RCUSCC project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" RCU-SCC Device Server for LOFAR2.0
"""
# PyTango imports
import tango
from tango import DebugIt
from tango.server import run
from tango.server import Device
from tango.server import attribute, command
from tango.server import device_property
from tango import AttrQuality, DispLevel, DevState
from tango import AttrWriteType, PipeWriteType
# Additional import
# PROTECTED REGION ID(RCUSCC.additionnal_import) ENABLED START #
import opcua
import traceback
# PROTECTED REGION END # // RCUSCC.additionnal_import
__all__ = ["RCUSCC", "main"]
class RCUSCC(Device):
"""
**Properties:**
- Device Property
OPC_Server_Name
- Type:'DevString'
OPC_Server_Port
- Type:'DevULong'
OPC_Time_Out
- Type:'DevDouble'
"""
# PROTECTED REGION ID(RCUSCC.class_variable) ENABLED START #
client = 0
name_space_index = 0
obj = 0
# PROTECTED REGION END # // RCUSCC.class_variable
# -----------------
# Device Properties
# -----------------
OPC_Server_Name = device_property(
dtype='DevString',
mandatory=True
)
OPC_Server_Port = device_property(
dtype='DevULong',
mandatory=True
)
OPC_Time_Out = device_property(
dtype='DevDouble',
mandatory=True
)
# ----------
# Attributes
# ----------
time_offset_rw = attribute(
dtype='DevLong64',
access=AttrWriteType.READ_WRITE,
)
time_offset_r = attribute(
# ---------------
# General methods
# ---------------
def init_device(self):
"""Initialises the attributes and properties of the RCUSCC."""
Device.init_device(self)
# PROTECTED REGION ID(RCUSCC.init_device) ENABLED START #
self.set_state(DevState.INIT)
# Init the dict that contains attribute to OPC-UA MP/CP mappings.
self.attribute_mapping = {}
# Set default values in the RW/R attributes and add them to
# the mapping.
self._time_offset_rw = 0
self.attribute_mapping["_time_offset_rw"] = {}
self._time_offset_r = 0
self.attribute_mapping["_time_offset_r"] = {}
# Set defaults to property values.
try:
self.debug_stream("Connecting to OPC-UA server %s:%d...", self.OPC_Server_Name, self.OPC_Server_Port)
self.client = opcua.Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_Time_Out * 1000)
self.client.connect()
self.debug_stream("Connecting to OPC-UA server %s:%d done.", self.OPC_Server_Name, self.OPC_Server_Port)
try:
self.name_space_index = self.client.get_namespace_index("http://lofar.eu")
except Exception as e:
self.warn_stream("Cannot determine the OPC-UA name space index. Will try and use the default = 2.")
self.name_space_index = 2
self.obj_node = self.client.get_objects_node()
self.debug_stream("Mapping OPC-UA MP/CP to attributes...")
attribute_name = "time_offset"
self.attribute_mapping["_time_offset_rw"] = self.obj_node.get_child([ "{}:time_offset".format(self.name_space_index), "{}:time_offset_RW".format(self.name_space_index)])
self.attribute_mapping["_time_offset_r"] = self.obj_node.get_child([ "{}:time_offset".format(self.name_space_index), "{}:time_offset_R".format(self.name_space_index)])
self.debug_stream("Mapping OPC-UA MP/CP to attributes done.")
self.set_state(DevState.ON)
except Exception as e:
self.set_state(DevState.FAULT)
self.error_stream("Connection init to the OPC-UA server %s:%d failed. Trace: %s" % (self.OPC_Server_Name, self.OPC_Server_Port, traceback.format_exc()))
raise e
# PROTECTED REGION END # // RCUSCC.init_device
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
# PROTECTED REGION ID(RCUSCC.always_executed_hook) ENABLED START #
# PROTECTED REGION END # // RCUSCC.always_executed_hook
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command.
"""
# PROTECTED REGION ID(RCUSCC.delete_device) ENABLED START #
self.debug_stream("Shutting down...")
self.set_state(DevState.OFF)
if self.client is not None:
self.client.close_session()
self.client.close_secure_channel()
self.debug_stream("Shut down. Good bye.")
# PROTECTED REGION END # // RCUSCC.delete_device
# ------------------
# Attributes methods
# ------------------
def read_time_offset_rw(self):
# PROTECTED REGION ID(RCUSCC.time_offset_rw_read) ENABLED START #
"""Return the time_offset_rw attribute."""
return self._time_offset_rw
# PROTECTED REGION END # // RCUSCC.time_offset_rw_read
def write_time_offset_rw(self, value):
# PROTECTED REGION ID(RCUSCC.time_offset_rw_write) ENABLED START #
"""Set the time_offset_rw attribute."""
self.attribute_mapping["_time_offset_rw"].set_value(value)
self._time_offset_rw = value
# PROTECTED REGION END # // RCUSCC.time_offset_rw_write
def read_time_offset_r(self):
# PROTECTED REGION ID(RCUSCC.time_offset_r_read) ENABLED START #
"""Return the time_offset_r attribute."""
self._time_offset_r = self.attribute_mapping["_time_offset_r"].get_value()
return self._time_offset_r
# PROTECTED REGION END # // RCUSCC.time_offset_r_read
# --------
# Commands
# --------
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the RCUSCC module."""
# PROTECTED REGION ID(RCUSCC.main) ENABLED START #
return run((RCUSCC,), args=args, **kwargs)
# PROTECTED REGION END # // RCUSCC.main
if __name__ == '__main__':
main()