Skip to content
Snippets Groups Projects
Commit 36d779bf authored by Auke Klazema's avatar Auke Klazema
Browse files

L2SS-873: Reduce code complexity according xenon

parent db36f10e
Branches
Tags
1 merge request!398Resolve L2SS-873
......@@ -179,56 +179,56 @@ class snmp_attribute:
errorIndication, errorStatus, errorIndex, *varBinds = self.comm.setter(write_obj)
def convert(self, varBinds):
def convert(self, var_binds):
"""
get all the values in a list, make sure to convert specific types that dont want to play nicely
"""
vals = []
varBinds = varBinds[0]
for varBind in varBinds:
varBinds = var_binds[0]
# Some MIB's used custom types, some dont. Custom types are merely wrapped base types.
varbind_types = varBind[1].__class__.__bases__ + (type(varBind[1]),)
for var_bind in varBinds:
value = self._convert_var_bind(var_bind)
vals.append(value)
snmp_type = None
if self.is_scalar:
vals = vals[0]
# find if one of the base types is present.
for i in varbind_types:
if i in snmp_to_numpy_dict.keys():
snmp_type = i
return vals
if snmp_type is None:
raise TypeError(f"Error: did not find a valid snmp type. Got: {varbind_types}, expected one of: '{snmp_to_numpy_dict.keys()}'")
def _convert_var_bind(self, var_bind):
# Some MIB's used custom types, some dont. Custom types are merely wrapped base types.
varbind_types = var_bind[1].__class__.__bases__ + (type(var_bind[1]),)
if snmp_type is hlapi.IpAddress:
# IpAddress values get printed as their raw value but in hex (7F 20 20 01 for 127.0.0.1 for example)
vals.append(varBind[1].prettyPrint())
snmp_type = None
elif (snmp_type is hlapi.Integer32 or snmp_type is hlapi.Integer) and self.dtype == str:
# Integers can have 'named values', Where a value can be translated to a specific name. A dict basically
# Example: {1: "other", 2: "invalid", 3: "dynamic", 4: "static",}
# find if one of the base types is present.
for i in varbind_types:
if i in snmp_to_numpy_dict.keys():
snmp_type = i
if varBind[1].namedValues == {}:
# An empty dict {} means no namedValue's are present.
vals.append(snmp_to_numpy_dict[snmp_type](varBind[1]))
else:
# append the named values string instead of the raw number.
vals.append(varBind[1].prettyPrint())
else:
# convert from the funky pysnmp types to numpy types and then append
value = snmp_to_numpy_dict[snmp_type](varBind[1])
if snmp_type is None:
raise TypeError(f"Error: did not find a valid snmp type. Got: {varbind_types}, expected one of: '{snmp_to_numpy_dict.keys()}'")
# scale the value correctly and append.
vals.append(value * self.scaling_factor)
if snmp_type is hlapi.IpAddress:
# IpAddress values get printed as their raw value but in hex (7F 20 20 01 for 127.0.0.1 for example)
value = var_bind[1].prettyPrint()
if self.is_scalar:
vals = vals[0]
elif (snmp_type is hlapi.Integer32 or snmp_type is hlapi.Integer) and self.dtype == str:
# Integers can have 'named values', Where a value can be translated to a specific name. A dict basically
# Example: {1: "other", 2: "invalid", 3: "dynamic", 4: "static",}
return vals
if var_bind[1].namedValues == {}:
# An empty dict {} means no namedValue's are present.
value = snmp_to_numpy_dict[snmp_type](var_bind[1])
else:
# append the named values string instead of the raw number.
value = var_bind[1].prettyPrint()
else:
# convert from the funky pysnmp types to numpy types and then append
value = snmp_to_numpy_dict[snmp_type](var_bind[1]) * self.scaling_factor
return value
class mib_loader:
......
......@@ -81,31 +81,33 @@ def _start_loop(receiver, writer, reconnect, filename, device):
"""Main loop"""
try:
while True:
try:
packet = receiver.get_packet()
writer.next_packet(packet, device)
except EOFError:
if reconnect and not filename:
logger.warning("Connection lost, attempting to reconnect")
while True:
try:
receiver.reconnect()
except Exception as e:
logger.warning(f"Could not reconnect: {e.__class__.__name__}: {e}")
time.sleep(10)
else:
break
logger.warning("Reconnected! Resuming operations")
else:
logger.info("End of input.")
raise SystemExit
_receive_packets(receiver, writer, reconnect, filename, device)
except KeyboardInterrupt:
# user abort, don't complain
logger.warning("Received keyboard interrupt. Stopping.")
finally:
writer.close_writer()
def _receive_packets(receiver, writer, reconnect, filename, device):
try:
packet = receiver.get_packet()
writer.next_packet(packet, device)
except EOFError:
if reconnect and not filename:
logger.warning("Connection lost, attempting to reconnect")
while True:
try:
receiver.reconnect()
except Exception as e:
logger.warning(f"Could not reconnect: {e.__class__.__name__}: {e}")
time.sleep(10)
else:
break
logger.warning("Reconnected! Resuming operations")
else:
logger.info("End of input.")
raise SystemExit
def main():
parser = _create_parser()
......
......@@ -31,39 +31,46 @@ class server_imitator:
"""
if dims == self.DIM_LIST["scalar"]:
if snmp_type is hlapi.ObjectIdentity:
read_val = [(snmp_type("1.3.6.1.2.1.1.1.0"),)]
elif snmp_type is hlapi.IpAddress:
read_val = [(None, snmp_type("1.1.1.1"),)]
elif snmp_type is hlapi.OctetString:
read_val = [(None, snmp_type("1"),)]
else:
read_val = [(None, snmp_type(1),)]
read_val = self._get_return_val_for_scalar(snmp_type)
elif dims == self.DIM_LIST["spectrum"]:
if snmp_type is hlapi.ObjectIdentity:
read_val = []
for _i in range(dims[0]):
read_val.append((None, snmp_type(f"1.3.6.1.2.1.1.1.0.1")))
elif snmp_type is hlapi.IpAddress:
read_val = []
for _i in range(dims[0]):
read_val.append((None, snmp_type(f"1.1.1.1")))
elif snmp_type is hlapi.OctetString:
read_val = []
for _i in range(dims[0]):
read_val.append((None, snmp_type("1")))
else:
read_val = []
for _i in range(dims[0]):
read_val.append((None, snmp_type(1)))
read_val = self._get_return_val_for_spectrum(snmp_type, dims)
else:
raise Exception("Image not supported :(")
return read_val
def _get_return_val_for_scalar(self, snmp_type : type):
if snmp_type is hlapi.ObjectIdentity:
read_val = [(snmp_type("1.3.6.1.2.1.1.1.0"),)]
elif snmp_type is hlapi.IpAddress:
read_val = [(None, snmp_type("1.1.1.1"),)]
elif snmp_type is hlapi.OctetString:
read_val = [(None, snmp_type("1"),)]
else:
read_val = [(None, snmp_type(1),)]
return read_val
def _get_return_val_for_spectrum(self, snmp_type : type, dims : tuple):
if snmp_type is hlapi.ObjectIdentity:
read_val = []
for _i in range(dims[0]):
read_val.append((None, snmp_type(f"1.3.6.1.2.1.1.1.0.1")))
elif snmp_type is hlapi.IpAddress:
read_val = []
for _i in range(dims[0]):
read_val.append((None, snmp_type(f"1.1.1.1")))
elif snmp_type is hlapi.OctetString:
read_val = []
for _i in range(dims[0]):
read_val.append((None, snmp_type("1")))
else:
read_val = []
for _i in range(dims[0]):
read_val.append((None, snmp_type(1)))
return read_val
def val_check(self, snmp_type : type, dims : tuple):
"""
provides the values we expect and would provide to the attribute after converting the
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment