Skip to content
Snippets Groups Projects
Commit bbc253bf authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch '2021-04-16-Branched_from_master-SNMP_device' into 'master'

Resolve #2021 "04 16 branched from master snmp device"

See merge request !26
parents e0d14a0b a6389778
Branches
Tags
1 merge request!26Resolve #2021 "04 16 branched from master snmp device"
# -*- coding: utf-8 -*-
#
# This file is part of the PCC project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" SNMP Device for LOFAR2.0
"""
# PyTango imports
from tango.server import run
from tango.server import device_property
from tango import AttrWriteType
# Additional import
from clients.SNMP_client import SNMP_client
from util.attribute_wrapper import attribute_wrapper
from util.hardware_device import hardware_device
import numpy
__all__ = ["SNMP", "main"]
class SNMP(hardware_device):
"""
**Properties:**
- Device Property
SNMP_community
- Type:'DevString'
SNMP_host
- Type:'DevULong'
SNMP_timeout
- Type:'DevDouble'
"""
# -----------------
# Device Properties
# -----------------
SNMP_community = device_property(
dtype='DevString',
mandatory=True
)
SNMP_host = device_property(
dtype='DevString',
mandatory=True
)
SNMP_timeout = device_property(
dtype='DevDouble',
mandatory=True
)
# ----------
# Attributes
# ----------
sys_description_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.1.0"}, datatype=numpy.str_)
sys_objectID_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.2.0", "type": "OID"}, datatype=numpy.str_)
sys_uptime_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.3.0", "type": "TimeTicks"}, datatype=numpy.int64)
sys_name_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.5.0"}, datatype=numpy.str_)
ip_route_mask_127_0_0_1_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.4.21.1.11.127.0.0.1", "type": "IpAddress"}, datatype=numpy.str_)
TCP_active_open_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.6.5.0", "type": "Counter32"}, datatype=numpy.int64)
sys_contact_RW = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str_, access=AttrWriteType.READ_WRITE)
sys_contact_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str_)
TCP_Curr_estab_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.6.9.0", "type": "Gauge"}, datatype=numpy.int64)
# inferred spectrum
if_index_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.2.2.1.1"}, dims=(10,), datatype=numpy.int64)
# --------
# overloaded functions
# --------
def configure_for_initialise(self):
""" user code here. is called when the state is set to STANDBY """
# set up the SNMP ua client
self.snmp_manager = SNMP_client(self.SNMP_community, self.SNMP_host, self.SNMP_timeout, self.Fault, self)
# map the attributes to the OPC ua comm client
for i in self.attr_list():
i.set_comm_client(self.snmp_manager)
self.snmp_manager.start()
# --------
# Commands
# --------
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the PCC module."""
return run((SNMP,), args=args, **kwargs)
if __name__ == '__main__':
main()
from util.comms_client import CommClient
import snmp
import numpy
import traceback
__all__ = ["SNMP_client"]
snmp_to_numpy_dict = {
snmp.types.INTEGER: numpy.int64,
snmp.types.TimeTicks: numpy.int64,
snmp.types.OCTET_STRING: numpy.str_,
snmp.types.OID: numpy.str_,
snmp.types.Counter32: numpy.int64,
snmp.types.Gauge32: numpy.int64,
snmp.types.IpAddress: numpy.str_,
}
snmp_types = {
"Integer": numpy.int64,
"Gauge": numpy.int64,
"TimeTick": numpy.int64,
"Counter32": numpy.int64,
"OctetString": numpy.str_,
"IpAddress": numpy.str_,
"OID": numpy.str_,
}
class SNMP_client(CommClient):
"""
messages to keep a check on the connection. On connection failure, reconnects once.
"""
def start(self):
super().start()
def __init__(self, community, host, timeout, fault_func, streams, try_interval=2):
"""
Create the SNMP and connect() to it
"""
super().__init__(fault_func, streams, try_interval)
self.community = community
self.host = host
self.manager = snmp.Manager(community=bytes(community, "utf8"))
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
def connect(self):
"""
Try to connect to the client
"""
self.streams.debug_stream("Connecting to community: %s, host: %s", self.community, self.host)
self.connected = True
return True
def ping(self):
"""
ping the client to make sure the connection with the client is still functional.
"""
pass
def _setup_annotation(self, annotation):
"""
This class's Implementation of the get_mapping function. returns the read and write functions
"""
if isinstance(annotation, dict):
# check if required path inarg is present
if annotation.get('oids') is None:
ValueError("SNMP get attributes require an oid")
oids = annotation.get("oids") # required
else:
TypeError("SNMP attributes require a dict with oid(s)")
return
dtype = annotation.get('type', None)
return oids, dtype
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
"""
dim_x = attribute.dim_x
dim_y = attribute.dim_y
dtype = attribute.numpy_type
return dim_x, dim_y, dtype
def get_oids(self, x, y, in_oid):
if x == 0:
x = 1
if y == 0:
y = 1
nof_oids = x * y
if nof_oids == 1:
# is scalar
if type(in_oid) is str:
# for ease of handling put single oid in a 1 element list
in_oid = [in_oid]
return in_oid
elif type(in_oid) is list and len(in_oid) == nof_oids:
# already is an array and of the right length
return in_oid
elif type(in_oid) is list and len(in_oid) != nof_oids:
# already is an array but the wrong length. Unable to handle this
raise ValueError("SNMP oids need to either be a single value or an array the size of the attribute dimensions. got: {} expected: {}x{}={}".format(len(in_oid),x,y,x*y))
else:
return ["{}.{}".format(in_oid, i + 1) for i in range(nof_oids)]
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
# process the annotation
oids, dtype = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dim_x, dim_y, numpy_type = self.setup_value_conversion(attribute)
oids = self.get_oids(dim_x, dim_y, oids)
def _read_function():
vars = self.manager.get(self.host, *oids)
return [snmp_to_numpy_dict[type(i.value)](str(i.value)) for i in vars]
if dtype is not None:
def _write_function(value):
if len(oids) == 1 and type(value) != list:
value = [value]
for i in range(len(oids)):
self.manager.set(self.host, oids[i], snmp_types[dtype](value[i]))
else:
def _write_function(value):
if len(oids) == 1 and type(value) != list:
value = [value]
for i in range(len(oids)):
self.manager.set(self.host, oids[i], value[i])
# return the read/write functions
return _read_function, _write_function
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment