Skip to content
Snippets Groups Projects
Commit 72520dc8 authored by Taya Snijder's avatar Taya Snijder
Browse files

Merge branch 'L2SS-247_2021-07-06-Branched_from_master-attribute_testing' into 'master'

Resolve L2SS-247 "2021 07 06 branched from master attribute testing"

Closes L2SS-247

See merge request !78
parents 678355ed 784891c7
No related branches found
No related tags found
1 merge request!78Resolve L2SS-247 "2021 07 06 branched from master attribute testing"
from tango.server import attribute from tango.server import attribute
from tango import AttrWriteType from tango import AttrWriteType
import numpy import numpy
from devices.device_decorators import only_when_on, fault_on_error from devices.device_decorators import only_when_on, fault_on_error
...@@ -34,16 +33,17 @@ class attribute_wrapper(attribute): ...@@ -34,16 +33,17 @@ class attribute_wrapper(attribute):
self.comms_id = comms_id # store data that can be used to identify the comms interface to use. not used by the wrapper itself self.comms_id = comms_id # store data that can be used to identify the comms interface to use. not used by the wrapper itself
self.comms_annotation = comms_annotation # store data that can be used by the comms interface. not used by the wrapper itself self.comms_annotation = comms_annotation # store data that can be used by the comms interface. not used by the wrapper itself
self.numpy_type = datatype # tango changes our attribute to their representation (E.g numpy.int64 becomes "DevLong64")
self.init_value = init_value self.init_value = init_value
is_scalar = dims == (1,) is_scalar = dims == (1,)
# tango doesn't recognise numpy.str_, for consistencies sake we convert it here and hide this from the top level # tango doesn't recognise numpy.str_, for consistencies sake we convert it here and hide this from the top level
# NOTE: discuss, idk if this is an important detail somewhere else # NOTE: discuss, idk if this is an important detail somewhere else
if datatype is numpy.str_: if datatype is numpy.str_ or datatype is numpy.str:
datatype = str datatype = str
self.numpy_type = datatype # tango changes our attribute to their representation (E.g numpy.int64 becomes "DevLong64")
# check if not scalar # check if not scalar
if is_scalar: if is_scalar:
# scalar, just set the single dimension. # scalar, just set the single dimension.
...@@ -131,7 +131,15 @@ class attribute_wrapper(attribute): ...@@ -131,7 +131,15 @@ class attribute_wrapper(attribute):
else: else:
numpy_dims = dims numpy_dims = dims
if self.dim_x == 1:
if self.numpy_type == str:
value = ''
else:
value = self.numpy_type(0)
else:
value = numpy.zeros(numpy_dims, dtype=self.numpy_type) value = numpy.zeros(numpy_dims, dtype=self.numpy_type)
return value return value
def set_comm_client(self, client): def set_comm_client(self, client):
......
...@@ -22,8 +22,6 @@ numpy_to_OPCua_dict = { ...@@ -22,8 +22,6 @@ numpy_to_OPCua_dict = {
numpy.float32: opcua.ua.VariantType.Float, numpy.float32: opcua.ua.VariantType.Float,
numpy.double: opcua.ua.VariantType.Double, numpy.double: opcua.ua.VariantType.Double,
numpy.float64: opcua.ua.VariantType.Double, numpy.float64: opcua.ua.VariantType.Double,
numpy.str_: opcua.ua.VariantType.String,
numpy.str: opcua.ua.VariantType.String,
str: opcua.ua.VariantType.String str: opcua.ua.VariantType.String
} }
...@@ -95,7 +93,7 @@ class OPCUAConnection(CommClient): ...@@ -95,7 +93,7 @@ class OPCUAConnection(CommClient):
print(i.get_browse_name()) print(i.get_browse_name())
for j in i.get_children(): for j in i.get_children():
try: try:
print(j.get_browse_name(), j.get_data_type_as_variant_type()) print(j.get_browse_name(), j.get_data_type_as_variant_type(), j.get_value())
except: except:
print(j.get_browse_name()) print(j.get_browse_name())
finally: finally:
......
...@@ -20,12 +20,13 @@ from tango import DevState, DebugIt ...@@ -20,12 +20,13 @@ from tango import DevState, DebugIt
from clients.attribute_wrapper import attribute_wrapper from clients.attribute_wrapper import attribute_wrapper
from common.lofar_logging import log_exceptions from common.lofar_logging import log_exceptions
import logging
__all__ = ["hardware_device"] __all__ = ["hardware_device"]
from devices.device_decorators import only_in_states, fault_on_error from devices.device_decorators import only_in_states, fault_on_error
import logging
logger = logging.getLogger() logger = logging.getLogger()
class AbstractDeviceMetas(DeviceMeta, ABCMeta): class AbstractDeviceMetas(DeviceMeta, ABCMeta):
......
...@@ -147,8 +147,7 @@ class PCC(hardware_device): ...@@ -147,8 +147,7 @@ class PCC(hardware_device):
self.function_mapping["CLK_off"] = {} self.function_mapping["CLK_off"] = {}
# set up the OPC ua client # set up the OPC ua client
self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self)
self.OPC_Time_Out, self.Fault, self)
# map an access helper class # map an access helper class
for i in self.attr_list(): for i in self.attr_list():
......
from clients.comms_client import CommClient from clients.comms_client import CommClient
import snmp import snmp
......
# -*- coding: utf-8 -*-
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" test Device Server
"""
# External imports
from tango import DevState
# Internal imports
from test.clients.test_client import test_client
from clients.attribute_wrapper import *
from devices.hardware_device import *
# Test imports
from tango.test_context import DeviceTestContext
from test import base
scalar_dims = (1,)
spectrum_dims = (4,)
image_dims = (3,2)
str_scalar_val = '1'
str_spectrum_val = ['1','1', '1','1']
str_image_val = [['1','1'],['1','1'],['1','1']]
def dev_init(device):
device.set_state(DevState.INIT)
device.test_client = test_client(device.Fault, device)
for i in device.attr_list():
i.set_comm_client(device.test_client)
device.test_client.start()
class TestAttributeTypes(base.TestCase):
class str_scalar_device(hardware_device):
scalar_R = attribute_wrapper(comms_annotation="str_scalar_R", datatype=str)
scalar_RW = attribute_wrapper(comms_annotation="str_scalar_RW", datatype=str, access=AttrWriteType.READ_WRITE)
def configure_for_initialise(self):
dev_init(self)
class bool_scalar_device(hardware_device):
scalar_R = attribute_wrapper(comms_annotation="bool_scalar_R", datatype=numpy.bool_)
scalar_RW = attribute_wrapper(comms_annotation="bool_scalar_RW", datatype=numpy.bool_, access=AttrWriteType.READ_WRITE)
def configure_for_initialise(self):
dev_init(self)
class float32_scalar_device(hardware_device):
scalar_R = attribute_wrapper(comms_annotation="float32_scalar_R", datatype=numpy.float32)
scalar_RW = attribute_wrapper(comms_annotation="float32_scalar_RW", datatype=numpy.float32, access=AttrWriteType.READ_WRITE)
def configure_for_initialise(self):
dev_init(self)
class float64_scalar_device(hardware_device):
scalar_R = attribute_wrapper(comms_annotation="float64_scalar_R", datatype=numpy.float64)
scalar_RW = attribute_wrapper(comms_annotation="float64_scalar_RW", datatype=numpy.float64, access=AttrWriteType.READ_WRITE)
def configure_for_initialise(self):
dev_init(self)
class double_scalar_device(hardware_device):
scalar_R = attribute_wrapper(comms_annotation="double_scalar_R", datatype=numpy.double)
scalar_RW = attribute_wrapper(comms_annotation="double_scalar_RW", datatype=numpy.double, access=AttrWriteType.READ_WRITE)
def configure_for_initialise(self):
dev_init(self)
class uint8_scalar_device(hardware_device):
scalar_R = attribute_wrapper(comms_annotation="uint8_scalar_R", datatype=numpy.uint8)
scalar_RW = attribute_wrapper(comms_annotation="uint8_scalar_RW", datatype=numpy.uint8, access=AttrWriteType.READ_WRITE)
def configure_for_initialise(self):
dev_init(self)
class uint16_scalar_device(hardware_device):
scalar_R = attribute_wrapper(comms_annotation="uint16_scalar_R", datatype=numpy.uint16)
scalar_RW = attribute_wrapper(comms_annotation="uint16_scalar_RW", datatype=numpy.uint16, access=AttrWriteType.READ_WRITE)
def configure_for_initialise(self):
dev_init(self)
class uint32_scalar_device(hardware_device):
scalar_R = attribute_wrapper(comms_annotation="uint32_scalar_R", datatype=numpy.uint32)
scalar_RW = attribute_wrapper(comms_annotation="uint32_scalar_RW", datatype=numpy.uint32, access=AttrWriteType.READ_WRITE)
def configure_for_initialise(self):
dev_init(self)
class uint64_scalar_device(hardware_device):
scalar_R = attribute_wrapper(comms_annotation="uint64_scalar_R", datatype=numpy.uint64)
scalar_RW = attribute_wrapper(comms_annotation="uint64_scalar_RW", datatype=numpy.uint64, access=AttrWriteType.READ_WRITE)
def configure_for_initialise(self):
dev_init(self)
class int16_scalar_device(hardware_device):
scalar_R = attribute_wrapper(comms_annotation="int16_scalar_R", datatype=numpy.int16)
scalar_RW = attribute_wrapper(comms_annotation="int16_scalar_RW", datatype=numpy.int16, access=AttrWriteType.READ_WRITE)
def configure_for_initialise(self):
dev_init(self)
class int32_scalar_device(hardware_device):
scalar_R = attribute_wrapper(comms_annotation="int32_scalar_R", datatype=numpy.int32)
scalar_RW = attribute_wrapper(comms_annotation="int32_scalar_RW", datatype=numpy.int32, access=AttrWriteType.READ_WRITE)
def configure_for_initialise(self):
dev_init(self)
class int64_scalar_device(hardware_device):
scalar_R = attribute_wrapper(comms_annotation="int64_scalar_R", datatype=numpy.int64)
scalar_RW = attribute_wrapper(comms_annotation="int64_scalar_RW", datatype=numpy.int64, access=AttrWriteType.READ_WRITE)
def configure_for_initialise(self):
dev_init(self)
class str_spectrum_device(hardware_device):
spectrum_R = attribute_wrapper(comms_annotation="str_spectrum_R", datatype=str, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="str_spectrum_RW", datatype=str, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
def configure_for_initialise(self):
dev_init(self)
class bool_spectrum_device(hardware_device):
spectrum_R = attribute_wrapper(comms_annotation="bool_spectrum_R", datatype=numpy.bool_, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="bool_spectrum_RW", datatype=numpy.bool_, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
def configure_for_initialise(self):
dev_init(self)
class float32_spectrum_device(hardware_device):
spectrum_R = attribute_wrapper(comms_annotation="float32_spectrum_R", datatype=numpy.float32, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="float32_spectrum_RW", datatype=numpy.float32, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
def configure_for_initialise(self):
dev_init(self)
class float64_spectrum_device(hardware_device):
spectrum_R = attribute_wrapper(comms_annotation="float64_spectrum_R", datatype=numpy.float64, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="float64_spectrum_RW", datatype=numpy.float64, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
def configure_for_initialise(self):
dev_init(self)
class double_spectrum_device(hardware_device):
spectrum_R = attribute_wrapper(comms_annotation="double_spectrum_R", datatype=numpy.double, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="double_spectrum_RW", datatype=numpy.double, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
def configure_for_initialise(self):
dev_init(self)
class uint8_spectrum_device(hardware_device):
spectrum_R = attribute_wrapper(comms_annotation="uint8_spectrum_R", datatype=numpy.uint8, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="uint8_spectrum_RW", datatype=numpy.uint8, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
def configure_for_initialise(self):
dev_init(self)
class uint16_spectrum_device(hardware_device):
spectrum_R = attribute_wrapper(comms_annotation="uint16_spectrum_R", datatype=numpy.uint16, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="uint16_spectrum_RW", datatype=numpy.uint16, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
def configure_for_initialise(self):
dev_init(self)
class uint32_spectrum_device(hardware_device):
spectrum_R = attribute_wrapper(comms_annotation="uint32_spectrum_R", datatype=numpy.uint32, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="uint32_spectrum_RW", datatype=numpy.uint32, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
def configure_for_initialise(self):
dev_init(self)
class uint64_spectrum_device(hardware_device):
spectrum_R = attribute_wrapper(comms_annotation="uint64_spectrum_R", datatype=numpy.uint64, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="uint64_spectrum_RW", datatype=numpy.uint64, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
def configure_for_initialise(self):
dev_init(self)
class int16_spectrum_device(hardware_device):
spectrum_R = attribute_wrapper(comms_annotation="int16_spectrum_R", datatype=numpy.int16, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="int16_spectrum_RW", datatype=numpy.int16, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
def configure_for_initialise(self):
dev_init(self)
class int32_spectrum_device(hardware_device):
spectrum_R = attribute_wrapper(comms_annotation="int32_spectrum_R", datatype=numpy.int32, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="int32_spectrum_RW", datatype=numpy.int32, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
def configure_for_initialise(self):
dev_init(self)
class int64_spectrum_device(hardware_device):
spectrum_R = attribute_wrapper(comms_annotation="int64_spectrum_R", datatype=numpy.int64, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="int64_spectrum_RW", datatype=numpy.int64, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
def configure_for_initialise(self):
dev_init(self)
class str_image_device(hardware_device):
image_R = attribute_wrapper(comms_annotation="str_image_R", datatype=str, dims=(2,3))
image_RW = attribute_wrapper(comms_annotation="str_image_RW", datatype=str, access=AttrWriteType.READ_WRITE, dims=(2,3))
def configure_for_initialise(self):
dev_init(self)
class bool_image_device(hardware_device):
image_R = attribute_wrapper(comms_annotation="bool_image_R", datatype=numpy.bool_, dims=(2,3))
image_RW = attribute_wrapper(comms_annotation="bool_image_RW", datatype=numpy.bool_, access=AttrWriteType.READ_WRITE, dims=(2,3))
def configure_for_initialise(self):
dev_init(self)
class float32_image_device(hardware_device):
image_R = attribute_wrapper(comms_annotation="float32_image_R", datatype=numpy.float32, dims=(2,3))
image_RW = attribute_wrapper(comms_annotation="float32_image_RW", datatype=numpy.float32, access=AttrWriteType.READ_WRITE, dims=(2,3))
def configure_for_initialise(self):
dev_init(self)
class float64_image_device(hardware_device):
image_R = attribute_wrapper(comms_annotation="float64_image_R", datatype=numpy.float64, dims=(2,3))
image_RW = attribute_wrapper(comms_annotation="float64_image_RW", datatype=numpy.float64, access=AttrWriteType.READ_WRITE, dims=(2,3))
def configure_for_initialise(self):
dev_init(self)
class double_image_device(hardware_device):
image_R = attribute_wrapper(comms_annotation="double_image_R", datatype=numpy.double, dims=(2,3))
image_RW = attribute_wrapper(comms_annotation="double_image_RW", datatype=numpy.double, access=AttrWriteType.READ_WRITE, dims=(2,3))
def configure_for_initialise(self):
dev_init(self)
class uint8_image_device(hardware_device):
image_R = attribute_wrapper(comms_annotation="uint8_image_R", datatype=numpy.uint8, dims=(2,3))
image_RW = attribute_wrapper(comms_annotation="uint8_image_RW", datatype=numpy.uint8, access=AttrWriteType.READ_WRITE, dims=(2,3))
def configure_for_initialise(self):
dev_init(self)
class uint16_image_device(hardware_device):
image_R = attribute_wrapper(comms_annotation="uint16_image_R", datatype=numpy.uint16, dims=(2,3))
image_RW = attribute_wrapper(comms_annotation="uint16_image_RW", datatype=numpy.uint16, access=AttrWriteType.READ_WRITE, dims=(2,3))
def configure_for_initialise(self):
dev_init(self)
class uint32_image_device(hardware_device):
image_R = attribute_wrapper(comms_annotation="uint32_image_R", datatype=numpy.uint32, dims=(2,3))
image_RW = attribute_wrapper(comms_annotation="uint32_image_RW", datatype=numpy.uint32, access=AttrWriteType.READ_WRITE, dims=(2,3))
def configure_for_initialise(self):
dev_init(self)
class uint64_image_device(hardware_device):
image_R = attribute_wrapper(comms_annotation="uint64_image_R", datatype=numpy.uint64, dims=(2,3))
image_RW = attribute_wrapper(comms_annotation="uint64_image_RW", datatype=numpy.uint64, access=AttrWriteType.READ_WRITE, dims=(2,3))
def configure_for_initialise(self):
dev_init(self)
class int16_image_device(hardware_device):
image_R = attribute_wrapper(comms_annotation="int16_image_R", datatype=numpy.int16, dims=(2,3))
image_RW = attribute_wrapper(comms_annotation="int16_image_RW", datatype=numpy.int16, access=AttrWriteType.READ_WRITE, dims=(2,3))
def configure_for_initialise(self):
dev_init(self)
class int32_image_device(hardware_device):
image_R = attribute_wrapper(comms_annotation="int32_image_R", datatype=numpy.int32, dims=(2,3))
image_RW = attribute_wrapper(comms_annotation="int32_image_RW", datatype=numpy.int32, access=AttrWriteType.READ_WRITE, dims=(2,3))
def configure_for_initialise(self):
dev_init(self)
class int64_image_device(hardware_device):
image_R = attribute_wrapper(comms_annotation="int64_image_R", datatype=numpy.int64, dims=(2,3))
image_RW = attribute_wrapper(comms_annotation="int64_image_RW", datatype=numpy.int64, access=AttrWriteType.READ_WRITE, dims=(2,3))
def configure_for_initialise(self):
dev_init(self)
def read_R_test(self, dev, dtype, test_type):
'''Test device'''
with DeviceTestContext(dev, process=True) as proxy:
#initialise
proxy.initialise()
proxy.on()
if test_type == "scalar":
expected = numpy.zeros((1,), dtype=dtype)
val = proxy.scalar_RW
elif test_type == "spectrum":
expected = numpy.zeros(spectrum_dims, dtype=dtype)
val = proxy.spectrum_R
elif test_type == "image":
expected = numpy.zeros(image_dims, dtype=dtype)
val = numpy.array(proxy.image_R) #is needed for STR since they act differently
# cant use all() for 2d arrays so instead compare the dimensions and then flatten to 2d
self.assertEqual(val.shape, expected.shape, " image R array dimensions got mangled. Expected {}, got {}".format(expected.shape, val.shape))
val.reshape(-1)
else:
self.assertEqual(1,2, " {} is not a valid test_type. please use either scalar, spectrum or image".format(test_type))
if test_type == "scalar":
comparison = expected == val
self.assertTrue(comparison, " Value could not be read or was not what was expected. Expected: {}, got {}".format(expected, val))
else:
comparison = expected == val
equal_arrays = comparison.all()
self.assertTrue(equal_arrays, " Value could not be read or was not what was expected. Expected: {}, got {}".format(expected, val))
print(" Test passed! Managed to read R attribute value. got: {}".format(val))
def write_RW_test(self, dev, dtype, test_type):
'''Test device'''
with DeviceTestContext(dev, process=True) as proxy:
#initialise
proxy.initialise()
proxy.on()
if test_type == "scalar":
if dtype is str or dtype is numpy.str_:
val = str_scalar_val
else:
val = dtype(1)
proxy.scalar_RW = val
elif test_type == "spectrum":
if dtype is str or dtype is numpy.str_:
val = str_spectrum_val
else:
val = numpy.full(spectrum_dims, dtype=dtype, fill_value=1)
print(val)
proxy.spectrum_RW = val
elif test_type == "image":
if dtype is str or dtype is numpy.str_:
val = str_image_val
else:
val = numpy.full(image_dims, dtype=dtype, fill_value=1)
proxy.image_RW = val
else:
self.assertEqual(1,2, " {} is not a valid test_type. please use either scalar, spectrum or image".format(test_type))
# can't really test anything here except that the writing didnt cause an error.
# reading back happens in readback_test
print(" Test passed! Managed to write: ".format(val))
def read_RW_test(self, dev, dtype, test_type):
'''Test device'''
try:
with DeviceTestContext(dev, process=True) as proxy:
#initialise
proxy.initialise()
proxy.on()
if test_type == "scalar":
expected = numpy.zeros((1,), dtype=dtype)
val = proxy.scalar_RW
elif test_type == "spectrum":
expected = numpy.zeros(spectrum_dims, dtype=dtype)
val = proxy.spectrum_RW
elif test_type == "image":
expected = numpy.zeros(image_dims, dtype=dtype)
val = numpy.array(proxy.image_RW) #is needed for STR since they act differently
# cant use all() for 2d arrays so instead compare the dimensions and then flatten to 2d
self.assertEqual(val.shape, expected.shape, " image R array dimensions got mangled. Expected {}, got {}".format(expected.shape, val.shape))
val.reshape(-1)
else:
self.assertEqual(1,2, " {} is not a valid test_type. please use either scalar, spectrum or image".format(test_type))
if test_type != "scalar":
# spectrums and the now flattened images can be compared with .all()
comparison = expected == val
equal_arrays = comparison.all()
self.assertTrue(equal_arrays, " Value could not be handled by the atrribute_wrappers internal RW storer")
else:
comparison = expected == val
self.assertTrue(comparison, " Value could not be handled by the atrribute_wrappers internal RW storer")
print(" Test passed! Managed to read internal RW value. got: {}".format(val))
except Exception as e:
info = "Test failure in {} {} read RW test. Expected: {}, got {}".format(test_type, dtype, expected, val)
raise Exception(info) from e
def readback_test(self, dev, dtype, test_type):
'''Test device'''
try:
with DeviceTestContext(dev, process=True) as proxy:
#initialise
proxy.initialise()
proxy.on()
if test_type == "scalar":
if dtype is str or dtype is numpy.str_:
val = str_scalar_val
else:
val = dtype(1)
proxy.scalar_RW = val
result_R = proxy.scalar_R
result_RW = proxy.scalar_RW
elif test_type == "spectrum":
if dtype is str or dtype is numpy.str_:
val = str_spectrum_val
else:
val = numpy.full(spectrum_dims, dtype=dtype, fill_value=1)
proxy.spectrum_RW = val
result_R = proxy.spectrum_R
result_RW = proxy.spectrum_RW
elif test_type == "image":
if dtype is str or dtype is numpy.str_:
val = str_image_val
else:
val = numpy.full(image_dims, dtype=dtype, fill_value=1)
# info += " write value: {}".format(val)
proxy.image_RW = val
result_R = proxy.image_R
result_RW = proxy.image_RW
if dtype != str:
self.assertEqual(result_R.shape, image_dims, "not the correct dimensions")
result_R = result_R.reshape(-1)
result_RW = result_RW.reshape(-1)
val = val.reshape(-1)
else:
# if the test isn't scalar/spectrum or image its wrong
self.assertEqual(1,2, " {} is not a valid test_type. please use either scalar, spectrum or image".format(test_type))
if test_type == "scalar":
comparison = result_RW == val
self.assertTrue(comparison, " Value could not be handled by the atrribute_wrappers internal RW storer. attempted to write: {}".format(val))
comparison = result_R == val
self.assertTrue(comparison, " value in the clients R attribute not equal to what was written. read: {}, wrote {}".format(result_R, val))
elif dtype != str:
comparison = result_RW == val
equal_arrays = comparison.all()
self.assertTrue(equal_arrays, " Value could not be handled by the atrribute_wrappers internal RW storer. attempted to write: {}".format(val))
comparison = result_R == val
equal_arrays = comparison.all()
self.assertTrue(equal_arrays, " value in the clients R attribute not equal to what was written. read: {}, wrote {}".format(result_R, val))
else:
if test_type == "image":
self.assertEqual(len(result_RW)*len(result_RW[0]), 6, "array dimensions do not match the expected dimensions. expected {}, got: {}".format(val, len(result_RW) * len(result_RW[0])))
self.assertEqual(len(result_RW) * len(result_RW[0]), 6,"array dimensions do not match the expected dimensions. expected {}, got: {}".format(val, len(result_R) * len([0])))
else:
self.assertEqual(len(result_RW), 4,"array dimensions do not match the expected dimensions. expected {}, got: {}".format(4, len(result_RW)))
self.assertEqual(len(result_R), 4, "array dimensions do not match the expected dimensions. expected {}, got: {}".format(4, len(result_R)))
print(" Test passed! Managed write and read back a value: {}".format(val))
except Exception as e:
info = "Test failure in {} {} readback test \n\tW: {} \n\tRW: {} \n\tR: {}".format(test_type, dtype, val, result_RW, result_R)
raise Exception(info) from e
"""
List of different types to be used with attributes testing, using any other
might have unexpected results. Each type is bound to a device scalar,
spectrum and image class
"""
attribute_type_tests = [
{
'type': str, 'scalar': str_scalar_device,
'spectrum': str_spectrum_device, "image": str_image_device
},
{
'type': numpy.bool_, 'scalar': bool_scalar_device,
'spectrum': bool_spectrum_device, "image": bool_image_device
},
{
'type': numpy.float32, 'scalar': float32_scalar_device,
'spectrum': float32_spectrum_device, "image": float32_image_device
},
{
'type': numpy.float64, 'scalar': float64_scalar_device,
'spectrum': float64_spectrum_device, "image": float64_image_device
},
{
'type': numpy.double, 'scalar': double_scalar_device,
'spectrum': double_spectrum_device, "image": double_image_device
},
{
'type': numpy.uint8, 'scalar': uint8_scalar_device,
'spectrum': uint8_spectrum_device, "image": uint8_image_device
},
{
'type': numpy.uint16, 'scalar': uint16_scalar_device,
'spectrum': uint16_spectrum_device, "image": uint16_image_device
},
{
'type': numpy.uint32, 'scalar': uint32_scalar_device,
'spectrum': uint32_spectrum_device, "image": uint32_image_device
},
{
'type': numpy.uint64, 'scalar': uint64_scalar_device,
'spectrum': uint64_spectrum_device, "image": uint64_image_device
},
{
'type': numpy.int16, 'scalar': int16_scalar_device,
'spectrum': int16_spectrum_device, "image": int16_image_device
},
{
'type': numpy.int32, 'scalar': int32_scalar_device,
'spectrum': int32_spectrum_device, "image": int32_image_device
},
{
'type': numpy.int64, 'scalar': int64_scalar_device,
'spectrum': int64_spectrum_device, "image": int64_image_device
}
]
def test_scalar_R(self):
for attribute_type_test in self.attribute_type_tests:
self.read_R_test(
attribute_type_test['scalar'], attribute_type_test['type'],
'scalar')
def test_scalar_RW(self):
for attribute_type_test in self.attribute_type_tests:
self.read_RW_test(
attribute_type_test['scalar'], attribute_type_test['type'],
'scalar')
def test_scalar_W(self):
for attribute_type_test in self.attribute_type_tests:
self.write_RW_test(
attribute_type_test['scalar'], attribute_type_test['type'],
'scalar')
def test_scalar_readback(self):
for attribute_type_test in self.attribute_type_tests:
self.readback_test(
attribute_type_test['scalar'], attribute_type_test['type'],
'scalar')
def test_spectrum_R(self):
for attribute_type_test in self.attribute_type_tests:
self.read_R_test(
attribute_type_test['spectrum'], attribute_type_test['type'],
'spectrum')
def test_spectrum_RW(self):
for attribute_type_test in self.attribute_type_tests:
self.read_RW_test(
attribute_type_test['spectrum'], attribute_type_test['type'],
'spectrum')
def test_spectrum_W(self):
for attribute_type_test in self.attribute_type_tests:
self.write_RW_test(
attribute_type_test['spectrum'], attribute_type_test['type'],
'spectrum')
def test_spectrum_readback(self):
for attribute_type_test in self.attribute_type_tests:
self.readback_test(
attribute_type_test['spectrum'], attribute_type_test['type'],
'spectrum')
def test_image_R(self):
for attribute_type_test in self.attribute_type_tests:
self.read_R_test(
attribute_type_test['image'], attribute_type_test['type'],
'image')
def test_image_RW(self):
for attribute_type_test in self.attribute_type_tests:
self.read_RW_test(
attribute_type_test['image'], attribute_type_test['type'],
'image')
def test_image_W(self):
for attribute_type_test in self.attribute_type_tests:
self.write_RW_test(attribute_type_test['image'], attribute_type_test['type'], 'image')
def test_image_readback(self):
for attribute_type_test in self.attribute_type_tests:
self.readback_test(
attribute_type_test['image'], attribute_type_test['type'],
'image')
from clients.comms_client import CommClient # External imports
import numpy import numpy
import os
# <class 'numpy.bool_'> # Test imports
from clients.comms_client import CommClient
class test_client(CommClient): class test_client(CommClient):
""" """
...@@ -75,15 +76,16 @@ class test_client(CommClient): ...@@ -75,15 +76,16 @@ class test_client(CommClient):
takes all gathered data to configure and return the correct read and write functions takes all gathered data to configure and return the correct read and write functions
""" """
value = numpy.zeros(dims, dtype) self.value = numpy.zeros(dims, dtype)
def read_function(): def read_function():
self.streams.debug_stream("from read_function, reading {} array of type {}".format(dims, dtype)) self.streams.debug_stream("from read_function, reading {} array of type {}".format(dims, dtype))
return value return self.value
def write_function(write_value): def write_function(write_value):
self.streams.debug_stream("from write_function, writing {} array of type {}".format(dims, dtype)) self.streams.debug_stream("from write_function, writing {} array of type {}".format(dims, dtype))
value = write_value self.value = write_value
return
self.streams.debug_stream("created and bound example_client read/write functions to attribute_wrapper object") self.streams.debug_stream("created and bound example_client read/write functions to attribute_wrapper object")
return read_function, write_function return read_function, write_function
......
# -*- coding: utf-8 -*-
#
# This file is part of the PCC project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" test Device Server
"""
# TODO(Corne): Remove sys.path.append hack once packaging is in place!
import os, sys
currentdir = os.path.dirname(os.path.realpath(__file__))
parentdir = os.path.dirname(currentdir)
parentdir = os.path.dirname(parentdir)
sys.path.append(parentdir)
# PyTango imports
from tango.server import run
from tango.server import device_property
from tango import DevState
# Additional import
from test.clients.test_client import test_client
from clients.attribute_wrapper import *
from devices.hardware_device import *
__all__ = ["test_device", "main"]
class test_device(hardware_device):
# -----------------
# Device Properties
# -----------------
OPC_Server_Name = device_property(
dtype='DevString',
)
OPC_Server_Port = device_property(
dtype='DevULong',
)
OPC_Time_Out = device_property(
dtype='DevDouble',
)
# ----------
# Attributes
# ----------
bool_scalar_R = attribute_wrapper(comms_annotation="numpy.bool_ type read scalar", datatype=numpy.bool_)
bool_scalar_RW = attribute_wrapper(comms_annotation="numpy.bool_ type read/write scalar", datatype=numpy.bool_, access=AttrWriteType.READ_WRITE)
int32_spectrum_R = attribute_wrapper(comms_annotation="numpy.int32 type read spectrum (len = 8)", datatype=numpy.int32, dims=(8,))
int32_spectrum_RW = attribute_wrapper(comms_annotation="numpy.int32 type read spectrum (len = 8)", datatype=numpy.int32, dims=(8,),
access=AttrWriteType.READ_WRITE)
double_image_R = attribute_wrapper(comms_annotation="numpy.double type read image (dims = 2x8)", datatype=numpy.double, dims=(2, 8))
double_image_RW = attribute_wrapper(comms_annotation="numpy.double type read/write image (dims = 8x2)", datatype=numpy.double, dims=(8, 2),
access=AttrWriteType.READ_WRITE)
int32_scalar_R = attribute_wrapper(comms_annotation="numpy.int32 type read scalar", datatype=numpy.int32)
uint16_spectrum_RW = attribute_wrapper(comms_annotation="numpy.uint16 type read/write spectrum (len = 8)", datatype=numpy.uint16, dims=(8,),
access=AttrWriteType.READ_WRITE)
float32_image_R = attribute_wrapper(comms_annotation="numpy.float32 type read image (dims = 8x2)", datatype=numpy.float32, dims=(8, 2))
uint8_image_RW = attribute_wrapper(comms_annotation="numpy.uint8 type read/write image (dims = 2x8)", datatype=numpy.uint8, dims=(2, 8),
access=AttrWriteType.READ_WRITE)
# --------
# overloaded functions
# --------
def configure_for_initialise(self):
""" user code here. is called when the sate is set to INIT """
"""Initialises the attributes and properties of the PCC."""
self.set_state(DevState.INIT)
# set up the test client
self.test_client = test_client(self.Fault, self)
# map an access helper class
for i in self.attr_list():
i.set_comm_client(self.test_client)
self.test_client.start()
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the example module."""
return run((test_device,), args=args, **kwargs)
if __name__ == '__main__':
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment