Skip to content
Snippets Groups Projects
Commit 4bd070ed authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

L2SS-820: capitalise class variables

parent ad6bc42e
Branches
Tags
1 merge request!367Resolve L2SS-820 "Capitalise class variables"
Showing
with 121 additions and 121 deletions
......@@ -8,7 +8,7 @@ logger = logging.getLogger()
__all__ = ["ini_client"]
numpy_to_ini_dict = {
NUMPY_TO_INI_DICT = {
numpy.int64: int,
numpy.double: float,
numpy.float64: float,
......@@ -16,7 +16,7 @@ numpy_to_ini_dict = {
str: str,
}
numpy_to_ini_get_dict = {
NUMPY_TO_INI_GET_DICT = {
numpy.int64: configparser.ConfigParser.getint,
numpy.double: configparser.ConfigParser.getfloat,
numpy.float64: configparser.ConfigParser.getfloat,
......@@ -24,7 +24,7 @@ numpy_to_ini_get_dict = {
str: str,
}
ini_to_numpy_dict = {
INI_TO_NUMPY_DICT = {
int: numpy.int64,
float: numpy.float64,
bool: bool,
......
......@@ -28,7 +28,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase):
# The AntennaField is setup with self.NR_TILES tiles in the test configuration
NR_TILES = 48
pointing_direction = numpy.array([["J2000","0deg","0deg"]] * NR_TILES).flatten()
POINTING_DIRECTION = numpy.array([["J2000","0deg","0deg"]] * NR_TILES).flatten()
def setUp(self):
super().setUp("STAT/TileBeam/1")
......@@ -64,7 +64,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase):
self.proxy.warm_boot()
# verify delays method returns the correct dimensions
delays = self.proxy.delays(self.pointing_direction)
delays = self.proxy.delays(self.POINTING_DIRECTION)
self.assertEqual(self.NR_TILES*16, len(delays))
def test_set_pointing(self):
......@@ -82,7 +82,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase):
time.sleep(3)
# Verify writing operation does not lead to errors
self.proxy.set_pointing(self.pointing_direction) # write values to RECV
self.proxy.set_pointing(self.POINTING_DIRECTION) # write values to RECV
delays_r2 = numpy.array(antennafield_proxy.read_attribute('HBAT_BF_delay_steps_RW').value)
self.assertIsNotNone(delays_r2)
......
......@@ -18,25 +18,25 @@ class TestProxyAttributeAccess(base.IntegrationTestCase):
""" Test whether DeviceProxy's can always access attributes immediately after turning them on. """
# We use RECV as our victim. Any device would do.
device_name = "STAT/RECV/1"
DEVICE_NAME = "STAT/RECV/1"
# an attribute to access
attribute_name = "ANT_mask_RW"
ATTRIBUTE_NAME = "ANT_mask_RW"
def setUp(self):
self.proxy = TestDeviceProxy(self.device_name)
self.proxy = TestDeviceProxy(self.DEVICE_NAME)
self.proxy.off()
def poll_attribute(self):
# make sure the attribute is polled, so we force proxies to access the poll first
if self.proxy.is_attribute_polled(self.attribute_name):
self.proxy.stop_poll_attribute(self.attribute_name)
self.proxy.poll_attribute(self.attribute_name, 1000)
if self.proxy.is_attribute_polled(self.ATTRIBUTE_NAME):
self.proxy.stop_poll_attribute(self.ATTRIBUTE_NAME)
self.proxy.poll_attribute(self.ATTRIBUTE_NAME, 1000)
def dont_poll_attribute(self):
# make sure the attribute is NOT polled, so we force proxies to access the device
if self.proxy.is_attribute_polled(self.attribute_name):
self.proxy.stop_poll_attribute(self.attribute_name)
if self.proxy.is_attribute_polled(self.ATTRIBUTE_NAME):
self.proxy.stop_poll_attribute(self.ATTRIBUTE_NAME)
def read_attribute(self):
# turn on the device
......@@ -44,7 +44,7 @@ class TestProxyAttributeAccess(base.IntegrationTestCase):
self.assertEqual(DevState.STANDBY, self.proxy.state())
# read the attribute -- shouldn't throw
_ = self.proxy.read_attribute(self.attribute_name)
_ = self.proxy.read_attribute(self.ATTRIBUTE_NAME)
def test_fast_setup_polled_attribute(self):
""" Setup a device as fast as possible and access its attributes immediately. """
......
......@@ -23,7 +23,7 @@ logger = logging.getLogger()
class TestRecvCluster(base.IntegrationTestCase):
pointing_direction = numpy.array([["J2000", "0deg", "0deg"]] * 96).flatten()
POINTING_DIRECTION = numpy.array([["J2000", "0deg", "0deg"]] * 96).flatten()
def setUp(self):
......@@ -72,7 +72,7 @@ class TestRecvCluster(base.IntegrationTestCase):
for _i in range(25):
start_time = time.monotonic_ns()
for proxy in beam_proxies:
proxy.set_pointing(self.pointing_direction)
proxy.set_pointing(self.POINTING_DIRECTION)
stop_time = time.monotonic_ns()
results.append(stop_time - start_time)
......
......@@ -23,13 +23,13 @@ import asyncio
import mock
import numpy
scalar_dims = (1,)
spectrum_dims = (4,)
image_dims = (3,2)
SCALAR_DIMS = (1,)
SPECTRUM_DIMS = (4,)
IMAGE_DIMS = (3,2)
str_scalar_val = '1'
str_spectrum_val = ['1','1', '1','1']
str_image_val = [['1','1'],['1','1'],['1','1']]
STR_SCALAR_VAL = '1'
STR_SPECTRUM_VAL = ['1','1', '1','1']
STR_IMAGE_VAL = [['1','1'],['1','1'],['1','1']]
def dev_init(device):
......@@ -132,85 +132,85 @@ class TestAttributeTypes(base.TestCase):
dev_init(self)
class str_spectrum_device(lofar_device):
spectrum_R = attribute_wrapper(comms_annotation="str_spectrum_R", datatype=str, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="str_spectrum_RW", datatype=str, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
spectrum_R = attribute_wrapper(comms_annotation="str_spectrum_R", datatype=str, dims=SPECTRUM_DIMS)
spectrum_RW = attribute_wrapper(comms_annotation="str_spectrum_RW", datatype=str, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS)
def configure_for_initialise(self):
dev_init(self)
class bool_spectrum_device(lofar_device):
spectrum_R = attribute_wrapper(comms_annotation="bool_spectrum_R", datatype=bool, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="bool_spectrum_RW", datatype=bool, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
spectrum_R = attribute_wrapper(comms_annotation="bool_spectrum_R", datatype=bool, dims=SPECTRUM_DIMS)
spectrum_RW = attribute_wrapper(comms_annotation="bool_spectrum_RW", datatype=bool, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS)
def configure_for_initialise(self):
dev_init(self)
class float32_spectrum_device(lofar_device):
spectrum_R = attribute_wrapper(comms_annotation="float32_spectrum_R", datatype=numpy.float32, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="float32_spectrum_RW", datatype=numpy.float32, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
spectrum_R = attribute_wrapper(comms_annotation="float32_spectrum_R", datatype=numpy.float32, dims=SPECTRUM_DIMS)
spectrum_RW = attribute_wrapper(comms_annotation="float32_spectrum_RW", datatype=numpy.float32, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS)
def configure_for_initialise(self):
dev_init(self)
class float64_spectrum_device(lofar_device):
spectrum_R = attribute_wrapper(comms_annotation="float64_spectrum_R", datatype=numpy.float64, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="float64_spectrum_RW", datatype=numpy.float64, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
spectrum_R = attribute_wrapper(comms_annotation="float64_spectrum_R", datatype=numpy.float64, dims=SPECTRUM_DIMS)
spectrum_RW = attribute_wrapper(comms_annotation="float64_spectrum_RW", datatype=numpy.float64, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS)
def configure_for_initialise(self):
dev_init(self)
class double_spectrum_device(lofar_device):
spectrum_R = attribute_wrapper(comms_annotation="double_spectrum_R", datatype=numpy.double, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="double_spectrum_RW", datatype=numpy.double, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
spectrum_R = attribute_wrapper(comms_annotation="double_spectrum_R", datatype=numpy.double, dims=SPECTRUM_DIMS)
spectrum_RW = attribute_wrapper(comms_annotation="double_spectrum_RW", datatype=numpy.double, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS)
def configure_for_initialise(self):
dev_init(self)
class uint8_spectrum_device(lofar_device):
spectrum_R = attribute_wrapper(comms_annotation="uint8_spectrum_R", datatype=numpy.uint8, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="uint8_spectrum_RW", datatype=numpy.uint8, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
spectrum_R = attribute_wrapper(comms_annotation="uint8_spectrum_R", datatype=numpy.uint8, dims=SPECTRUM_DIMS)
spectrum_RW = attribute_wrapper(comms_annotation="uint8_spectrum_RW", datatype=numpy.uint8, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS)
def configure_for_initialise(self):
dev_init(self)
class uint16_spectrum_device(lofar_device):
spectrum_R = attribute_wrapper(comms_annotation="uint16_spectrum_R", datatype=numpy.uint16, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="uint16_spectrum_RW", datatype=numpy.uint16, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
spectrum_R = attribute_wrapper(comms_annotation="uint16_spectrum_R", datatype=numpy.uint16, dims=SPECTRUM_DIMS)
spectrum_RW = attribute_wrapper(comms_annotation="uint16_spectrum_RW", datatype=numpy.uint16, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS)
def configure_for_initialise(self):
dev_init(self)
class uint32_spectrum_device(lofar_device):
spectrum_R = attribute_wrapper(comms_annotation="uint32_spectrum_R", datatype=numpy.uint32, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="uint32_spectrum_RW", datatype=numpy.uint32, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
spectrum_R = attribute_wrapper(comms_annotation="uint32_spectrum_R", datatype=numpy.uint32, dims=SPECTRUM_DIMS)
spectrum_RW = attribute_wrapper(comms_annotation="uint32_spectrum_RW", datatype=numpy.uint32, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS)
def configure_for_initialise(self):
dev_init(self)
class uint64_spectrum_device(lofar_device):
spectrum_R = attribute_wrapper(comms_annotation="uint64_spectrum_R", datatype=numpy.uint64, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="uint64_spectrum_RW", datatype=numpy.uint64, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
spectrum_R = attribute_wrapper(comms_annotation="uint64_spectrum_R", datatype=numpy.uint64, dims=SPECTRUM_DIMS)
spectrum_RW = attribute_wrapper(comms_annotation="uint64_spectrum_RW", datatype=numpy.uint64, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS)
def configure_for_initialise(self):
dev_init(self)
class int16_spectrum_device(lofar_device):
spectrum_R = attribute_wrapper(comms_annotation="int16_spectrum_R", datatype=numpy.int16, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="int16_spectrum_RW", datatype=numpy.int16, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
spectrum_R = attribute_wrapper(comms_annotation="int16_spectrum_R", datatype=numpy.int16, dims=SPECTRUM_DIMS)
spectrum_RW = attribute_wrapper(comms_annotation="int16_spectrum_RW", datatype=numpy.int16, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS)
def configure_for_initialise(self):
dev_init(self)
class int32_spectrum_device(lofar_device):
spectrum_R = attribute_wrapper(comms_annotation="int32_spectrum_R", datatype=numpy.int32, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="int32_spectrum_RW", datatype=numpy.int32, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
spectrum_R = attribute_wrapper(comms_annotation="int32_spectrum_R", datatype=numpy.int32, dims=SPECTRUM_DIMS)
spectrum_RW = attribute_wrapper(comms_annotation="int32_spectrum_RW", datatype=numpy.int32, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS)
def configure_for_initialise(self):
dev_init(self)
class int64_spectrum_device(lofar_device):
spectrum_R = attribute_wrapper(comms_annotation="int64_spectrum_R", datatype=numpy.int64, dims=spectrum_dims)
spectrum_RW = attribute_wrapper(comms_annotation="int64_spectrum_RW", datatype=numpy.int64, access=AttrWriteType.READ_WRITE, dims=spectrum_dims)
spectrum_R = attribute_wrapper(comms_annotation="int64_spectrum_R", datatype=numpy.int64, dims=SPECTRUM_DIMS)
spectrum_RW = attribute_wrapper(comms_annotation="int64_spectrum_RW", datatype=numpy.int64, access=AttrWriteType.READ_WRITE, dims=SPECTRUM_DIMS)
def configure_for_initialise(self):
dev_init(self)
......@@ -311,10 +311,10 @@ class TestAttributeTypes(base.TestCase):
expected = numpy.zeros((1,), dtype=dtype)
val = proxy.scalar_R
elif test_type == "spectrum":
expected = numpy.zeros(spectrum_dims, dtype=dtype)
expected = numpy.zeros(SPECTRUM_DIMS, dtype=dtype)
val = proxy.spectrum_R
elif test_type == "image":
expected = numpy.zeros(image_dims, dtype=dtype)
expected = numpy.zeros(IMAGE_DIMS, dtype=dtype)
val = numpy.array(proxy.image_R) #is needed for STR since they act differently
# cant use all() for 2d arrays so instead compare the dimensions and then flatten to 2d
......@@ -344,22 +344,22 @@ class TestAttributeTypes(base.TestCase):
if test_type == "scalar":
if dtype is str:
val = str_scalar_val
val = STR_SCALAR_VAL
else:
val = dtype(1)
proxy.scalar_RW = val
elif test_type == "spectrum":
if dtype is str:
val = str_spectrum_val
val = STR_SPECTRUM_VAL
else:
val = numpy.full(spectrum_dims, dtype=dtype, fill_value=1)
val = numpy.full(SPECTRUM_DIMS, dtype=dtype, fill_value=1)
print(val)
proxy.spectrum_RW = val
elif test_type == "image":
if dtype is str:
val = str_image_val
val = STR_IMAGE_VAL
else:
val = numpy.full(image_dims, dtype=dtype, fill_value=1)
val = numpy.full(IMAGE_DIMS, dtype=dtype, fill_value=1)
proxy.image_RW = val
else:
self.assertEqual(1,2, " {} is not a valid test_type. please use either scalar, spectrum or image".format(test_type))
......@@ -385,10 +385,10 @@ class TestAttributeTypes(base.TestCase):
expected = numpy.zeros((1,), dtype=dtype)
val = proxy.scalar_RW
elif test_type == "spectrum":
expected = numpy.zeros(spectrum_dims, dtype=dtype)
expected = numpy.zeros(SPECTRUM_DIMS, dtype=dtype)
val = proxy.spectrum_RW
elif test_type == "image":
expected = numpy.zeros(image_dims, dtype=dtype)
expected = numpy.zeros(IMAGE_DIMS, dtype=dtype)
val = numpy.array(proxy.image_RW) #is needed for STR since they act differently
# cant use all() for 2d arrays so instead compare the dimensions and then flatten to 2d
......@@ -414,7 +414,7 @@ class TestAttributeTypes(base.TestCase):
def _get_result_type(self, dtype, test_type, proxy):
if test_type == "scalar":
if dtype is str:
val = str_scalar_val
val = STR_SCALAR_VAL
else:
val = dtype(1)
proxy.scalar_RW = val
......@@ -422,17 +422,17 @@ class TestAttributeTypes(base.TestCase):
result_RW = proxy.scalar_RW
elif test_type == "spectrum":
if dtype is str:
val = str_spectrum_val
val = STR_SPECTRUM_VAL
else:
val = numpy.full(spectrum_dims, dtype=dtype, fill_value=1)
val = numpy.full(SPECTRUM_DIMS, dtype=dtype, fill_value=1)
proxy.spectrum_RW = val
result_R = proxy.spectrum_R
result_RW = proxy.spectrum_RW
elif test_type == "image":
if dtype is str:
val = str_image_val
val = STR_IMAGE_VAL
else:
val = numpy.full(image_dims, dtype=dtype, fill_value=1)
val = numpy.full(IMAGE_DIMS, dtype=dtype, fill_value=1)
# info += " write value: {}".format(val)
proxy.image_RW = val
......@@ -440,7 +440,7 @@ class TestAttributeTypes(base.TestCase):
result_RW = proxy.image_RW
if dtype != str:
self.assertEqual(result_R.shape, image_dims, "not the correct dimensions")
self.assertEqual(result_R.shape, IMAGE_DIMS, "not the correct dimensions")
result_R = result_R.reshape(-1)
result_RW = result_RW.reshape(-1)
......@@ -497,7 +497,7 @@ class TestAttributeTypes(base.TestCase):
might have unexpected results. Each type is bound to a device scalar,
spectrum and image class
"""
attribute_type_tests = [
ATTRIBUTE_TYPE_TESTS = [
{
'type': str, 'scalar': str_scalar_device,
'spectrum': str_spectrum_device, "image": str_image_device
......@@ -549,71 +549,71 @@ class TestAttributeTypes(base.TestCase):
]
def test_scalar_R(self):
for attribute_type_test in self.attribute_type_tests:
for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS:
self.read_R_test(
attribute_type_test['scalar'], attribute_type_test['type'],
'scalar')
def test_scalar_RW(self):
for attribute_type_test in self.attribute_type_tests:
for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS:
self.read_RW_test(
attribute_type_test['scalar'], attribute_type_test['type'],
'scalar')
def test_scalar_W(self):
for attribute_type_test in self.attribute_type_tests:
for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS:
self.write_RW_test(
attribute_type_test['scalar'], attribute_type_test['type'],
'scalar')
def test_scalar_readback(self):
for attribute_type_test in self.attribute_type_tests:
for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS:
self.readback_test(
attribute_type_test['scalar'], attribute_type_test['type'],
'scalar')
def test_spectrum_R(self):
for attribute_type_test in self.attribute_type_tests:
for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS:
self.read_R_test(
attribute_type_test['spectrum'], attribute_type_test['type'],
'spectrum')
def test_spectrum_RW(self):
for attribute_type_test in self.attribute_type_tests:
for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS:
self.read_RW_test(
attribute_type_test['spectrum'], attribute_type_test['type'],
'spectrum')
def test_spectrum_W(self):
for attribute_type_test in self.attribute_type_tests:
for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS:
self.write_RW_test(
attribute_type_test['spectrum'], attribute_type_test['type'],
'spectrum')
def test_spectrum_readback(self):
for attribute_type_test in self.attribute_type_tests:
for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS:
self.readback_test(
attribute_type_test['spectrum'], attribute_type_test['type'],
'spectrum')
def test_image_R(self):
for attribute_type_test in self.attribute_type_tests:
for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS:
self.read_R_test(
attribute_type_test['image'], attribute_type_test['type'],
'image')
def test_image_RW(self):
for attribute_type_test in self.attribute_type_tests:
for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS:
self.read_RW_test(
attribute_type_test['image'], attribute_type_test['type'],
'image')
def test_image_W(self):
for attribute_type_test in self.attribute_type_tests:
for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS:
self.write_RW_test(attribute_type_test['image'], attribute_type_test['type'], 'image')
def test_image_readback(self):
for attribute_type_test in self.attribute_type_tests:
for attribute_type_test in self.ATTRIBUTE_TYPE_TESTS:
self.readback_test(
attribute_type_test['image'], attribute_type_test['type'],
'image')
......
......@@ -11,10 +11,10 @@ from os import path
class TestMibLoading(base.TestCase):
#name and directory of the pymib file
mib = "TEST-MIB"
MIB = "TEST-MIB"
# mib file is in a folder that is in the same folder as this test
rel_dir = "SNMP_mib_loading"
REL_DIR = "SNMP_mib_loading"
def test_content(self):
"""
......@@ -23,9 +23,9 @@ class TestMibLoading(base.TestCase):
"""
abs_dir = path.dirname(__file__) + "/" + self.rel_dir
abs_dir = path.dirname(__file__) + "/" + self.REL_DIR
loader = mib_loader(abs_dir)
loader.load_pymib(self.mib)
loader.load_pymib(self.MIB)
# used to view mibs client side
mibView = view.MibViewController(loader.mibBuilder)
......@@ -37,14 +37,14 @@ class TestMibLoading(base.TestCase):
testNamedValue_value = 1
# get testValue and set a value of 1
obj_T = pysnmp.ObjectType(ObjectIdentity(self.mib, testNamedValue), pysnmp.Integer32(1))
obj_T = pysnmp.ObjectType(ObjectIdentity(self.MIB, testNamedValue), pysnmp.Integer32(1))
obj_T.resolveWithMib(mibView)
# get the oid
self.assertEqual(str(obj_T[0]), testNamedValue_oid)
# get the name format: mib::name
self.assertEqual(obj_T[0].prettyPrint(), f"{self.mib}::{testNamedValue}")
self.assertEqual(obj_T[0].prettyPrint(), f"{self.MIB}::{testNamedValue}")
# get the namedValue
self.assertEqual(str(obj_T[1]), testNamedValue_named)
......
......@@ -17,7 +17,7 @@ class attr_props:
self.numpy_type = numpy_type
attr_test_types = [
ATTR_TEST_TYPES = [
attr_props(numpy_type=bool),
attr_props(numpy_type=str),
attr_props(numpy_type=numpy.float32),
......@@ -32,10 +32,10 @@ attr_test_types = [
attr_props(numpy_type=numpy.int64)
]
scalar_shape = (1,)
spectrum_shape = (4,)
image_shape = (2, 3)
dimension_tests = [scalar_shape, spectrum_shape, image_shape]
SCALAR_SHAPE = (1,)
SPECTRUM_SHAPE = (4,)
IMAGE_SHAPE = (2, 3)
DIMENSION_TESTS = [SCALAR_SHAPE, SPECTRUM_SHAPE, IMAGE_SHAPE]
class TestOPCua(base.AsyncTestCase):
......@@ -90,14 +90,14 @@ class TestOPCua(base.AsyncTestCase):
m_opc_client_members.get_objects_node = asynctest.Mock(return_value=m_objects_node)
m_opc_client.return_value = m_opc_client_members
for i in attr_test_types:
for i in ATTR_TEST_TYPES:
class mock_attr:
def __init__(self, dtype, x, y):
self.numpy_type = dtype
self.dim_x = x
self.dim_y = y
for j in dimension_tests:
for j in DIMENSION_TESTS:
if len(j) == 1:
dim_x = j[0]
dim_y = 0
......@@ -127,9 +127,9 @@ class TestOPCua(base.AsyncTestCase):
"""
# for all datatypes
for i in attr_test_types:
for i in ATTR_TEST_TYPES:
# for all dimensions
for j in dimension_tests:
for j in DIMENSION_TESTS:
node = mock.Mock()
......@@ -158,8 +158,8 @@ class TestOPCua(base.AsyncTestCase):
This tests the read functions.
"""
for j in dimension_tests:
for i in attr_test_types:
for j in DIMENSION_TESTS:
for i in ATTR_TEST_TYPES:
def get_test_value():
return numpy.zeros(j, i.numpy_type)
......@@ -250,10 +250,10 @@ class TestOPCua(base.AsyncTestCase):
"""
# for all dimensionalities
for j in dimension_tests:
for j in DIMENSION_TESTS:
#for all datatypes
for i in attr_test_types:
for i in ATTR_TEST_TYPES:
# get numpy array of the test value
def get_test_value():
......@@ -275,7 +275,7 @@ class TestOPCua(base.AsyncTestCase):
async def compare_values(val):
# test valuest
val = val.tolist() if type(val) == numpy.ndarray else val
if j != dimension_tests[0]:
if j != DIMENSION_TESTS[0]:
comp = val.Value == get_mock_value(get_test_value().flatten()).Value
self.assertTrue(comp.all(),
"Array attempting to write unequal to expected array: \n\t got: {} \n\texpected: {}".format(val,get_mock_value(get_test_value())))
......
......@@ -10,7 +10,7 @@ from tangostationcontrol.clients.snmp_client import SNMP_client, snmp_attribute,
class server_imitator:
# conversion dict
snmp_to_numpy_dict = {
SNMP_TO_NUMPY_DICT = {
hlapi.Integer32: numpy.int64,
hlapi.TimeTicks: numpy.int64,
hlapi.OctetString: str,
......@@ -20,7 +20,7 @@ class server_imitator:
}
# shortcut for testing dimensionality
dim_list = {
DIM_LIST = {
"scalar": (1, 0),
"spectrum": (4, 0),
}
......@@ -30,7 +30,7 @@ class server_imitator:
provides the return value for the set/get functions that an actual server would return.
"""
if dims == self.dim_list["scalar"]:
if dims == self.DIM_LIST["scalar"]:
if snmp_type is hlapi.ObjectIdentity:
read_val = (None, snmp_type("1.3.6.1.2.1.1.1.0"))
elif snmp_type is hlapi.IpAddress:
......@@ -41,7 +41,7 @@ class server_imitator:
read_val = (None, snmp_type(1))
elif dims == self.dim_list["spectrum"]:
elif dims == self.DIM_LIST["spectrum"]:
if snmp_type is hlapi.ObjectIdentity:
read_val = []
for _i in range(dims[0]):
......@@ -69,14 +69,14 @@ class server_imitator:
provides the values we expect and would provide to the attribute after converting the
"""
if dims == self.dim_list["scalar"]:
if dims == self.DIM_LIST["scalar"]:
snmp_type_dict = {hlapi.ObjectIdentity:"1.3.6.1.2.1.1.1.0.1",
hlapi.IpAddress: "1.1.1.1",
hlapi.OctetString: "1"}
check_val = 1
for k,v in snmp_type_dict.items():
if snmp_type is k: check_val = v
elif dims == self.dim_list["spectrum"]:
elif dims == self.DIM_LIST["spectrum"]:
snmp_type_dict = {hlapi.ObjectIdentity:["1.3.6.1.2.1.1.1.0.1"] * dims[0],
hlapi.IpAddress: ["1.1.1.1"] * dims[0],
hlapi.OctetString: ["1"] * dims[0]}
......@@ -118,9 +118,9 @@ class TestSNMP(base.TestCase):
server = server_imitator()
for j in server.dim_list:
for i in server.snmp_to_numpy_dict:
m_next.return_value = (None, None, None, server.get_return_val(i, server.dim_list[j]))
for j in server.DIM_LIST:
for i in server.SNMP_TO_NUMPY_DICT:
m_next.return_value = (None, None, None, server.get_return_val(i, server.DIM_LIST[j]))
def __fakeInit__(self):
pass
......@@ -128,11 +128,11 @@ class TestSNMP(base.TestCase):
with mock.patch.object(SNMP_comm, '__init__', __fakeInit__):
m_comms = SNMP_comm()
snmp_attr = snmp_attribute(comm=m_comms, mib="test", name="test", idx=0, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1])
snmp_attr = snmp_attribute(comm=m_comms, mib="test", name="test", idx=0, dtype=server.SNMP_TO_NUMPY_DICT[i], dim_x=server.DIM_LIST[j][0], dim_y=server.DIM_LIST[j][1])
val = snmp_attr.read_function()
checkval = server.val_check(i, server.dim_list[j])
checkval = server.val_check(i, server.DIM_LIST[j])
self.assertEqual(checkval, val, f"During test {j} {i}; Expected: {checkval} of type {i}, got: {val} of type {type(val)}")
@mock.patch('pysnmp.hlapi.ObjectIdentity')
......@@ -146,9 +146,9 @@ class TestSNMP(base.TestCase):
server = server_imitator()
for j in server.dim_list:
for i in server.snmp_to_numpy_dict:
m_next.return_value = (None, None, None, server.get_return_val(i, server.dim_list[j]))
for j in server.DIM_LIST:
for i in server.SNMP_TO_NUMPY_DICT:
m_next.return_value = (None, None, None, server.get_return_val(i, server.DIM_LIST[j]))
def __fakeInit__(self):
pass
......@@ -156,14 +156,14 @@ class TestSNMP(base.TestCase):
with mock.patch.object(SNMP_comm, '__init__', __fakeInit__):
m_comms = SNMP_comm()
set_val = server.val_check(i, server.dim_list[j])
set_val = server.val_check(i, server.DIM_LIST[j])
snmp_attr = snmp_attribute(comm=m_comms, mib="test", name="test", idx=0, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1])
snmp_attr = snmp_attribute(comm=m_comms, mib="test", name="test", idx=0, dtype=server.SNMP_TO_NUMPY_DICT[i], dim_x=server.DIM_LIST[j][0], dim_y=server.DIM_LIST[j][1])
res_lst = []
def test(*value):
res_lst.append(value[1])
return None, None, None, server.get_return_val(i, server.dim_list[j])
return None, None, None, server.get_return_val(i, server.DIM_LIST[j])
hlapi.ObjectType = test
......@@ -172,7 +172,7 @@ class TestSNMP(base.TestCase):
if len(res_lst) == 1:
res_lst = res_lst[0]
checkval = server.val_check(i, server.dim_list[j])
checkval = server.val_check(i, server.DIM_LIST[j])
self.assertEqual(checkval, res_lst, f"During test {j} {i}; Expected: {checkval}, got: {res_lst}")
@mock.patch('tangostationcontrol.clients.snmp_client.SNMP_comm.getter')
......
......@@ -19,8 +19,8 @@ from tangostationcontrol.test import base
# where our WSRT_Measures.ztar surrogate is located
# two versions with different timestamps are provided
fake_measures = os.path.dirname(__file__) + "/fake_measures.ztar"
fake_measures_newer = os.path.dirname(__file__) + "/fake_measures_newer.ztar"
FAKE_MEASURES = os.path.dirname(__file__) + "/fake_measures.ztar"
FAKE_MEASURES_NEWER = os.path.dirname(__file__) + "/fake_measures_newer.ztar"
class TestMeasures(base.TestCase):
@mock.patch.object(urllib.request, 'urlretrieve')
......@@ -32,7 +32,7 @@ class TestMeasures(base.TestCase):
mock.patch('tangostationcontrol.common.measures.DOWNLOAD_DIR', tmpdirname) as downloaddir:
# emulate the download
m_urlretrieve.side_effect = lambda *args, **kw: shutil.copyfile(fake_measures, tmpdirname + "/WSRT_Measures.ztar")
m_urlretrieve.side_effect = lambda *args, **kw: shutil.copyfile(FAKE_MEASURES, tmpdirname + "/WSRT_Measures.ztar")
# 'download' and process our fake measures
newdir = measures.download_measures()
......@@ -53,9 +53,9 @@ class TestMeasures(base.TestCase):
mock.patch('tangostationcontrol.common.measures.DOWNLOAD_DIR', tmpdirname) as downloaddir:
# 'download' two measures with different timestamps
m_urlretrieve.side_effect = lambda *args, **kw: shutil.copyfile(fake_measures, tmpdirname + "/WSRT_Measures.ztar")
m_urlretrieve.side_effect = lambda *args, **kw: shutil.copyfile(FAKE_MEASURES, tmpdirname + "/WSRT_Measures.ztar")
newdir1 = measures.download_measures()
m_urlretrieve.side_effect = lambda *args, **kw: shutil.copyfile(fake_measures_newer, tmpdirname + "/WSRT_Measures.ztar")
m_urlretrieve.side_effect = lambda *args, **kw: shutil.copyfile(FAKE_MEASURES_NEWER, tmpdirname + "/WSRT_Measures.ztar")
newdir2 = measures.download_measures()
# check if both are available
......
......@@ -19,13 +19,13 @@ class Random_Data(Device):
Random data monitor point device
"""
dim_array = 1024 # x-axis dimension of a random values array
DIM_ARRAY = 1024 # x-axis dimension of a random values array
def read(self):
return random.random()
def read_array(self):
return random.rand(self.dim_array).astype(double)
return random.rand(self.DIM_ARRAY).astype(double)
# Attributes
rnd1 = attribute(
......@@ -390,7 +390,7 @@ class Random_Data(Device):
rnd21 = attribute(
dtype = ('DevDouble',),
max_dim_x = dim_array,
max_dim_x = DIM_ARRAY,
max_dim_y = 1,
polling_period = 1000,
period = 1000,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment