diff --git a/devices/clients/attribute_wrapper.py b/devices/clients/attribute_wrapper.py index 99312919c0631f85c64cd3aec097a00b316f12f4..12e5c83516e2c68c2216aca5ba9b39a1fa6f4f8c 100644 --- a/devices/clients/attribute_wrapper.py +++ b/devices/clients/attribute_wrapper.py @@ -1,6 +1,5 @@ from tango.server import attribute from tango import AttrWriteType - import numpy from devices.device_decorators import only_when_on, fault_on_error @@ -27,23 +26,24 @@ class attribute_wrapper(attribute): """ # ensure the type is a numpy array. - # see also https://pytango.readthedocs.io/en/stable/server_api/server.html?highlight=devlong#module-tango.server for + # see also https://pytango.readthedocs.io/en/stable/server_api/server.html?highlight=devlong#module-tango.server for # more details about type conversion Python/numpy -> PyTango if "numpy" not in str(datatype) and datatype != str: raise TypeError("Attribute needs to be a Tango-supported numpy or str type, but has type \"%s\"" % (datatype,)) self.comms_id = comms_id # store data that can be used to identify the comms interface to use. not used by the wrapper itself self.comms_annotation = comms_annotation # store data that can be used by the comms interface. not used by the wrapper itself - self.numpy_type = datatype # tango changes our attribute to their representation (E.g numpy.int64 becomes "DevLong64") self.init_value = init_value is_scalar = dims == (1,) # tango doesn't recognise numpy.str_, for consistencies sake we convert it here and hide this from the top level # NOTE: discuss, idk if this is an important detail somewhere else - if datatype is numpy.str_: + if datatype is numpy.str_ or datatype is numpy.str: datatype = str + self.numpy_type = datatype # tango changes our attribute to their representation (E.g numpy.int64 becomes "DevLong64") + # check if not scalar if is_scalar: # scalar, just set the single dimension. @@ -131,7 +131,15 @@ class attribute_wrapper(attribute): else: numpy_dims = dims - value = numpy.zeros(numpy_dims, dtype=self.numpy_type) + if self.dim_x == 1: + + if self.numpy_type == str: + value = '' + else: + value = self.numpy_type(0) + else: + value = numpy.zeros(numpy_dims, dtype=self.numpy_type) + return value def set_comm_client(self, client): diff --git a/devices/clients/opcua_client.py b/devices/clients/opcua_client.py index 0167ad95e113e0880ab607c24ef9f54b45b58d11..8a986a0c7f98819ecad9ea6a5710aaca19c1ac0c 100644 --- a/devices/clients/opcua_client.py +++ b/devices/clients/opcua_client.py @@ -22,8 +22,6 @@ numpy_to_OPCua_dict = { numpy.float32: opcua.ua.VariantType.Float, numpy.double: opcua.ua.VariantType.Double, numpy.float64: opcua.ua.VariantType.Double, - numpy.str_: opcua.ua.VariantType.String, - numpy.str: opcua.ua.VariantType.String, str: opcua.ua.VariantType.String } @@ -95,7 +93,7 @@ class OPCUAConnection(CommClient): print(i.get_browse_name()) for j in i.get_children(): try: - print(j.get_browse_name(), j.get_data_type_as_variant_type()) + print(j.get_browse_name(), j.get_data_type_as_variant_type(), j.get_value()) except: print(j.get_browse_name()) finally: diff --git a/devices/devices/hardware_device.py b/devices/devices/hardware_device.py index a4da09297a6696c4fb5a31e2359b63958cb4eb4d..524c378c1256eb5cc09fb9af12b21d5d0f781a09 100644 --- a/devices/devices/hardware_device.py +++ b/devices/devices/hardware_device.py @@ -20,12 +20,13 @@ from tango import DevState, DebugIt from clients.attribute_wrapper import attribute_wrapper from common.lofar_logging import log_exceptions +import logging + __all__ = ["hardware_device"] from devices.device_decorators import only_in_states, fault_on_error -import logging logger = logging.getLogger() class AbstractDeviceMetas(DeviceMeta, ABCMeta): diff --git a/devices/devices/pcc.py b/devices/devices/pcc.py index 0db21b41e7c609c934345e0b0dafdea9e9e08efb..73b105abc21f9cc8c7c15a564a67c9e0758e77cd 100644 --- a/devices/devices/pcc.py +++ b/devices/devices/pcc.py @@ -147,8 +147,7 @@ class PCC(hardware_device): self.function_mapping["CLK_off"] = {} # set up the OPC ua client - self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", - self.OPC_Time_Out, self.Fault, self) + self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self) # map an access helper class for i in self.attr_list(): diff --git a/devices/examples/snmp/snmp_client.py b/devices/examples/snmp/snmp_client.py index 2c162abef0f924c3d67d9d248253c2a9df533a3f..635450172387148734e5e3b42ed0f82f067a0048 100644 --- a/devices/examples/snmp/snmp_client.py +++ b/devices/examples/snmp/snmp_client.py @@ -1,3 +1,4 @@ + from clients.comms_client import CommClient import snmp diff --git a/devices/test/clients/test_attr_wrapper.py b/devices/test/clients/test_attr_wrapper.py new file mode 100644 index 0000000000000000000000000000000000000000..a293923acbf21774e9f221b650353f3410104a88 --- /dev/null +++ b/devices/test/clients/test_attr_wrapper.py @@ -0,0 +1,599 @@ +# -*- coding: utf-8 -*- + +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" test Device Server +""" + +# External imports +from tango import DevState + +# Internal imports +from test.clients.test_client import test_client +from clients.attribute_wrapper import * +from devices.hardware_device import * + +# Test imports +from tango.test_context import DeviceTestContext +from test import base + +scalar_dims = (1,) +spectrum_dims = (4,) +image_dims = (3,2) + +str_scalar_val = '1' +str_spectrum_val = ['1','1', '1','1'] +str_image_val = [['1','1'],['1','1'],['1','1']] + + +def dev_init(device): + device.set_state(DevState.INIT) + device.test_client = test_client(device.Fault, device) + for i in device.attr_list(): + i.set_comm_client(device.test_client) + device.test_client.start() + + +class TestAttributeTypes(base.TestCase): + + class str_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="str_scalar_R", datatype=str) + scalar_RW = attribute_wrapper(comms_annotation="str_scalar_RW", datatype=str, access=AttrWriteType.READ_WRITE) + + def configure_for_initialise(self): + dev_init(self) + + class bool_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="bool_scalar_R", datatype=numpy.bool_) + scalar_RW = attribute_wrapper(comms_annotation="bool_scalar_RW", datatype=numpy.bool_, access=AttrWriteType.READ_WRITE) + + def configure_for_initialise(self): + dev_init(self) + + class float32_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="float32_scalar_R", datatype=numpy.float32) + scalar_RW = attribute_wrapper(comms_annotation="float32_scalar_RW", datatype=numpy.float32, access=AttrWriteType.READ_WRITE) + + def configure_for_initialise(self): + dev_init(self) + + class float64_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="float64_scalar_R", datatype=numpy.float64) + scalar_RW = attribute_wrapper(comms_annotation="float64_scalar_RW", datatype=numpy.float64, access=AttrWriteType.READ_WRITE) + + def configure_for_initialise(self): + dev_init(self) + + class double_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="double_scalar_R", datatype=numpy.double) + scalar_RW = attribute_wrapper(comms_annotation="double_scalar_RW", datatype=numpy.double, access=AttrWriteType.READ_WRITE) + + def configure_for_initialise(self): + dev_init(self) + + class uint8_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="uint8_scalar_R", datatype=numpy.uint8) + scalar_RW = attribute_wrapper(comms_annotation="uint8_scalar_RW", datatype=numpy.uint8, access=AttrWriteType.READ_WRITE) + + def configure_for_initialise(self): + dev_init(self) + + class uint16_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="uint16_scalar_R", datatype=numpy.uint16) + scalar_RW = attribute_wrapper(comms_annotation="uint16_scalar_RW", datatype=numpy.uint16, access=AttrWriteType.READ_WRITE) + + def configure_for_initialise(self): + dev_init(self) + + class uint32_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="uint32_scalar_R", datatype=numpy.uint32) + scalar_RW = attribute_wrapper(comms_annotation="uint32_scalar_RW", datatype=numpy.uint32, access=AttrWriteType.READ_WRITE) + + def configure_for_initialise(self): + dev_init(self) + + class uint64_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="uint64_scalar_R", datatype=numpy.uint64) + scalar_RW = attribute_wrapper(comms_annotation="uint64_scalar_RW", datatype=numpy.uint64, access=AttrWriteType.READ_WRITE) + + def configure_for_initialise(self): + dev_init(self) + + class int16_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="int16_scalar_R", datatype=numpy.int16) + scalar_RW = attribute_wrapper(comms_annotation="int16_scalar_RW", datatype=numpy.int16, access=AttrWriteType.READ_WRITE) + + def configure_for_initialise(self): + dev_init(self) + + class int32_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="int32_scalar_R", datatype=numpy.int32) + scalar_RW = attribute_wrapper(comms_annotation="int32_scalar_RW", datatype=numpy.int32, access=AttrWriteType.READ_WRITE) + + def configure_for_initialise(self): + dev_init(self) + + class int64_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="int64_scalar_R", datatype=numpy.int64) + scalar_RW = attribute_wrapper(comms_annotation="int64_scalar_RW", datatype=numpy.int64, access=AttrWriteType.READ_WRITE) + + def configure_for_initialise(self): + dev_init(self) + + class str_spectrum_device(hardware_device): + spectrum_R = attribute_wrapper(comms_annotation="str_spectrum_R", datatype=str, dims=spectrum_dims) + spectrum_RW = attribute_wrapper(comms_annotation="str_spectrum_RW", datatype=str, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + + def configure_for_initialise(self): + dev_init(self) + + class bool_spectrum_device(hardware_device): + spectrum_R = attribute_wrapper(comms_annotation="bool_spectrum_R", datatype=numpy.bool_, dims=spectrum_dims) + spectrum_RW = attribute_wrapper(comms_annotation="bool_spectrum_RW", datatype=numpy.bool_, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + + def configure_for_initialise(self): + dev_init(self) + + class float32_spectrum_device(hardware_device): + spectrum_R = attribute_wrapper(comms_annotation="float32_spectrum_R", datatype=numpy.float32, dims=spectrum_dims) + spectrum_RW = attribute_wrapper(comms_annotation="float32_spectrum_RW", datatype=numpy.float32, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + + def configure_for_initialise(self): + dev_init(self) + + class float64_spectrum_device(hardware_device): + spectrum_R = attribute_wrapper(comms_annotation="float64_spectrum_R", datatype=numpy.float64, dims=spectrum_dims) + spectrum_RW = attribute_wrapper(comms_annotation="float64_spectrum_RW", datatype=numpy.float64, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + + def configure_for_initialise(self): + dev_init(self) + + class double_spectrum_device(hardware_device): + spectrum_R = attribute_wrapper(comms_annotation="double_spectrum_R", datatype=numpy.double, dims=spectrum_dims) + spectrum_RW = attribute_wrapper(comms_annotation="double_spectrum_RW", datatype=numpy.double, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + + def configure_for_initialise(self): + dev_init(self) + + class uint8_spectrum_device(hardware_device): + spectrum_R = attribute_wrapper(comms_annotation="uint8_spectrum_R", datatype=numpy.uint8, dims=spectrum_dims) + spectrum_RW = attribute_wrapper(comms_annotation="uint8_spectrum_RW", datatype=numpy.uint8, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + + def configure_for_initialise(self): + dev_init(self) + + class uint16_spectrum_device(hardware_device): + spectrum_R = attribute_wrapper(comms_annotation="uint16_spectrum_R", datatype=numpy.uint16, dims=spectrum_dims) + spectrum_RW = attribute_wrapper(comms_annotation="uint16_spectrum_RW", datatype=numpy.uint16, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + + def configure_for_initialise(self): + dev_init(self) + + class uint32_spectrum_device(hardware_device): + spectrum_R = attribute_wrapper(comms_annotation="uint32_spectrum_R", datatype=numpy.uint32, dims=spectrum_dims) + spectrum_RW = attribute_wrapper(comms_annotation="uint32_spectrum_RW", datatype=numpy.uint32, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + + def configure_for_initialise(self): + dev_init(self) + + class uint64_spectrum_device(hardware_device): + spectrum_R = attribute_wrapper(comms_annotation="uint64_spectrum_R", datatype=numpy.uint64, dims=spectrum_dims) + spectrum_RW = attribute_wrapper(comms_annotation="uint64_spectrum_RW", datatype=numpy.uint64, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + + def configure_for_initialise(self): + dev_init(self) + + class int16_spectrum_device(hardware_device): + spectrum_R = attribute_wrapper(comms_annotation="int16_spectrum_R", datatype=numpy.int16, dims=spectrum_dims) + spectrum_RW = attribute_wrapper(comms_annotation="int16_spectrum_RW", datatype=numpy.int16, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + + def configure_for_initialise(self): + dev_init(self) + + class int32_spectrum_device(hardware_device): + spectrum_R = attribute_wrapper(comms_annotation="int32_spectrum_R", datatype=numpy.int32, dims=spectrum_dims) + spectrum_RW = attribute_wrapper(comms_annotation="int32_spectrum_RW", datatype=numpy.int32, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + + def configure_for_initialise(self): + dev_init(self) + + class int64_spectrum_device(hardware_device): + spectrum_R = attribute_wrapper(comms_annotation="int64_spectrum_R", datatype=numpy.int64, dims=spectrum_dims) + spectrum_RW = attribute_wrapper(comms_annotation="int64_spectrum_RW", datatype=numpy.int64, access=AttrWriteType.READ_WRITE, dims=spectrum_dims) + + def configure_for_initialise(self): + dev_init(self) + + class str_image_device(hardware_device): + image_R = attribute_wrapper(comms_annotation="str_image_R", datatype=str, dims=(2,3)) + image_RW = attribute_wrapper(comms_annotation="str_image_RW", datatype=str, access=AttrWriteType.READ_WRITE, dims=(2,3)) + + def configure_for_initialise(self): + dev_init(self) + + class bool_image_device(hardware_device): + image_R = attribute_wrapper(comms_annotation="bool_image_R", datatype=numpy.bool_, dims=(2,3)) + image_RW = attribute_wrapper(comms_annotation="bool_image_RW", datatype=numpy.bool_, access=AttrWriteType.READ_WRITE, dims=(2,3)) + + def configure_for_initialise(self): + dev_init(self) + + class float32_image_device(hardware_device): + image_R = attribute_wrapper(comms_annotation="float32_image_R", datatype=numpy.float32, dims=(2,3)) + image_RW = attribute_wrapper(comms_annotation="float32_image_RW", datatype=numpy.float32, access=AttrWriteType.READ_WRITE, dims=(2,3)) + + def configure_for_initialise(self): + dev_init(self) + + class float64_image_device(hardware_device): + image_R = attribute_wrapper(comms_annotation="float64_image_R", datatype=numpy.float64, dims=(2,3)) + image_RW = attribute_wrapper(comms_annotation="float64_image_RW", datatype=numpy.float64, access=AttrWriteType.READ_WRITE, dims=(2,3)) + + def configure_for_initialise(self): + dev_init(self) + + class double_image_device(hardware_device): + image_R = attribute_wrapper(comms_annotation="double_image_R", datatype=numpy.double, dims=(2,3)) + image_RW = attribute_wrapper(comms_annotation="double_image_RW", datatype=numpy.double, access=AttrWriteType.READ_WRITE, dims=(2,3)) + + def configure_for_initialise(self): + dev_init(self) + + class uint8_image_device(hardware_device): + image_R = attribute_wrapper(comms_annotation="uint8_image_R", datatype=numpy.uint8, dims=(2,3)) + image_RW = attribute_wrapper(comms_annotation="uint8_image_RW", datatype=numpy.uint8, access=AttrWriteType.READ_WRITE, dims=(2,3)) + + def configure_for_initialise(self): + dev_init(self) + + class uint16_image_device(hardware_device): + image_R = attribute_wrapper(comms_annotation="uint16_image_R", datatype=numpy.uint16, dims=(2,3)) + image_RW = attribute_wrapper(comms_annotation="uint16_image_RW", datatype=numpy.uint16, access=AttrWriteType.READ_WRITE, dims=(2,3)) + + def configure_for_initialise(self): + dev_init(self) + + class uint32_image_device(hardware_device): + image_R = attribute_wrapper(comms_annotation="uint32_image_R", datatype=numpy.uint32, dims=(2,3)) + image_RW = attribute_wrapper(comms_annotation="uint32_image_RW", datatype=numpy.uint32, access=AttrWriteType.READ_WRITE, dims=(2,3)) + + def configure_for_initialise(self): + dev_init(self) + + class uint64_image_device(hardware_device): + image_R = attribute_wrapper(comms_annotation="uint64_image_R", datatype=numpy.uint64, dims=(2,3)) + image_RW = attribute_wrapper(comms_annotation="uint64_image_RW", datatype=numpy.uint64, access=AttrWriteType.READ_WRITE, dims=(2,3)) + + def configure_for_initialise(self): + dev_init(self) + + class int16_image_device(hardware_device): + image_R = attribute_wrapper(comms_annotation="int16_image_R", datatype=numpy.int16, dims=(2,3)) + image_RW = attribute_wrapper(comms_annotation="int16_image_RW", datatype=numpy.int16, access=AttrWriteType.READ_WRITE, dims=(2,3)) + + def configure_for_initialise(self): + dev_init(self) + + class int32_image_device(hardware_device): + image_R = attribute_wrapper(comms_annotation="int32_image_R", datatype=numpy.int32, dims=(2,3)) + image_RW = attribute_wrapper(comms_annotation="int32_image_RW", datatype=numpy.int32, access=AttrWriteType.READ_WRITE, dims=(2,3)) + + def configure_for_initialise(self): + dev_init(self) + + class int64_image_device(hardware_device): + image_R = attribute_wrapper(comms_annotation="int64_image_R", datatype=numpy.int64, dims=(2,3)) + image_RW = attribute_wrapper(comms_annotation="int64_image_RW", datatype=numpy.int64, access=AttrWriteType.READ_WRITE, dims=(2,3)) + + def configure_for_initialise(self): + dev_init(self) + + def read_R_test(self, dev, dtype, test_type): + '''Test device''' + with DeviceTestContext(dev, process=True) as proxy: + + #initialise + proxy.initialise() + proxy.on() + + if test_type == "scalar": + expected = numpy.zeros((1,), dtype=dtype) + val = proxy.scalar_RW + elif test_type == "spectrum": + expected = numpy.zeros(spectrum_dims, dtype=dtype) + val = proxy.spectrum_R + elif test_type == "image": + expected = numpy.zeros(image_dims, dtype=dtype) + val = numpy.array(proxy.image_R) #is needed for STR since they act differently + + # cant use all() for 2d arrays so instead compare the dimensions and then flatten to 2d + self.assertEqual(val.shape, expected.shape, " image R array dimensions got mangled. Expected {}, got {}".format(expected.shape, val.shape)) + val.reshape(-1) + else: + self.assertEqual(1,2, " {} is not a valid test_type. please use either scalar, spectrum or image".format(test_type)) + + if test_type == "scalar": + comparison = expected == val + self.assertTrue(comparison, " Value could not be read or was not what was expected. Expected: {}, got {}".format(expected, val)) + else: + comparison = expected == val + equal_arrays = comparison.all() + self.assertTrue(equal_arrays, " Value could not be read or was not what was expected. Expected: {}, got {}".format(expected, val)) + + print(" Test passed! Managed to read R attribute value. got: {}".format(val)) + + def write_RW_test(self, dev, dtype, test_type): + '''Test device''' + with DeviceTestContext(dev, process=True) as proxy: + + #initialise + proxy.initialise() + proxy.on() + + if test_type == "scalar": + + if dtype is str or dtype is numpy.str_: + val = str_scalar_val + else: + val = dtype(1) + proxy.scalar_RW = val + elif test_type == "spectrum": + if dtype is str or dtype is numpy.str_: + val = str_spectrum_val + else: + val = numpy.full(spectrum_dims, dtype=dtype, fill_value=1) + print(val) + proxy.spectrum_RW = val + elif test_type == "image": + if dtype is str or dtype is numpy.str_: + val = str_image_val + else: + val = numpy.full(image_dims, dtype=dtype, fill_value=1) + proxy.image_RW = val + else: + self.assertEqual(1,2, " {} is not a valid test_type. please use either scalar, spectrum or image".format(test_type)) + + # can't really test anything here except that the writing didnt cause an error. + # reading back happens in readback_test + + print(" Test passed! Managed to write: ".format(val)) + + def read_RW_test(self, dev, dtype, test_type): + '''Test device''' + try: + with DeviceTestContext(dev, process=True) as proxy: + + #initialise + proxy.initialise() + proxy.on() + + if test_type == "scalar": + expected = numpy.zeros((1,), dtype=dtype) + val = proxy.scalar_RW + elif test_type == "spectrum": + expected = numpy.zeros(spectrum_dims, dtype=dtype) + val = proxy.spectrum_RW + elif test_type == "image": + expected = numpy.zeros(image_dims, dtype=dtype) + val = numpy.array(proxy.image_RW) #is needed for STR since they act differently + + # cant use all() for 2d arrays so instead compare the dimensions and then flatten to 2d + self.assertEqual(val.shape, expected.shape, " image R array dimensions got mangled. Expected {}, got {}".format(expected.shape, val.shape)) + val.reshape(-1) + else: + self.assertEqual(1,2, " {} is not a valid test_type. please use either scalar, spectrum or image".format(test_type)) + + if test_type != "scalar": + # spectrums and the now flattened images can be compared with .all() + comparison = expected == val + equal_arrays = comparison.all() + self.assertTrue(equal_arrays, " Value could not be handled by the atrribute_wrappers internal RW storer") + else: + comparison = expected == val + self.assertTrue(comparison, " Value could not be handled by the atrribute_wrappers internal RW storer") + + print(" Test passed! Managed to read internal RW value. got: {}".format(val)) + except Exception as e: + info = "Test failure in {} {} read RW test. Expected: {}, got {}".format(test_type, dtype, expected, val) + raise Exception(info) from e + + def readback_test(self, dev, dtype, test_type): + '''Test device''' + try: + with DeviceTestContext(dev, process=True) as proxy: + + #initialise + proxy.initialise() + proxy.on() + + if test_type == "scalar": + if dtype is str or dtype is numpy.str_: + val = str_scalar_val + else: + val = dtype(1) + proxy.scalar_RW = val + result_R = proxy.scalar_R + result_RW = proxy.scalar_RW + elif test_type == "spectrum": + if dtype is str or dtype is numpy.str_: + val = str_spectrum_val + else: + val = numpy.full(spectrum_dims, dtype=dtype, fill_value=1) + proxy.spectrum_RW = val + result_R = proxy.spectrum_R + result_RW = proxy.spectrum_RW + elif test_type == "image": + if dtype is str or dtype is numpy.str_: + val = str_image_val + else: + val = numpy.full(image_dims, dtype=dtype, fill_value=1) + + # info += " write value: {}".format(val) + proxy.image_RW = val + result_R = proxy.image_R + result_RW = proxy.image_RW + + if dtype != str: + self.assertEqual(result_R.shape, image_dims, "not the correct dimensions") + + result_R = result_R.reshape(-1) + result_RW = result_RW.reshape(-1) + val = val.reshape(-1) + + else: + # if the test isn't scalar/spectrum or image its wrong + self.assertEqual(1,2, " {} is not a valid test_type. please use either scalar, spectrum or image".format(test_type)) + + if test_type == "scalar": + comparison = result_RW == val + self.assertTrue(comparison, " Value could not be handled by the atrribute_wrappers internal RW storer. attempted to write: {}".format(val)) + comparison = result_R == val + self.assertTrue(comparison, " value in the clients R attribute not equal to what was written. read: {}, wrote {}".format(result_R, val)) + elif dtype != str: + comparison = result_RW == val + equal_arrays = comparison.all() + self.assertTrue(equal_arrays, " Value could not be handled by the atrribute_wrappers internal RW storer. attempted to write: {}".format(val)) + comparison = result_R == val + equal_arrays = comparison.all() + self.assertTrue(equal_arrays, " value in the clients R attribute not equal to what was written. read: {}, wrote {}".format(result_R, val)) + else: + if test_type == "image": + self.assertEqual(len(result_RW)*len(result_RW[0]), 6, "array dimensions do not match the expected dimensions. expected {}, got: {}".format(val, len(result_RW) * len(result_RW[0]))) + self.assertEqual(len(result_RW) * len(result_RW[0]), 6,"array dimensions do not match the expected dimensions. expected {}, got: {}".format(val, len(result_R) * len([0]))) + else: + self.assertEqual(len(result_RW), 4,"array dimensions do not match the expected dimensions. expected {}, got: {}".format(4, len(result_RW))) + self.assertEqual(len(result_R), 4, "array dimensions do not match the expected dimensions. expected {}, got: {}".format(4, len(result_R))) + + print(" Test passed! Managed write and read back a value: {}".format(val)) + + except Exception as e: + info = "Test failure in {} {} readback test \n\tW: {} \n\tRW: {} \n\tR: {}".format(test_type, dtype, val, result_RW, result_R) + raise Exception(info) from e + + + """ + List of different types to be used with attributes testing, using any other + might have unexpected results. Each type is bound to a device scalar, + spectrum and image class + """ + attribute_type_tests = [ + { + 'type': str, 'scalar': str_scalar_device, + 'spectrum': str_spectrum_device, "image": str_image_device + }, + { + 'type': numpy.bool_, 'scalar': bool_scalar_device, + 'spectrum': bool_spectrum_device, "image": bool_image_device + }, + { + 'type': numpy.float32, 'scalar': float32_scalar_device, + 'spectrum': float32_spectrum_device, "image": float32_image_device + }, + { + 'type': numpy.float64, 'scalar': float64_scalar_device, + 'spectrum': float64_spectrum_device, "image": float64_image_device + }, + { + 'type': numpy.double, 'scalar': double_scalar_device, + 'spectrum': double_spectrum_device, "image": double_image_device + }, + { + 'type': numpy.uint8, 'scalar': uint8_scalar_device, + 'spectrum': uint8_spectrum_device, "image": uint8_image_device + }, + { + 'type': numpy.uint16, 'scalar': uint16_scalar_device, + 'spectrum': uint16_spectrum_device, "image": uint16_image_device + }, + { + 'type': numpy.uint32, 'scalar': uint32_scalar_device, + 'spectrum': uint32_spectrum_device, "image": uint32_image_device + }, + { + 'type': numpy.uint64, 'scalar': uint64_scalar_device, + 'spectrum': uint64_spectrum_device, "image": uint64_image_device + }, + { + 'type': numpy.int16, 'scalar': int16_scalar_device, + 'spectrum': int16_spectrum_device, "image": int16_image_device + }, + { + 'type': numpy.int32, 'scalar': int32_scalar_device, + 'spectrum': int32_spectrum_device, "image": int32_image_device + }, + { + 'type': numpy.int64, 'scalar': int64_scalar_device, + 'spectrum': int64_spectrum_device, "image": int64_image_device + } + ] + + def test_scalar_R(self): + for attribute_type_test in self.attribute_type_tests: + self.read_R_test( + attribute_type_test['scalar'], attribute_type_test['type'], + 'scalar') + + def test_scalar_RW(self): + for attribute_type_test in self.attribute_type_tests: + self.read_RW_test( + attribute_type_test['scalar'], attribute_type_test['type'], + 'scalar') + + def test_scalar_W(self): + for attribute_type_test in self.attribute_type_tests: + self.write_RW_test( + attribute_type_test['scalar'], attribute_type_test['type'], + 'scalar') + + def test_scalar_readback(self): + for attribute_type_test in self.attribute_type_tests: + self.readback_test( + attribute_type_test['scalar'], attribute_type_test['type'], + 'scalar') + + def test_spectrum_R(self): + for attribute_type_test in self.attribute_type_tests: + self.read_R_test( + attribute_type_test['spectrum'], attribute_type_test['type'], + 'spectrum') + + def test_spectrum_RW(self): + for attribute_type_test in self.attribute_type_tests: + self.read_RW_test( + attribute_type_test['spectrum'], attribute_type_test['type'], + 'spectrum') + + def test_spectrum_W(self): + for attribute_type_test in self.attribute_type_tests: + self.write_RW_test( + attribute_type_test['spectrum'], attribute_type_test['type'], + 'spectrum') + + def test_spectrum_readback(self): + for attribute_type_test in self.attribute_type_tests: + self.readback_test( + attribute_type_test['spectrum'], attribute_type_test['type'], + 'spectrum') + + def test_image_R(self): + for attribute_type_test in self.attribute_type_tests: + self.read_R_test( + attribute_type_test['image'], attribute_type_test['type'], + 'image') + + def test_image_RW(self): + for attribute_type_test in self.attribute_type_tests: + self.read_RW_test( + attribute_type_test['image'], attribute_type_test['type'], + 'image') + + def test_image_W(self): + for attribute_type_test in self.attribute_type_tests: + self.write_RW_test(attribute_type_test['image'], attribute_type_test['type'], 'image') + + def test_image_readback(self): + for attribute_type_test in self.attribute_type_tests: + self.readback_test( + attribute_type_test['image'], attribute_type_test['type'], + 'image') diff --git a/devices/test/clients/test_client.py b/devices/test/clients/test_client.py index 355b4f72ad8d6c61c6655b45b75a9f597ac6b72c..1d8c85f5e597a31d00bc1af105e0465b9c8a8a11 100644 --- a/devices/test/clients/test_client.py +++ b/devices/test/clients/test_client.py @@ -1,9 +1,10 @@ -from clients.comms_client import CommClient +# External imports import numpy -import os -# <class 'numpy.bool_'> +# Test imports +from clients.comms_client import CommClient + class test_client(CommClient): """ @@ -75,15 +76,16 @@ class test_client(CommClient): takes all gathered data to configure and return the correct read and write functions """ - value = numpy.zeros(dims, dtype) + self.value = numpy.zeros(dims, dtype) def read_function(): self.streams.debug_stream("from read_function, reading {} array of type {}".format(dims, dtype)) - return value + return self.value def write_function(write_value): self.streams.debug_stream("from write_function, writing {} array of type {}".format(dims, dtype)) - value = write_value + self.value = write_value + return self.streams.debug_stream("created and bound example_client read/write functions to attribute_wrapper object") return read_function, write_function diff --git a/devices/test/devices/test_device.py b/devices/test/devices/test_device.py deleted file mode 100644 index f9a72ec88d006450403b6cfc6a0396b842bb36a1..0000000000000000000000000000000000000000 --- a/devices/test/devices/test_device.py +++ /dev/null @@ -1,99 +0,0 @@ -# -*- coding: utf-8 -*- -# -# This file is part of the PCC project -# -# -# -# Distributed under the terms of the APACHE license. -# See LICENSE.txt for more info. - -""" test Device Server -""" - -# TODO(Corne): Remove sys.path.append hack once packaging is in place! -import os, sys -currentdir = os.path.dirname(os.path.realpath(__file__)) -parentdir = os.path.dirname(currentdir) -parentdir = os.path.dirname(parentdir) -sys.path.append(parentdir) - -# PyTango imports -from tango.server import run -from tango.server import device_property -from tango import DevState -# Additional import - -from test.clients.test_client import test_client -from clients.attribute_wrapper import * -from devices.hardware_device import * - -__all__ = ["test_device", "main"] - - -class test_device(hardware_device): - # ----------------- - # Device Properties - # ----------------- - - OPC_Server_Name = device_property( - dtype='DevString', - ) - - OPC_Server_Port = device_property( - dtype='DevULong', - ) - - OPC_Time_Out = device_property( - dtype='DevDouble', - ) - - # ---------- - # Attributes - # ---------- - bool_scalar_R = attribute_wrapper(comms_annotation="numpy.bool_ type read scalar", datatype=numpy.bool_) - bool_scalar_RW = attribute_wrapper(comms_annotation="numpy.bool_ type read/write scalar", datatype=numpy.bool_, access=AttrWriteType.READ_WRITE) - - int32_spectrum_R = attribute_wrapper(comms_annotation="numpy.int32 type read spectrum (len = 8)", datatype=numpy.int32, dims=(8,)) - int32_spectrum_RW = attribute_wrapper(comms_annotation="numpy.int32 type read spectrum (len = 8)", datatype=numpy.int32, dims=(8,), - access=AttrWriteType.READ_WRITE) - - double_image_R = attribute_wrapper(comms_annotation="numpy.double type read image (dims = 2x8)", datatype=numpy.double, dims=(2, 8)) - double_image_RW = attribute_wrapper(comms_annotation="numpy.double type read/write image (dims = 8x2)", datatype=numpy.double, dims=(8, 2), - access=AttrWriteType.READ_WRITE) - - int32_scalar_R = attribute_wrapper(comms_annotation="numpy.int32 type read scalar", datatype=numpy.int32) - uint16_spectrum_RW = attribute_wrapper(comms_annotation="numpy.uint16 type read/write spectrum (len = 8)", datatype=numpy.uint16, dims=(8,), - access=AttrWriteType.READ_WRITE) - float32_image_R = attribute_wrapper(comms_annotation="numpy.float32 type read image (dims = 8x2)", datatype=numpy.float32, dims=(8, 2)) - uint8_image_RW = attribute_wrapper(comms_annotation="numpy.uint8 type read/write image (dims = 2x8)", datatype=numpy.uint8, dims=(2, 8), - access=AttrWriteType.READ_WRITE) - - # -------- - # overloaded functions - # -------- - def configure_for_initialise(self): - """ user code here. is called when the sate is set to INIT """ - """Initialises the attributes and properties of the PCC.""" - - self.set_state(DevState.INIT) - - # set up the test client - self.test_client = test_client(self.Fault, self) - - # map an access helper class - for i in self.attr_list(): - i.set_comm_client(self.test_client) - - self.test_client.start() - - -# ---------- -# Run server -# ---------- -def main(args=None, **kwargs): - """Main function of the example module.""" - return run((test_device,), args=args, **kwargs) - - -if __name__ == '__main__': - main() diff --git a/devices/util/__init__.py b/devices/util/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000