# -*- coding: utf-8 -*- # # This file is part of the PCC project # # # # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. """ PCC Device Server for LOFAR2.0 """ # PyTango imports from tango import DebugIt from tango.server import run, command from tango.server import device_property, attribute from tango import AttrWriteType import numpy # Additional import from util.wrappers import * from clients.opcua_connection import OPCUAConnection from util.attribute_wrapper import attribute_wrapper from util.hardware_device import hardware_device from util.lofar_logging import device_logging_to_python, log_exceptions from util.lofar_git import get_version __all__ = ["PCC", "main"] @device_logging_to_python({"device": "PCC"}) class PCC(hardware_device): """ **Properties:** - Device Property OPC_Server_Name - Type:'DevString' OPC_Server_Port - Type:'DevULong' OPC_Time_Out - Type:'DevDouble' """ # ----------------- # Device Properties # ----------------- OPC_Server_Name = device_property( dtype='DevString', mandatory=True ) OPC_Server_Port = device_property( dtype='DevULong', mandatory=True ) OPC_Time_Out = device_property( dtype='DevDouble', mandatory=True ) OPC_namespace = device_property( dtype='DevString', mandatory=False ) # ---------- # Attributes # ---------- version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version()) Ant_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:Ant_mask_RW"], datatype=numpy.bool_, dims=(3, 32), access=AttrWriteType.READ_WRITE) CLK_Enable_PWR_R = attribute_wrapper(comms_annotation=["2:PCC", "2:CLK_Enable_PWR_R"], datatype=numpy.bool_) CLK_I2C_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:CLK_I2C_STATUS_R"], datatype=numpy.int64) CLK_PLL_error_R = attribute_wrapper(comms_annotation=["2:PCC", "2:CLK_PLL_error_R"], datatype=numpy.bool_) CLK_PLL_locked_R = attribute_wrapper(comms_annotation=["2:PCC", "2:CLK_PLL_locked_R"], datatype=numpy.bool_) CLK_monitor_rate_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:CLK_monitor_rate_RW"], datatype=numpy.int64, access=AttrWriteType.READ_WRITE) CLK_translator_busy_R = attribute_wrapper(comms_annotation=["2:PCC", "2:CLK_translator_busy_R"], datatype=numpy.bool_) HBA_element_beamformer_delays_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_beamformer_delays_R"], datatype=numpy.int64, dims=(32, 96)) HBA_element_beamformer_delays_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_beamformer_delays_RW"], datatype=numpy.int64, dims=(32, 96), access=AttrWriteType.READ_WRITE) HBA_element_led_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_led_R"], datatype=numpy.int64, dims=(32, 96)) HBA_element_led_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_led_RW"], datatype=numpy.int64, dims=(32, 96), access=AttrWriteType.READ_WRITE) HBA_element_LNA_pwr_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_LNA_pwr_R"], datatype=numpy.int64, dims=(32, 96)) HBA_element_LNA_pwr_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_LNA_pwr_RW"], datatype=numpy.int64, dims=(32, 96), access=AttrWriteType.READ_WRITE) HBA_element_pwr_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_pwr_R"], datatype=numpy.int64, dims=(32, 96)) HBA_element_pwr_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_pwr_RW"], datatype=numpy.int64, dims=(32, 96), access=AttrWriteType.READ_WRITE) RCU_ADC_lock_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_lock_R"], datatype=numpy.int64, dims=(3, 32)) RCU_attenuator_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_attenuator_R"], datatype=numpy.int64, dims=(3, 32)) RCU_attenuator_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_attenuator_RW"], datatype=numpy.int64, dims=(3, 32), access=AttrWriteType.READ_WRITE) RCU_band_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_band_R"], datatype=numpy.int64, dims=(3, 32)) RCU_band_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_band_RW"], datatype=numpy.int64, dims=(3, 32), access=AttrWriteType.READ_WRITE) RCU_I2C_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_I2C_STATUS_R"], datatype=numpy.int64, dims=(32,)) RCU_ID_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ID_R"], datatype=numpy.int64, dims=(32,)) RCU_LED0_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED0_R"], datatype=numpy.bool_, dims=(32,)) RCU_LED0_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED0_RW"], datatype=numpy.bool_, dims=(32,), access=AttrWriteType.READ_WRITE) RCU_LED1_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED1_R"], datatype=numpy.bool_, dims=(32,)) RCU_LED1_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED1_RW"], datatype=numpy.bool_, dims=(32,), access=AttrWriteType.READ_WRITE) RCU_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_mask_RW"], datatype=numpy.bool_, dims=(32,), access=AttrWriteType.READ_WRITE) RCU_monitor_rate_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_monitor_rate_RW"], datatype=numpy.int64, access=AttrWriteType.READ_WRITE) RCU_Pwr_dig_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_Pwr_dig_R"], datatype=numpy.bool_, dims=(32,)) RCU_temperature_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_temperature_R"], datatype=numpy.float64, dims=(32,)) RCU_translator_busy_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_translator_busy_R"], datatype=numpy.bool_) RCU_version_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_version_R"], datatype=numpy.str_, dims=(32,)) @log_exceptions() def delete_device(self): """Hook to delete resources allocated in init_device. This method allows for any memory or other resources allocated in the init_device method to be released. This method is called by the device destructor and by the device Init command (a Tango built-in). """ self.debug_stream("Shutting down...") self.Off() self.debug_stream("Shut down. Good bye.") # -------- # overloaded functions # -------- @log_exceptions() def configure_for_off(self): """ user code here. is called when the state is set to OFF """ # Stop keep-alive try: self.opcua_connection.stop() except Exception as e: self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e)) @log_exceptions() def configure_for_initialise(self): """ user code here. is called when the state is set to INIT """ # Init the dict that contains function to OPC-UA function mappings. self.function_mapping = {} self.function_mapping["RCU_on"] = {} self.function_mapping["RCU_off"] = {} self.function_mapping["CLK_on"] = {} self.function_mapping["CLK_off"] = {} # set up the OPC ua client self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self) # map an access helper class for i in self.attr_list(): try: i.set_comm_client(self.OPCua_client) except Exception as e: # use the pass function instead of setting read/write fails i.set_pass_func() self.warn_stream("error while setting the PCC attribute {} read/write function. {}".format(i, e)) self.OPCua_client.start() # -------- # Commands # -------- @command() @DebugIt() @only_when_on() @fault_on_error() def RCU_off(self): """ :return:None """ self.function_mapping["RCU_off"]() @command() @DebugIt() @only_when_on() @fault_on_error() def RCU_on(self): """ :return:None """ self.function_mapping["RCU_on"]() @command() @DebugIt() @only_when_on() @fault_on_error() def ADC_on(self): """ :return:None """ self.function_mapping["ADC_on"]() @command() @DebugIt() @only_when_on() @fault_on_error() def RCU_update(self): """ :return:None """ self.function_mapping["RCU_update"]() @command() @DebugIt() @only_when_on() @fault_on_error() def CLK_off(self): """ :return:None """ self.function_mapping["CLK_off"]() @command() @DebugIt() @only_when_on() @fault_on_error() def CLK_on(self): """ :return:None """ self.function_mapping["CLK_on"]() @command() @DebugIt() @only_when_on() @fault_on_error() def CLK_PLL_setup(self): """ :return:None """ self.function_mapping["CLK_PLL_setup"]() # ---------- # Run server # ---------- def main(args=None, **kwargs): """Main function of the PCC module.""" from util.lofar_logging import configure_logger import logging configure_logger(logging.getLogger()) return run((PCC,), args=args, **kwargs) if __name__ == '__main__': main()