from tango.server import attribute from tango import AttrWriteType import numpy from src.wrappers import only_when_on, fault_on_error import logging logger = logging.getLogger() class attribute_wrapper(attribute): """ Wraps all the attributes in a wrapper class to manage most of the redundant code behind the scenes """ def __init__(self, comms_annotation=None, datatype=None, dims=(1,), access=AttrWriteType.READ, init_value=None, **kwargs): """ wraps around the tango Attribute class. Provides an easier interface for 1d or 2d arrays. Also provides a way to abstract managing the communications interface. comms_annotation: data passed along to the attribute. can be given any form of data. handling is up to client implementation datatype: any numpy datatype dims: dimensions of the init_value: value """ # ensure the type is a numpy array if "numpy" not in str(datatype) and datatype != str: raise TypeError("Attribute needs to be a Tango-supported numpy or str type, but has type \"%s\"" % (datatype,)) self.comms_annotation = comms_annotation # store data that can be used by the comms interface. not used by the wrapper itself self.numpy_type = datatype # tango changes our attribute to their representation (E.g numpy.int64 becomes "DevLong64") self.init_value = init_value max_dim_y = 0 # tango doesn't recognise numpy.str_, for consistencies sake we convert it here and hide this from the top level # NOTE: discuss, idk if this is an important detail somewhere else if datatype is numpy.str_: datatype = str # check if not scalar if isinstance(dims, tuple): # get first dimension max_dim_x = dims[0] # single dimension/spectrum requires the datatype to be wrapped in a tuple datatype = (datatype,) if len(dims) == 2: # get second dimension max_dim_y = dims[1] # wrap the datatype tuple in another tuple for 2d arrays/images datatype = (datatype,) else: # scalar, just set the single dimension max_dim_x = 1 if access == AttrWriteType.READ_WRITE: """ if the attribute is of READ_WRITE type, assign the RW and write function to it""" @only_when_on @fault_on_error def read_RW(device): # print("read_RW {}, {}x{}, {}, {}".format(me.name, me.dim_x, me.dim_y, me.attr_type, me.value)) """ read_RW returns the value that was last written to the attribute """ try: return device.value_dict[self] except Exception as e: raise Exception("Attribute read_RW function error, attempted to read value_dict with key: `%s`, are you sure this exists?", self) from e @only_when_on @fault_on_error def write_RW(device, value): """ _write_RW writes a value to this attribute """ try: self.write_function(value) device.value_dict[self] = value except Exception as e: raise e self.fget = read_RW self.fset = write_RW else: """ if the attribute is of READ type, assign the read function to it""" @only_when_on @fault_on_error def read_R(device): """ _read_R reads the attribute value, stores it and returns it" """ device.value_dict[self] = self.read_function() return device.value_dict[self] self.fget = read_R super().__init__(dtype=datatype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access, **kwargs) return def initial_value(self): """ returns a numpy array filled with zeroes fit to the size of the attribute. Or if init_value is not the default None, return that value """ if self.init_value is not None: return self.init_value if self.dim_y > 1: dims = (self.dim_x, self.dim_y) else: dims = (self.dim_x,) # x and y are swapped for numpy and Tango. to maintain tango conventions, x and y are swapped for numpy if len(dims) == 2: numpy_dims = tuple((dims[1], dims[0])) else: numpy_dims = dims value = numpy.zeros(numpy_dims, dtype=self.numpy_type) return value def set_comm_client(self, client): """ takes a communications client as input arguments This client should be of a class containing a "get_mapping" function and return a read and write function that the wrapper will use to get/set data. """ try: self.read_function, self.write_function = client.setup_attribute(self.comms_annotation, self) except Exception as e: def pass_func(value=None): pass logger.error("setting comm_client failed. using pass function instead") self.read_function = pass_func self.write_function = pass_func raise Exception("Exception while setting comm_client read/write functions. using pass function instead. %s") from e