Skip to content
Snippets Groups Projects
Commit 9592705a authored by Taya Snijder's avatar Taya Snijder
Browse files

reading and writing now works for SNMP with arbitrary types

parent e8f09f33
No related branches found
No related tags found
1 merge request!26Resolve #2021 "04 16 branched from master snmp device"
......@@ -15,11 +15,11 @@
from tango.server import run
from tango.server import device_property
from tango import AttrWriteType
# Additional import
# Additional import
from clients.SNMP_client import SNMP_client
from src.attribute_wrapper import attribute_wrapper
from src.hardware_device import hardware_device
from util.attribute_wrapper import attribute_wrapper
from util.hardware_device import hardware_device
import numpy
......@@ -64,13 +64,13 @@ class SNMP(hardware_device):
# ----------
sys_description_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.1.0"}, datatype=numpy.str_)
sys_objectID_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.2.0", "type": "OID"}, datatype=numpy.int64)
sys_uptime_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.3.0", "type": "TimeTicks"}, datatype=numpy.str_)
sys_objectID_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.2.0", "type": "OID"}, datatype=numpy.str_)
sys_uptime_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.3.0", "type": "TimeTicks"}, datatype=numpy.int64)
sys_name_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.5.0"}, datatype=numpy.str_)
ip_route_mask_127_0_0_1_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.4.21.1.11.127.0.0.1", "type": "IpAddress"}, datatype=numpy.str_)
TCP_active_open_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.6.5.0", "type": "Counter32"}, datatype=numpy.str_)
TCP_active_open_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.6.5.0", "type": "Counter32"}, datatype=numpy.int64)
sys_contact_RW = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str_, access= AttrWriteType.READ_WRITE)
sys_contact_RW = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str_, access=AttrWriteType.READ_WRITE)
sys_contact_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str_)
TCP_Curr_estab_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.6.9.0", "type": "Gauge"}, datatype=numpy.int64)
......@@ -78,6 +78,8 @@ class SNMP(hardware_device):
# inferred spectrum
if_index_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.2.2.1.1"}, dims=(10,), datatype=numpy.int64)
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
pass
......@@ -97,7 +99,7 @@ class SNMP(hardware_device):
# --------
# overloaded functions
# --------
def initialise(self):
def configure_for_initialise(self):
""" user code here. is called when the state is set to STANDBY """
# set up the SNMP ua client
......
from util.comms_client import CommClient
import snmp
import numpy
import traceback
__all__ = ["SNMP_client"]
snmp_to_numpy_dict = {
snmp.types.INTEGER: numpy.int64,
snmp.types.TimeTicks: numpy.int64,
snmp.types.OCTET_STRING: numpy.str_,
snmp.types.OID: numpy.str_,
snmp.types.Counter32: numpy.int64,
snmp.types.Gauge32: numpy.int64,
snmp.types.IpAddress: numpy.str_,
}
snmp_types = {
"Integer": numpy.int64,
"Gauge": numpy.int64,
"TimeTick": numpy.int64,
"Counter32": numpy.int64,
"OctetString": numpy.str_,
"IpAddress": numpy.str_,
"OID": numpy.str_,
}
# numpy_to_snmp_dict = {
# numpy.int64,
# numpy.int64,
# str,
# }
class SNMP_client(CommClient):
"""
messages to keep a check on the connection. On connection failure, reconnects once.
"""
def start(self):
super().start()
def __init__(self, community, host, timeout, fault_func, streams, try_interval=2):
"""
Create the SNMP and connect() to it
"""
super().__init__(fault_func, streams, try_interval)
self.community = community
self.host = host
self.manager = snmp.Manager(community=bytes(community, "utf8"))
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
def connect(self):
"""
Try to connect to the client
"""
self.streams.debug_stream("Connecting to community: %s, host: %s", self.community, self.host)
self.connected = True
return True
def ping(self):
"""
ping the client to make sure the connection with the client is still functional.
"""
pass
def _setup_annotation(self, annotation):
"""
This class's Implementation of the get_mapping function. returns the read and write functions
"""
if isinstance(annotation, dict):
# check if required path inarg is present
if annotation.get('oids') is None:
AssertionError("SNMP get attributes require an oid")
oids = annotation.get("oids") # required
else:
TypeError("SNMP attributes require a dict with oid(s)")
return
if annotation.get('type') is not None:
dtype = annotation.get("type") # required
# actual_type = snmp_types[dtype]
else:
dtype = None
return oids, dtype
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
"""
dim_x = attribute.dim_x
dim_y = attribute.dim_y
dtype = attribute.numpy_type
return dim_x, dim_y, dtype
def get_oids(self, x, y, in_oid):
if x == 0:
x = 1
if y == 0:
y = 1
nof_oids = x * y
if nof_oids == 1:
# is scalar
if type(in_oid) is str:
# for ease of handling put single oid in a 1 element list
in_oid = [in_oid]
return in_oid
elif type(in_oid) is list and len(in_oid) == nof_oids:
# already is an array and of the right length
return in_oid
elif type(in_oid) is list and len(in_oid) != nof_oids:
# already is an array but the wrong length. Unable to handle this
raise ValueError("SNMP oids need to either be a single value or an array the size of the attribute dimensions. got: {} expected: {}x{}={}".format(len(in_oid),x,y,x*y))
else:
out_oids = []
for i in range(nof_oids):
out_oids.append(in_oid + ".{}".format(i+1))
return out_oids
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
# process the annotation
oids, dtype = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dim_x, dim_y, numpy_type = self.setup_value_conversion(attribute)
oids = self.get_oids(dim_x, dim_y, oids)
if dtype is None:
dtype = numpy_type
def _read_function():
vars = self.manager.get(self.host, *oids)
value = []
for i in vars:
val = snmp_to_numpy_dict[type(i.value)](str(i.value))
value.append(val)
return value
if dtype is None:
def _write_function(value):
if len(oids) == 1 and type(value) != list:
value = [value]
else:
for i in range(len(oids)):
self.manager.set(self.host, oids[i], snmp_types[dtype](value[i]))
else:
def _write_function(value):
if len(oids) == 1 and type(value) != list:
value = [value]
else:
for i in range(len(oids)):
self.manager.set(self.host, oids[i], value[i])
# return the read/write functions
return _read_function, _write_function
from src.comms_client import CommClient
import configparser
import numpy
numpy_to_ini_dict = {
numpy.int64: int,
numpy.double: float,
numpy.bool_: bool,
str: str
}
ini_to_numpy_dict = {
int: numpy.int64,
float: numpy.double,
bool: numpy.bool_,
str: str
}
import os
class ini_client(CommClient):
"""
this class provides an example implementation of a comms_client.
Durirng initialisation it creates a correctly shaped zero filled value. on read that value is returned and on write its modified.
"""
def start(self):
super().start()
def __init__(self, filename, fault_func, streams, try_interval=2):
"""
initialises the class and tries to connect to the client.
"""
self.config = configparser.ConfigParser()
self.filename = filename
if not filename.endswith(".ini"):
filename = filename + ".ini"
super().__init__(fault_func, streams, try_interval)
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
def connect(self):
files_path = [os.path.abspath(x) for x in os.listdir()]
self.streams.debug_stream(" %s", files_path)
self.config_file = open(self.filename, "rw")
self.connected = True # set connected to true
return True # if succesfull, return true. otherwise return false
def disconnect(self):
self.connected = False # always force a reconnect, regardless of a successful disconnect
self.streams.debug_stream("disconnected from the 'client' ")
def _setup_annotation(self, annotation):
"""
this function gives the client access to the comm client annotation data given to the attribute wrapper.
The annotation data can be used to provide whatever extra data is necessary in order to find/access the monitor/control point.
the annotation can be in whatever format may be required. it is up to the user to handle its content
example annotation may include:
- a file path and file line/location
- COM object path
"""
# as this is an example, just print the annotation
self.streams.debug_stream("annotation: {}".format(annotation))
name = annotation.get('name')
if name is None:
AssertionError("ini client requires a variable name to set/get")
section = annotation.get('section')
if section is None:
AssertionError("requires a section to open")
return section, name
def _setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all
necessary data such as dimensionality and data type
"""
if attribute.dim_y > 1:
dims = (attribute.dim_y, attribute.dim_x)
else:
dims = (attribute.dim_x,)
dtype = attribute.numpy_type
return dims, dtype
def _setup_mapping(self, name, section, dtype):
"""
takes all gathered data to configure and return the correct read and write functions
"""
def read_function():
value = self.config.get(section, name)
value = ini_to_numpy_dict[dtype](value)
return value
def write_function(write_value):
self.config.set(section, name, write_value)
fp = open(self.filename, 'w')
self.config.write(fp)
return read_function, write_function
def setup_attribute(self, annotation=None, attribute=None):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions.
must return the read and write functions
"""
# process the comms_annotation
section, name = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dims, dtype = self._setup_value_conversion(attribute)
# configure and return the read/write functions
read_function, write_function = self._setup_mapping(name, section, dtype)
# return the read/write functions
return read_function, write_function
def write_config():
config = configparser.ConfigParser()
config['scalar'] = {}
config['scalar']['double_scalar'] = '1.2'
config['scalar']['double_scalar'] = '3.4'
config['scalar']['bool_scalar'] = 'True'
config['scalar']['bool_scalar'] = 'False'
config['scalar']['int_scalar'] = '5'
config['scalar']['int_scalar'] = '6'
config['scalar']['str_scalar'] = 'this is'
config['scalar']['str_scalar'] = 'a test'
config['spectrum'] = {}
config['spectrum']['double_scalar'] = '[1.2, 2.3, 3.4]'
config['spectrum']['double_scalar'] = '[5.6, 6.7, 7.8]'
config['spectrum']['bool_scalar'] = '[True, True, False]'
config['spectrum']['bool_scalar'] = '[False, False, True]'
config['spectrum']['int_scalar'] = '[5'
config['spectrum']['int_scalar'] = '[6,7,8,9]'
config['spectrum']['str_scalar'] = '["a", "b", "c"]'
config['spectrum']['str_scalar'] = '["D", "E", "F"]'
with open('example.ini', 'w') as configfile:
config.write(configfile)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment