Newer
Older
# -*- coding: utf-8 -*-
#
# This file is part of the SDP project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" SDP Device Server for LOFAR2.0
"""
# PyTango imports
from tango.server import run
from tango.server import device_property, attribute
# Additional import
from clients.opcua_connection import OPCUAConnection
from util.attribute_wrapper import attribute_wrapper
from util.hardware_device import hardware_device
from util.lofar_logging import device_logging_to_python, log_exceptions
from util.lofar_git import get_version
import numpy
@device_logging_to_python({"device": "APSCTL"})
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class APSCTL(hardware_device):
"""
**Properties:**
- Device Property
OPC_Server_Name
- Type:'DevString'
OPC_Server_Port
- Type:'DevULong'
OPC_Time_Out
- Type:'DevDouble'
"""
# -----------------
# Device Properties
# -----------------
OPC_Server_Name = device_property(
dtype='DevString',
mandatory=True
)
OPC_Server_Port = device_property(
dtype='DevULong',
mandatory=True
)
OPC_Time_Out = device_property(
dtype='DevDouble',
mandatory=True
)
# ----------
# Attributes
# ----------
version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version())
N_unb = 2
N_fpga = 4
N_ddr = 2
N_qsfp = 6
# Central CP per Uniboard
UNB2_FPGA_DDR4_SLOT_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_DDR4_SLOT_TEMP_R"], datatype=numpy.double, dims=((N_unb * N_ddr), N_fpga))
UNB2_I2C_bus_QSFP_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_QSFP_STATUS_R"], datatype=numpy.int64, dims=((N_unb * N_fpga), N_qsfp))
UNB2_I2C_bus_DDR4_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_DDR4_STATUS_R"], datatype=numpy.int64, dims=(N_ddr, N_fpga))
UNB2_I2C_bus_FPGA_PS_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_FPGA_PS_STATUS_R"], datatype=numpy.int64, dims=(N_unb * N_fpga,))
UNB2_translator_busy_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_translator_busy_R"], datatype=numpy.bool_)
UNB2_Front_Panel_LED_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_Front_Panel_LED_RW"], datatype=numpy.uint8, dims=(N_unb,), access=AttrWriteType.READ_WRITE)
UNB2_Front_Panel_LED_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_Front_Panel_LED_R"], datatype=numpy.uint8, dims=(N_unb,))
UNB2_EEPROM_Serial_Number_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_EEPROM_Serial_Number_R"], datatype=numpy.str, dims=(N_unb,))
UNB2_EEPROM_Unique_ID_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_EEPROM_Unique_ID_R"], datatype=numpy.uint32, dims=(N_unb,))
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
UNB2_FPGA_DDR4_SLOT_PART_NUMBER_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_DDR4_SLOT_PART_NUMBER_R"], datatype=numpy.str, dims=(N_unb * N_qsfp, N_fpga))
UNB2_monitor_rate_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_monitor_rate_RW"], datatype=numpy.double, dims=(N_unb,), access=AttrWriteType.READ_WRITE)
UNB2_I2C_bus_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_STATUS_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_I2C_bus_PS_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_PS_STATUS_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_mask_RW"], datatype=numpy.double, dims=(N_unb,), access=AttrWriteType.READ_WRITE)
UNB2_Power_ON_OFF_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_Power_ON_OFF_R"], datatype=numpy.double, dims=(N_unb,), access=AttrWriteType.READ_WRITE)
UNB2_FPGA_QSFP_CAGE_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_TEMP_R"], datatype=numpy.double, dims=(N_unb * N_qsfp,N_fpga))
UNB2_FPGA_QSFP_CAGE_LOS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_LOS_R"], datatype=numpy.uint8, dims=(N_unb * N_qsfp,N_fpga))
UNB2_FPGA_POL_HGXB_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_HGXB_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_HGXB_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_HGXB_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_HGXB_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_HGXB_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_PGM_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_PGM_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_PGM_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_PGM_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_PGM_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_PGM_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_RXGXB_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_RXGXB_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_RXGXB_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_RXGXB_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_RXGXB_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_RXGXB_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_TXGXB_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_TXGXB_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_TXGXB_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_TXGXB_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_POL_FPGA_TXGXB_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_FPGA_TXGXB_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_POL_FPGA_CORE_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_FPGA_CORE_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_CORE_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_CORE_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_CORE_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_CORE_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_ERAM_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_ERAM_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_ERAM_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_ERAM_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_ERAM_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_ERAM_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_POL_CLOCK_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_CLOCK_VOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_CLOCK_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_CLOCK_IOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_CLOCK_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_CLOCK_TEMP_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_SWITCH_1V2_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_1V2_VOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_SWITCH_1V2_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_1V2_IOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_SWITCH_1V2_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_1V2_TEMP_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_SWITCH_PHY_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_PHY_VOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_SWITCH_PHY_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_PHY_IOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_SWITCH_PHY_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_PHY_TEMP_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_QSFP_N01_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N01_VOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_QSFP_N01_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N01_IOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_QSFP_N01_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N01_TEMP_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_QSFP_N23_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N23_VOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_QSFP_N23_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N23_IOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_QSFP_N23_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N23_TEMP_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_DC_DC_48V_12V_VIN_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_DC_DC_48V_12V_VIN_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_DC_DC_48V_12V_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_DC_DC_48V_12V_VOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_DC_DC_48V_12V_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_DC_DC_48V_12V_IOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_DC_DC_48V_12V_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_DC_DC_48V_12V_TEMP_R"], datatype=numpy.double, dims=(N_unb,))
# QualifiedName(2: UNB2_on)
# QualifiedName(2: UNB2_off)
@log_exceptions()
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command (a Tango built-in).
"""
self.debug_stream("Shutting down...")
self.Off()
self.debug_stream("Shut down. Good bye.")
# --------
# overloaded functions
# --------
@log_exceptions()
def configure_for_off(self):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
try:
self.opcua_connection.stop()
except Exception as e:
self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e))
@log_exceptions()
def configure_for_initialise(self):
""" user code here. is called when the sate is set to INIT """
"""Initialises the attributes and properties of the PCC."""
# set up the OPC ua client
self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self)
# map an access helper class
for i in self.attr_list():
try:
i.set_comm_client(self.OPCua_client)
except Exception as e:

Taya Snijder
committed
# use the pass function instead of setting read/write fails
self.warn_stream("error while setting the APSCTL attribute {} read/write function. {}".format(i, e))
self.OPCua_client.start()
# --------
# Commands
# --------
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the SDP module."""

Jan David Mol
committed
from util.lofar_logging import configure_logger
import logging
configure_logger(logging.getLogger())
return run((APSCTL,), args=args, **kwargs)
if __name__ == '__main__':
main()