diff --git a/tangostationcontrol/tangostationcontrol/examples/snmp/__init__.py b/tangostationcontrol/__init__.py similarity index 100% rename from tangostationcontrol/tangostationcontrol/examples/snmp/__init__.py rename to tangostationcontrol/__init__.py diff --git a/tangostationcontrol/requirements.txt b/tangostationcontrol/requirements.txt index 0cb186f10ceb7b41693b948a4abd81b17c61c053..62cf05034ea7b40c479f6b75eb4c8d2641f2dcf9 100644 --- a/tangostationcontrol/requirements.txt +++ b/tangostationcontrol/requirements.txt @@ -6,7 +6,7 @@ asyncua >= 0.9.90 # LGPLv3 PyMySQL[rsa] >= 1.0.2 # MIT psycopg2-binary >= 2.9.2 # LGPL sqlalchemy >= 1.4.26 # MIT -snmp >= 0.1.7 # GPL3 +pysnmp >= 0.1.7 # BSD h5py >= 3.1.0 # BSD psutil >= 5.8.0 # BSD docker >= 5.0.3 # Apache 2 diff --git a/tangostationcontrol/tangostationcontrol/clients/snmp_client.py b/tangostationcontrol/tangostationcontrol/clients/snmp_client.py new file mode 100644 index 0000000000000000000000000000000000000000..7a7f45808cdc2d160cb9db3356d3a0e9beda4be0 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/clients/snmp_client.py @@ -0,0 +1,282 @@ + +from tangostationcontrol.clients.comms_client import CommClient + +from pysnmp import hlapi + +import numpy +import logging + +logger = logging.getLogger() + +__all__ = ["SNMP_client"] + +snmp_to_numpy_dict = { + hlapi.Integer32: numpy.int64, + hlapi.TimeTicks: numpy.int64, + str: str, + hlapi.ObjectIdentity: str, + hlapi.Counter32: numpy.int64, + hlapi.Gauge32: numpy.int64, + hlapi.IpAddress: str, +} + + +class SNMP_client(CommClient): + """ + messages to keep a check on the connection. On connection failure, reconnects once. + """ + + def start(self): + super().start() + + def __init__(self, community, host, timeout, fault_func, try_interval=2, port=161): + """ + Create the SNMP engine + """ + super().__init__(fault_func, try_interval) + + logger.debug(f"setting up SNMP engine with host: {host} and community: {community}") + self.port = port + + self.engine = hlapi.SnmpEngine() + self.community = hlapi.CommunityData(community) + self.transport = hlapi.UdpTransportTarget((host, port)) + + # context data sets the version used. Default SNMPv2 + self.ctx_data = hlapi.ContextData() + + # only sets up the engine, doesn't connect + self.connected = True + + + def _setup_annotation(self, annotation): + """ + parses the annotation this attribute received for its initialisation. + """ + + wrapper = annotation_wrapper(annotation) + return wrapper + + def setup_value_conversion(self, attribute): + """ + gives the client access to the attribute_wrapper object in order to access all data it could potentially need. + """ + + dim_x = attribute.dim_x + dim_y = attribute.dim_y + dtype = attribute.numpy_type + + return dim_x, dim_y, dtype + + def setup_attribute(self, annotation, attribute): + """ + MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions + + Gets called from inside the attribute wrapper. It is provided with the attribute_warpper itself + and the annotation provided when the attribute_wrapper was declared. + These parameters can be used to configure a valid read and write function as return values. + """ + + # process the annotation + wrapper = self._setup_annotation(annotation) + + # get all the necessary data to set up the read/write functions from the attribute_wrapper + dim_x, dim_y, dtype = self.setup_value_conversion(attribute) + snmp_attr = snmp_attribute(self, wrapper, dtype, dim_x, dim_y) + + # return the read/write functions + def read_function(): + return snmp_attr.read_function() + + def write_function(value): + snmp_attr.write_function(value) + + return read_function, write_function + + +class annotation_wrapper: + def __init__(self, annotation): + """ + The SNMP client uses a dict and takes the following keys: + + either + oids: Required. An oid string of the object + or + mib: the mib name + name: name of the value to read + index (optional) the index if the value thats being read from is a table. + """ + + # values start as None because we have a way too complicated interface + self.oids = None + self.mib = None + self.name = None + self.idx = None + + # check if the 'oids' key is used and not the 'mib' and 'name' keys + + if 'oids' in annotation and 'mib' not in annotation and 'name' not in annotation: + self.oids = annotation["oids"] + + # checks to make sure this isn't present + if 'index' in annotation: + raise ValueError(f"SNMP attribute annotation doesn't support oid type declarations with an index present.") + + + # check if the 'oids' key is NOT used but instead the 'mib' and 'name' keys + elif 'oids' not in annotation and 'mib' in annotation and 'name' in annotation: + self.mib = annotation["mib"] + self.name = annotation["name"] + + # SNMP has tables that require an index number to access them. regular non-table variable have an index of 0 + self.idx = annotation.get('index', 0) + + else: + raise ValueError( + f"SNMP attribute annotation requires a dict argument with either a 'oids' key or both a 'name' and 'mib' key. Not both. Instead got: {annotation}") + + def create_objID(self, x, y): + is_scalar = (x + y) == 1 + + # if oids are used + if self.oids is not None: + # get a list of str of the oids + self.oids = self._get_oids(x, y, self.oids) + + # turn the list of oids in to a tuple of pysnmp object identities. These are used for the + objID = tuple(hlapi.ObjectIdentity(self.oids[i]) for i in range(len(self.oids))) + + # if mib + name is used + else: + + # only scalars can be used at the present time. + if not is_scalar: + # tuple(hlapi.ObjectIdentity(mib, name, idx) for i in range(len(oids))) + + raise ValueError(f"MIB + name type attributes can only be scalars, got dimensions of: ({x}, {y})") + else: + objID = hlapi.ObjectIdentity(self.mib, self.name, self.idx) + + return objID + + def _get_oids(self, x, y, in_oid): + """ + This function expands oids depending on dimensionality. + if its a scalar its left alone, but if its an array it creates a list of sequential oids if not already provided + + scalar "1.1.1.1" -> stays the same + spectrum: "1.1.1.1" -> ["1.1.1.1.1", "1.1.1.1.2, ..."] + """ + + if x == 0: + x = 1 + if y == 0: + y = 1 + + is_scalar = (x * y) == 1 + nof_oids = x * y + + # if scalar + if is_scalar: + if type(in_oid) is str: + # for ease of handling put single oid in a 1 element list + in_oid = [in_oid] + + return in_oid + + else: + # if we got a single str oid, make a list of sequential oids + if type(in_oid) is str: + return ["{}.{}".format(in_oid, i + 1) for i in range(nof_oids)] + + # if its an already expanded list of all oids + elif type(in_oid) is list and len(in_oid) == nof_oids: + return in_oid + + # if its a list of attributes with the wrong length. + else: + raise ValueError( + "SNMP oids need to either be a single value or an array the size of the attribute dimensions. got: {} expected: {}x{}={}".format( + len(in_oid), x, y, x * y)) + + +class snmp_attribute: + + def __init__(self, client : SNMP_client, wrapper, dtype, dim_x, dim_y): + + self.client = client + self.wrapper = wrapper + self.dtype = dtype + self.dim_x = dim_x + self.dim_y = dim_y + self.is_scalar = (self.dim_x + self.dim_y) == 1 + + self.objID = self.wrapper.create_objID(self.dim_x, self.dim_y) + + def next_wrap(self, cmd): + """ + This function exists to allow the next(cmd) call to be mocked for unit testing. As the + """ + return next(cmd) + + def read_function(self): + """ + Read function we give to the attribute wrapper + """ + + # must be recreated for each read it seems + self.objs = tuple(hlapi.ObjectType(i) for i in self.objID) + + # get the thingy to get the values + get_cmd = hlapi.getCmd(self.client.engine, self.client.community, self.client.trasport, self.client.ctx_data, *self.objs) + + # dont ask me why 'next' is used to get all of the values + errorIndication, errorStatus, errorIndex, *varBinds = self.next_wrap(get_cmd) + + # get all the values in a list converted to the correct type + val_lst = self.convert(varBinds) + + # return the list of values + return val_lst + + def write_function(self, value): + """ + Write function we give to the attribute wrapper + """ + + if self.is_scalar: + write_obj = tuple(hlapi.ObjectType(self.objID[0], value), ) + + else: + write_obj = tuple(hlapi.ObjectType(self.objID[i], value[i]) for i in range(len(self.objID))) + + set_cmd = hlapi.setCmd(self.client.engine, self.client.community, self.client.trasport, self.client.ctx_data, *write_obj) + errorIndication, errorStatus, errorIndex, *varBinds = self.next_wrap(set_cmd) + + def convert(self, varBinds): + """ + get all the values in a list, make sure to convert specific types that dont want to play nicely + """ + + vals = [] + if not self.is_scalar: + #just the first element of this single element list + varBinds = varBinds[0] + + for varBind in varBinds: + # class 'DisplayString' doesnt want to play along for whatever reason + if "DisplayString" in str(type(varBind[1])): + vals.append(varBind[1].prettyPrint()) + elif type(varBind[1]) == hlapi.IpAddress: + # IpAddress values get printed as their raw value but in hex (7F 20 20 01 for 127.0.0.1 for example) + vals.append(varBind[1].prettyPrint()) + else: + # convert from the funky pysnmp types to numpy types and then append + vals.append(snmp_to_numpy_dict[type(varBind[1])](varBind[1])) + + if self.is_scalar: + vals = vals[0] + + return vals + + diff --git a/tangostationcontrol/tangostationcontrol/devices/snmp_device.py b/tangostationcontrol/tangostationcontrol/devices/snmp_device.py new file mode 100644 index 0000000000000000000000000000000000000000..04d5a1425e19b0c5fbcb076f206bcd4ed122618a --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/devices/snmp_device.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +# +# This file is part of theRECV project +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" SNMP Device for LOFAR2.0 + +""" + +# PyTango imports +from tango.server import run +from tango.server import device_property +from tango import AttrWriteType + +# Additional import +from tangostationcontrol.clients.snmp_client import SNMP_client +from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper +from tangostationcontrol.devices.lofar_device import lofar_device + +import numpy + +import logging +logger = logging.getLogger() + +__all__ = ["SNMP", "main"] + + +class SNMP(lofar_device): + """ + + **Properties:** + + - Device Property + SNMP_community + - Type:'DevString' + SNMP_host + - Type:'DevULong' + SNMP_timeout + - Type:'DevDouble' + """ + + # ----------------- + # Device Properties + # ----------------- + + SNMP_community = device_property( + dtype='DevString', + mandatory=True + ) + + SNMP_host = device_property( + dtype='DevString', + mandatory=True + ) + + SNMP_timeout = device_property( + dtype='DevDouble', + mandatory=True + ) + + # ---------- + # Attributes + # ---------- + + + # octetstring + sysDescr_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "sysDescr"}, datatype=numpy.str) + sysName_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "sysName"}, datatype=numpy.str) + + # get a table element with the oid + ifDescr31_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"}, datatype=numpy.str) + + # get 10 table elements with the oid and dimension + ifDescr_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.2.2.1.2"}, dims=(10,), datatype=numpy.str) + + #timeticks + sysUpTime_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "sysUpTime"}, datatype=numpy.int64) + + # OID + sysObjectID_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "sysObjectID"}, datatype=numpy.int64) + + # integer + sysServices_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "sysServices"}, datatype=numpy.int64) + tcpRtoAlgorithm_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "tcpRtoAlgorithm"}, datatype=numpy.int64) + snmpEnableAuthenTraps_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "snmpEnableAuthenTraps"}, datatype=numpy.int64) + + #gauge + tcpCurrEstab_R = attribute_wrapper(comms_annotation={"mib": "RFC1213-MIB", "name": "tcpCurrEstab"}, datatype=numpy.int64) + + #counter32 + tcpActiveOpens_R = attribute_wrapper(comms_annotation={"mib": "RFC1213-MIB", "name": "tcpActiveOpens"}, datatype=numpy.int64) + snmpInPkts_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "snmpInPkts"}, datatype=numpy.int64) + + #IP address + ipAdEntAddr_R = attribute_wrapper(comms_annotation={"mib": "RFC1213-MIB", "name": "ipAdEntAddr", "index": (127,0,0,1)}, datatype=numpy.str) + ipAdEntIfIndex_R = attribute_wrapper(comms_annotation={"mib": "RFC1213-MIB", "name": "ipAdEntIfIndex", "index": (10, 87, 6, 14)}, datatype=numpy.str) + + #str RW attribute + sysContact_obj_R = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "sysContact"}, datatype=numpy.str) + sysContact_obj_RW = attribute_wrapper(comms_annotation={"mib": "SNMPv2-MIB", "name": "sysContact"}, datatype=numpy.str, access=AttrWriteType.READ_WRITE) + + + + # -------- + # overloaded functions + # -------- + def configure_for_initialise(self): + """ user code here. is called when the state is set to STANDBY """ + + # set up the SNMP ua client + self.snmp_manager = SNMP_client(self.SNMP_community, self.SNMP_host, self.SNMP_timeout, self.Fault, self) + + # map an access helper class + for i in self.attr_list(): + try: + i.set_comm_client(self.snmp_manager) + except Exception as e: + # use the pass function instead of setting read/write fails + i.set_pass_func() + logger.warning("error while setting the SNMP attribute {} read/write function. {}".format(i, e)) + + self.snmp_manager.start() + + +# -------- +# Commands +# -------- + + +# ---------- +# Run server +# ---------- +def main(args=None, **kwargs): + """Main function of the module.""" + + from tangostationcontrol.common.lofar_logging import configure_logger + configure_logger() + + return run((SNMP,), args=args, **kwargs) diff --git a/tangostationcontrol/tangostationcontrol/examples/snmp/snmp.py b/tangostationcontrol/tangostationcontrol/examples/snmp/snmp.py deleted file mode 100644 index 3c962da9911abf9a0b9cbb4d7ecd4ff19c6e95d5..0000000000000000000000000000000000000000 --- a/tangostationcontrol/tangostationcontrol/examples/snmp/snmp.py +++ /dev/null @@ -1,120 +0,0 @@ -# -*- coding: utf-8 -*- -# -# This file is part of theRECV project -# -# -# -# Distributed under the terms of the APACHE license. -# See LICENSE.txt for more info. - -""" SNMP Device for LOFAR2.0 - -""" - -# PyTango imports -from tango.server import run -from tango.server import device_property -from tango import AttrWriteType - -# Additional import -from tangostationcontrol.examples.snmp.snmp_client import SNMP_client -from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper -from tangostationcontrol.devices.lofar_device import lofar_device - -import numpy - -import logging -logger = logging.getLogger() - -__all__ = ["SNMP", "main"] - - -class SNMP(lofar_device): - """ - - **Properties:** - - - Device Property - SNMP_community - - Type:'DevString' - SNMP_host - - Type:'DevULong' - SNMP_timeout - - Type:'DevDouble' - """ - - # ----------------- - # Device Properties - # ----------------- - - SNMP_community = device_property( - dtype='DevString', - mandatory=True - ) - - SNMP_host = device_property( - dtype='DevString', - mandatory=True - ) - - SNMP_timeout = device_property( - dtype='DevDouble', - mandatory=True - ) - - # ---------- - # Attributes - # ---------- - - sys_description_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.1.0"}, datatype=numpy.str) - sys_objectID_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.2.0", "type": "OID"}, datatype=numpy.str) - sys_uptime_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.3.0", "type": "TimeTicks"}, datatype=numpy.int64) - sys_name_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.5.0"}, datatype=numpy.str) - ip_route_mask_127_0_0_1_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.4.21.1.11.127.0.0.1", "type": "IpAddress"}, datatype=numpy.str) - TCP_active_open_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.6.5.0", "type": "Counter32"}, datatype=numpy.int64) - - sys_contact_RW = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str, access=AttrWriteType.READ_WRITE) - sys_contact_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str) - - TCP_Curr_estab_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.6.9.0", "type": "Gauge"}, datatype=numpy.int64) - - # inferred spectrum - if_index_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.2.2.1.1"}, dims=(10,), datatype=numpy.int64) - - - # -------- - # overloaded functions - # -------- - def configure_for_initialise(self): - """ user code here. is called when the state is set to STANDBY """ - - # set up the SNMP ua client - self.snmp_manager = SNMP_client(self.SNMP_community, self.SNMP_host, self.SNMP_timeout, self.Fault, self) - - # map an access helper class - for i in self.attr_list(): - try: - i.set_comm_client(self.snmp_manager) - except Exception as e: - # use the pass function instead of setting read/write fails - i.set_pass_func() - logger.warning("error while setting the SNMP attribute {} read/write function. {}".format(i, e)) - - self.snmp_manager.start() - - -# -------- -# Commands -# -------- - - -# ---------- -# Run server -# ---------- -def main(args=None, **kwargs): - """Main function of the module.""" - - from tangostationcontrol.common.lofar_logging import configure_logger - configure_logger() - - return run((SNMP,), args=args, **kwargs) diff --git a/tangostationcontrol/tangostationcontrol/examples/snmp/snmp_client.py b/tangostationcontrol/tangostationcontrol/examples/snmp/snmp_client.py deleted file mode 100644 index 9a0919457bb692c614a5edc6f425664202435d9b..0000000000000000000000000000000000000000 --- a/tangostationcontrol/tangostationcontrol/examples/snmp/snmp_client.py +++ /dev/null @@ -1,163 +0,0 @@ - -from tangostationcontrol.clients.comms_client import CommClient - -import snmp - -import numpy -import logging - -logger = logging.getLogger() - -__all__ = ["SNMP_client"] - - -snmp_to_numpy_dict = { - snmp.types.INTEGER: numpy.int64, - snmp.types.TimeTicks: numpy.int64, - snmp.types.OCTET_STRING: numpy.str, - snmp.types.OID: numpy.str, - snmp.types.Counter32: numpy.int64, - snmp.types.Gauge32: numpy.int64, - snmp.types.IpAddress: numpy.str, -} - -snmp_types = { - "Integer": numpy.int64, - "Gauge": numpy.int64, - "TimeTick": numpy.int64, - "Counter32": numpy.int64, - "OctetString": numpy.str, - "IpAddress": numpy.str, - "OID": numpy.str, -} - - -class SNMP_client(CommClient): - """ - messages to keep a check on the connection. On connection failure, reconnects once. - """ - - def start(self): - super().start() - - def __init__(self, community, host, timeout, fault_func, try_interval=2): - """ - Create the SNMP and connect() to it - """ - super().__init__(fault_func, try_interval) - - self.community = community - self.host = host - self.manager = snmp.Manager(community=bytes(community, "utf8")) - - # Explicitly connect - if not self.connect(): - # hardware or infra is down -- needs fixing first - fault_func() - return - - def connect(self): - """ - Try to connect to the client - """ - logger.debug(f"Connecting to community: {self.community}, host: {self.host}") - - self.connected = True - return True - - def ping(self): - """ - ping the client to make sure the connection with the client is still functional. - """ - pass - - def _setup_annotation(self, annotation): - """ - This class's Implementation of the get_mapping function. returns the read and write functions - """ - - if isinstance(annotation, dict): - # check if required path inarg is present - if annotation.get('oids') is None: - ValueError("SNMP get attributes require an oid") - oids = annotation.get("oids") # required - else: - TypeError("SNMP attributes require a dict with oid(s)") - return - - dtype = annotation.get('type', None) - - return oids, dtype - - def setup_value_conversion(self, attribute): - """ - gives the client access to the attribute_wrapper object in order to access all data it could potentially need. - """ - - dim_x = attribute.dim_x - dim_y = attribute.dim_y - dtype = attribute.numpy_type - - return dim_x, dim_y, dtype - - def get_oids(self, x, y, in_oid): - - if x == 0: - x = 1 - if y == 0: - y = 1 - - nof_oids = x * y - - if nof_oids == 1: - # is scalar - if type(in_oid) is str: - # for ease of handling put single oid in a 1 element list - in_oid = [in_oid] - return in_oid - - elif type(in_oid) is list and len(in_oid) == nof_oids: - # already is an array and of the right length - return in_oid - elif type(in_oid) is list and len(in_oid) != nof_oids: - # already is an array but the wrong length. Unable to handle this - raise ValueError("SNMP oids need to either be a single value or an array the size of the attribute dimensions. got: {} expected: {}x{}={}".format(len(in_oid),x,y,x*y)) - else: - - return ["{}.{}".format(in_oid, i + 1) for i in range(nof_oids)] - - - def setup_attribute(self, annotation, attribute): - """ - MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions - """ - - # process the annotation - oids, dtype = self._setup_annotation(annotation) - - # get all the necessary data to set up the read/write functions from the attribute_wrapper - dim_x, dim_y, numpy_type = self.setup_value_conversion(attribute) - oids = self.get_oids(dim_x, dim_y, oids) - - def _read_function(): - vars = self.manager.get(self.host, *oids) - return [snmp_to_numpy_dict[type(i.value)](str(i.value)) for i in vars] - - if dtype is not None: - def _write_function(value): - if len(oids) == 1 and type(value) != list: - value = [value] - - for i in range(len(oids)): - self.manager.set(self.host, oids[i], snmp_types[dtype](value[i])) - else: - def _write_function(value): - if len(oids) == 1 and type(value) != list: - value = [value] - - for i in range(len(oids)): - self.manager.set(self.host, oids[i], value[i]) - - - # return the read/write functions - return _read_function, _write_function diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py b/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py new file mode 100644 index 0000000000000000000000000000000000000000..f061e38cedc7cefefeb72976454edecd7b647259 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py @@ -0,0 +1,242 @@ +from pysnmp import hlapi +import numpy + +from unittest import mock + +from tangostationcontrol.test import base + +from tangostationcontrol.clients.snmp_client import SNMP_client, snmp_attribute, annotation_wrapper + + +class server_imitator: + # conversion dict + snmp_to_numpy_dict = { + hlapi.Integer32: numpy.int64, + hlapi.TimeTicks: numpy.int64, + str: str, + hlapi.Counter32: numpy.int64, + hlapi.Gauge32: numpy.int64, + hlapi.IpAddress: str, + } + + # shortcut for testing dimensionality + dim_list = { + "scalar": (1, 0), + "spectrum": (4, 0), + } + + def get_return_val(self, snmp_type : type, dims : tuple): + """ + provides the return value for the set/get functions that an actual server would return. + """ + + if dims == self.dim_list["scalar"]: + if snmp_type is hlapi.ObjectIdentity: + read_val = (None, snmp_type("1.3.6.1.2.1.1.1.0")) + elif snmp_type is hlapi.IpAddress: + read_val = (None, snmp_type("1.1.1.1")) + else: + read_val = (None, snmp_type(1)) + + + elif dims == self.dim_list["spectrum"]: + if snmp_type is hlapi.ObjectIdentity: + read_val = [] + for _i in range(dims[0]): + read_val.append((None, snmp_type(f"1.3.6.1.2.1.1.1.0.1"))) + elif snmp_type is hlapi.IpAddress: + read_val = [] + for _i in range(dims[0]): + read_val.append((None, snmp_type(f"1.1.1.1"))) + else: + read_val = [] + for _i in range(dims[0]): + read_val.append((None, snmp_type(1))) + else: + raise Exception("Image not yet supported :(") + + return read_val + + + def val_check(self, snmp_type : type, dims : tuple): + """ + provides the values we expect and would provide to the attribute after converting the + """ + + if dims == self.dim_list["scalar"]: + if snmp_type is hlapi.ObjectIdentity: + check_val = "1.3.6.1.2.1.1.1.0.1" + elif snmp_type is hlapi.IpAddress: + check_val = "1.1.1.1" + elif snmp_type is str: + check_val = "1" + else: + check_val = 1 + elif dims == self.dim_list["spectrum"]: + if snmp_type is hlapi.ObjectIdentity: + check_val = ["1.3.6.1.2.1.1.1.0.1"] * dims[0] + + elif snmp_type is hlapi.IpAddress: + check_val = ["1.1.1.1"] * dims[0] + elif snmp_type is str: + check_val = ["1"] * dims[0] + else: + check_val = [1] * dims[0] + else: + raise Exception("Image not yet supported :(") + + return check_val + +class TestSNMP(base.TestCase): + + + def test_annotation_success(self): + """ + unit test for the processing of annotation. Has 2 lists. 1 with things that should succeed and 1 with things that should fail. + """ + + client = SNMP_client(community='public', host='localhost', timeout=10, fault_func=None, try_interval=2) + + test_list = [ + # test name nad MIB type annotation + {"mib": "SNMPv2-MIB", "name": "sysDescr"}, + + # test name nad MIB type annotation with index + {"mib": "RFC1213-MIB", "name": "ipAdEntAddr", "index": (127, 0, 0, 1)}, + {"mib": "random-MIB", "name": "aName", "index": 2}, + + #oid + {"oids": "1.3.6.1.2.1.2.2.1.2.31"} + ] + + + for i in test_list: + wrapper = client._setup_annotation(annotation=i) + + if wrapper.oids is not None: + self.assertEqual(wrapper.oids, i["oids"]) + + else: + self.assertEqual(wrapper.mib, i["mib"], f"expected mib with: {i['mib']}, got: {wrapper.idx} from: {i}") + self.assertEqual(wrapper.name, i["name"], f"expected name with: {i['name']}, got: {wrapper.idx} from: {i}") + self.assertEqual(wrapper.idx, i.get('index', 0), f"expected idx with: {i.get('index', 0)}, got: {wrapper.idx} from: {i}") + + + def test_annotation_fail(self): + """ + unit test for the processing of annotation. Has 2 lists. 1 with things that should succeed and 1 with things that should fail. + """ + + client = SNMP_client(community='public', host='localhost', timeout=10, fault_func=None, try_interval=2) + + fail_list = [ + # OIDS cant use the index + {"oids": "1.3.6.1.2.1.2.2.1.2.31", "index": 2}, + # mixed annotation is not allowed + {"oids": "1.3.6.1.2.1.2.2.1.2.31", "name": "thisShouldFail"}, + # no 'name' + {"mib": "random-MIB", "index": 2}, + ] + + for i in fail_list: + with self.assertRaises(ValueError): + client._setup_annotation(annotation=i) + + def test_oids_scalar(self): + + test_oid = "1.1.1.1" + + server = server_imitator() + + x, y = server.dim_list['scalar'] + + # we just need the object to call another function + wrapper = annotation_wrapper(annotation = {"oids": "Not None lol"}) + # scalar + scalar_expected = [test_oid] + ret_oids = wrapper._get_oids(x, y, test_oid) + self.assertEqual(ret_oids, scalar_expected, f"Expected: {scalar_expected}, got: {ret_oids}") + + def test_oids_spectrum(self): + """ + Tests the "get_oids" function, which is for getting lists of sequential oids. + + Results should basically be an incrementing list of oids with the final number incremented by 1 each time. + So "1.1" with dims of 3x1 might become ["1.1.1", "1.1.2", "1.1.3"] + """ + server = server_imitator() + + test_oid = "1.1.1.1" + x, y = server.dim_list['spectrum'] + + # we just need the object to call another function + wrapper = annotation_wrapper(annotation={"oids": "Not None lol"}) + + # spectrum + spectrum_expected = [test_oid + ".1", test_oid + ".2", test_oid + ".3", test_oid + ".4"] + ret_oids = wrapper._get_oids(x, y, test_oid) + self.assertListEqual(ret_oids, spectrum_expected, f"Expected: {spectrum_expected}, got: {ret_oids}") + + @mock.patch('pysnmp.hlapi.ObjectIdentity') + @mock.patch('pysnmp.hlapi.ObjectType') + @mock.patch('tangostationcontrol.clients.snmp_client.snmp_attribute.next_wrap') + def test_snmp_obj_get(self, m_next, m_obj_T, m_obj_i): + """ + Attempts to read a fake SNMP variable and checks whether it got what it expected + """ + + server = server_imitator() + + for j in server.dim_list: + for i in server.snmp_to_numpy_dict: + m_next.return_value = (None, None, None, server.get_return_val(i, server.dim_list[j])) + + m_client = mock.Mock() + + + wrapper = annotation_wrapper(annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"}) + snmp_attr = snmp_attribute(client=m_client, wrapper=wrapper, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1]) + + val = snmp_attr.read_function() + + checkval = server.val_check(i, server.dim_list[j]) + self.assertEqual(checkval, val, f"Expected: {checkval}, got: {val}") + + @mock.patch('pysnmp.hlapi.ObjectIdentity') + @mock.patch('pysnmp.hlapi.setCmd') + @mock.patch('tangostationcontrol.clients.snmp_client.snmp_attribute.next_wrap') + def test_snmp_obj_set(self, m_next, m_nextCmd, m_obj_i): + """ + Attempts to write a value to an SNMP server, but instead intercepts it and compared whether the values is as expected. + """ + server = server_imitator() + + + for j in server.dim_list: + for i in server.snmp_to_numpy_dict: + m_next.return_value = (None, None, None, server.get_return_val(i, server.dim_list[j])) + + m_client = mock.Mock() + set_val = server.val_check(i, server.dim_list[j]) + + wrapper = annotation_wrapper(annotation={"oids": "1.3.6.1.2.1.2.2.1.2.31"}) + snmp_attr = snmp_attribute(client=m_client, wrapper=wrapper, dtype=server.snmp_to_numpy_dict[i], dim_x=server.dim_list[j][0], dim_y=server.dim_list[j][1]) + + res_lst = [] + def test(*value): + res_lst.append(value[1]) + return None, None, None, server.get_return_val(i, server.dim_list[j]) + + hlapi.ObjectType = test + + snmp_attr.write_function(set_val) + + if len(res_lst) == 1: + res_lst = res_lst[0] + + checkval = server.val_check(i, server.dim_list[j]) + self.assertEqual(checkval, res_lst, f"Expected: {checkval}, got: {res_lst}") + + + +