Skip to content
Snippets Groups Projects
Commit 654c374c authored by Taya Snijder's avatar Taya Snijder
Browse files

removed unfinisehd SNMP code

parent 5b8d287a
No related branches found
No related tags found
1 merge request!52021 03 22 branched from master attribute wrapper
# -*- coding: utf-8 -*-
#
# This file is part of the PCC project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" PCC Device Server for LOFAR2.0
"""
# PyTango imports
from tango.server import run
from tango.server import device_property
# Additional import
from clients.SNMP_client import SNMP_client
from src.attribute_wrapper import *
from src.hardware_device import *
__all__ = ["PCC", "main"]
class PCC(hardware_device):
"""
**Properties:**
- Device Property
SNMP_community
- Type:'DevString'
SNMP_host
- Type:'DevULong'
SNMP_timeout
- Type:'DevDouble'
"""
# -----------------
# Device Properties
# -----------------
SNMP_community = device_property(
dtype='DevString',
mandatory=True
)
SNMP_host = device_property(
dtype='DevString',
mandatory=True
)
SNMP_timeout = device_property(
dtype='DevDouble',
mandatory=True
)
# ----------
# Attributes
# ----------
attr0 = attribute_wrapper(comms_annotation={"oids": [""]}, datatype=numpy.bool_, dims=(32,), access=AttrWriteType.READ_WRITE)
attr1 = attribute_wrapper(comms_annotation={"oids": ["1.3.6.1.2.1.1.6.0"]}, datatype=numpy.bool_, access=AttrWriteType.READ_WRITE)
attr2 = attribute_wrapper(comms_annotation={"host": "127.0.0.1", "oids": ["1.3.6.1.2.1.1.5.0"]}, datatype=numpy.bool_, access=AttrWriteType.READ_WRITE)
attr3 = attribute_wrapper(comms_annotation={"host": "127.0.0.1", "oids": ["1.3.6.1.2.1.1.5.1", "1.3.6.1.2.1.1.5.2", "1.3.6.1.2.1.1.5.3"]}, dims=(3,), datatype=numpy.bool_)
attr4 = attribute_wrapper(comms_annotation={"host": "127.0.0.1", "oids": ["1.3.6.1.2.1.1.5.0"]}, dims=(3,), datatype=numpy.bool_)
# ["1.3.6.1.2.1.1.5.0"] gets transformed in to an array the size of dims with ".1", ".2" .. added
# ["1.3.6.1.2.1.1.5.0.1", "1.3.6.1.2.1.1.5.0.2", "1.3.6.1.2.1.1.5.0.3"]
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
pass
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command (a Tango built-in).
"""
self.debug_stream("Shutting down...")
self.Off()
self.debug_stream("Shut down. Good bye.")
# --------
# overloaded functions
# --------
def initialise(self):
""" user code here. is called when the state is set to STANDBY """
#set up the SNMP ua client
self.snmp_manager = SNMP_client(self.SNMP_community, self.SNMP_host, self.SNMP_timeout, self.Standby, self.Fault, self)
# map the attributes to the OPC ua comm client
for i in self.attr_list():
i.set_comm_client(self.OPCua_client)
self.OPCua_client.start()
# --------
# Commands
# --------
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the PCC module."""
return run((PCC,), args=args, **kwargs)
if __name__ == '__main__':
main()
from src.comms_client import *
import snmp
__all__ = ["SNMP_client"]
numpy_to_snmp_dict = {
"<class 'numpy.bool_'>": opcua.ua.VariantType.Boolean,
"<class 'numpy.int8'>": opcua.ua.VariantType.SByte,
"<class 'numpy.uint8'>": opcua.ua.VariantType.Byte,
"<class 'numpy.int16'>": opcua.ua.VariantType.Int16,
"<class 'numpy.uint16'>": opcua.ua.VariantType.UInt16,
"<class 'numpy.int32'>": opcua.ua.VariantType.Int32,
"<class 'numpy.uint32'>": opcua.ua.VariantType.UInt32,
"<class 'numpy.int64'>": opcua.ua.VariantType.Int64,
"<class 'numpy.uint64'>": opcua.ua.VariantType.UInt64,
"<class 'numpy.datetime_data'>": opcua.ua.VariantType.DateTime, # is this the right type, does it even matter?
"<class 'numpy.float32'>": opcua.ua.VariantType.Float,
"<class 'numpy.float64'>": opcua.ua.VariantType.Double,
"<class 'numpy.double'>": opcua.ua.VariantType.Double,
"<class 'numpy.str_'>": opcua.ua.VariantType.String,
"<class 'numpy.str'>": opcua.ua.VariantType.String,
"str": opcua.ua.VariantType.String
}
class SNMP_client(CommClient):
"""
messages to keep a check on the connection. On connection failure, reconnects once.
"""
def start(self):
super().start()
def __init__(self, community, host, timeout, on_func, fault_func, streams, try_interval=2):
"""
Create the SNMP and connect() to it
"""
super().__init__(on_func, fault_func, streams, try_interval)
self.community = community
self.host = host
self.manager = snmp.Manager(community, host, timeout)
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
def connect(self):
"""
Try to connect to the client
"""
self.streams.debug_stream("Connecting to server %s %s", self.community, self.host)
self.connected = True
return True
def disconnect(self):
"""
disconnect from the client
"""
self.connected = False # always force a reconnect, regardless of a successful disconnect
def ping(self):
"""
ping the client to make sure the connection with the client is still functional.
"""
pass
def _setup_annotation(self, annotation):
"""
This class's Implementation of the get_mapping function. returns the read and write functions
"""
if isinstance(annotation, dict):
# check if required path inarg is present
if annotation.get('oids') is None:
AssertionError("SNMP get attributes require an oid")
oids = annotation.get("oids") # required
if annotation.get('host') is None:
AssertionError("SNMP get attributes require an host")
host = annotation.get("host") # required
else:
TypeError("SNMP attributes require a dict with oid and adress")
return
return host, oids
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
the OPC ua read/write functions require the dimensionality and the type to be known
"""
dim_x = attribute.dim_x
dim_y = attribute.dim_y
snmp_type = numpy_to_snmp_dict[str(attribute.numpy_type)] # convert the numpy type to a corresponding UA type
return dim_x, dim_y, snmp_type
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
# process the annotation
host, oids = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dim_x, dim_y, snmp_type = self.setup_value_conversion(attribute)
def _read_function(self):
vars = self.manager.get(host, *oids)
#TODO convert type
#todo
def _write_function(self, value):
self.manager.set(host, oids, value)
# return the read/write functions
return _read_function, _write_function
class snmp_get:
"""
This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
"""
def __init__(self, host, oid, dim_x, dim_y, snmp_type):
self.host = host
self.oid = oid
self.dim_y = dim_y
self.dim_x = dim_x
self.snmp_type = snmp_type
def read_function(self):
"""
Read_R function
"""
value = numpy.array(self.node.get_value())
if self.dim_y != 0:
value = numpy.array(numpy.split(value, indices_or_sections=self.dim_y))
else:
value = numpy.array(value)
return value
def write_function(self, value):
"""
write_RW function
"""
# set_data_value(opcua.ua.uatypes.Variant(value = value.tolist(), varianttype=opcua.ua.VariantType.Int32))
if self.dim_y != 0:
v = numpy.concatenate(value)
self.node.set_data_value(opcua.ua.uatypes.Variant(value=v.tolist(), varianttype=self.ua_type))
elif self.dim_x != 1:
self.node.set_data_value(opcua.ua.uatypes.Variant(value=value.tolist(), varianttype=self.ua_type))
else:
self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment