Skip to content
Snippets Groups Projects
SNMP_client.py 5.01 KiB
Newer Older
from src.comms_client import *
import snmp


__all__ = ["SNMP_client"]

numpy_to_snmp_dict = {
	"<class 'numpy.bool_'>": opcua.ua.VariantType.Boolean,
	"<class 'numpy.int8'>": opcua.ua.VariantType.SByte,
	"<class 'numpy.uint8'>": opcua.ua.VariantType.Byte,
	"<class 'numpy.int16'>": opcua.ua.VariantType.Int16,
	"<class 'numpy.uint16'>": opcua.ua.VariantType.UInt16,
	"<class 'numpy.int32'>": opcua.ua.VariantType.Int32,
	"<class 'numpy.uint32'>": opcua.ua.VariantType.UInt32,
	"<class 'numpy.int64'>": opcua.ua.VariantType.Int64,
	"<class 'numpy.uint64'>": opcua.ua.VariantType.UInt64,
	"<class 'numpy.datetime_data'>": opcua.ua.VariantType.DateTime, # is this the right type, does it even matter?
	"<class 'numpy.float32'>": opcua.ua.VariantType.Float,
	"<class 'numpy.float64'>": opcua.ua.VariantType.Double,
	"<class 'numpy.double'>": opcua.ua.VariantType.Double,
	"<class 'numpy.str_'>": opcua.ua.VariantType.String,
	"<class 'numpy.str'>": opcua.ua.VariantType.String,
	"str": opcua.ua.VariantType.String
}

class SNMP_client(CommClient):
	"""
	  messages to keep a check on the connection. On connection failure, reconnects once.
	"""

	def start(self):
		super().start()

	def __init__(self, community, host, timeout, on_func, fault_func, streams, try_interval=2):
		"""
		Create the SNMP and connect() to it
		"""
		super().__init__(on_func, fault_func, streams, try_interval)

		self.community = community
		self.host = host
		self.manager = snmp.Manager(community, host, timeout)

		# Explicitly connect
		if not self.connect():
			# hardware or infra is down -- needs fixing first
			fault_func()
			return

	def connect(self):
		"""
		Try to connect to the client
		"""

		self.streams.debug_stream("Connecting to server %s %s", self.community, self.host)
		self.connected = True
		return True

	def disconnect(self):
		"""
		disconnect from the client
		"""
		self.connected = False  # always force a reconnect, regardless of a successful disconnect


	def ping(self):
		"""
		ping the client to make sure the connection with the client is still functional.
		"""
		pass

	def _setup_annotation(self, annotation):
		"""
		This class's Implementation of the get_mapping function. returns the read and write functions
		"""

		if isinstance(annotation, dict):
			# check if required path inarg is present
			if annotation.get('oids') is None:
				AssertionError("SNMP get attributes require an oid")
			oids = annotation.get("oids")  # required

			if annotation.get('host') is None:
				AssertionError("SNMP get attributes require an host")
			host = annotation.get("host")  # required

		else:
			TypeError("SNMP attributes require a dict with oid and adress")
			return

		return host, oids

	def setup_value_conversion(self, attribute):
		"""
		gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
		the OPC ua read/write functions require the dimensionality and the type to be known
		"""

		dim_x = attribute.dim_x
		dim_y = attribute.dim_y
		snmp_type = numpy_to_snmp_dict[str(attribute.numpy_type)]	 # convert the numpy type to a corresponding UA type

		return dim_x, dim_y, snmp_type

	def setup_attribute(self, annotation, attribute):
		"""
		MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
		"""

		# process the annotation
		host, oids = self._setup_annotation(annotation)

		# get all the necessary data to set up the read/write functions from the attribute_wrapper
		dim_x, dim_y, snmp_type = self.setup_value_conversion(attribute)

		def _read_function(self):
			vars = self.manager.get(host, *oids)
			#TODO convert type
			#todo

		def _write_function(self, value):
			self.manager.set(host, oids, value)

		# return the read/write functions
		return _read_function, _write_function



class snmp_get:
	"""
	This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
	"""

	def __init__(self, host, oid, dim_x, dim_y, snmp_type):
		self.host = host
		self.oid = oid
		self.dim_y = dim_y
		self.dim_x = dim_x
		self.snmp_type = snmp_type

	def read_function(self):
		"""
		Read_R function
		"""
		value = numpy.array(self.node.get_value())

		if self.dim_y != 0:
			value = numpy.array(numpy.split(value, indices_or_sections=self.dim_y))
		else:
			value = numpy.array(value)
		return value

	def write_function(self, value):
		"""
		write_RW function
		"""
		# set_data_value(opcua.ua.uatypes.Variant(value = value.tolist(), varianttype=opcua.ua.VariantType.Int32))

		if self.dim_y != 0:
			v = numpy.concatenate(value)
			self.node.set_data_value(opcua.ua.uatypes.Variant(value=v.tolist(), varianttype=self.ua_type))

		elif self.dim_x != 1:
			self.node.set_data_value(opcua.ua.uatypes.Variant(value=value.tolist(), varianttype=self.ua_type))
		else:
			self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type))