Skip to content
Snippets Groups Projects
Select Git revision
  • d594420b5c65c6b3fb08db886b51cfee6e930b47
  • master default protected
  • revert-cs032-ccd-ip
  • deploy-components-parallel
  • fix-chrony-exporter
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • fix-build-dind
  • v0.55.5-r2 protected
  • v0.52.8-rc1 protected
  • v0.55.5 protected
  • v0.55.4 protected
  • 0.55.2.dev0
  • 0.55.1.dev0
  • 0.55.0.dev0
  • v0.54.0 protected
  • 0.53.2.dev0
  • 0.53.1.dev0
  • v0.52.3-r2 protected
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
41 results

test_tcp_replicator.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    attribute_wrapper.py 5.48 KiB
    from tango.server import attribute
    from tango import AttrWriteType
    
    import numpy
    
    from util.wrappers import only_when_on, fault_on_error
    import logging
    
    logger = logging.getLogger()
    
    
    class attribute_wrapper(attribute):
        """
        Wraps all the attributes in a wrapper class to manage most of the redundant code behind the scenes
        """
    
        def __init__(self, comms_annotation=None, datatype=None, dims=(1,), access=AttrWriteType.READ, init_value=None, **kwargs):
            """
            wraps around the tango Attribute class. Provides an easier interface for 1d or 2d arrays. Also provides a way to abstract
            managing the communications interface.
    
            comms_annotation: data passed along to the attribute. can be given any form of data. handling is up to client implementation
            datatype: any numpy datatype
            dims: dimensions of the
            init_value: value
            """
    
            # ensure the type is a numpy array
            if "numpy" not in str(datatype) and datatype != str:
                raise TypeError("Attribute needs to be a Tango-supported numpy or str type, but has type \"%s\"" % (datatype,))
    
            self.comms_annotation = comms_annotation  # store data that can be used by the comms interface. not used by the wrapper itself
            self.numpy_type = datatype  # tango changes our attribute to their representation (E.g numpy.int64 becomes "DevLong64")
    
            self.init_value = init_value
            max_dim_y = 0
    
            # tango doesn't recognise numpy.str_, for consistencies sake we convert it here and hide this from the top level
            # NOTE: discuss, idk if this is an important detail somewhere else
            if datatype is numpy.str_:
                datatype = str
    
            # check if not scalar
            if isinstance(dims, tuple):
    
                # get first dimension
                max_dim_x = dims[0]
    
                # single dimension/spectrum requires the datatype to be wrapped in a tuple
                datatype = (datatype,)
    
                if len(dims) == 2:
                    # get second dimension
                    max_dim_y = dims[1]
                    # wrap the datatype tuple in another tuple for 2d arrays/images
                    datatype = (datatype,)
            else:
                # scalar, just set the single dimension
                max_dim_x = 1
    
            if access == AttrWriteType.READ_WRITE:
                """ if the attribute is of READ_WRITE type, assign the RW and write function to it"""
    
                @only_when_on()
                @fault_on_error()
                def read_RW(device):
                    # print("read_RW {}, {}x{}, {}, {}".format(me.name, me.dim_x, me.dim_y, me.attr_type, me.value))
                    """
                    read_RW returns the value that was last written to the attribute
                    """
                    try:
                        return device.value_dict[self]
                    except Exception as e:
                        raise Exception("Attribute read_RW function error, attempted to read value_dict with key: `%s`, are you sure this exists?",
                                        self) from e
    
                @only_when_on()
                @fault_on_error()
                def write_RW(device, value):
                    """
                    _write_RW writes a value to this attribute
                    """
    
                    self.write_function(value)
                    device.value_dict[self] = value
    
                self.fget = read_RW
                self.fset = write_RW
    
    
            else:
                """ if the attribute is of READ type, assign the read function to it"""
    
                @only_when_on()
                @fault_on_error()
                def read_R(device):
                    """
                    _read_R reads the attribute value, stores it and returns it"
                    """
                    device.value_dict[self] = self.read_function()
                    return device.value_dict[self]
    
                self.fget = read_R
    
            super().__init__(dtype=datatype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access, **kwargs)
    
            return
    
        def initial_value(self):
            """
            returns a numpy array filled with zeroes fit to the size of the attribute. Or if init_value is not the default None, return that value
            """
            if self.init_value is not None:
                return self.init_value
    
            if self.dim_y > 1:
                dims = (self.dim_x, self.dim_y)
            else:
                dims = (self.dim_x,)
    
            # x and y are swapped for numpy and Tango. to maintain tango conventions, x and y are swapped for numpy
            if len(dims) == 2:
                numpy_dims = tuple((dims[1], dims[0]))
            else:
                numpy_dims = dims
    
            value = numpy.zeros(numpy_dims, dtype=self.numpy_type)
            return value
    
        def set_comm_client(self, client):
            """
            takes a communications client as input arguments This client should be of a class containing a "get_mapping" function
            and return a read and write function that the wrapper will use to get/set data.
            """
            try:
                self.read_function, self.write_function = client.setup_attribute(self.comms_annotation, self)
            except Exception as e:
                def pass_func(value=None):
                    pass
    
                logger.error("Exception while setting %s attribute with annotation: '%s' read/write functions. using pass function instead to to keep running", client.__class__.__name__, self.comms_annotation)
    
                self.read_function = pass_func
                self.write_function = pass_func
    
                raise Exception("Exception while setting comm_client read/write functions. using pass function instead. %s") from e