Newer
Older
from tango.server import attribute
from tango import AttrWriteType
import numpy
from src.wrappers import only_when_on, fault_on_error
class attribute_wrapper(attribute):
"""
Wraps all the attributes in a wrapper class to manage most of the redundant code behind the scenes
"""
def __init__(self, comms_annotation=None, datatype=None, dims=(1,), access=AttrWriteType.READ, init_value=None, **kwargs):
"""
wraps around the tango Attribute class. Provides an easier interface for 1d or 2d arrays. Also provides a way to abstract
managing the communications interface.
"""
# ensure the type is a numpy array
if "numpy" not in str(datatype) and type(datatype) != str:
raise TypeError("Attribute needs to be a Tango-supported numpy or str type, but has type \"%s\"" % (datatype,))
self.comms_annotation = comms_annotation # store data that can be used by the comms interface. not used by the wrapper itself
self.numpy_type = datatype # tango changes our attribute to their representation (E.g numpy.int64 becomes "DevLong64")
max_dim_y = 0
# tango doesn't recognise numpy.str_, for consistencies sake we convert it here and hide this from the top level
# NOTE: discuss, idk if this is an important detail somewhere else
if datatype is numpy.str_:
datatype = str
# check if not scalar
if isinstance(dims, tuple):
# get first dimension
max_dim_x = dims[0]
# single dimension/spectrum requires the datatype to be wrapped in a tuple
datatype = (datatype,)
if len(dims) == 2:
# get second dimension
max_dim_y = dims[1]
# wrap the datatype tuple in another tuple for 2d arrays/images
datatype = (datatype,)
else:
# scalar, just set the single dimension
max_dim_x = 1
""" if the attribute is of READ_WRITE type, assign the RW and write function to it"""
@only_when_on
@fault_on_error
def read_RW(device):
# print("read_RW {}, {}x{}, {}, {}".format(me.name, me.dim_x, me.dim_y, me.attr_type, me.value))
"""
read_RW returns the value that was last written to the attribute
"""
try:
except:
print()
@only_when_on
@fault_on_error
def write_RW(device, value):
"""
_write_RW writes a value to this attribute
"""
self.write_function(value)
self.fget = read_RW
self.fset = write_RW
else:
""" if the attribute is of READ type, assign the read function to it"""
@only_when_on
@fault_on_error
def read_R(device):
"""
_read_R reads the attribute value, stores it and returns it"
"""
device.value_dict[self] = self.read_function()
return device.value_dict[self]
self.fget = read_R
super().__init__(dtype=datatype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access, **kwargs)
return
def initial_value(self):
"""
returns a numpy array filled with zeroes fit to the size of the attribute. Or if init_value is not the default None, return that value
"""
if self.init_value is not None:
return self.init_value
if self.dim_y > 1:
dims = (self.dim_x, self.dim_y)
else:
dims = (self.dim_x,)
# x and y are swapped for numpy and Tango. to maintain tango conventions, x and y are swapped for numpy
if len(dims) == 2:
numpy_dims = tuple((dims[1], dims[0]))
else:
numpy_dims = dims
value = numpy.zeros(numpy_dims, dtype=self.numpy_type)
return value
def set_comm_client(self, client):
"""
takes a communications client as input arguments This client should be of a class containing a "get_mapping" function
and return a read and write function that the wrapper will use to get/set data.
"""
try:
self.read_function, self.write_function = client.setup_attribute(self.comms_annotation, self)
except:
def pass_func(value=None):
pass
print("setting comm_client failed. using pass function instead")
self.read_function = pass_func
self.write_function = pass_func