Skip to content
Snippets Groups Projects
Commit a54db2a7 authored by Taya Snijder's avatar Taya Snijder
Browse files

removed support for OIDS and lists

parent 515fa86a
No related branches found
No related tags found
1 merge request!288Resolve L2SS-446 "Extend snmp client to support mib files"
......@@ -52,11 +52,24 @@ class SNMP_client(CommClient):
def _setup_annotation(self, annotation):
"""
parses the annotation this attribute received for its initialisation.
The SNMP client uses a dict and takes the following keys:
mib: the mib name
name: name of the value to read
index (optional) the index if the value thats being read from is a table.
"""
wrapper = annotation_wrapper(annotation)
return wrapper
# check if the 'mib' and 'name' keys are present
if 'mib' in annotation and 'name' in annotation:
mib = annotation["mib"]
name = annotation["name"]
# SNMP has tables that require an index number to access them. regular non-table variable have an index of 0
idx = annotation.get('index', 0)
else:
raise ValueError(f"SNMP attribute annotation requires a dict argument with both a 'name' and 'mib' key. Instead got: {annotation}")
return mib, name, idx
def setup_value_conversion(self, attribute):
"""
......@@ -79,11 +92,11 @@ class SNMP_client(CommClient):
"""
# process the annotation
wrapper = self._setup_annotation(annotation)
mib, name, idx = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dim_x, dim_y, dtype = self.setup_value_conversion(attribute)
snmp_attr = snmp_attribute(self, wrapper, dtype, dim_x, dim_y)
snmp_attr = snmp_attribute(self, mib, name, idx, dtype, dim_x, dim_y)
# return the read/write functions
def read_function():
......@@ -104,33 +117,19 @@ class annotation_wrapper:
def __init__(self, annotation):
"""
The SNMP client uses a dict and takes the following keys:
either
oids: Required. An oid string of the object
or
mib: the mib name
name: name of the value to read
index (optional) the index if the value thats being read from is a table.
"""
# values start as None because we have a way too complicated interface
self.oids = None
# values start as None
self.mib = None
self.name = None
self.idx = None
# check if the 'oids' key is used and not the 'mib' and 'name' keys
if 'oids' in annotation and 'mib' not in annotation and 'name' not in annotation:
self.oids = annotation["oids"]
# checks to make sure this isn't present
if 'index' in annotation:
raise ValueError(f"SNMP attribute annotation doesn't support oid type declarations with an index present.")
# check if the 'oids' key is NOT used but instead the 'mib' and 'name' keys
elif 'oids' not in annotation and 'mib' in annotation and 'name' in annotation:
# check if the 'mib' and 'name' keys are present
if 'mib' in annotation and 'name' in annotation:
self.mib = annotation["mib"]
self.name = annotation["name"]
......@@ -138,86 +137,23 @@ class annotation_wrapper:
self.idx = annotation.get('index', 0)
else:
raise ValueError(
f"SNMP attribute annotation requires a dict argument with either a 'oids' key or both a 'name' and 'mib' key. Not both. Instead got: {annotation}")
def create_objID(self, x, y):
is_scalar = (x + y) == 1
# if oids are used
if self.oids is not None:
# get a list of str of the oids
self.oids = self._get_oids(x, y, self.oids)
# turn the list of oids in to a tuple of pysnmp object identities. These are used for the
objID = tuple(hlapi.ObjectIdentity(self.oids[i]) for i in range(len(self.oids)))
# if mib + name is used
else:
# only scalars can be used at the present time.
if not is_scalar:
# tuple(hlapi.ObjectIdentity(mib, name, idx) for i in range(len(oids)))
raise ValueError(f"MIB + name type attributes can only be scalars, got dimensions of: ({x}, {y})")
else:
objID = hlapi.ObjectIdentity(self.mib, self.name, self.idx)
return objID
def _get_oids(self, x, y, in_oid):
"""
This function expands oids depending on dimensionality.
if its a scalar its left alone, but if its an array it creates a list of sequential oids if not already provided
scalar "1.1.1.1" -> stays the same
spectrum: "1.1.1.1" -> ["1.1.1.1.1", "1.1.1.1.2, ..."]
"""
if x == 0:
x = 1
if y == 0:
y = 1
is_scalar = (x * y) == 1
nof_oids = x * y
# if scalar
if is_scalar:
if type(in_oid) is str:
# for ease of handling put single oid in a 1 element list
in_oid = [in_oid]
return in_oid
else:
# if we got a single str oid, make a list of sequential oids
if type(in_oid) is str:
return ["{}.{}".format(in_oid, i + 1) for i in range(nof_oids)]
# if its an already expanded list of all oids
elif type(in_oid) is list and len(in_oid) == nof_oids:
return in_oid
# if its a list of attributes with the wrong length.
else:
raise ValueError(
"SNMP oids need to either be a single value or an array the size of the attribute dimensions. got: {} expected: {}x{}={}".format(
len(in_oid), x, y, x * y))
raise ValueError(f"SNMP attribute annotation requires a dict argument with both a 'name' and 'mib' key. Instead got: {annotation}")
class snmp_attribute:
def __init__(self, client : SNMP_client, wrapper, dtype, dim_x, dim_y):
def __init__(self, client : SNMP_client, mib, name, idx, dtype, dim_x, dim_y):
self.client = client
self.wrapper = wrapper
self.mib = mib
self.name = name
self.idx = idx
self.dtype = dtype
self.dim_x = dim_x
self.dim_y = dim_y
self.is_scalar = (self.dim_x + self.dim_y) == 1
self.objID = self.wrapper.create_objID(self.dim_x, self.dim_y)
self.objID = self.create_objID()
def next_wrap(self, cmd):
"""
......@@ -285,4 +221,16 @@ class snmp_attribute:
return vals
def create_objID(self):
# only scalars can be used at the present time.
if not self.is_scalar:
# tuple(hlapi.ObjectIdentity(mib, name, idx) for i in range(len(oids)))
raise ValueError(f"MIB + name type attributes can only be scalars, got dimensions of: ({x}, {y})")
else:
objID = hlapi.ObjectIdentity(self.mib, self.name, self.idx)
return objID
......@@ -22,7 +22,8 @@ class server_imitator:
# shortcut for testing dimensionality
dim_list = {
"scalar": (1, 0),
"spectrum": (4, 0),
#NOTE: spectrum not supported currently
#"spectrum": (4, 0),
}
def get_return_val(self, snmp_type : type, dims : tuple):
......@@ -89,39 +90,6 @@ class server_imitator:
class TestSNMP(base.TestCase):
def test_annotation_success(self):
"""
unit test for the processing of annotation. Has 2 lists. 1 with things that should succeed and 1 with things that should fail.
"""
client = SNMP_client(community='public', host='localhost', timeout=10, fault_func=None, try_interval=2)
test_list = [
# test name nad MIB type annotation
{"mib": "SNMPv2-MIB", "name": "sysDescr"},
# test name nad MIB type annotation with index
{"mib": "RFC1213-MIB", "name": "ipAdEntAddr", "index": (127, 0, 0, 1)},
{"mib": "random-MIB", "name": "aName", "index": 2},
#oid
{"oids": "1.3.6.1.2.1.2.2.1.2.31"}
]
for i in test_list:
wrapper = client._setup_annotation(annotation=i)
if wrapper.oids is not None:
self.assertEqual(wrapper.oids, i["oids"])
else:
self.assertEqual(wrapper.mib, i["mib"], f"expected mib with: {i['mib']}, got: {wrapper.idx} from: {i}")
self.assertEqual(wrapper.name, i["name"], f"expected name with: {i['name']}, got: {wrapper.idx} from: {i}")
self.assertEqual(wrapper.idx, i.get('index', 0), f"expected idx with: {i.get('index', 0)}, got: {wrapper.idx} from: {i}")
def test_annotation_fail(self):
"""
unit test for the processing of annotation. Has 2 lists. 1 with things that should succeed and 1 with things that should fail.
......@@ -130,53 +98,16 @@ class TestSNMP(base.TestCase):
client = SNMP_client(community='public', host='localhost', timeout=10, fault_func=None, try_interval=2)
fail_list = [
# OIDS cant use the index
{"oids": "1.3.6.1.2.1.2.2.1.2.31", "index": 2},
# mixed annotation is not allowed
{"oids": "1.3.6.1.2.1.2.2.1.2.31", "name": "thisShouldFail"},
# no 'name'
{"mib": "random-MIB", "index": 2},
# no MIB
{"name": "random-name", "index": 2},
]
for i in fail_list:
with self.assertRaises(ValueError):
client._setup_annotation(annotation=i)
def test_oids_scalar(self):
test_oid = "1.1.1.1"
server = server_imitator()
x, y = server.dim_list['scalar']
# we just need the object to call another function
wrapper = annotation_wrapper(annotation = {"oids": "Not None lol"})
# scalar
scalar_expected = [test_oid]
ret_oids = wrapper._get_oids(x, y, test_oid)
self.assertEqual(ret_oids, scalar_expected, f"Expected: {scalar_expected}, got: {ret_oids}")
def test_oids_spectrum(self):
"""
Tests the "get_oids" function, which is for getting lists of sequential oids.
Results should basically be an incrementing list of oids with the final number incremented by 1 each time.
So "1.1" with dims of 3x1 might become ["1.1.1", "1.1.2", "1.1.3"]
"""
server = server_imitator()
test_oid = "1.1.1.1"
x, y = server.dim_list['spectrum']
# we just need the object to call another function
wrapper = annotation_wrapper(annotation={"oids": "Not None lol"})
# spectrum
spectrum_expected = [test_oid + ".1", test_oid + ".2", test_oid + ".3", test_oid + ".4"]
ret_oids = wrapper._get_oids(x, y, test_oid)
self.assertListEqual(ret_oids, spectrum_expected, f"Expected: {spectrum_expected}, got: {ret_oids}")
@mock.patch('pysnmp.hlapi.ObjectIdentity')
@mock.patch('pysnmp.hlapi.ObjectType')
@mock.patch('tangostationcontrol.clients.snmp_client.snmp_attribute.next_wrap')
......@@ -193,9 +124,7 @@ class TestSNMP(base.TestCase):
m_client = mock.Mock()
wrapper = annotation_wrapper(annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"})
snmp_attr = snmp_attribute(client=m_client, wrapper=wrapper, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1])
snmp_attr = snmp_attribute(client=m_client, mib="test", name="test", idx=0, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1])
val = snmp_attr.read_function()
......@@ -219,8 +148,7 @@ class TestSNMP(base.TestCase):
m_client = mock.Mock()
set_val = server.val_check(i, server.dim_list[j])
wrapper = annotation_wrapper(annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"})
snmp_attr = snmp_attribute(client=m_client, wrapper=wrapper, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1])
snmp_attr = snmp_attribute(client=m_client, mib="test", name="test", idx=0, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1])
res_lst = []
def test(*value):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment