Skip to content
Snippets Groups Projects
Commit edc22241 authored by Taya Snijder's avatar Taya Snijder
Browse files

small improvements

parent ce0de6c0
No related branches found
No related tags found
1 merge request!243Resolve L2SS-464 "Replace snmp python library with pysnmp"
...@@ -51,16 +51,7 @@ class SNMP_client(CommClient): ...@@ -51,16 +51,7 @@ class SNMP_client(CommClient):
def _setup_annotation(self, annotation): def _setup_annotation(self, annotation):
""" """
This class's Implementation of the get_mapping function. returns the read and write functions parses the annotation this attribute received for its initialisation.
The SNMP client uses a dict and takes the following keys:
either
oids: Required. An oid string of the object
or
mib: the mib name
name: name of the value to read
index (optional) the index if the value thats being read from is a table.
""" """
wrapper = annotation_wrapper(annotation) wrapper = annotation_wrapper(annotation)
...@@ -81,7 +72,7 @@ class SNMP_client(CommClient): ...@@ -81,7 +72,7 @@ class SNMP_client(CommClient):
""" """
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
Gets called from inside the attributet wrapper. It is provided with the attribute_warpper itself Gets called from inside the attribute wrapper. It is provided with the attribute_warpper itself
and the annotation provided when the attribute_wrapper was declared. and the annotation provided when the attribute_wrapper was declared.
These parameters can be used to configure a valid read and write function as return values. These parameters can be used to configure a valid read and write function as return values.
""" """
...@@ -169,6 +160,13 @@ class annotation_wrapper: ...@@ -169,6 +160,13 @@ class annotation_wrapper:
return objID return objID
def _get_oids(self, x, y, in_oid): def _get_oids(self, x, y, in_oid):
"""
This function expands oids depending on dimensionality.
if its a scalar its left alone, but if its an array it creates a list of sequential oids if not already provided
scalar "1.1.1.1" -> stays the same
spectrum: "1.1.1.1" -> ["1.1.1.1.1", "1.1.1.1.2, ..."]
"""
if x == 0: if x == 0:
x = 1 x = 1
...@@ -190,10 +188,11 @@ class annotation_wrapper: ...@@ -190,10 +188,11 @@ class annotation_wrapper:
# if we got a single str oid, make a list of sequential oids # if we got a single str oid, make a list of sequential oids
if type(in_oid) is str: if type(in_oid) is str:
return ["{}.{}".format(in_oid, i + 1) for i in range(nof_oids)] return ["{}.{}".format(in_oid, i + 1) for i in range(nof_oids)]
# if its an already expanded list of all oids # if its an already expanded list of all oids
elif type(in_oid) is list and len(in_oid) == nof_oids: elif type(in_oid) is list and len(in_oid) == nof_oids:
# already is an list and of the right length
return in_oid return in_oid
# if its a list of attributes with the wrong length. # if its a list of attributes with the wrong length.
else: else:
raise ValueError( raise ValueError(
...@@ -221,6 +220,10 @@ class snmp_attribute: ...@@ -221,6 +220,10 @@ class snmp_attribute:
return next(cmd) return next(cmd)
def read_function(self): def read_function(self):
"""
Read function we give to the attribute wrapper
"""
# must be recreated for each read it seems # must be recreated for each read it seems
self.objs = tuple(hlapi.ObjectType(i) for i in self.objID) self.objs = tuple(hlapi.ObjectType(i) for i in self.objID)
...@@ -237,6 +240,9 @@ class snmp_attribute: ...@@ -237,6 +240,9 @@ class snmp_attribute:
return val_lst return val_lst
def write_function(self, value): def write_function(self, value):
"""
Write function we give to the attribute wrapper
"""
if self.is_scalar: if self.is_scalar:
write_obj = tuple(hlapi.ObjectType(self.objID[0], value), ) write_obj = tuple(hlapi.ObjectType(self.objID[0], value), )
...@@ -254,6 +260,7 @@ class snmp_attribute: ...@@ -254,6 +260,7 @@ class snmp_attribute:
vals = [] vals = []
if not self.is_scalar: if not self.is_scalar:
#just the first element of this single element list
varBinds = varBinds[0] varBinds = varBinds[0]
for varBind in varBinds: for varBind in varBinds:
......
...@@ -7,6 +7,7 @@ from tangostationcontrol.test import base ...@@ -7,6 +7,7 @@ from tangostationcontrol.test import base
from tangostationcontrol.clients.snmp_client import SNMP_client, snmp_attribute, annotation_wrapper from tangostationcontrol.clients.snmp_client import SNMP_client, snmp_attribute, annotation_wrapper
# conversion dict
snmp_to_numpy_dict = { snmp_to_numpy_dict = {
hlapi.Integer32: numpy.int64, hlapi.Integer32: numpy.int64,
hlapi.TimeTicks: numpy.int64, hlapi.TimeTicks: numpy.int64,
...@@ -16,15 +17,16 @@ snmp_to_numpy_dict = { ...@@ -16,15 +17,16 @@ snmp_to_numpy_dict = {
hlapi.IpAddress: str, hlapi.IpAddress: str,
} }
# shortcut for testing dimensionality
dim_list = { dim_list = {
"scalar": (1, 0), "scalar": (1, 0),
"spectrum": (4, 0), "spectrum": (4, 0),
#"image": (3, 2)
} }
def get_return_val(snmp_type : type, dims : tuple): def get_return_val(snmp_type : type, dims : tuple):
#TODO support image type dims """
provides the return value for the set/get functions.
"""
if dims == dim_list["scalar"]: if dims == dim_list["scalar"]:
if snmp_type is hlapi.ObjectIdentity: if snmp_type is hlapi.ObjectIdentity:
...@@ -55,6 +57,11 @@ def get_return_val(snmp_type : type, dims : tuple): ...@@ -55,6 +57,11 @@ def get_return_val(snmp_type : type, dims : tuple):
def val_check(snmp_type : type, dims : tuple): def val_check(snmp_type : type, dims : tuple):
"""
provides the expected value for the get function to compare against after it has been converted to buildin/numpy types again
also provides the values for the set function to set
"""
if dims == dim_list["scalar"]: if dims == dim_list["scalar"]:
if snmp_type is hlapi.ObjectIdentity: if snmp_type is hlapi.ObjectIdentity:
check_val = "1.3.6.1.2.1.1.1.0.1" check_val = "1.3.6.1.2.1.1.1.0.1"
...@@ -87,9 +94,6 @@ class TestSNMP(base.TestCase): ...@@ -87,9 +94,6 @@ class TestSNMP(base.TestCase):
unit test for the processing of annotation. Has 2 lists. 1 with things that should succeed and 1 with things that should fail. unit test for the processing of annotation. Has 2 lists. 1 with things that should succeed and 1 with things that should fail.
""" """
test_name = "test_name"
test_mib = "test_mib"
client = SNMP_client(community='public', host='localhost', timeout=10, fault_func=None, try_interval=2) client = SNMP_client(community='public', host='localhost', timeout=10, fault_func=None, try_interval=2)
test_list = [ test_list = [
...@@ -143,8 +147,6 @@ class TestSNMP(base.TestCase): ...@@ -143,8 +147,6 @@ class TestSNMP(base.TestCase):
x, y = dim_list['scalar'] x, y = dim_list['scalar']
m_client = mock.Mock()
# we just need the object to call another function # we just need the object to call another function
wrapper = annotation_wrapper(annotation = {"oids": "Not None lol"}) wrapper = annotation_wrapper(annotation = {"oids": "Not None lol"})
# scalar # scalar
...@@ -163,8 +165,6 @@ class TestSNMP(base.TestCase): ...@@ -163,8 +165,6 @@ class TestSNMP(base.TestCase):
test_oid = "1.1.1.1" test_oid = "1.1.1.1"
x, y = dim_list['spectrum'] x, y = dim_list['spectrum']
m_client = mock.Mock()
# we just need the object to call another function # we just need the object to call another function
wrapper = annotation_wrapper(annotation={"oids": "Not None lol"}) wrapper = annotation_wrapper(annotation={"oids": "Not None lol"})
...@@ -188,9 +188,9 @@ class TestSNMP(base.TestCase): ...@@ -188,9 +188,9 @@ class TestSNMP(base.TestCase):
m_client = mock.Mock() m_client = mock.Mock()
wrapper = annotation_wrapper(annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"}) wrapper = annotation_wrapper(annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"})
a = snmp_attribute(client=m_client, wrapper=wrapper, dtype=snmp_to_numpy_dict[i], dim_x=dim_list[j][0], dim_y=dim_list[j][1]) snmp_attr = snmp_attribute(client=m_client, wrapper=wrapper, dtype=snmp_to_numpy_dict[i], dim_x=dim_list[j][0], dim_y=dim_list[j][1])
val = a.read_function() val = snmp_attr.read_function()
checkval = val_check(i, dim_list[j]) checkval = val_check(i, dim_list[j])
self.assertEqual(checkval, val, f"Expected: {checkval}, got: {val}") self.assertEqual(checkval, val, f"Expected: {checkval}, got: {val}")
...@@ -211,7 +211,7 @@ class TestSNMP(base.TestCase): ...@@ -211,7 +211,7 @@ class TestSNMP(base.TestCase):
set_val = val_check(i, dim_list[j]) set_val = val_check(i, dim_list[j])
wrapper = annotation_wrapper(annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"}) wrapper = annotation_wrapper(annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"})
a = snmp_attribute(client=m_client, wrapper=wrapper, dtype=snmp_to_numpy_dict[i], dim_x=dim_list[j][0], dim_y=dim_list[j][1]) snmp_attr = snmp_attribute(client=m_client, wrapper=wrapper, dtype=snmp_to_numpy_dict[i], dim_x=dim_list[j][0], dim_y=dim_list[j][1])
res_lst = [] res_lst = []
def test(*value): def test(*value):
...@@ -220,7 +220,7 @@ class TestSNMP(base.TestCase): ...@@ -220,7 +220,7 @@ class TestSNMP(base.TestCase):
hlapi.ObjectType = test hlapi.ObjectType = test
a.write_function(set_val) snmp_attr.write_function(set_val)
if len(res_lst) == 1: if len(res_lst) == 1:
res_lst = res_lst[0] res_lst = res_lst[0]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment