From 70d1e9a069c2d633787e00097380d341fd83635c Mon Sep 17 00:00:00 2001 From: thijs snijder <snijder@astron.nl> Date: Fri, 16 Apr 2021 12:51:46 +0200 Subject: [PATCH] added early ini and APSCTL clients/devices --- devices/APSCTL.py | 189 ++++++++++++++++++++++++++++++++++ devices/clients/ini_client.py | 133 ++++++++++++++++++++++++ devices/example/example.ini | 18 ++++ devices/ini_device.py | 115 +++++++++++++++++++++ 4 files changed, 455 insertions(+) create mode 100644 devices/APSCTL.py create mode 100644 devices/clients/ini_client.py create mode 100644 devices/example/example.ini create mode 100644 devices/ini_device.py diff --git a/devices/APSCTL.py b/devices/APSCTL.py new file mode 100644 index 000000000..27fdaba4b --- /dev/null +++ b/devices/APSCTL.py @@ -0,0 +1,189 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the SDP project +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" SDP Device Server for LOFAR2.0 + +""" + +# PyTango imports +from tango.server import run +from tango.server import device_property + +#attribute extention and hardware device imports +from src.attribute_wrapper import * +from src.hardware_device import * + +# Additional import + +from clients.opcua_connection import OPCUAConnection + + +__all__ = ["APSCTL", "main"] + +class APSCTL(hardware_device): + """ + + **Properties:** + + - Device Property + OPC_Server_Name + - Type:'DevString' + OPC_Server_Port + - Type:'DevULong' + OPC_Time_Out + - Type:'DevDouble' + """ + + # ----------------- + # Device Properties + # ----------------- + + OPC_Server_Name = device_property( + dtype='DevString', + mandatory=True + ) + + OPC_Server_Port = device_property( + dtype='DevULong', + mandatory=True + ) + + OPC_Time_Out = device_property( + dtype='DevDouble', + mandatory=True + ) + + # ---------- + # Attributes + # ---------- + N_unb = 2 + N_fpga = 4 + N_ddr = 2 + N_qsfp = 6 + + + # Central CP per Uniboard + UNB2_Power_ON_OFF_RW = attribute_wrapper(comms_annotation=["2:UNB2_Power_ON_OFF_RW"], datatype=numpy.bool_, dims=(N_unb,), access=AttrWriteType.READ_WRITE) + UNB2_Front_Panel_LED_RW = attribute_wrapper(comms_annotation=["2:UNB2_Front_Panel_LED_RW"], datatype=numpy.uint8, dims=(N_unb,), access=AttrWriteType.READ_WRITE) + UNB2_Mask_RW = attribute_wrapper(comms_annotation=["2:UNB2_Mask_RW"], datatype=numpy.bool_, dims=(N_unb,), access=AttrWriteType.READ_WRITE) + # Central MP per Uniboard + UNB2_I2C_bus_OK_R = attribute_wrapper(comms_annotation=["2:UNB2_I2C_bus_OK_R"], datatype=numpy.bool_, dims=(N_unb,)) + UNB2_Front_Panel_LED_R = attribute_wrapper(comms_annotation=["2:UNB2_Front_Panel_LED_R"], datatype=numpy.uint8, dims=(N_unb,)) + UNB2_EEPROM_Serial_Number_R = attribute_wrapper(comms_annotation=["2:UNB2_EEPROM_Serial_Number_R"], datatype=numpy.str, dims=(N_unb,)) + UNB2_EEPROM_Unique_ID_R = attribute_wrapper(comms_annotation=["2:UNB2_EEPROM_Unique_ID_R"], datatype=numpy.uint32, dims=(N_unb,)) + UNB2_DC_DC_48V_12V_VIN_R = attribute_wrapper(comms_annotation=["2:UNB2_DC_DC_48V_12V_VIN_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_DC_DC_48V_12V_VOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_DC_DC_48V_12V_VOUT_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_DC_DC_48V_12V_IOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_DC_DC_48V_12V_IOUT_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_DC_DC_48V_12V_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_DC_DC_48V_12V_TEMP_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_POL_QSFP_N01_VOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_POL_QSFP_N01_VOUT_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_POL_QSFP_N01_IOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_POL_QSFP_N01_IOUT_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_POL_QSFP_N01_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_POL_QSFP_N01_TEMP_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_POL_QSFP_N23_VOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_POL_QSFP_N23_VOUT_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_POL_QSFP_N23_IOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_POL_QSFP_N23_IOUT_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_POL_QSFP_N23_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_POL_QSFP_N23_TEMP_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_POL_SWITCH_1V2_VOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_POL_SWITCH_1V2_VOUT_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_POL_SWITCH_1V2_IOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_POL_SWITCH_1V2_IOUT_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_POL_SWITCH_1V2_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_POL_SWITCH_1V2_TEMP_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_POL_SWITCH_PHY_VOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_POL_SWITCH_PHY_VOUT_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_POL_SWITCH_PHY_IOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_POL_SWITCH_PHY_IOUT_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_POL_SWITCH_PHY_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_POL_SWITCH_PHY_TEMP_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_POL_CLOCK_VOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_POL_CLOCK_VOUT_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_POL_CLOCK_IOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_POL_CLOCK_IOUT_R"], datatype=numpy.double, dims=(N_unb,)) + UNB2_POL_CLOCK_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_POL_CLOCK_TEMP_R"], datatype=numpy.double, dims=(N_unb,)) + + # monitor points per FPGA + UNB2_FPGA_DDR4_SLOT_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_DDR4_SLOT_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_DDR4_SLOT_PART_NUMBER_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_DDR4_SLOT_PART_NUMBER_R"], datatype=numpy.str, dims=(N_unb,N_fpga)) + UNB2_FPGA_QSFP_CAGE_0_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_QSFP_CAGE_0_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_QSFP_CAGE_1_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_QSFP_CAGE_1_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_QSFP_CAGE_2_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_QSFP_CAGE_2_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_QSFP_CAGE_3_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_QSFP_CAGE_3_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_QSFP_CAGE_4_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_QSFP_CAGE_4_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_QSFP_CAGE_5_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_QSFP_CAGE_5_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_QSFP_CAGE_0_LOS_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_QSFP_CAGE_0_LOS_R"], datatype=numpy.uint8, dims=(N_unb,N_fpga)) + UNB2_FPGA_QSFP_CAGE_1_LOS_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_QSFP_CAGE_1_LOS_R"], datatype=numpy.uint8, dims=(N_unb,N_fpga)) + UNB2_FPGA_QSFP_CAGE_2_LOS_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_QSFP_CAGE_2_LOS_R"], datatype=numpy.uint8, dims=(N_unb,N_fpga)) + UNB2_FPGA_QSFP_CAGE_3_LOS_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_QSFP_CAGE_3_LOS_R"], datatype=numpy.uint8, dims=(N_unb,N_fpga)) + UNB2_FPGA_QSFP_CAGE_4_LOS_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_QSFP_CAGE_4_LOS_R"], datatype=numpy.uint8, dims=(N_unb,N_fpga)) + UNB2_FPGA_QSFP_CAGE_5_LOS_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_QSFP_CAGE_5_LOS_R"], datatype=numpy.uint8, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_CORE_VOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_CORE_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_CORE_IOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_CORE_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_CORE_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_CORE_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_ERAM_VOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_ERAM_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_ERAM_IOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_ERAM_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_ERAM_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_ERAM_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_RXGXB_VOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_RXGXB_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_RXGXB_IOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_RXGXB_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_RXGXB_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_RXGXB_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_TXGXB_VOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_TXGXB_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_TXGXB_IOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_TXGXB_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + # listed as 'POL_FPGA_TXGXB_TEMP_R' on https://support.astron.nl/confluence/pages/viewpage.action?spaceKey=L2M&title=L4+SDPHW+Decision%3A+UniBoard2+Monitor+and+Control+points + # probably a typo + POL_FPGA_TXGXB_TEMP_R = attribute_wrapper(comms_annotation=["2:POL_FPGA_TXGXB_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + + UNB2_FPGA_POL_HGXB_VOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_HGXB_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_HGXB_IOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_HGXB_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_HGXB_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_HGXB_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_PGM_VOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_PGM_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_PGM_IOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_PGM_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + UNB2_FPGA_POL_PGM_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_PGM_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga)) + + + def delete_device(self): + """Hook to delete resources allocated in init_device. + + This method allows for any memory or other resources allocated in the + init_device method to be released. This method is called by the device + destructor and by the device Init command (a Tango built-in). + """ + self.debug_stream("Shutting down...") + + self.Off() + self.debug_stream("Shut down. Good bye.") + + # -------- + # overloaded functions + # -------- + def off(self): + """ user code here. is called when the state is set to OFF """ + + # Stop keep-alive + self.opcua_connection.stop() + + def initialise(self): + """ user code here. is called when the sate is set to INIT """ + """Initialises the attributes and properties of the PCC.""" + + # set up the OPC ua client + self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self) + + # map an access helper class + for i in self.attr_list(): + try: + i.set_comm_client(self.OPCua_client) + except: + self.debug_stream("error in getting APSCTL attribute: {} from client".format(i)) + + self.OPCua_client.start() + + # -------- + # Commands + # -------- + +# ---------- +# Run server +# ---------- +def main(args=None, **kwargs): + """Main function of the SDP module.""" + return run((APSCTL,), args=args, **kwargs) + + +if __name__ == '__main__': + main() + diff --git a/devices/clients/ini_client.py b/devices/clients/ini_client.py new file mode 100644 index 000000000..9eecc7eac --- /dev/null +++ b/devices/clients/ini_client.py @@ -0,0 +1,133 @@ +from src.comms_client import * +import configparser +import numpy + + +numpy_to_ini_dict = { + numpy.int64: int, + numpy.double: float, + numpy.bool_: bool, + str: str +} +ini_to_numpy_dict = { + int: numpy.int64, + float: numpy.double, + bool: numpy.bool_, + str: str +} + +import os + + +class ini_client(CommClient): + """ + this class provides an example implementation of a comms_client. + Durirng initialisation it creates a correctly shaped zero filled value. on read that value is returned and on write its modified. + """ + + def start(self): + super().start() + + def __init__(self, filename, fault_func, streams, try_interval=2): + """ + initialises the class and tries to connect to the client. + """ + self.config = configparser.ConfigParser() + self.filename = filename + + if not filename.endswith(".ini"): + filename = filename + ".ini" + + + super().__init__(fault_func, streams, try_interval) + + # Explicitly connect + if not self.connect(): + # hardware or infra is down -- needs fixing first + fault_func() + return + + def connect(self): + files_path = [os.path.abspath(x) for x in os.listdir()] + self.streams.debug_stream(" %s", files_path) + self.config_file = open(self.filename, "rw") + + self.connected = True # set connected to true + return True # if succesfull, return true. otherwise return false + + def disconnect(self): + self.connected = False # always force a reconnect, regardless of a successful disconnect + self.streams.debug_stream("disconnected from the 'client' ") + + def _setup_annotation(self, annotation): + """ + this function gives the client access to the comm client annotation data given to the attribute wrapper. + The annotation data can be used to provide whatever extra data is necessary in order to find/access the monitor/control point. + + the annotation can be in whatever format may be required. it is up to the user to handle its content + example annotation may include: + - a file path and file line/location + - COM object path + """ + + # as this is an example, just print the annotation + self.streams.debug_stream("annotation: {}".format(annotation)) + name = annotation.get('name') + if name is None: + AssertionError("ini client requires a variable name to set/get") + section = annotation.get('section') + if section is None: + AssertionError("requires a section to open") + + return section, name + + + def _setup_value_conversion(self, attribute): + """ + gives the client access to the attribute_wrapper object in order to access all + necessary data such as dimensionality and data type + """ + + if attribute.dim_y > 1: + dims = (attribute.dim_y, attribute.dim_x) + else: + dims = (attribute.dim_x,) + + dtype = attribute.numpy_type + + return dims, dtype + + def _setup_mapping(self, name, section, dtype): + """ + takes all gathered data to configure and return the correct read and write functions + """ + + def read_function(): + value = self.config.get(section, name) + value = ini_to_numpy_dict[dtype](value) + return value + + def write_function(write_value): + self.config.set(section, name, write_value) + fp = open(self.filename, 'w') + self.config.write(fp) + + return read_function, write_function + + def setup_attribute(self, annotation=None, attribute=None): + """ + MANDATORY function: is used by the attribute wrapper to get read/write functions. + must return the read and write functions + """ + + # process the comms_annotation + section, name = self._setup_annotation(annotation) + + # get all the necessary data to set up the read/write functions from the attribute_wrapper + dims, dtype = self._setup_value_conversion(attribute) + + # configure and return the read/write functions + read_function, write_function = self._setup_mapping(name, section, dtype) + + # return the read/write functions + return read_function, write_function diff --git a/devices/example/example.ini b/devices/example/example.ini new file mode 100644 index 000000000..f12452bd3 --- /dev/null +++ b/devices/example/example.ini @@ -0,0 +1,18 @@ +[scalar] +double_scalar = 1.2 +bool_scalar = True +int_scalar = 3 +str_scalar = "hello" + +[spectrum] +double_spectrum = [1.2, 3.4, 5.6, 7.8] +bool_spectrum = [True, False, False, True] +int_spectrum = [1, 2, 3, 4] +str_spectrum = ["hi", "how", "are", "you?"] + +[image] +double_image = [[1.2, 3.4], [5.6, 7.8], [9.0, 1.2]] +bool_image = [[True, False], [False, False], [True, True]] +int_image = [[1, 2], [3, 4], [5,6]] +str_image = [["a", "b"], ["c", "d"], ["e", "f"]] + diff --git a/devices/ini_device.py b/devices/ini_device.py new file mode 100644 index 000000000..dbacb0693 --- /dev/null +++ b/devices/ini_device.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +# +# This file wraps around a tango device class and provides a number of abstractions useful for hardware devices. It works together +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" + +""" + +# PyTango imports +from tango.server import run +from tango.server import device_property +from tango import DevState +# Additional import +from src.attribute_wrapper import * +from src.hardware_device import * + + + +from clients.ini_client import * + + +__all__ = ["ini_device"] + + +class ini_device(hardware_device): + """ + This class is the minimal (read empty) implementation of a class using 'hardware_device' + """ + + # ---------- + # Attributes + # ---------- + """ + attribute wrapper objects can be declared here. All attribute wrapper objects will get automatically put in a list (attr_list) for easy access + + example = attribute_wrapper(comms_annotation="this is an example", datatype=numpy.double, dims=(8, 2), access=AttrWriteType.READ_WRITE) + ... + + """ + double_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "double_scalar"}, datatype=numpy.double, access=AttrWriteType.READ_WRITE) + double_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "double_scalar"}, datatype=numpy.double) + bool_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "bool_scalar"}, datatype=numpy.bool_, access=AttrWriteType.READ_WRITE) + bool_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "bool_scalar"}, datatype=numpy.bool_) + int_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "int_scalar"}, datatype=numpy.int64, access=AttrWriteType.READ_WRITE) + int_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "int_scalar"}, datatype=numpy.int64) + str_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "str_scalar"}, datatype=numpy.str, access=AttrWriteType.READ_WRITE) + str_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "str_scalar"}, datatype=numpy.str) + + double_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "double_spectrum"}, datatype=numpy.double, dims=(4,), access=AttrWriteType.READ_WRITE) + double_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "double_spectrum"}, datatype=numpy.double, dims=(4,)) + bool_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "bool_spectrum"}, datatype=numpy.bool_, dims=(4,), access=AttrWriteType.READ_WRITE) + bool_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "bool_spectrum"}, datatype=numpy.bool_, dims=(4,)) + int_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "int_spectrum"}, datatype=numpy.int64, dims=(4,), access=AttrWriteType.READ_WRITE) + int_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "int_spectrum"}, datatype=numpy.int64, dims=(4,)) + str_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "str_spectrum"}, datatype=numpy.str, dims=(4,), access=AttrWriteType.READ_WRITE) + str_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "str_spectrum"}, datatype=numpy.str, dims=(4,)) + + double_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "double_image"}, datatype=numpy.double, dims=(3, 2), access=AttrWriteType.READ_WRITE) + double_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "double_image"}, datatype=numpy.double, dims=(3, 2)) + bool_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "bool_image"}, datatype=numpy.bool_, dims=(3, 2), access=AttrWriteType.READ_WRITE) + bool_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "bool_image"}, datatype=numpy.bool_, dims=(3, 2)) + int_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "int_image"}, datatype=numpy.int64, dims=(3, 2), access=AttrWriteType.READ_WRITE) + int_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "int_image"}, datatype=numpy.int64, dims=(3, 2)) + str_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "str_image"}, datatype=numpy.str, dims=(3, 2), access=AttrWriteType.READ_WRITE) + str_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "str_image"}, datatype=numpy.str, dims=(3, 2)) + + + def always_executed_hook(self): + """Method always executed before any TANGO command is executed.""" + pass + + def delete_device(self): + """Hook to delete resources allocated in init_device. + + This method allows for any memory or other resources allocated in the + init_device method to be released. This method is called by the device + destructor and by the device Init command (a Tango built-in). + """ + self.debug_stream("Shutting down...") + + self.Off() + self.debug_stream("Shut down. Good bye.") + + # -------- + # overloaded functions + # -------- + def initialise(self): + """ user code here. is called when the sate is set to INIT """ + """Initialises the attributes and properties of the PCC.""" + + self.set_state(DevState.INIT) + + # set up the OPC ua client + self.ini_client = ini_client("example/example.ini", self.Fault, self) + + # map an access helper class + for i in self.attr_list(): + i.set_comm_client(self.ini_client) + + self.ini_client.start() + + +# ---------- +# Run server +# ---------- +def main(args=None, **kwargs): + """Main function of the hardware device module.""" + return run((ini_device,), args=args, **kwargs) + + +if __name__ == '__main__': + main() -- GitLab