diff --git a/devices/OPC_Sim.py b/devices/OPC_Sim.py index 663175812f7d22bd89a521cfe388750ad562a431..376c83a406703b2205bae245a5c7182ad8b3d0f5 100644 --- a/devices/OPC_Sim.py +++ b/devices/OPC_Sim.py @@ -21,12 +21,12 @@ import time import numpy -from clients.opcua_connection import OPCUAConnection -from util.test_opcua_server import test_server, server_attribute +from devices.clients.opcua_connection import OPCUAConnection +from devices.util.test_opcua_server import test_server, server_attribute from opcua import ua, Server -from util.attribute_wrapper import * -from util.hardware_device import * +from devices.util.attribute_wrapper import * +from devices.util.hardware_device import * from tango.test_context import DeviceTestContext diff --git a/devices/attr_wrapper_test.py b/devices/attr_wrapper_test.py new file mode 100644 index 0000000000000000000000000000000000000000..702cab6bb4ebf3bc31fce52e1cf7b30e08cba51c --- /dev/null +++ b/devices/attr_wrapper_test.py @@ -0,0 +1,202 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the PCC project +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" test Device Server +""" + +# PyTango imports +from tango import DebugIt +from tango.server import run, command +from tango.server import device_property +from tango import AttrWriteType +from tango import DevState +# Additional import + +import numpy + +from devices.clients.test_client import test_client +from devices.util.attribute_wrapper import * +from devices.util.hardware_device import * + +from tango.test_context import DeviceTestContext +import unittest + +__all__ = ["test_device"] + + +class TestCase(unittest.TestCase): + + class str_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="scalar_R", datatype=numpy.int64) + + def configure_for_initialise(self): + self.set_state(DevState.INIT) + self.test_client = test_client(self.Fault, self) + self.scalar_R.set_comm_client(self.test_client) + self.test_client.start() + + class bool_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="scalar_R", datatype=numpy.int64) + + def configure_for_initialise(self): + self.set_state(DevState.INIT) + self.test_client = test_client(self.Fault, self) + self.scalar_R.set_comm_client(self.test_client) + self.test_client.start() + + class float32_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="scalar_R", datatype=numpy.int64) + + def configure_for_initialise(self): + self.set_state(DevState.INIT) + self.test_client = test_client(self.Fault, self) + self.scalar_R.set_comm_client(self.test_client) + self.test_client.start() + + class float64_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="scalar_R", datatype=numpy.int64) + + def configure_for_initialise(self): + self.set_state(DevState.INIT) + self.test_client = test_client(self.Fault, self) + self.scalar_R.set_comm_client(self.test_client) + self.test_client.start() + + class double_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="scalar_R", datatype=numpy.int64) + + def configure_for_initialise(self): + self.set_state(DevState.INIT) + self.test_client = test_client(self.Fault, self) + self.scalar_R.set_comm_client(self.test_client) + self.test_client.start() + + class uint8_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="scalar_R", datatype=numpy.int64) + + def configure_for_initialise(self): + self.set_state(DevState.INIT) + self.test_client = test_client(self.Fault, self) + self.scalar_R.set_comm_client(self.test_client) + self.test_client.start() + + class uint16_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="scalar_R", datatype=numpy.int64) + + def configure_for_initialise(self): + self.set_state(DevState.INIT) + self.test_client = test_client(self.Fault, self) + self.scalar_R.set_comm_client(self.test_client) + self.test_client.start() + + class uint32_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="scalar_R", datatype=numpy.int64) + + def configure_for_initialise(self): + self.set_state(DevState.INIT) + self.test_client = test_client(self.Fault, self) + self.scalar_R.set_comm_client(self.test_client) + self.test_client.start() + + class uint64_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="scalar_R", datatype=numpy.int64) + + def configure_for_initialise(self): + self.set_state(DevState.INIT) + self.test_client = test_client(self.Fault, self) + self.scalar_R.set_comm_client(self.test_client) + self.test_client.start() + + class int16_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="scalar_R", datatype=numpy.int64) + + def configure_for_initialise(self): + self.set_state(DevState.INIT) + self.test_client = test_client(self.Fault, self) + self.scalar_R.set_comm_client(self.test_client) + self.test_client.start() + + class int32_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="scalar_R", datatype=numpy.int64) + + def configure_for_initialise(self): + self.set_state(DevState.INIT) + self.test_client = test_client(self.Fault, self) + self.scalar_R.set_comm_client(self.test_client) + self.test_client.start() + + class int64_scalar_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="scalar_R", datatype=numpy.int64) + + def configure_for_initialise(self): + self.set_state(DevState.INIT) + self.test_client = test_client(self.Fault, self) + self.scalar_R.set_comm_client(self.test_client) + self.test_client.start() + + + def read_R(self, proxy): + val = proxy.scalar_R + return val + + def write_RW(self, proxy, val=1): + proxy.scalar_RW = val + + def read_RW(self, proxy): + val = proxy.scalar_RW + return val + + + def test_read_R(self, dev): + '''Test device''' + with DeviceTestContext(dev, process=True) as proxy: + + #initialise + proxy.initialise() + proxy.on() + + self.read_R(proxy) + + def test_write_RW(self, dev): + '''Test device''' + with DeviceTestContext(dev, process=True) as proxy: + + #initialise + proxy.initialise() + proxy.on() + + self.write_RW(proxy, val=1) + + def test_read_RW(self, dev): + '''Test device''' + with DeviceTestContext(dev, process=True) as proxy: + + #initialise + proxy.initialise() + proxy.on() + + self.read_RW(proxy) + + def test_write_readback(self, dev): + '''Test device''' + with DeviceTestContext(dev, process=True) as proxy: + + #initialise + proxy.initialise() + proxy.on() + + val = 1 + self.write_RW(proxy) + + result_R = self.read_R(proxy) + result_RW = self.read_RW(proxy) + + self.assertEqual(val, result_RW, "Value could not be handled by the atrribute_wrappers internal RW storer") + self.assertEqual(result_R, result_RW, "written value not present in clients R attribute") + diff --git a/devices/clients/attribute_wrapper.py b/devices/clients/attribute_wrapper.py index 99312919c0631f85c64cd3aec097a00b316f12f4..9f4fd7e002493480070d69f1c146f15abd262922 100644 --- a/devices/clients/attribute_wrapper.py +++ b/devices/clients/attribute_wrapper.py @@ -1,10 +1,9 @@ from tango.server import attribute from tango import AttrWriteType - +import logging import numpy from devices.device_decorators import only_when_on, fault_on_error -import logging logger = logging.getLogger() diff --git a/devices/devices/hardware_device.py b/devices/devices/hardware_device.py index 8c348868d691111eedb700906b9c1a4242111591..85c546616bea25b384206b6d7952209e44e0320d 100644 --- a/devices/devices/hardware_device.py +++ b/devices/devices/hardware_device.py @@ -18,11 +18,15 @@ from tango import DevState, DebugIt from clients.attribute_wrapper import attribute_wrapper from common.lofar_logging import log_exceptions +import logging + __all__ = ["hardware_device"] from devices.device_decorators import only_in_states, fault_on_error +logger = logging.getLogger() + #@log_exceptions() class hardware_device(Device): """ diff --git a/devices/examples/snmp/snmp_client.py b/devices/examples/snmp/snmp_client.py index 2c162abef0f924c3d67d9d248253c2a9df533a3f..635450172387148734e5e3b42ed0f82f067a0048 100644 --- a/devices/examples/snmp/snmp_client.py +++ b/devices/examples/snmp/snmp_client.py @@ -1,3 +1,4 @@ + from clients.comms_client import CommClient import snmp diff --git a/devices/minimal_test.py b/devices/minimal_test.py new file mode 100644 index 0000000000000000000000000000000000000000..d330702828498503f38bee8bb5b78376c8651b10 --- /dev/null +++ b/devices/minimal_test.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the PCC project +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" test Device Server +""" + +# PyTango imports +from tango import DebugIt +from tango.server import run, command +from tango.server import device_property +from tango import AttrWriteType +from tango import DevState +# Additional import + +import numpy + +from devices.clients.test_client import test_client +from devices.util.attribute_wrapper import * +from devices.util.hardware_device import * + +from tango.test_context import DeviceTestContext +import unittest + +__all__ = ["test_device"] + + +class TestCase(unittest.TestCase): + + + class test_device(hardware_device): + scalar_R = attribute_wrapper(comms_annotation="str_scalar_R", datatype=numpy.int64) + + def configure_for_initialise(self): + self.set_state(DevState.INIT) + self.test_client = test_client(self.Fault, self) + self.scalar_R.set_comm_client(self.test_client) + self.test_client.start() + + + def read_R(self, proxy): + val = proxy.scalar_R + return val + + def write_RW(self, proxy, val=1): + proxy.scalar_RW = val + + def read_RW(self, proxy): + val = proxy.scalar_RW + return val + + + def test_read_R(self): + '''Test device''' + with DeviceTestContext(self.test_device, process=True) as proxy: + + #initialise + proxy.initialise() + proxy.on() + + self.read_R(proxy) + + def test_write_RW(self): + '''Test device''' + with DeviceTestContext(self.test_device, process=True) as proxy: + + #initialise + proxy.initialise() + proxy.on() + + self.write_RW(proxy, val=1) + + def test_read_RW(self): + '''Test device''' + with DeviceTestContext(self.test_device, process=True) as proxy: + + #initialise + proxy.initialise() + proxy.on() + + self.read_RW(proxy) + + def test_write_readback(self): + '''Test device''' + with DeviceTestContext(self.test_device, process=True) as proxy: + + #initialise + proxy.initialise() + proxy.on() + + val = 1 + self.write_RW(proxy) + + result_R = self.read_R(proxy) + result_RW = self.read_RW(proxy) + + self.assertEqual(val, result_RW, "Value could not be handled by the atrribute_wrappers internal RW storer") + self.assertEqual(result_R, result_RW, "written value not present in clients R attribute") + + diff --git a/devices/simple_test.py b/devices/simple_test.py index afe7e409883defb1e3152d5059539f85f4bc4cab..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 --- a/devices/simple_test.py +++ b/devices/simple_test.py @@ -1,12 +0,0 @@ -import unittest - - - def setUp(self): - - pass - - def test_full(self): - pass - - # def tearDown(self): - # pass \ No newline at end of file