From c1851723c075f49e95fee5f6e53d090f6948fa89 Mon Sep 17 00:00:00 2001 From: thijs snijder <snijder@astron.nl> Date: Fri, 7 May 2021 11:53:43 +0200 Subject: [PATCH] removed snmp library included in files --- devices/snmp/__init__.py | 14 - devices/snmp/exceptions.py | 10 - devices/snmp/mutex.py | 61 ---- devices/snmp/types.py | 491 ------------------------- devices/snmp/v1/__init__.py | 655 ---------------------------------- devices/snmp/v1/exceptions.py | 17 - devices/snmp/v1/notes.py | 68 ---- 7 files changed, 1316 deletions(-) delete mode 100644 devices/snmp/__init__.py delete mode 100644 devices/snmp/exceptions.py delete mode 100644 devices/snmp/mutex.py delete mode 100644 devices/snmp/types.py delete mode 100644 devices/snmp/v1/__init__.py delete mode 100644 devices/snmp/v1/exceptions.py delete mode 100644 devices/snmp/v1/notes.py diff --git a/devices/snmp/__init__.py b/devices/snmp/__init__.py deleted file mode 100644 index a6b8b3750..000000000 --- a/devices/snmp/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -from .v1 import SNMPv1 - -versions = { - 1: SNMPv1, -} - -def Manager(*args, version=1, **kwargs): - try: - cls = versions[version] - except KeyError as e: - msg = "'version' must be one of {}".format(list(versions.keys())) - raise ValueError(msg) from e - - return cls(*args, **kwargs) diff --git a/devices/snmp/exceptions.py b/devices/snmp/exceptions.py deleted file mode 100644 index 47933a8bc..000000000 --- a/devices/snmp/exceptions.py +++ /dev/null @@ -1,10 +0,0 @@ -# used to indicate that a string cannot be decoded because it violates encoding rules -class EncodingError(Exception): - pass - -# used to indicate that a response violates the protocol in some way -class ProtocolError(Exception): - pass - -class Timeout(Exception): - pass diff --git a/devices/snmp/mutex.py b/devices/snmp/mutex.py deleted file mode 100644 index 62de191f3..000000000 --- a/devices/snmp/mutex.py +++ /dev/null @@ -1,61 +0,0 @@ -__all__ = ['RWLock'] - -from threading import Lock - -# returns a pair of objects, (r, w), which constitute a -# writer-preferred reader/writer lock -def RWLock(): - r = Lock() - w = Lock() - return RLock(r, w), WLock(r, w) - - -class ContextLock: - def __enter__(self): - self.acquire() - - def __exit__(self, *args, **kwargs): - self.release() - -class RLock(ContextLock): - def __init__(self, r, w): - self.r = r - self.w = w - self.mutex = Lock() - self.queue = Lock() - self.count = 0 - - def acquire(self): - with self.queue: - with self.r: - with self.mutex: - if not self.count: - self.w.acquire() - self.count += 1 - - def release(self): - with self.mutex: - self.count -= 1 - if not self.count: - self.w.release() - -class WLock(ContextLock): - def __init__(self, r, w): - self.r = r - self.w = w - self.mutex = Lock() - self.count = 0 - - def acquire(self): - with self.mutex: - if not self.count: - self.r.acquire() - self.count += 1 - self.w.acquire() - - def release(self): - self.w.release() - with self.mutex: - self.count -= 1 - if not self.count: - self.r.release() diff --git a/devices/snmp/types.py b/devices/snmp/types.py deleted file mode 100644 index 3eb9b95be..000000000 --- a/devices/snmp/types.py +++ /dev/null @@ -1,491 +0,0 @@ -__all__ = [ - 'ASN1', 'INTEGER', 'OCTET_STRING', 'NULL', 'OID', 'SEQUENCE', 'UNSIGNED', - 'Counter32', 'Gauge32', 'TimeTicks', 'Integer32', 'Counter64', 'IpAddress', - 'VarBind', 'VarBindList', 'PDU', 'GetRequestPDU', 'GetNextRequestPDU', - 'GetResponsePDU', 'SetRequestPDU', 'Message', -] - -from copy import copy -import socket - -from .exceptions import EncodingError, ProtocolError - -def unpack(obj): - if len(obj) < 2: - raise EncodingError("object encoding is too short") - - dtype = obj[0] - l = obj[1] - - index = 2 - if l & 0x80: - index += l & 0x7f - - if len(obj) < index: - raise EncodingError("Long form length field is incomplete") - - l = 0 - for num in obj[2:index]: - l <<= 8 - l += num - - if len(obj) < index + l: - raise EncodingError("Invalid length field: object encoding too short") - - return dtype, obj[index:index+l], obj[index+l:] - -def length(l): - if l < 0x80: - return bytes([l]) - - bytearr = bytearray() - while l: - bytearr.append(l & 0xff) - l >>= 8 - - # this works as long as (l < 2^1008), which is super big - bytearr.append(len(bytearr) | 0x80) - - return bytes(reversed(bytearr)) - -class ASN1: - def __init__(self, value=None, encoding=None): - self._encoding = encoding - self._value = value - - def __repr__(self): - return "{}({})".format(self.__class__.__name__, self) - - @classmethod - def copy(cls, obj): - return cls(encoding=obj.encoding) - - @staticmethod - def deserialize(obj, cls=None, leftovers=False): - dtype, encoding, tail = unpack(obj) - if tail and not leftovers: - raise EncodingError("Unexpected trailing bytes") - - if cls is None: - try: - cls = types[dtype] - except KeyError as e: - message = "Unknown type: '0x{:02x}'".format(dtype) - raise ProtocolError(message) from e - - elif dtype != cls.TYPE: - message = "Expected type '0x{:02x}'; got '0x{:02x}'" - message = message.format(cls.TYPE, dtype) - raise ProtocolError(message) - - obj = cls(encoding=encoding) - - return (obj, tail) if leftovers else obj - - def serialize(self): - return bytes([self.TYPE]) + length(len(self.encoding)) + self.encoding - - # The following methods must be overwritten for sequence types - def __bool__(self): - return bool(self.value) - - def __eq__(self, other): - return self.value == other - - def __ge__(self, other): - return self.value >= other - - def __gt__(self, other): - return self.value > other - - def __le__(self, other): - return self.value <= other - - def __lt__(self, other): - return self.value < other - - def __ne__(self, other): - return self.value != other - - def __str__(self): - return repr(self.value) - - def poke(self): - self.value - -### Primitive types ### - -class INTEGER(ASN1): - SIGNED = True - - @property - def encoding(self): - if self._encoding is None: - encoding = bytearray() - x = self._value - - # do - while - while True: - encoding.append(x & 0xff) - x >>= 8 - if x in (0, -1): - break - - self._encoding = bytes(reversed(encoding)) - return self._encoding - - @property - def value(self): - if self._value is None: - negative = self.SIGNED and bool(self._encoding[0] & 0x80) - - x = 0 - for byte in self._encoding: - x <<= 8 - x |= byte - - if negative: - bits = 8 * len(self._encoding) - self._value = -(~x + (1 << bits) + 1) - else: - self._value = x - - return self._value - -class OCTET_STRING(ASN1): - @property - def encoding(self): - if self._encoding is None: - self._encoding = self._value - return self._encoding - - @property - def value(self): - if self._value is None: - self._value = self._encoding - return self._value - -class NULL(ASN1): - def __init__(self, value=None, encoding=None): - if encoding: - raise EncodingError("Non-null encoding for NULL type") - elif value is not None: - raise ValueError("NULL cannot have non-null value") - - def __str__(self): - return "" - - @property - def encoding(self): - return b'' - - @property - def value(self): - return None - -class OID(ASN1): - @property - def encoding(self): - if self._encoding is None: - if self._value[0] == '.': - self._value = self.value[1:] - - segments = [int(segment) for segment in self._value.split('.')] - - if len(segments) > 1: - segments[1] += segments[0] * 40 - segments = segments[1:] - - encoding = bytearray() - for num in segments: - bytearr = bytearray() - while num > 0x7f: - bytearr.append(num & 0x7f) - num >>= 7 - bytearr.append(num) - - for i in range(1, len(bytearr)): - bytearr[i] |= 0x80 - - bytearr.reverse() - encoding += bytearr - - self._encoding = bytes(encoding) - - return self._encoding - - @property - def value(self): - if self._value is None: - encoding = self._encoding - - first = encoding[0] - oid = [str(num) for num in divmod(first, 40)] - - val = 0 - for byte in encoding[1:]: - val |= byte & 0x7f - if byte & 0x80: - val <<= 7 - else: - oid.append(str(val)) - val = 0 - - if val: - raise EncodingError("OID ended in a byte with bit 7 set") - - self._value = '.'.join(oid) - - return self._value - -class SEQUENCE(ASN1): - EXPECTED = None - - def __init__(self, *values, encoding=None): - self.expected = copy(self.EXPECTED) - - self._encoding = encoding - self._values = values or None - - def __bool__(self): - return bool(self.values) - - def __eq__(self, other): - return self.values == other - - def __ge__(self, other): - return self.values >= other - - def __gt__(self, other): - return self.values > other - - def __le__(self, other): - return self.values <= other - - def __lt__(self, other): - return self.values < other - - def __ne__(self, other): - return self.values != other - - def __str__(self): - return repr(self) - - def __repr__(self, depth=0): - string = "{}{}:\n".format('\t'*depth, self.__class__.__name__) - depth += 1 - for entry in self.values: - if isinstance(entry, SEQUENCE): - string += entry.__repr__(depth=depth) - else: - string += "{}{}: {}\n".format( - '\t'*depth, - entry.__class__.__name__, - entry - ) - - return string - - def poke(self): - for val in self.values: - val.poke() - - @property - def encoding(self): - if self._encoding is None: - encodings = [None] * len(self.values) - for i in range(len(self.values)): - encodings[i] = self.values[i].serialize() - - self._encoding = b''.join(encodings) - - return self._encoding - - @property - def values(self): - if self._values is None: - definite = isinstance(self.expected, list) - - sequence = [] - encoding = self._encoding - while encoding: - if definite: - try: - cls = self.expected[len(sequence)] - except IndexError as e: - message = "{} has too many elements" - message = message.format(self.__class__.__name__) - raise ProtocolError(message) from e - else: - cls = self.expected - - obj, encoding = ASN1.deserialize(encoding, cls=cls, leftovers=True) - sequence.append(obj) - - if definite and len(sequence) < len(self.expected): - message = "{} has too few elements" - message = message.format(self.__class__.__name__) - raise ProtocolError(message) - - self._values = tuple(sequence) - - return self._values - -### Composed types ### - -class UNSIGNED(INTEGER): - SIGNED = False - -class Counter32(UNSIGNED): - pass - -class Gauge32(UNSIGNED): - pass - -class TimeTicks(UNSIGNED): - pass - -class Integer32(INTEGER): - pass - -class Counter64(UNSIGNED): - pass - -class IpAddress(OCTET_STRING): - @property - def encoding(self): - if self._encoding is None: - self._encoding = socket.inet_aton(self._value) - return self._encoding - - @property - def value(self): - if self._value is None: - if len(self._encoding) == 4: - self._value = socket.inet_ntoa(self._encoding) - else: - raise ProtocolError("IP Address must be 4 bytes long") - return self._value - -class VarBind(SEQUENCE): - EXPECTED = [ - OID, - None, - ] - - def __init__(self, *args, **kwargs): - super(VarBind, self).__init__(*args, **kwargs) - self.error = None - - @property - def name(self): - return self.values[0] - - @property - def value(self): - return self.values[1] - -class VarBindList(SEQUENCE): - EXPECTED = VarBind - - def __getitem__(self, index): - return self.values[index] - - def __iter__(self): - return iter(self.values) - - def __len__(self): - return len(self.values) - -class PDU(SEQUENCE): - EXPECTED = [ - INTEGER, - INTEGER, - INTEGER, - VarBindList, - ] - - def __init__(self, request_id=0, error_status=0, error_index=0, vars=None, encoding=None): - values = ( - INTEGER.copy(UNSIGNED(request_id)), - INTEGER(error_status), - INTEGER(error_index), - vars, - ) if encoding is None else () - super(PDU, self).__init__(*values, encoding=encoding) - - @property - def request_id(self): - return self.values[0] - - @property - def error_status(self): - return self.values[1] - - @property - def error_index(self): - return self.values[2] - - @property - def vars(self): - return self.values[3] - -class GetRequestPDU(PDU): - pass - -class GetNextRequestPDU(PDU): - pass - -class GetResponsePDU(PDU): - pass - -class SetRequestPDU(PDU): - pass - -class Message(SEQUENCE): - EXPECTED = [ - INTEGER, - OCTET_STRING, - GetResponsePDU, - ] - - def __init__(self, version=0, community=b'public', data=None, encoding=None): - values = ( - INTEGER(version), - OCTET_STRING(community), - data, - ) if encoding is None else () - super(Message, self).__init__(*values, encoding=encoding) - - @property - def version(self): - return self.values[0] - - @property - def community(self): - return self.values[1] - - @property - def data(self): - return self.values[2] - -types = { - 0x02: INTEGER, - 0x04: OCTET_STRING, - 0x05: NULL, - 0x06: OID, - 0x30: SEQUENCE, - 0x40: IpAddress, - 0x41: Counter32, - 0x42: Gauge32, - 0x43: TimeTicks, - 0x44: Integer32, - 0x46: Counter64, - 0xa0: GetRequestPDU, - 0xa1: GetNextRequestPDU, - 0xa2: GetResponsePDU, - 0xa3: SetRequestPDU, -} - -for dtype, cls in types.items(): - cls.TYPE = dtype diff --git a/devices/snmp/v1/__init__.py b/devices/snmp/v1/__init__.py deleted file mode 100644 index 319940d1a..000000000 --- a/devices/snmp/v1/__init__.py +++ /dev/null @@ -1,655 +0,0 @@ -from binascii import hexlify -from collections import OrderedDict -import logging -import os -import random -import select -import socket -import threading -import time - -from ..exceptions import EncodingError, ProtocolError, Timeout -from ..mutex import RWLock -from ..types import * -from .exceptions import TooBig, NoSuchName, BadValue, ReadOnly, GenErr - -log = logging.getLogger(__name__) - -DUMMY_EVENT = threading.Event() -DUMMY_EVENT.set() - -ERRORS = { - 1: TooBig, - 2: NoSuchName, - 3: BadValue, - 4: ReadOnly, - 5: GenErr, -} - -PORT = 161 -RECV_SIZE = 65507 -MAX_REQUEST_ID = 0xffffffff -VERSION = 0 -WINDOWS = (os.name == "nt") - -if WINDOWS: - import ctypes - from ctypes.wintypes import HANDLE, DWORD, BOOL - import msvcrt - - ERROR_NO_DATA = 232 - LPDWORD = ctypes.POINTER(DWORD) - SELECT_TIMEOUT = 1 - - def errcheck (result, func, args): - if not result: - raise WinError (ctypes.get_last_error()) - - def setblocking (fd, blocking): - handle = msvcrt.get_osfhandle(fd) - mode = ctypes.byref(DWORD(0 if blocking else 1)) - - kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) - kernel32.SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD] - kernel32.SetNamedPipeHandleState.restype = BOOL - kernel32.SetNamedPipeHandleState.errcheck = errcheck - return kernel32.SetNamedPipeHandleState(handle, mode, None, None) - -class PendTable: - def __init__(self): - self.lock = threading.Lock() - - # The two events signal the arrival of the value for the oid itself, - # or the variable returned by a GetNext request (respectively) - # { - # <oid>: [<Event>, <Event>], - # ... - # } - self.oids = {} - - # Used by set() to make sure multiple set requests to the same OID - # do not overlap in time - # { - # <oid>: <Event>, - # ... - # } - self.sets = {} - -if WINDOWS: - def _close (fd): - os.close(fd) - - def _done (_, fd): - try: - os.read(fd, 1) - except WindowsError: - if ctypes.GetLastError() != ERROR_NO_DATA: - raise - else: - return False - else: - return True - - def _select (sock, _): - return select.select([sock], [], [], SELECT_TIMEOUT)[0] - - def _setup (fd): - setblocking(fd, False) - return fd -else: - def _close (pipe): - pipe.close() - - def _done (ready, pipe): - return pipe in ready - - def _select (sock, pipe): - return select.select([sock, pipe], [], [])[0] - - def _setup (fd): - return os.fdopen(fd) - -# background thread to process responses -def _listen_thread(sock, pipe, requests, rlock, data, dlock, port=PORT): - pipe = _setup(pipe) - - while True: - # wait for data on sock or pipe - ready = _select(sock, pipe) - - if _done(ready, pipe): - # exit from this thread - # don't bother processing any more responses; the calling - # application has all the data they need - break - - for s in ready: - # listen for UDP packets from the correct port - packet, (host, p) = s.recvfrom(RECV_SIZE) - if p != port: - continue - - try: - # convert bytes to Message object - message = ASN1.deserialize(packet, cls=Message) - - # ignore garbage packets - if message.version != VERSION: - continue - - # force a full parse; invalid packet will raise an error - message.poke() - - except (EncodingError, ProtocolError) as e: - # this should take care of filtering out invalid traffic - log.debug("{}: {}: {}".format( - e.__class__.__name__, e, hexlify(packet).decode() - )) - continue - - request_id = message.data.request_id.value - try: - with rlock: - request, event = requests[request_id][:2] - except KeyError: - # ignore responses for which there was no request - msg = "Received unexpected response from {}: {}" - log.debug(msg.format(host, hexlify(packet).decode())) - continue - - # while we don't explicitly check every possible protocol violation - # this one would cause IndexErrors below, which I'd rather avoid - if len(message.data.vars) != len(request.data.vars): - msg = "VarBindList length mismatch:\n(Request) {}(Response) {}" - log.error(msg.format(request, message)) - continue - - requests.pop(request_id) - next = isinstance(request.data, GetNextRequestPDU) - - error = None - error_status = message.data.error_status.value - if error_status != 0: - log.debug(message.data) - error_index = message.data.error_index.value - try: - cls = ERRORS[error_status] - except KeyError: - msg = "Invalid error status: {}" - error = ProtocolError(msg.format(error_status)) - else: - try: - varbind = message.data.vars[error_index-1] - except IndexError: - msg = "Invalid error index: {}" - error = ProtocolError(msg.format(error_index)) - else: - error = cls(varbind.name.value) - - with dlock: - try: - host_data = data[host] - except KeyError: - host_data = {} - data[host] = host_data - - for i, varbind in enumerate(message.data.vars): - # won't make a difference if error is None - varbind.error = error - - requested = request.data.vars[i].name.value - oid = varbind.name.value - - if next: - try: - host_data[requested][1] = oid - except KeyError: - host_data[requested] = [None, oid] - elif requested != oid: - msg = "OID ({}) does not match requested ({})" - log.warning(msg.format(oid, requested)) - - # this will cause a ProtocolError to be raised in get() - # However, if this data is never accessed, the error - # will go unnoticed. - # Assuming, however, that the agent is correctly - # implemented and the channel is secure, this should - # never happen - try: - host_data[requested][0] = None - except KeyError: - host_data[requested] = [None, None] - - # update data table - try: - host_data[oid][0] = varbind - except KeyError: - host_data[oid] = [varbind, None] - - msg = "Done processing response from {} (ID={})" - log.debug(msg.format(host, request_id)) - - # alert the main thread that the data is ready - event.set() - - _close(pipe) - log.debug("Listener thread exiting") - -def _monitor_thread(sock, done, requests, rlock, data, dlock, port=PORT, resend=1): - delay = 0 - while not done.wait(timeout=delay): - with rlock: - try: - ID = next(iter(requests)) - except StopIteration: - delay = resend - else: - timestamp = requests[ID][3] - diff = time.time() - timestamp - - if diff >= resend: - delay = 0 - message, event, host, _, count = requests.pop(ID) - if count: - timestamp += resend - requests[ID] = ( - message, event, host, timestamp, count-1 - ) - else: - delay = 1-diff - - if delay == 0: - if count: - msg = "Resending to {} (ID={})" - log.debug(msg.format(host, message.data.request_id)) - sock.sendto(message.serialize(), (host, port)) - else: - with dlock: - msg = "Request to {} timed out (ID={})" - log.debug(msg.format(host, message.data.request_id)) - for varbind in message.data.vars: - varbind.error = Timeout(varbind.name.value) - oid = varbind.name.value - try: - host_data = data[host] - except KeyError: - host_data = {} - data[host] = host_data - - try: - _, next_oid = host_data[oid] - except KeyError: - next_oid = None - - # causes GETNEXT requests to register the timeout - if isinstance(message.data, GetNextRequestPDU): - next_oid = oid - - host_data[oid] = [varbind, next_oid] - - event.set() - - log.debug("Monitor thread exiting") - -class SNMPv1: - def __init__(self, community, rwcommunity=None, port=PORT, resend=1): - self.rocommunity = community - self.rwcommunity = rwcommunity or community - self.port = port - - self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self._sock.setblocking(False) - self._sock.bind(('', 0)) - - # used to shut down background threads - r, w = os.pipe() - self._write_pipe = os.fdopen(w, 'w') - self._closed = threading.Event() - - # counting by an odd number will hit every - # request id once before repeating - self._count_by = random.randint(0, MAX_REQUEST_ID//2) * 2 + 1 - self._next_id = self._count_by - - # This is an OrderedDict so the monitoring thread can iterate through - # them in order - # <timestamp> is the timestamp of the most recent transmission - # <count> is the number of remaining re-transmits before Timeout - # { - # <request_id>: (<Message>, <Event>, <host>, <timestamp>, <count>), - # ... - # } - self._requests = OrderedDict() - self._rlock = threading.Lock() - - # table of pending requests (prevents re-sending packets unnecessarily) - # { - # <host_ip>: <PendTable>, - # ... - # } - self._pending = {} - self._plock = threading.Lock() - - # table of responses - # { - # <host_ip>: { - # <oid>: [ - # <VarBind>, - # <next_oid>, - # ], - # ... - # }, - # ... - # } - self._data = {} - self._drlock, self._dwlock = RWLock() - - self._listener = threading.Thread( - target=_listen_thread, - args=( - self._sock, - r, - self._requests, - self._rlock, - self._data, - self._dwlock, - ), - kwargs={"port":port}, - ) - self._listener.start() - - self._monitor = threading.Thread( - target=_monitor_thread, - args=( - self._sock, - self._closed, - self._requests, - self._rlock, - self._data, - self._dwlock, - ), - kwargs={ - "port": port, - "resend": resend, - }, - ) - self._monitor.start() - - def close(self): - - log.debug("Sending shutdown signal to helper threads") - self._closed.set() - self._write_pipe.write('\0') - self._write_pipe.flush() - - self._listener.join() - self._monitor.join() - log.debug("All helper threads done") - - self._write_pipe.close() - self._sock.close() - - self._write_pipe = None - self._sock = None - - def __enter__(self): - return self - - def __exit__(self, *args): - if not self._closed.is_set(): - self.close() - - def _request_id(self): - request_id = self._next_id - self._next_id = (request_id + self._count_by) & MAX_REQUEST_ID - - return request_id - - def get(self, host, *oids, community=None, block=True, timeout=10, - refresh=False, next=False): - # this event will be stored in the pending table under this request ID - # the _listener_thread will signal when the data is ready - main_event = threading.Event() - - # store the first error found on a cached VarBind and raise it only - # after the request has been sent for any other oids there may be - error = None - - # used for blocking calls to wait for all responses - events = set() - - # set of oids that are neither in self._data nor self._pending - send = set() - - # return value (values[i] corresponds to oids[i]) - values = [None] * len(oids) - - with self._plock: - try: - host_pending = self._pending[host] - except KeyError: - host_pending = PendTable() - self._pending[host] = host_pending - - with self._drlock: - try: - host_data = self._data[host] - except KeyError: - host_data = {} - - # acquiring the lock all the way out here should minimize the number of - # packets sent, even if this object is being shared by multiple threads - with host_pending.lock: - for i, oid in enumerate(oids): - if not refresh: - try: - event = host_pending.oids[oid][int(next)] - except KeyError: - pass - else: - # request has been sent already and is pending - if event and not event.is_set(): - events.add(event) - # don't fetch cached value, don't re-send request - continue - - try: - # TODO: make a separate lock for each host's data - with self._drlock: - value, next_oid = host_data[oid] - if next: - value, _ = host_data[next_oid] - except KeyError: - pass - else: - # cached value found - if value is not None: - # raise any errors after the request is sent - error = error or value.error - - # set return value - values[i] = value - continue - - # add this item to the pending table - try: - host_pending.oids[oid][int(next)] = main_event - except KeyError: - if next: - host_pending.oids[oid] = [None, main_event] - else: - host_pending.oids[oid] = [main_event, None] - - # make a note to include this OID in the request - send.add(oid) - - # send any requests that are not found to be pending - if send: - events.add(main_event) - pdu_type = GetNextRequestPDU if next else GetRequestPDU - pdu = pdu_type( - request_id=self._request_id(), - vars=VarBindList( - *[VarBind(OID(oid), NULL()) for oid in send] - ), - ) - - # assign request_id variable this way rather than directly from - # self._request_id() because self._request_id() returns unsigned - # values, whereas this method returns signed values, and the key - # here has to match what is used in the _listener_thread() - request_id = pdu.request_id.value - - message = Message( - version = VERSION, - data = pdu, - community = community or self.rocommunity, - ) - - with self._rlock: - self._requests[request_id] = ( - message, main_event, host, time.time(), timeout-1 - ) - - self._sock.sendto(message.serialize(), (host, self.port)) - log.debug("Sent request to {} (ID={})".format(host, request_id)) - - if error is not None: - raise error - - if not block: - return values - - # wait for all requested oids to receive a response - for event in events: - event.wait() - - # the data table should now be all up to date - with self._drlock: - values = [] - try: - host_data = self._data[host] - except KeyError: - # shouldn't get here, ProtocolError will be triggered below - host_data = {} - - for oid in oids: - try: - value, next_oid = host_data[oid] - if next: - value, _ = host_data[next_oid] - except KeyError: - value = None - - if value is None: - raise ProtocolError("Missing variable: {}".format(oid)) - elif value.error is not None: - raise value.error - - values.append(value) - - return values - - def get_next(self, *args, **kwargs): - kwargs['next'] = True - return self.get(*args, **kwargs) - - def set(self, host, oid, value, community=None, block=True, timeout=10): - # wrap the value in an ASN1 type - if isinstance(value, int): - value = INTEGER(value) - elif value is None: - value = NULL() - elif isinstance(value, ASN1): - pass - else: - if isinstance(value, str): - value = value.encode() - value = OCTET_STRING(value) - - # create PDU - pdu = SetRequestPDU( - request_id=self._request_id(), - vars=VarBindList(VarBind(OID(oid), value)), - ) - - request_id = pdu.request_id.value - message = Message( - version = VERSION, - data = pdu, - community = community or self.rwcommunity, - ) - - # get PendTable for this host - with self._plock: - try: - host_pending = self._pending[host] - except KeyError: - host_pending = PendTable() - self._pending[host] = host_pending - - # used to wait for the previous set request to complete - event = DUMMY_EVENT - - # signaled when _listen_thread processes the response - main_event = threading.Event() - - # wait for any pending requests to complete before sending - pend_event = None - - # only allow one outstanding set request at a time - while pend_event is None: - # loop until we can put main_event in host_pending.oids - event.wait() - with host_pending.lock: - try: - event = host_pending.sets[oid] - except KeyError: - event = DUMMY_EVENT - - # event will not be set if another thread's set request - # acquires the lock first and stores its main_event to .sets - if event.is_set(): - try: - pend_event, next_oid = host_pending.oids[oid] - except KeyError: - pend_event, next_oid = DUMMY_EVENT, None - - host_pending.oids[oid] = [main_event, next_oid] - host_pending.sets[oid] = main_event - - # wait for pending requests to be serviced - pend_event.wait() - - with self._rlock: - self._requests[request_id] = ( - message, main_event, host, time.time(), timeout-1 - ) - - self._sock.sendto(message.serialize(), (host, self.port)) - msg = "SET request sent to {} (ID={}):\n{}" - log.debug(msg.format(host, request_id, pdu)) - - if not block: - return - - # no need to duplicate code; just call self.get() - return self.get(host, oid, block=True) - - def walk(self, host, oid, **kwargs): - start = oid - while True: - try: - var, = self.get_next(host, oid, block=True, **kwargs) - except NoSuchName: - break - - oid = var.name.value - - if not oid.startswith(start): - break - - # send now to speed access on the next iteration - self.get_next(host, oid, block=False, **kwargs) - - yield [var] diff --git a/devices/snmp/v1/exceptions.py b/devices/snmp/v1/exceptions.py deleted file mode 100644 index 262aecfa5..000000000 --- a/devices/snmp/v1/exceptions.py +++ /dev/null @@ -1,17 +0,0 @@ -class StatusError(Exception): - pass - -class TooBig(StatusError): - pass - -class NoSuchName(StatusError): - pass - -class BadValue(StatusError): - pass - -class ReadOnly(StatusError): - pass - -class GenErr(StatusError): - pass diff --git a/devices/snmp/v1/notes.py b/devices/snmp/v1/notes.py deleted file mode 100644 index 126ce6d9a..000000000 --- a/devices/snmp/v1/notes.py +++ /dev/null @@ -1,68 +0,0 @@ -get: - create main event - - get pend table for host or create one - get data table for host or create dummy - - with pend table lock - for each oid in the request - if not refresh - if pending - continue - - if found in cache - grab cached result - continue - - add main event to the pend table - add oid to request - - if there are oids to be sent - construct message - add message to requests table - send message - - if an oid had an error - raise it now - - if not waiting for response - return the values you do have - - wait for all events - - with data table read lock - for each oid - grab the value - - make sure it is present - - make sure there are no errors - - return the values - -listen thread: - check port number - decode message - pull request id - find the corresponding request - make sure it has the right number of varbinds - remove request from table - check error status - - with data table write lock - get/create entry for host - for each varbind - give it the error found in the request error field - find the oid requested - make sure it matches the request - save varbind to data table - - set the request event - -monitor thread: - wait for next stale request, done, or 1 second (whichever comes first) - grab next request - if it is stale - if it has not timed out - resend it - else - set varbind error to timeout - signal event -- GitLab