Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from src.comms_client import *
import snmp
__all__ = ["SNMP_client"]
numpy_to_snmp_dict = {
"<class 'numpy.bool_'>": opcua.ua.VariantType.Boolean,
"<class 'numpy.int8'>": opcua.ua.VariantType.SByte,
"<class 'numpy.uint8'>": opcua.ua.VariantType.Byte,
"<class 'numpy.int16'>": opcua.ua.VariantType.Int16,
"<class 'numpy.uint16'>": opcua.ua.VariantType.UInt16,
"<class 'numpy.int32'>": opcua.ua.VariantType.Int32,
"<class 'numpy.uint32'>": opcua.ua.VariantType.UInt32,
"<class 'numpy.int64'>": opcua.ua.VariantType.Int64,
"<class 'numpy.uint64'>": opcua.ua.VariantType.UInt64,
"<class 'numpy.datetime_data'>": opcua.ua.VariantType.DateTime, # is this the right type, does it even matter?
"<class 'numpy.float32'>": opcua.ua.VariantType.Float,
"<class 'numpy.float64'>": opcua.ua.VariantType.Double,
"<class 'numpy.double'>": opcua.ua.VariantType.Double,
"<class 'numpy.str_'>": opcua.ua.VariantType.String,
"<class 'numpy.str'>": opcua.ua.VariantType.String,
"str": opcua.ua.VariantType.String
}
class SNMP_client(CommClient):
"""
messages to keep a check on the connection. On connection failure, reconnects once.
"""
def start(self):
super().start()
def __init__(self, community, host, timeout, on_func, fault_func, streams, try_interval=2):
"""
Create the SNMP and connect() to it
"""
super().__init__(on_func, fault_func, streams, try_interval)
self.community = community
self.host = host
self.manager = snmp.Manager(community, host, timeout)
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
def connect(self):
"""
Try to connect to the client
"""
self.streams.debug_stream("Connecting to server %s %s", self.community, self.host)
self.connected = True
return True
def disconnect(self):
"""
disconnect from the client
"""
self.connected = False # always force a reconnect, regardless of a successful disconnect
def ping(self):
"""
ping the client to make sure the connection with the client is still functional.
"""
pass
def _setup_annotation(self, annotation):
"""
This class's Implementation of the get_mapping function. returns the read and write functions
"""
if isinstance(annotation, dict):
# check if required path inarg is present
if annotation.get('oids') is None:
AssertionError("SNMP get attributes require an oid")
oids = annotation.get("oids") # required
if annotation.get('host') is None:
AssertionError("SNMP get attributes require an host")
host = annotation.get("host") # required
else:
TypeError("SNMP attributes require a dict with oid and adress")
return
return host, oids
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
the OPC ua read/write functions require the dimensionality and the type to be known
"""
dim_x = attribute.dim_x
dim_y = attribute.dim_y
snmp_type = numpy_to_snmp_dict[str(attribute.numpy_type)] # convert the numpy type to a corresponding UA type
return dim_x, dim_y, snmp_type
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
# process the annotation
host, oids = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dim_x, dim_y, snmp_type = self.setup_value_conversion(attribute)
def _read_function(self):
vars = self.manager.get(host, *oids)
#TODO convert type
#todo
def _write_function(self, value):
self.manager.set(host, oids, value)
# return the read/write functions
return _read_function, _write_function
class snmp_get:
"""
This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
"""
def __init__(self, host, oid, dim_x, dim_y, snmp_type):
self.host = host
self.oid = oid
self.dim_y = dim_y
self.dim_x = dim_x
self.snmp_type = snmp_type
def read_function(self):
"""
Read_R function
"""
value = numpy.array(self.node.get_value())
if self.dim_y != 0:
value = numpy.array(numpy.split(value, indices_or_sections=self.dim_y))
else:
value = numpy.array(value)
return value
def write_function(self, value):
"""
write_RW function
"""
# set_data_value(opcua.ua.uatypes.Variant(value = value.tolist(), varianttype=opcua.ua.VariantType.Int32))
if self.dim_y != 0:
v = numpy.concatenate(value)
self.node.set_data_value(opcua.ua.uatypes.Variant(value=v.tolist(), varianttype=self.ua_type))
elif self.dim_x != 1:
self.node.set_data_value(opcua.ua.uatypes.Variant(value=value.tolist(), varianttype=self.ua_type))
else:
self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type))