Newer
Older
# -*- coding: utf-8 -*-
#
# This file is part of the PCC project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" PCC Device Server for LOFAR2.0
"""
# PyTango imports
from tango import DebugIt
from tango.server import run, command
from tango.server import device_property
# Additional import
from clients.opcua_connection import OPCUAConnection
from util.attribute_wrapper import *
from util.hardware_device import *
from util.lofar_logging import device_logging_to_python, log_exceptions
__all__ = ["PCC", "main"]
@device_logging_to_python({"device": "PCC"})
class PCC(hardware_device):
# -----------------
# Device Properties
# -----------------
OPC_Server_Name = device_property(
dtype='DevString',
mandatory=True
)
OPC_Server_Port = device_property(
dtype='DevULong',
mandatory=True
)
OPC_Time_Out = device_property(
dtype='DevDouble',
mandatory=True
)
OPC_Namespace = device_property(
dtype='DevString',
mandatory=False
)
# ----------
# Attributes
# ----------
RCU_state_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_state_R"], datatype=numpy.str_, access=AttrWriteType.READ_WRITE)
RCU_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_mask_RW"], datatype=numpy.bool_, dims=(32,), access=AttrWriteType.READ_WRITE)
Ant_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:Ant_mask_RW"], datatype=numpy.bool_, dims=(3, 32), access=AttrWriteType.READ_WRITE)
RCU_attenuator_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_attenuator_R"], datatype=numpy.int64, dims=(3, 32))
RCU_attenuator_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_attenuator_RW"], datatype=numpy.int64, dims=(3, 32), access=AttrWriteType.READ_WRITE)
RCU_band_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_band_R"], datatype=numpy.int64, dims=(3, 32))
RCU_band_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_band_RW"], datatype=numpy.int64, dims=(3, 32), access=AttrWriteType.READ_WRITE)
RCU_temperature_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_temperature_R"], datatype=numpy.float64, dims=(32,))
RCU_Pwr_dig_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_Pwr_dig_R"], datatype=numpy.int64, dims=(32,))
RCU_LED0_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED0_R"], datatype=numpy.int64, dims=(32,))
RCU_LED0_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED0_RW"], datatype=numpy.int64, dims=(32,), access=AttrWriteType.READ_WRITE)
RCU_ADC_lock_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_lock_R"], datatype=numpy.int64, dims=(3, 32))
RCU_ADC_SYNC_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_SYNC_R"], datatype=numpy.int64, dims=(3, 32))
RCU_ADC_JESD_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_JESD_R"], datatype=numpy.int64, dims=(3, 32))
RCU_ADC_CML_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_CML_R"], datatype=numpy.int64, dims=(3, 32))
RCU_OUT1_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_OUT1_R"], datatype=numpy.int64, dims=(3, 32))
RCU_OUT2_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_OUT2_R"], datatype=numpy.int64, dims=(3, 32))
RCU_ID_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ID_R"], datatype=numpy.int64, dims=(32,))
RCU_version_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_version_R"], datatype=numpy.str_, dims=(32,))
HBA_element_beamformer_delays_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_beamformer_delays_R"], datatype=numpy.int64, dims=(32, 96))
HBA_element_beamformer_delays_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_beamformer_delays_RW"], datatype=numpy.int64, dims=(32, 96), access=AttrWriteType.READ_WRITE)
HBA_element_pwr_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_pwr_R"], datatype=numpy.int64, dims=(32, 96))
HBA_element_pwr_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_pwr_RW"], datatype=numpy.int64, dims=(32, 96), access=AttrWriteType.READ_WRITE)
uC_ID_R = attribute_wrapper(comms_annotation=["2:PCC", "2:uC_ID_R"], datatype=numpy.int64, dims=(32,))
RCU_monitor_rate_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_monitor_rate_RW"], datatype=numpy.float64, access=AttrWriteType.READ_WRITE)
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command (a Tango built-in).
"""
self.debug_stream("Shutting down...")
self.Off()
self.debug_stream("Shut down. Good bye.")
# --------
# overloaded functions
# --------
def off(self):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
self.OPCua_client.disconnect()
def initialise(self):
""" user code here. is called when the state is set to INIT """
#set up the OPC ua client
namespace = "http://lofar.eu"
if self.OPC_Namespace is not None:
namespace = self.OPC_Namespace
self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), namespace, self.OPC_Time_Out, self.Standby, self.Fault, self)
# map the attributes to the OPC ua comm client
for i in self.attr_list():
try:
i.set_comm_client(self.OPCua_client)
except:
pass
# Init the dict that contains function to OPC-UA function mappings.
self.function_mapping = {}
self.function_mapping["RCU_off"] = self.OPCua_client._setup_annotation(["2:PCC", "2:RCU_off"])
self.function_mapping["RCU_on"] = self.OPCua_client._setup_annotation(["2:PCC", "2:RCU_on"])
self.function_mapping["ADC_on"] = self.OPCua_client._setup_annotation(["2:PCC", "2:ADC_on"])
self.function_mapping["RCU_update"] = self.OPCua_client._setup_annotation(["2:PCC", "2:RCU_update"])
self.function_mapping["CLK_off"] = self.OPCua_client._setup_annotation(["2:PCC", "2:CLK_off"])
self.function_mapping["CLK_on"] = self.OPCua_client._setup_annotation(["2:PCC", "2:CLK_on"])
self.function_mapping["CLK_PLL_setup"] = self.OPCua_client._setup_annotation(["2:PCC", "2:CLK_PLL_setup"])
# --------
# Commands
# --------
@command()
@DebugIt()
@only_when_on
@fault_on_error
def RCU_off(self):
"""
:return:None
"""
self.function_mapping["RCU_off"]()
@command()
@DebugIt()
@only_when_on
@fault_on_error
def RCU_on(self):
"""
:return:None
"""
self.function_mapping["RCU_on"]()
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@command()
@DebugIt()
@only_when_on
@fault_on_error
def ADC_on(self):
"""
:return:None
"""
self.function_mapping["ADC_on"]()
@command()
@DebugIt()
@only_when_on
@fault_on_error
def RCU_update(self):
"""
:return:None
"""
self.function_mapping["RCU_update"]()
@command()
@DebugIt()
@only_when_on
@fault_on_error
def CLK_off(self):
"""
:return:None
"""
self.function_mapping["CLK_off"]()
@command()
@DebugIt()
@only_when_on
@fault_on_error
def CLK_on(self):
"""
:return:None
"""
self.function_mapping["CLK_on"]()
@command()
@DebugIt()
@only_when_on
@fault_on_error
def CLK_PLL_setup(self):
"""
:return:None
"""
self.function_mapping["CLK_PLL_setup"]()
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the PCC module."""
return run((PCC,), args=args, **kwargs)