diff --git a/devices/clients/attribute_wrapper.py b/devices/clients/attribute_wrapper.py index 397f11e0bcc437e82699a472c5f2a297f0e5c2e4..12e5c83516e2c68c2216aca5ba9b39a1fa6f4f8c 100644 --- a/devices/clients/attribute_wrapper.py +++ b/devices/clients/attribute_wrapper.py @@ -9,156 +9,156 @@ logger = logging.getLogger() class attribute_wrapper(attribute): - """ - Wraps all the attributes in a wrapper class to manage most of the redundant code behind the scenes - """ - - def __init__(self, comms_id=None, comms_annotation=None, datatype=None, dims=(1,), access=AttrWriteType.READ, init_value=None, **kwargs): - """ - wraps around the tango Attribute class. Provides an easier interface for 1d or 2d arrays. Also provides a way to abstract - managing the communications interface. - - comms_id: user-supplied identifier that is attached to this object, to identify which communication class will need to be attached - comms_annotation: data passed along to the attribute. can be given any form of data. handling is up to client implementation - datatype: any numpy datatype - dims: dimensions of the attribute as a tuple, or (1,) for a scalar. - init_value: value - """ - - # ensure the type is a numpy array. - # see also https://pytango.readthedocs.io/en/stable/server_api/server.html?highlight=devlong#module-tango.server for - # more details about type conversion Python/numpy -> PyTango - if "numpy" not in str(datatype) and datatype != str: - raise TypeError("Attribute needs to be a Tango-supported numpy or str type, but has type \"%s\"" % (datatype,)) - - self.comms_id = comms_id # store data that can be used to identify the comms interface to use. not used by the wrapper itself - self.comms_annotation = comms_annotation # store data that can be used by the comms interface. not used by the wrapper itself - - self.init_value = init_value - is_scalar = dims == (1,) - - # tango doesn't recognise numpy.str_, for consistencies sake we convert it here and hide this from the top level - # NOTE: discuss, idk if this is an important detail somewhere else - if datatype is numpy.str_ or datatype is numpy.str: - datatype = str - - self.numpy_type = datatype # tango changes our attribute to their representation (E.g numpy.int64 becomes "DevLong64") - - # check if not scalar - if is_scalar: - # scalar, just set the single dimension. - # Tango defines a scalar as having dimensions (1,0), see https://pytango.readthedocs.io/en/stable/server_api/attribute.html - max_dim_x = 1 - max_dim_y = 0 - else: - # get first dimension - max_dim_x = dims[0] - - # single dimension/spectrum requires the datatype to be wrapped in a tuple - datatype = (datatype,) - - if len(dims) == 2: - # get second dimension - max_dim_y = dims[1] - # wrap the datatype tuple in another tuple for 2d arrays/images - datatype = (datatype,) - else: - max_dim_y = 0 - - if access == AttrWriteType.READ_WRITE: - """ if the attribute is of READ_WRITE type, assign the RW and write function to it""" - - @only_when_on() - @fault_on_error() - def read_RW(device): - # print("read_RW {}, {}x{}, {}, {}".format(me.name, me.dim_x, me.dim_y, me.attr_type, me.value)) - """ - read_RW returns the value that was last written to the attribute - """ - try: - return device.value_dict[self] - except Exception as e: - raise Exception("Attribute read_RW function error, attempted to read value_dict with key: `%s`, are you sure this exists?", - self) from e - - @only_when_on() - @fault_on_error() - def write_RW(device, value): - """ - _write_RW writes a value to this attribute - """ - - self.write_function(value) - device.value_dict[self] = value - - self.fget = read_RW - self.fset = write_RW - - - else: - """ if the attribute is of READ type, assign the read function to it""" - - @only_when_on() - @fault_on_error() - def read_R(device): - """ - _read_R reads the attribute value, stores it and returns it" - """ - device.value_dict[self] = self.read_function() - return device.value_dict[self] - - self.fget = read_R - - super().__init__(dtype=datatype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access, **kwargs) - - return - - def initial_value(self): - """ - returns a numpy array filled with zeroes fit to the size of the attribute. Or if init_value is not the default None, return that value - """ - if self.init_value is not None: - return self.init_value - - if self.dim_y > 1: - dims = (self.dim_x, self.dim_y) - else: - dims = (self.dim_x,) - - # x and y are swapped for numpy and Tango. to maintain tango conventions, x and y are swapped for numpy - if len(dims) == 2: - numpy_dims = tuple((dims[1], dims[0])) - else: - numpy_dims = dims - - if self.dim_x == 1: - - if self.numpy_type == str: - value = '' - else: - value = self.numpy_type(0) - else: - value = numpy.zeros(numpy_dims, dtype=self.numpy_type) - - return value - - def set_comm_client(self, client): - """ - takes a communications client as input arguments This client should be of a class containing a "get_mapping" function - and return a read and write function that the wrapper will use to get/set data. - """ - try: - self.read_function, self.write_function = client.setup_attribute(self.comms_annotation, self) - except Exception as e: - - logger.error("Exception while setting {} attribute with annotation: '{}' {}".format(client.__class__.__name__, self.comms_annotation, e)) - raise Exception("Exception while setting %s attribute with annotation: '%s'", client.__class__.__name__, self.comms_annotation) from e - - def set_pass_func(self): - def pass_func(value=None): - pass - - logger.debug("using pass function for attribute with annotation: {}".format(self.comms_annotation)) - - self.read_function = pass_func - self.write_function = pass_func + """ + Wraps all the attributes in a wrapper class to manage most of the redundant code behind the scenes + """ + + def __init__(self, comms_id=None, comms_annotation=None, datatype=None, dims=(1,), access=AttrWriteType.READ, init_value=None, **kwargs): + """ + wraps around the tango Attribute class. Provides an easier interface for 1d or 2d arrays. Also provides a way to abstract + managing the communications interface. + + comms_id: user-supplied identifier that is attached to this object, to identify which communication class will need to be attached + comms_annotation: data passed along to the attribute. can be given any form of data. handling is up to client implementation + datatype: any numpy datatype + dims: dimensions of the attribute as a tuple, or (1,) for a scalar. + init_value: value + """ + + # ensure the type is a numpy array. + # see also https://pytango.readthedocs.io/en/stable/server_api/server.html?highlight=devlong#module-tango.server for + # more details about type conversion Python/numpy -> PyTango + if "numpy" not in str(datatype) and datatype != str: + raise TypeError("Attribute needs to be a Tango-supported numpy or str type, but has type \"%s\"" % (datatype,)) + + self.comms_id = comms_id # store data that can be used to identify the comms interface to use. not used by the wrapper itself + self.comms_annotation = comms_annotation # store data that can be used by the comms interface. not used by the wrapper itself + + self.init_value = init_value + is_scalar = dims == (1,) + + # tango doesn't recognise numpy.str_, for consistencies sake we convert it here and hide this from the top level + # NOTE: discuss, idk if this is an important detail somewhere else + if datatype is numpy.str_ or datatype is numpy.str: + datatype = str + + self.numpy_type = datatype # tango changes our attribute to their representation (E.g numpy.int64 becomes "DevLong64") + + # check if not scalar + if is_scalar: + # scalar, just set the single dimension. + # Tango defines a scalar as having dimensions (1,0), see https://pytango.readthedocs.io/en/stable/server_api/attribute.html + max_dim_x = 1 + max_dim_y = 0 + else: + # get first dimension + max_dim_x = dims[0] + + # single dimension/spectrum requires the datatype to be wrapped in a tuple + datatype = (datatype,) + + if len(dims) == 2: + # get second dimension + max_dim_y = dims[1] + # wrap the datatype tuple in another tuple for 2d arrays/images + datatype = (datatype,) + else: + max_dim_y = 0 + + if access == AttrWriteType.READ_WRITE: + """ if the attribute is of READ_WRITE type, assign the RW and write function to it""" + + @only_when_on() + @fault_on_error() + def read_RW(device): + # print("read_RW {}, {}x{}, {}, {}".format(me.name, me.dim_x, me.dim_y, me.attr_type, me.value)) + """ + read_RW returns the value that was last written to the attribute + """ + try: + return device.value_dict[self] + except Exception as e: + raise Exception("Attribute read_RW function error, attempted to read value_dict with key: `%s`, are you sure this exists?", + self) from e + + @only_when_on() + @fault_on_error() + def write_RW(device, value): + """ + _write_RW writes a value to this attribute + """ + + self.write_function(value) + device.value_dict[self] = value + + self.fget = read_RW + self.fset = write_RW + + + else: + """ if the attribute is of READ type, assign the read function to it""" + + @only_when_on() + @fault_on_error() + def read_R(device): + """ + _read_R reads the attribute value, stores it and returns it" + """ + device.value_dict[self] = self.read_function() + return device.value_dict[self] + + self.fget = read_R + + super().__init__(dtype=datatype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access, **kwargs) + + return + + def initial_value(self): + """ + returns a numpy array filled with zeroes fit to the size of the attribute. Or if init_value is not the default None, return that value + """ + if self.init_value is not None: + return self.init_value + + if self.dim_y > 1: + dims = (self.dim_x, self.dim_y) + else: + dims = (self.dim_x,) + + # x and y are swapped for numpy and Tango. to maintain tango conventions, x and y are swapped for numpy + if len(dims) == 2: + numpy_dims = tuple((dims[1], dims[0])) + else: + numpy_dims = dims + + if self.dim_x == 1: + + if self.numpy_type == str: + value = '' + else: + value = self.numpy_type(0) + else: + value = numpy.zeros(numpy_dims, dtype=self.numpy_type) + + return value + + def set_comm_client(self, client): + """ + takes a communications client as input arguments This client should be of a class containing a "get_mapping" function + and return a read and write function that the wrapper will use to get/set data. + """ + try: + self.read_function, self.write_function = client.setup_attribute(self.comms_annotation, self) + except Exception as e: + + logger.error("Exception while setting {} attribute with annotation: '{}' {}".format(client.__class__.__name__, self.comms_annotation, e)) + raise Exception("Exception while setting %s attribute with annotation: '%s'", client.__class__.__name__, self.comms_annotation) from e + + def set_pass_func(self): + def pass_func(value=None): + pass + + logger.debug("using pass function for attribute with annotation: {}".format(self.comms_annotation)) + + self.read_function = pass_func + self.write_function = pass_func