Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
SNMP_client.py 4.37 KiB
from src.comms_client import *
import snmp
import numpy
import traceback

__all__ = ["SNMP_client"]


snmp_to_numpy_dict = {
    snmp.types.INTEGER: numpy.int64,
    snmp.types.TimeTicks: numpy.int64,
    snmp.types.OCTET_STRING: str,
    snmp.types.OID: str
}

snmp_types = {
    "Integer": numpy.int64,
    "Gauge": numpy.int64,
    "TimeTick": numpy.int64,
    "Counter32": numpy.int64,
    "OctetString": str,
    "IpAddress": str,
    "OID": str,
}


# numpy_to_snmp_dict = {
#     numpy.int64,
#     numpy.int64,
#     str,
# }


class SNMP_client(CommClient):
    """
        messages to keep a check on the connection. On connection failure, reconnects once.
    """

    def start(self):
        super().start()

    def __init__(self, community, host, timeout, fault_func, streams, try_interval=2):
        """
        Create the SNMP and connect() to it
        """
        super().__init__(fault_func, streams, try_interval)

        self.community = community
        self.host = host
        self.manager = snmp.Manager(community=bytes(community, "utf8"))

        # Explicitly connect
        if not self.connect():
            # hardware or infra is down -- needs fixing first
            fault_func()
            return

    def connect(self):
        """
        Try to connect to the client
        """
        self.streams.debug_stream("Connecting to community: %s, host: %s", self.community, self.host)

        self.connected = True
        return True

    def ping(self):
        """
        ping the client to make sure the connection with the client is still functional.
        """
        pass

    def _setup_annotation(self, annotation):
        """
        This class's Implementation of the get_mapping function. returns the read and write functions
        """

        if isinstance(annotation, dict):
            # check if required path inarg is present
            if annotation.get('oids') is None:
                AssertionError("SNMP get attributes require an oid")
            oids = annotation.get("oids")  # required
        else:
            TypeError("SNMP attributes require a dict with oid(s)")
            return

        return oids

    def setup_value_conversion(self, attribute):
        """
        gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
        """

        dim_x = attribute.dim_x
        dim_y = attribute.dim_y

        return dim_x, dim_y

    def get_oids(self, x, y, in_oid):

        if x == 0:
            x = 1
        if y == 0:
            y = 1

        nof_oids = x * y

        if nof_oids == 1:
            # is scalar
            if type(in_oid) is str:
                # for ease of handling put single oid in a 1 element list
                in_oid = [in_oid]
            return in_oid

        elif type(in_oid) is list and len(in_oid) == nof_oids:
            # already is an array and of the right length
            return in_oid
        elif type(in_oid) is list and len(in_oid) != nof_oids:
            # already is an array but the wrong length. Unable to handle this
            raise ValueError("SNMP oids need to either be a single value or an array the size of the attribute dimensions. got: {} expected: {}x{}={}".format(len(in_oid),x,y,x*y))
        else:
            out_oids = []

            for i in range(nof_oids):
                out_oids.append(in_oid + ".{}".format(i+1))

            return out_oids

    def setup_attribute(self, annotation, attribute):
        """
        MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
        """

        # process the annotation
        oids = self._setup_annotation(annotation)

        # get all the necessary data to set up the read/write functions from the attribute_wrapper
        dim_x, dim_y = self.setup_value_conversion(attribute)
        oids = self.get_oids(dim_x, dim_y, oids)

        def _read_function():
            vars = self.manager.get(self.host, *oids)

            value = []
            for i in vars:
                val = snmp_to_numpy_dict[type(i.value)](str(i.value))
                value.append(val)
            return value

        def _write_function(value):
            for i in range(len(oids)):
                    self.manager.set(self.host, oids[i], value[i])


        # return the read/write functions
        return _read_function, _write_function