Skip to content
Snippets Groups Projects
Commit b1f43167 authored by Taya Snijder's avatar Taya Snijder
Browse files

further process reviews

parent a70ecba4
No related branches found
No related tags found
1 merge request!243Resolve L2SS-464 "Replace snmp python library with pysnmp"
...@@ -62,8 +62,11 @@ class SNMP_client(CommClient): ...@@ -62,8 +62,11 @@ class SNMP_client(CommClient):
index (optional) the index if the value thats being read from is a table. index (optional) the index if the value thats being read from is a table.
""" """
# flag used in the return argument to indicate oid or mib+name # return values start as None because we have a way too complicated interface
uses_oid = False oids = None
mib = None
name = None
idx = None
# check if the 'oids' key is used and not the 'mib' and 'name' keys # check if the 'oids' key is used and not the 'mib' and 'name' keys
...@@ -75,22 +78,19 @@ class SNMP_client(CommClient): ...@@ -75,22 +78,19 @@ class SNMP_client(CommClient):
if 'index' in annotation: if 'index' in annotation:
raise ValueError(f"SNMP attribute annotation doesn't support oid type declarations with an index present.") raise ValueError(f"SNMP attribute annotation doesn't support oid type declarations with an index present.")
return uses_oid, oids
# check if the 'oids' key is NOT used but instead the 'mib' and 'name' keys # check if the 'oids' key is NOT used but instead the 'mib' and 'name' keys
elif 'oids' not in annotation and 'mib' in annotation and 'name' in annotation: elif 'oids' not in annotation and 'mib' in annotation and 'name' in annotation:
uses_mib_and_name = True
mib = annotation["mib"] mib = annotation["mib"]
name = annotation["name"] name = annotation["name"]
# index is already part of the 'oids' ... id, but for name types it can be specified. Default is 0 # SNMP has tables that require an index number to access them. regular non-table variable have an index of 0
idx = annotation.get('index', 0) idx = annotation.get('index', 0)
return uses_oid, (mib, name, idx)
else: else:
raise ValueError(f"SNMP attribute annotation requires a dict argument with either a 'oids' key or both a 'name' and 'mib' key. Not both. Instead got: {annotation}") raise ValueError(f"SNMP attribute annotation requires a dict argument with either a 'oids' key or both a 'name' and 'mib' key. Not both. Instead got: {annotation}")
return oids, mib, name, idx
def setup_value_conversion(self, attribute): def setup_value_conversion(self, attribute):
""" """
...@@ -109,11 +109,11 @@ class SNMP_client(CommClient): ...@@ -109,11 +109,11 @@ class SNMP_client(CommClient):
""" """
# process the annotation # process the annotation
uses_oids, output = self._setup_annotation(annotation) oids, mib, name, idx = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper # get all the necessary data to set up the read/write functions from the attribute_wrapper
dim_x, dim_y, dtype = self.setup_value_conversion(attribute) dim_x, dim_y, dtype = self.setup_value_conversion(attribute)
snmp_attr = snmp_attribute(self, uses_oids, dtype, dim_x, dim_y, output) snmp_attr = snmp_attribute(self, oids, mib, name, idx, dtype, dim_x, dim_y)
# return the read/write functions # return the read/write functions
def read_function(): def read_function():
...@@ -127,7 +127,7 @@ class SNMP_client(CommClient): ...@@ -127,7 +127,7 @@ class SNMP_client(CommClient):
class snmp_attribute: class snmp_attribute:
def __init__(self, client : SNMP_client, uses_oids, dtype, dim_x, dim_y, args): def __init__(self, client : SNMP_client, oids, mib, name, idx, dtype, dim_x, dim_y):
self.client = client self.client = client
self.dtype = dtype self.dtype = dtype
...@@ -135,31 +135,27 @@ class snmp_attribute: ...@@ -135,31 +135,27 @@ class snmp_attribute:
self.dim_y = dim_y self.dim_y = dim_y
self.is_scalar = (self.dim_x + self.dim_y) == 1 self.is_scalar = (self.dim_x + self.dim_y) == 1
# if oids are used
if uses_oids: if oids is not None:
# get a list of str of the oids # get a list of str of the oids
oids = self.get_oids(dim_x, dim_y, args) oids = self.get_oids(dim_x, dim_y, oids)
# turn the list of oids in to a tuple of pysnmp object identities. These are used for the # turn the list of oids in to a tuple of pysnmp object identities. These are used for the
objID = tuple(hlapi.ObjectIdentity(oids[i]) for i in range(len(oids))) objID = tuple(hlapi.ObjectIdentity(oids[i]) for i in range(len(oids)))
# if mib + name is used
else: else:
# if its an mib and name type
mib = args[0]
name = args[1]
idx = args[2]
# only scalars can be used at the present time.
if not self.is_scalar: if not self.is_scalar:
raise ValueError(f"MIB + name type attributes can only be scalars, got dimensions of: ({self.dim_y}, {self.dim_x})") #tuple(hlapi.ObjectIdentity(mib, name, idx) for i in range(len(oids)))
raise ValueError(f"MIB + name type attributes can only be scalars, got dimensions of: ({self.dim_y}, {self.dim_x})")
else:
objID = hlapi.ObjectIdentity(mib, name, idx) objID = hlapi.ObjectIdentity(mib, name, idx)
self.objID = objID self.objID = objID
def next_wrap(self, cmd): def next_wrap(self, cmd):
""" """
This function exists to allow the next(cmd) call to be mocked for unit testing. As the This function exists to allow the next(cmd) call to be mocked for unit testing. As the
...@@ -184,7 +180,6 @@ class snmp_attribute: ...@@ -184,7 +180,6 @@ class snmp_attribute:
def write_function(self, value): def write_function(self, value):
if self.is_scalar: if self.is_scalar:
write_obj = tuple(hlapi.ObjectType(self.objID[0], value), ) write_obj = tuple(hlapi.ObjectType(self.objID[0], value), )
......
...@@ -95,7 +95,7 @@ class TestSNMP(base.TestCase): ...@@ -95,7 +95,7 @@ class TestSNMP(base.TestCase):
m_client = mock.Mock() m_client = mock.Mock()
a = snmp_attribute(client=m_client, uses_oids=True, dtype=snmp_to_numpy_dict[i], dim_x=dim_list[j][0], dim_y=dim_list[j][1], args=("1.3.6.1.2.1.2.2.1.2.31")) a = snmp_attribute(client=m_client, oids="1.3.6.1.2.1.2.2.1.2.31", mib=None, name=None, idx=None, dtype=snmp_to_numpy_dict[i], dim_x=dim_list[j][0], dim_y=dim_list[j][1])
val = a.read_function() val = a.read_function()
checkval = val_check(i, dim_list[j]) checkval = val_check(i, dim_list[j])
...@@ -117,8 +117,8 @@ class TestSNMP(base.TestCase): ...@@ -117,8 +117,8 @@ class TestSNMP(base.TestCase):
set_val = val_check(i, dim_list[j]) set_val = val_check(i, dim_list[j])
a = snmp_attribute(client=m_client, uses_oids=True, dtype=snmp_to_numpy_dict[i], dim_x=dim_list[j][0], a = snmp_attribute(client=m_client, oids="1.3.6.1.2.1.2.2.1.2.31", mib=None, name=None, idx=None, dtype=snmp_to_numpy_dict[i], dim_x=dim_list[j][0],
dim_y=dim_list[j][1], args=("1.3.6.1.2.1.2.2.1.2.31")) dim_y=dim_list[j][1])
res_lst = [] res_lst = []
def test(*value): def test(*value):
...@@ -145,8 +145,7 @@ class TestSNMP(base.TestCase): ...@@ -145,8 +145,7 @@ class TestSNMP(base.TestCase):
m_client = mock.Mock() m_client = mock.Mock()
# we just need the object to call another function # we just need the object to call another function
a = snmp_attribute(client=m_client, uses_oids=False, dtype=str, dim_x=x, a = snmp_attribute(client=m_client, oids="Not None", mib=None, name=None, idx=None, dtype=str, dim_x=x, dim_y=y)
dim_y=y, args=("SNMPv2-MIB", "sysDescr", 0))
# scalar # scalar
scalar_expected = [test_oid] scalar_expected = [test_oid]
...@@ -168,8 +167,7 @@ class TestSNMP(base.TestCase): ...@@ -168,8 +167,7 @@ class TestSNMP(base.TestCase):
m_client = mock.Mock() m_client = mock.Mock()
# we just need the object to call another function # we just need the object to call another function
a = snmp_attribute(client=m_client, uses_oids=True, dtype=str, dim_x=x, a = snmp_attribute(client=m_client, oids="Not None", mib=None, name=None, idx=None, dtype=str, dim_x=x, dim_y=y)
dim_y=y, args=test_oid)
# spectrum # spectrum
spectrum_expected = [test_oid + ".1", test_oid + ".2", test_oid + ".3", test_oid + ".4"] spectrum_expected = [test_oid + ".1", test_oid + ".2", test_oid + ".3", test_oid + ".4"]
...@@ -202,18 +200,12 @@ class TestSNMP(base.TestCase): ...@@ -202,18 +200,12 @@ class TestSNMP(base.TestCase):
for i in test_list: for i in test_list:
uses_oids, res = client._setup_annotation(annotation=i) oids, mib, name, idx = client._setup_annotation(annotation=i)
if uses_oids: if oids is not None:
oids = res
self.assertEqual(oids, i["oids"]) self.assertEqual(oids, i["oids"])
else: else:
# if its an mib and name type
mib = res[0]
name = res[1]
idx = res[2]
self.assertEqual(mib, i["mib"], f"expected mib with: {i['mib']}, got: {idx} from: {i}") self.assertEqual(mib, i["mib"], f"expected mib with: {i['mib']}, got: {idx} from: {i}")
self.assertEqual(name, i["name"], f"expected name with: {i['name']}, got: {idx} from: {i}") self.assertEqual(name, i["name"], f"expected name with: {i['name']}, got: {idx} from: {i}")
self.assertEqual(idx, i.get('index', 0), f"expected idx with: {i.get('index', 0)}, got: {idx} from: {i}") self.assertEqual(idx, i.get('index', 0), f"expected idx with: {i.get('index', 0)}, got: {idx} from: {i}")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment