Newer
Older
# -*- coding: utf-8 -*-
#
# This file is part of the SDP project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" SDP Device Server for LOFAR2.0
"""
# PyTango imports
from tango.server import run
from tango.server import device_property
# Additional import
from clients.opcua_connection import OPCUAConnection
from src.attribute_wrapper import attribute_wrapper
from src.hardware_device import hardware_device
from src.lofar_logging import device_logging_to_python
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
**Properties:**
- Device Property
OPC_Server_Name
- Type:'DevString'
OPC_Server_Port
- Type:'DevULong'
OPC_Time_Out
- Type:'DevDouble'
"""
# -----------------
# Device Properties
# -----------------
OPC_Server_Name = device_property(
dtype='DevString',
mandatory=True
)
OPC_Server_Port = device_property(
dtype='DevULong',
mandatory=True
)
OPC_Time_Out = device_property(
dtype='DevDouble',
mandatory=True
)
# ----------
# Attributes
# ----------
fpga_mask_RW = attribute_wrapper(comms_annotation=["1:fpga_mask_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE)
fpga_scrap_R = attribute_wrapper(comms_annotation=["1:fpga_scrap_R"], datatype=numpy.int32, dims=(2048,))
fpga_scrap_RW = attribute_wrapper(comms_annotation=["1:fpga_scrap_RW"], datatype=numpy.int32, dims=(2048,), access=AttrWriteType.READ_WRITE)
fpga_status_R = attribute_wrapper(comms_annotation=["1:fpga_status_R"], datatype=numpy.bool_, dims=(16,))
fpga_temp_R = attribute_wrapper(comms_annotation=["1:fpga_temp_R"], datatype=numpy.float_, dims=(16,))
fpga_version_R = attribute_wrapper(comms_annotation=["1:fpga_version_R"], datatype=numpy.str_, dims=(16,))
fpga_weights_R = attribute_wrapper(comms_annotation=["1:fpga_weights_R"], datatype=numpy.int16, dims=(16, 12 * 488 * 2))
fpga_weights_RW = attribute_wrapper(comms_annotation=["1:fpga_weights_RW"], datatype=numpy.int16, dims=(16, 12 * 488 * 2), access=AttrWriteType.READ_WRITE)
tr_busy_R = attribute_wrapper(comms_annotation=["1:tr_busy_R"], datatype=numpy.bool_)
# NOTE: typo in node name is 'tr_reload_W' should be 'tr_reload_RW'
tr_reload_RW = attribute_wrapper(comms_annotation=["1:tr_reload_W"], datatype=numpy.bool_, access=AttrWriteType.READ_WRITE)
tr_tod_R = attribute_wrapper(comms_annotation=["1:tr_tod_R"], datatype=numpy.uint64)
tr_uptime_R = attribute_wrapper(comms_annotation=["1:tr_uptime_R"], datatype=numpy.uint64)
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
pass
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command (a Tango built-in).
"""
self.debug_stream("Shutting down...")
self.Off()
self.debug_stream("Shut down. Good bye.")
# --------
# overloaded functions
# --------
def off(self):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
self.opcua_connection.stop()
def initialise(self):
""" user code here. is called when the sate is set to INIT """
"""Initialises the attributes and properties of the PCC."""
# set up the OPC ua client
self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self)
# map an access helper class
for i in self.attr_list():
i.set_comm_client(self.OPCua_client)
self.OPCua_client.start()
# --------
# Commands
# --------
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the SDP module."""
return run((SDP,), args=args, **kwargs)