Skip to content
Snippets Groups Projects
attribute_wrapper.py 4.11 KiB
Newer Older
from tango.server import attribute
from tango import AttrWriteType

import numpy

from src.wrappers import only_when_on, fault_on_error



class attribute_wrapper(attribute):
	"""
		Wraps all the attributes in a wrapper class to manage most of the redundant code behind the scenes
	"""

	def __init__(self, comms_annotation=None, datatype=None, dims=(1,), access=AttrWriteType.READ, init_value=None, **kwargs):
		"""
		wraps around the tango Attribute class. Provides an easier interface for 1d or 2d arrays. Also provides a way to abstract
		managing the communications interface.
		"""

		# ensure the type is a numpy array
		if "numpy" not in str(datatype) and type(datatype) != str:
			raise TypeError("Attribute needs to be a Tango-supported numpy or str type, but has type \"%s\"" % (datatype,))



		self.comms_annotation = comms_annotation # store data that can be used by the comms interface. not used by the wrapper itself
		self.numpy_type = datatype # tango changes our attribute to their representation (E.g numpy.int64 becomes "DevLong64")

		self.init_value = init_value
		max_dim_y = 0

		# tango doesn't recognise numpy.str_, for consistencies sake we convert it here and hide this from the top level
		# NOTE: discuss, idk if this is an important detail somewhere else
		if datatype is numpy.str_:
			datatype = str

		# check if not scalar
		if isinstance(dims, tuple):

			# get first dimension
			max_dim_x = dims[0]

			# single dimension/spectrum requires the datatype to be wrapped in a tuple
			datatype = (datatype,)

			if len(dims) == 2:
				# get second dimension
				max_dim_y = dims[1]
				# wrap the datatype tuple in another tuple for 2d arrays/images
				datatype = (datatype,)
		else:
			# scalar, just set the single dimension
			max_dim_x = 1


		if access == AttrWriteType.READ_WRITE:
			""" if the attribute is of READ_WRITE type, assign the RW and write function to it"""

			@only_when_on
			@fault_on_error
			def read_RW(device):
				# print("read_RW {}, {}x{}, {}, {}".format(me.name, me.dim_x, me.dim_y, me.attr_type, me.value))
				"""
				read_RW returns the value that was last written to the attribute
				"""
				try:
					return device.value_dict[self]
				except:
					print()

			@only_when_on
			@fault_on_error
			def write_RW(device, value):
				"""
				_write_RW writes a value to this attribute
				"""
				self.write_function(value)
				device.value_dict[self] = value

			self.fget = read_RW
			self.fset = write_RW


		else:
			""" if the attribute is of READ type, assign the read function to it"""

			@only_when_on
			@fault_on_error
			def read_R(device):
				"""
				_read_R reads the attribute value, stores it and returns it"
				"""
				device.value_dict[self] = self.read_function()
				return device.value_dict[self]
		super().__init__(dtype=datatype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access, **kwargs)

		return

	def initial_value(self):
		"""
		returns a numpy array filled with zeroes fit to the size of the attribute. Or if init_value is not the default None, return that value
		"""
		if self.init_value is not None:
			return self.init_value

		if self.dim_y > 1:
			dims = (self.dim_x, self.dim_y)
		else:
			dims = (self.dim_x,)

		# x and y are swapped for numpy and Tango. to maintain tango conventions, x and y are swapped for numpy
		if len(dims) == 2:
			numpy_dims = tuple((dims[1], dims[0]))
		else:
			numpy_dims = dims

		value = numpy.zeros(numpy_dims, dtype=self.numpy_type)
		return value

	def set_comm_client(self, client):
		"""
		takes a communications client as input arguments This client should be of a class containing a "get_mapping" function
		and return a read and write function that the wrapper will use to get/set data.
		"""
		try:
			self.read_function, self.write_function = client.setup_attribute(self.comms_annotation, self)
		except:
			def pass_func(value=None):
				pass
			print("setting comm_client failed. using pass function instead")

			self.read_function = pass_func
			self.write_function = pass_func