Skip to content
Snippets Groups Projects
Commit d6cd74ad authored by Taya Snijder's avatar Taya Snijder
Browse files

Merge branch 'L2SS-464-Replace-snmp-python-library-with-pysnmp' into 'master'

Resolve L2SS-464 "Replace snmp python library with pysnmp"

Closes L2SS-464

See merge request !243
parents e27b962a d05d0057
No related branches found
No related tags found
1 merge request!243Resolve L2SS-464 "Replace snmp python library with pysnmp"
...@@ -6,7 +6,7 @@ asyncua >= 0.9.90 # LGPLv3 ...@@ -6,7 +6,7 @@ asyncua >= 0.9.90 # LGPLv3
PyMySQL[rsa] >= 1.0.2 # MIT PyMySQL[rsa] >= 1.0.2 # MIT
psycopg2-binary >= 2.9.2 # LGPL psycopg2-binary >= 2.9.2 # LGPL
sqlalchemy >= 1.4.26 # MIT sqlalchemy >= 1.4.26 # MIT
snmp >= 0.1.7 # GPL3 pysnmp >= 0.1.7 # BSD
h5py >= 3.1.0 # BSD h5py >= 3.1.0 # BSD
psutil >= 5.8.0 # BSD psutil >= 5.8.0 # BSD
docker >= 5.0.3 # Apache 2 docker >= 5.0.3 # Apache 2
......
from tangostationcontrol.clients.comms_client import CommClient
from pysnmp import hlapi
import numpy
import logging
logger = logging.getLogger()
__all__ = ["SNMP_client"]
snmp_to_numpy_dict = {
hlapi.Integer32: numpy.int64,
hlapi.TimeTicks: numpy.int64,
str: str,
hlapi.ObjectIdentity: str,
hlapi.Counter32: numpy.int64,
hlapi.Gauge32: numpy.int64,
hlapi.IpAddress: str,
}
class SNMP_client(CommClient):
"""
messages to keep a check on the connection. On connection failure, reconnects once.
"""
def start(self):
super().start()
def __init__(self, community, host, timeout, fault_func, try_interval=2, port=161):
"""
Create the SNMP engine
"""
super().__init__(fault_func, try_interval)
logger.debug(f"setting up SNMP engine with host: {host} and community: {community}")
self.port = port
self.engine = hlapi.SnmpEngine()
self.community = hlapi.CommunityData(community)
self.transport = hlapi.UdpTransportTarget((host, port))
# context data sets the version used. Default SNMPv2
self.ctx_data = hlapi.ContextData()
# only sets up the engine, doesn't connect
self.connected = True
def _setup_annotation(self, annotation):
"""
parses the annotation this attribute received for its initialisation.
"""
wrapper = annotation_wrapper(annotation)
return wrapper
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
"""
dim_x = attribute.dim_x
dim_y = attribute.dim_y
dtype = attribute.numpy_type
return dim_x, dim_y, dtype
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
Gets called from inside the attribute wrapper. It is provided with the attribute_warpper itself
and the annotation provided when the attribute_wrapper was declared.
These parameters can be used to configure a valid read and write function as return values.
"""
# process the annotation
wrapper = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dim_x, dim_y, dtype = self.setup_value_conversion(attribute)
snmp_attr = snmp_attribute(self, wrapper, dtype, dim_x, dim_y)
# return the read/write functions
def read_function():
return snmp_attr.read_function()
def write_function(value):
snmp_attr.write_function(value)
return read_function, write_function
class annotation_wrapper:
def __init__(self, annotation):
"""
The SNMP client uses a dict and takes the following keys:
either
oids: Required. An oid string of the object
or
mib: the mib name
name: name of the value to read
index (optional) the index if the value thats being read from is a table.
"""
# values start as None because we have a way too complicated interface
self.oids = None
self.mib = None
self.name = None
self.idx = None
# check if the 'oids' key is used and not the 'mib' and 'name' keys
if 'oids' in annotation and 'mib' not in annotation and 'name' not in annotation:
self.oids = annotation["oids"]
# checks to make sure this isn't present
if 'index' in annotation:
raise ValueError(f"SNMP attribute annotation doesn't support oid type declarations with an index present.")
# check if the 'oids' key is NOT used but instead the 'mib' and 'name' keys
elif 'oids' not in annotation and 'mib' in annotation and 'name' in annotation:
self.mib = annotation["mib"]
self.name = annotation["name"]
# SNMP has tables that require an index number to access them. regular non-table variable have an index of 0
self.idx = annotation.get('index', 0)
else:
raise ValueError(
f"SNMP attribute annotation requires a dict argument with either a 'oids' key or both a 'name' and 'mib' key. Not both. Instead got: {annotation}")
def create_objID(self, x, y):
is_scalar = (x + y) == 1
# if oids are used
if self.oids is not None:
# get a list of str of the oids
self.oids = self._get_oids(x, y, self.oids)
# turn the list of oids in to a tuple of pysnmp object identities. These are used for the
objID = tuple(hlapi.ObjectIdentity(self.oids[i]) for i in range(len(self.oids)))
# if mib + name is used
else:
# only scalars can be used at the present time.
if not is_scalar:
# tuple(hlapi.ObjectIdentity(mib, name, idx) for i in range(len(oids)))
raise ValueError(f"MIB + name type attributes can only be scalars, got dimensions of: ({x}, {y})")
else:
objID = hlapi.ObjectIdentity(self.mib, self.name, self.idx)
return objID
def _get_oids(self, x, y, in_oid):
"""
This function expands oids depending on dimensionality.
if its a scalar its left alone, but if its an array it creates a list of sequential oids if not already provided
scalar "1.1.1.1" -> stays the same
spectrum: "1.1.1.1" -> ["1.1.1.1.1", "1.1.1.1.2, ..."]
"""
if x == 0:
x = 1
if y == 0:
y = 1
is_scalar = (x * y) == 1
nof_oids = x * y
# if scalar
if is_scalar:
if type(in_oid) is str:
# for ease of handling put single oid in a 1 element list
in_oid = [in_oid]
return in_oid
else:
# if we got a single str oid, make a list of sequential oids
if type(in_oid) is str:
return ["{}.{}".format(in_oid, i + 1) for i in range(nof_oids)]
# if its an already expanded list of all oids
elif type(in_oid) is list and len(in_oid) == nof_oids:
return in_oid
# if its a list of attributes with the wrong length.
else:
raise ValueError(
"SNMP oids need to either be a single value or an array the size of the attribute dimensions. got: {} expected: {}x{}={}".format(
len(in_oid), x, y, x * y))
class snmp_attribute:
def __init__(self, client : SNMP_client, wrapper, dtype, dim_x, dim_y):
self.client = client
self.wrapper = wrapper
self.dtype = dtype
self.dim_x = dim_x
self.dim_y = dim_y
self.is_scalar = (self.dim_x + self.dim_y) == 1
self.objID = self.wrapper.create_objID(self.dim_x, self.dim_y)
def next_wrap(self, cmd):
"""
This function exists to allow the next(cmd) call to be mocked for unit testing. As the
"""
return next(cmd)
def read_function(self):
"""
Read function we give to the attribute wrapper
"""
# must be recreated for each read it seems
self.objs = tuple(hlapi.ObjectType(i) for i in self.objID)
# get the thingy to get the values
get_cmd = hlapi.getCmd(self.client.engine, self.client.community, self.client.trasport, self.client.ctx_data, *self.objs)
# dont ask me why 'next' is used to get all of the values
errorIndication, errorStatus, errorIndex, *varBinds = self.next_wrap(get_cmd)
# get all the values in a list converted to the correct type
val_lst = self.convert(varBinds)
# return the list of values
return val_lst
def write_function(self, value):
"""
Write function we give to the attribute wrapper
"""
if self.is_scalar:
write_obj = tuple(hlapi.ObjectType(self.objID[0], value), )
else:
write_obj = tuple(hlapi.ObjectType(self.objID[i], value[i]) for i in range(len(self.objID)))
set_cmd = hlapi.setCmd(self.client.engine, self.client.community, self.client.trasport, self.client.ctx_data, *write_obj)
errorIndication, errorStatus, errorIndex, *varBinds = self.next_wrap(set_cmd)
def convert(self, varBinds):
"""
get all the values in a list, make sure to convert specific types that dont want to play nicely
"""
vals = []
if not self.is_scalar:
#just the first element of this single element list
varBinds = varBinds[0]
for varBind in varBinds:
# class 'DisplayString' doesnt want to play along for whatever reason
if "DisplayString" in str(type(varBind[1])):
vals.append(varBind[1].prettyPrint())
elif type(varBind[1]) == hlapi.IpAddress:
# IpAddress values get printed as their raw value but in hex (7F 20 20 01 for 127.0.0.1 for example)
vals.append(varBind[1].prettyPrint())
else:
# convert from the funky pysnmp types to numpy types and then append
vals.append(snmp_to_numpy_dict[type(varBind[1])](varBind[1]))
if self.is_scalar:
vals = vals[0]
return vals
...@@ -17,7 +17,7 @@ from tango.server import device_property ...@@ -17,7 +17,7 @@ from tango.server import device_property
from tango import AttrWriteType from tango import AttrWriteType
# Additional import # Additional import
from tangostationcontrol.examples.snmp.snmp_client import SNMP_client from tangostationcontrol.clients.snmp_client import SNMP_client
from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
from tangostationcontrol.devices.lofar_device import lofar_device from tangostationcontrol.devices.lofar_device import lofar_device
...@@ -66,20 +66,43 @@ class SNMP(lofar_device): ...@@ -66,20 +66,43 @@ class SNMP(lofar_device):
# Attributes # Attributes
# ---------- # ----------
sys_description_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.1.0"}, datatype=numpy.str)
sys_objectID_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.2.0", "type": "OID"}, datatype=numpy.str)
sys_uptime_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.3.0", "type": "TimeTicks"}, datatype=numpy.int64)
sys_name_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.5.0"}, datatype=numpy.str)
ip_route_mask_127_0_0_1_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.4.21.1.11.127.0.0.1", "type": "IpAddress"}, datatype=numpy.str)
TCP_active_open_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.6.5.0", "type": "Counter32"}, datatype=numpy.int64)
sys_contact_RW = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str, access=AttrWriteType.READ_WRITE) # octetstring
sys_contact_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str) sysDescr_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "sysDescr"}, datatype=numpy.str)
sysName_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "sysName"}, datatype=numpy.str)
TCP_Curr_estab_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.6.9.0", "type": "Gauge"}, datatype=numpy.int64) # get a table element with the oid
ifDescr31_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"}, datatype=numpy.str)
# get 10 table elements with the oid and dimension
ifDescr_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.2.2.1.2"}, dims=(10,), datatype=numpy.str)
#timeticks
sysUpTime_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "sysUpTime"}, datatype=numpy.int64)
# OID
sysObjectID_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "sysObjectID"}, datatype=numpy.int64)
# integer
sysServices_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "sysServices"}, datatype=numpy.int64)
tcpRtoAlgorithm_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "tcpRtoAlgorithm"}, datatype=numpy.int64)
snmpEnableAuthenTraps_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "snmpEnableAuthenTraps"}, datatype=numpy.int64)
#gauge
tcpCurrEstab_R = attribute_wrapper(comms_annotation={"mib": "RFC1213-MIB", "name": "tcpCurrEstab"}, datatype=numpy.int64)
#counter32
tcpActiveOpens_R = attribute_wrapper(comms_annotation={"mib": "RFC1213-MIB", "name": "tcpActiveOpens"}, datatype=numpy.int64)
snmpInPkts_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "snmpInPkts"}, datatype=numpy.int64)
#IP address
ipAdEntAddr_R = attribute_wrapper(comms_annotation={"mib": "RFC1213-MIB", "name": "ipAdEntAddr", "index": (127,0,0,1)}, datatype=numpy.str)
ipAdEntIfIndex_R = attribute_wrapper(comms_annotation={"mib": "RFC1213-MIB", "name": "ipAdEntIfIndex", "index": (10, 87, 6, 14)}, datatype=numpy.str)
#str RW attribute
sysContact_obj_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "sysContact"}, datatype=numpy.str)
sysContact_obj_RW = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "sysContact"}, datatype=numpy.str, access=AttrWriteType.READ_WRITE)
# inferred spectrum
if_index_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.2.2.1.1"}, dims=(10,), datatype=numpy.int64)
# -------- # --------
......
from tangostationcontrol.clients.comms_client import CommClient
import snmp
import numpy
import logging
logger = logging.getLogger()
__all__ = ["SNMP_client"]
snmp_to_numpy_dict = {
snmp.types.INTEGER: numpy.int64,
snmp.types.TimeTicks: numpy.int64,
snmp.types.OCTET_STRING: numpy.str,
snmp.types.OID: numpy.str,
snmp.types.Counter32: numpy.int64,
snmp.types.Gauge32: numpy.int64,
snmp.types.IpAddress: numpy.str,
}
snmp_types = {
"Integer": numpy.int64,
"Gauge": numpy.int64,
"TimeTick": numpy.int64,
"Counter32": numpy.int64,
"OctetString": numpy.str,
"IpAddress": numpy.str,
"OID": numpy.str,
}
class SNMP_client(CommClient):
"""
messages to keep a check on the connection. On connection failure, reconnects once.
"""
def start(self):
super().start()
def __init__(self, community, host, timeout, fault_func, try_interval=2):
"""
Create the SNMP and connect() to it
"""
super().__init__(fault_func, try_interval)
self.community = community
self.host = host
self.manager = snmp.Manager(community=bytes(community, "utf8"))
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
def connect(self):
"""
Try to connect to the client
"""
logger.debug(f"Connecting to community: {self.community}, host: {self.host}")
self.connected = True
return True
def ping(self):
"""
ping the client to make sure the connection with the client is still functional.
"""
pass
def _setup_annotation(self, annotation):
"""
This class's Implementation of the get_mapping function. returns the read and write functions
"""
if isinstance(annotation, dict):
# check if required path inarg is present
if annotation.get('oids') is None:
ValueError("SNMP get attributes require an oid")
oids = annotation.get("oids") # required
else:
TypeError("SNMP attributes require a dict with oid(s)")
return
dtype = annotation.get('type', None)
return oids, dtype
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
"""
dim_x = attribute.dim_x
dim_y = attribute.dim_y
dtype = attribute.numpy_type
return dim_x, dim_y, dtype
def get_oids(self, x, y, in_oid):
if x == 0:
x = 1
if y == 0:
y = 1
nof_oids = x * y
if nof_oids == 1:
# is scalar
if type(in_oid) is str:
# for ease of handling put single oid in a 1 element list
in_oid = [in_oid]
return in_oid
elif type(in_oid) is list and len(in_oid) == nof_oids:
# already is an array and of the right length
return in_oid
elif type(in_oid) is list and len(in_oid) != nof_oids:
# already is an array but the wrong length. Unable to handle this
raise ValueError("SNMP oids need to either be a single value or an array the size of the attribute dimensions. got: {} expected: {}x{}={}".format(len(in_oid),x,y,x*y))
else:
return ["{}.{}".format(in_oid, i + 1) for i in range(nof_oids)]
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
# process the annotation
oids, dtype = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dim_x, dim_y, numpy_type = self.setup_value_conversion(attribute)
oids = self.get_oids(dim_x, dim_y, oids)
def _read_function():
vars = self.manager.get(self.host, *oids)
return [snmp_to_numpy_dict[type(i.value)](str(i.value)) for i in vars]
if dtype is not None:
def _write_function(value):
if len(oids) == 1 and type(value) != list:
value = [value]
for i in range(len(oids)):
self.manager.set(self.host, oids[i], snmp_types[dtype](value[i]))
else:
def _write_function(value):
if len(oids) == 1 and type(value) != list:
value = [value]
for i in range(len(oids)):
self.manager.set(self.host, oids[i], value[i])
# return the read/write functions
return _read_function, _write_function
from pysnmp import hlapi
import numpy
from unittest import mock
from tangostationcontrol.test import base
from tangostationcontrol.clients.snmp_client import SNMP_client, snmp_attribute, annotation_wrapper
class server_imitator:
# conversion dict
snmp_to_numpy_dict = {
hlapi.Integer32: numpy.int64,
hlapi.TimeTicks: numpy.int64,
str: str,
hlapi.Counter32: numpy.int64,
hlapi.Gauge32: numpy.int64,
hlapi.IpAddress: str,
}
# shortcut for testing dimensionality
dim_list = {
"scalar": (1, 0),
"spectrum": (4, 0),
}
def get_return_val(self, snmp_type : type, dims : tuple):
"""
provides the return value for the set/get functions that an actual server would return.
"""
if dims == self.dim_list["scalar"]:
if snmp_type is hlapi.ObjectIdentity:
read_val = (None, snmp_type("1.3.6.1.2.1.1.1.0"))
elif snmp_type is hlapi.IpAddress:
read_val = (None, snmp_type("1.1.1.1"))
else:
read_val = (None, snmp_type(1))
elif dims == self.dim_list["spectrum"]:
if snmp_type is hlapi.ObjectIdentity:
read_val = []
for _i in range(dims[0]):
read_val.append((None, snmp_type(f"1.3.6.1.2.1.1.1.0.1")))
elif snmp_type is hlapi.IpAddress:
read_val = []
for _i in range(dims[0]):
read_val.append((None, snmp_type(f"1.1.1.1")))
else:
read_val = []
for _i in range(dims[0]):
read_val.append((None, snmp_type(1)))
else:
raise Exception("Image not yet supported :(")
return read_val
def val_check(self, snmp_type : type, dims : tuple):
"""
provides the values we expect and would provide to the attribute after converting the
"""
if dims == self.dim_list["scalar"]:
if snmp_type is hlapi.ObjectIdentity:
check_val = "1.3.6.1.2.1.1.1.0.1"
elif snmp_type is hlapi.IpAddress:
check_val = "1.1.1.1"
elif snmp_type is str:
check_val = "1"
else:
check_val = 1
elif dims == self.dim_list["spectrum"]:
if snmp_type is hlapi.ObjectIdentity:
check_val = ["1.3.6.1.2.1.1.1.0.1"] * dims[0]
elif snmp_type is hlapi.IpAddress:
check_val = ["1.1.1.1"] * dims[0]
elif snmp_type is str:
check_val = ["1"] * dims[0]
else:
check_val = [1] * dims[0]
else:
raise Exception("Image not yet supported :(")
return check_val
class TestSNMP(base.TestCase):
def test_annotation_success(self):
"""
unit test for the processing of annotation. Has 2 lists. 1 with things that should succeed and 1 with things that should fail.
"""
client = SNMP_client(community='public', host='localhost', timeout=10, fault_func=None, try_interval=2)
test_list = [
# test name nad MIB type annotation
{"mib": "SNMPv2-MIB", "name": "sysDescr"},
# test name nad MIB type annotation with index
{"mib": "RFC1213-MIB", "name": "ipAdEntAddr", "index": (127, 0, 0, 1)},
{"mib": "random-MIB", "name": "aName", "index": 2},
#oid
{"oids": "1.3.6.1.2.1.2.2.1.2.31"}
]
for i in test_list:
wrapper = client._setup_annotation(annotation=i)
if wrapper.oids is not None:
self.assertEqual(wrapper.oids, i["oids"])
else:
self.assertEqual(wrapper.mib, i["mib"], f"expected mib with: {i['mib']}, got: {wrapper.idx} from: {i}")
self.assertEqual(wrapper.name, i["name"], f"expected name with: {i['name']}, got: {wrapper.idx} from: {i}")
self.assertEqual(wrapper.idx, i.get('index', 0), f"expected idx with: {i.get('index', 0)}, got: {wrapper.idx} from: {i}")
def test_annotation_fail(self):
"""
unit test for the processing of annotation. Has 2 lists. 1 with things that should succeed and 1 with things that should fail.
"""
client = SNMP_client(community='public', host='localhost', timeout=10, fault_func=None, try_interval=2)
fail_list = [
# OIDS cant use the index
{"oids": "1.3.6.1.2.1.2.2.1.2.31", "index": 2},
# mixed annotation is not allowed
{"oids": "1.3.6.1.2.1.2.2.1.2.31", "name": "thisShouldFail"},
# no 'name'
{"mib": "random-MIB", "index": 2},
]
for i in fail_list:
with self.assertRaises(ValueError):
client._setup_annotation(annotation=i)
def test_oids_scalar(self):
test_oid = "1.1.1.1"
server = server_imitator()
x, y = server.dim_list['scalar']
# we just need the object to call another function
wrapper = annotation_wrapper(annotation = {"oids": "Not None lol"})
# scalar
scalar_expected = [test_oid]
ret_oids = wrapper._get_oids(x, y, test_oid)
self.assertEqual(ret_oids, scalar_expected, f"Expected: {scalar_expected}, got: {ret_oids}")
def test_oids_spectrum(self):
"""
Tests the "get_oids" function, which is for getting lists of sequential oids.
Results should basically be an incrementing list of oids with the final number incremented by 1 each time.
So "1.1" with dims of 3x1 might become ["1.1.1", "1.1.2", "1.1.3"]
"""
server = server_imitator()
test_oid = "1.1.1.1"
x, y = server.dim_list['spectrum']
# we just need the object to call another function
wrapper = annotation_wrapper(annotation={"oids": "Not None lol"})
# spectrum
spectrum_expected = [test_oid + ".1", test_oid + ".2", test_oid + ".3", test_oid + ".4"]
ret_oids = wrapper._get_oids(x, y, test_oid)
self.assertListEqual(ret_oids, spectrum_expected, f"Expected: {spectrum_expected}, got: {ret_oids}")
@mock.patch('pysnmp.hlapi.ObjectIdentity')
@mock.patch('pysnmp.hlapi.ObjectType')
@mock.patch('tangostationcontrol.clients.snmp_client.snmp_attribute.next_wrap')
def test_snmp_obj_get(self, m_next, m_obj_T, m_obj_i):
"""
Attempts to read a fake SNMP variable and checks whether it got what it expected
"""
server = server_imitator()
for j in server.dim_list:
for i in server.snmp_to_numpy_dict:
m_next.return_value = (None, None, None, server.get_return_val(i, server.dim_list[j]))
m_client = mock.Mock()
wrapper = annotation_wrapper(annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"})
snmp_attr = snmp_attribute(client=m_client, wrapper=wrapper, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1])
val = snmp_attr.read_function()
checkval = server.val_check(i, server.dim_list[j])
self.assertEqual(checkval, val, f"Expected: {checkval}, got: {val}")
@mock.patch('pysnmp.hlapi.ObjectIdentity')
@mock.patch('pysnmp.hlapi.setCmd')
@mock.patch('tangostationcontrol.clients.snmp_client.snmp_attribute.next_wrap')
def test_snmp_obj_set(self, m_next, m_nextCmd, m_obj_i):
"""
Attempts to write a value to an SNMP server, but instead intercepts it and compared whether the values is as expected.
"""
server = server_imitator()
for j in server.dim_list:
for i in server.snmp_to_numpy_dict:
m_next.return_value = (None, None, None, server.get_return_val(i, server.dim_list[j]))
m_client = mock.Mock()
set_val = server.val_check(i, server.dim_list[j])
wrapper = annotation_wrapper(annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"})
snmp_attr = snmp_attribute(client=m_client, wrapper=wrapper, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1])
res_lst = []
def test(*value):
res_lst.append(value[1])
return None, None, None, server.get_return_val(i, server.dim_list[j])
hlapi.ObjectType = test
snmp_attr.write_function(set_val)
if len(res_lst) == 1:
res_lst = res_lst[0]
checkval = server.val_check(i, server.dim_list[j])
self.assertEqual(checkval, res_lst, f"Expected: {checkval}, got: {res_lst}")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment