Newer
Older
# -*- coding: utf-8 -*-
#
# This file is part of the PCC project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" PCC Device Server for LOFAR2.0
"""
# PyTango imports
from tango import DebugIt
from tango.server import run, command
from tango.server import device_property, attribute
from clients.opcua_connection import OPCUAConnection
from util.attribute_wrapper import attribute_wrapper
from util.hardware_device import hardware_device
from util.lofar_logging import device_logging_to_python, log_exceptions
from util.lofar_git import get_version
__all__ = ["PCC", "main"]
@device_logging_to_python({"device": "PCC"})
class PCC(hardware_device):
- Device Property
OPC_Server_Name
- Type:'DevString'
OPC_Server_Port
- Type:'DevULong'
OPC_Time_Out
- Type:'DevDouble'
"""
# -----------------
# Device Properties
# -----------------
OPC_Server_Name = device_property(
dtype='DevString',
mandatory=True
)
OPC_Server_Port = device_property(
dtype='DevULong',
mandatory=True
)
OPC_Time_Out = device_property(
dtype='DevDouble',
mandatory=True
)
OPC_namespace = device_property(
version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version())
Ant_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:Ant_mask_RW"], datatype=numpy.bool_, dims=(3, 32), access=AttrWriteType.READ_WRITE)
CLK_Enable_PWR_R = attribute_wrapper(comms_annotation=["2:PCC", "2:CLK_Enable_PWR_R"], datatype=numpy.bool_)
CLK_I2C_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:CLK_I2C_STATUS_R"], datatype=numpy.int64)
CLK_PLL_error_R = attribute_wrapper(comms_annotation=["2:PCC", "2:CLK_PLL_error_R"], datatype=numpy.bool_)
CLK_PLL_locked_R = attribute_wrapper(comms_annotation=["2:PCC", "2:CLK_PLL_locked_R"], datatype=numpy.bool_)
CLK_monitor_rate_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:CLK_monitor_rate_RW"], datatype=numpy.int64, access=AttrWriteType.READ_WRITE)
CLK_translator_busy_R = attribute_wrapper(comms_annotation=["2:PCC", "2:CLK_translator_busy_R"], datatype=numpy.bool_)
HBA_element_beamformer_delays_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_beamformer_delays_R"], datatype=numpy.int64, dims=(32, 96))
HBA_element_beamformer_delays_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_beamformer_delays_RW"], datatype=numpy.int64, dims=(32, 96), access=AttrWriteType.READ_WRITE)
HBA_element_led_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_led_R"], datatype=numpy.int64, dims=(32, 96))
HBA_element_led_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_led_RW"], datatype=numpy.int64, dims=(32, 96), access=AttrWriteType.READ_WRITE)
HBA_element_LNA_pwr_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_LNA_pwr_R"], datatype=numpy.int64, dims=(32, 96))
HBA_element_LNA_pwr_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_LNA_pwr_RW"], datatype=numpy.int64, dims=(32, 96), access=AttrWriteType.READ_WRITE)
HBA_element_pwr_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_pwr_R"], datatype=numpy.int64, dims=(32, 96))
HBA_element_pwr_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_pwr_RW"], datatype=numpy.int64, dims=(32, 96), access=AttrWriteType.READ_WRITE)
RCU_ADC_lock_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_lock_R"], datatype=numpy.int64, dims=(3, 32))
RCU_attenuator_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_attenuator_R"], datatype=numpy.int64, dims=(3, 32))
RCU_attenuator_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_attenuator_RW"], datatype=numpy.int64, dims=(3, 32), access=AttrWriteType.READ_WRITE)
RCU_band_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_band_R"], datatype=numpy.int64, dims=(3, 32))
RCU_band_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_band_RW"], datatype=numpy.int64, dims=(3, 32), access=AttrWriteType.READ_WRITE)
RCU_I2C_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_I2C_STATUS_R"], datatype=numpy.int64, dims=(32,))
RCU_ID_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ID_R"], datatype=numpy.int64, dims=(32,))
RCU_LED0_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED0_R"], datatype=numpy.bool_, dims=(32,))
RCU_LED0_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED0_RW"], datatype=numpy.bool_, dims=(32,), access=AttrWriteType.READ_WRITE)
RCU_LED1_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED1_R"], datatype=numpy.bool_, dims=(32,))
RCU_LED1_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED1_RW"], datatype=numpy.bool_, dims=(32,), access=AttrWriteType.READ_WRITE)
RCU_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_mask_RW"], datatype=numpy.bool_, dims=(32,), access=AttrWriteType.READ_WRITE)
RCU_monitor_rate_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_monitor_rate_RW"], datatype=numpy.int64, access=AttrWriteType.READ_WRITE)
RCU_Pwr_dig_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_Pwr_dig_R"], datatype=numpy.bool_, dims=(32,))
RCU_temperature_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_temperature_R"], datatype=numpy.float64, dims=(32,))
RCU_translator_busy_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_translator_busy_R"], datatype=numpy.bool_)
RCU_version_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_version_R"], datatype=numpy.str_, dims=(32,))
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command (a Tango built-in).
"""
self.debug_stream("Shutting down...")
self.Off()
self.debug_stream("Shut down. Good bye.")
# --------
# overloaded functions
# --------
def configure_for_off(self):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
try:
self.opcua_connection.stop()
except Exception as e:
self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e))
def configure_for_initialise(self):
""" user code here. is called when the state is set to INIT """
# Init the dict that contains function to OPC-UA function mappings.
self.function_mapping = {}
self.function_mapping["RCU_on"] = {}
self.function_mapping["RCU_off"] = {}
self.function_mapping["CLK_on"] = {}
self.function_mapping["CLK_off"] = {}
# set up the OPC ua client
self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu",
self.OPC_Time_Out, self.Fault, self)

Taya Snijder
committed
# map an access helper class
for i in self.attr_list():
try:
i.set_comm_client(self.OPCua_client)
except Exception as e:

Taya Snijder
committed
# use the pass function instead of setting read/write fails
self.warn_stream("error while setting the PCC attribute {} read/write function. {}".format(i, e))
self.OPCua_client.start()
# --------
# Commands
# --------
@command()
@DebugIt()
@only_when_on()
@fault_on_error()
:return:None
"""
self.function_mapping["RCU_off"]()
@only_when_on()
@fault_on_error()
:return:None
"""
self.function_mapping["RCU_on"]()
@only_when_on()
@fault_on_error()
def ADC_on(self):
"""
:return:None
"""
self.function_mapping["ADC_on"]()
@command()
@DebugIt()
@only_when_on()
@fault_on_error()
def RCU_update(self):
"""
:return:None
"""
self.function_mapping["RCU_update"]()
@command()
@DebugIt()
@only_when_on()
@fault_on_error()
def CLK_off(self):
"""
:return:None
"""
self.function_mapping["CLK_off"]()
@command()
@DebugIt()
@only_when_on()
@fault_on_error()
def CLK_on(self):
"""
:return:None
"""
self.function_mapping["CLK_on"]()
@command()
@DebugIt()
@only_when_on()
@fault_on_error()
def CLK_PLL_setup(self):
"""
:return:None
"""
self.function_mapping["CLK_PLL_setup"]()
# ----------
# Run server
# ----------
def main(args=None, **kwargs):

Jan David Mol
committed
from util.lofar_logging import configure_logger
import logging
configure_logger(logging.getLogger())