Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from threading import Thread
import socket
from util.comms_client import CommClient
from collections import deque
import numpy
import logging
import socket
from datetime import datetime
from multiprocessing import Value, Array
import time
__all__ = ["sst_client"]
# <class 'numpy.bool_'>
class sst_client(CommClient):
"""
Connects to OPC-UA in the foreground or background, and sends HELLO
messages to keep a check on the connection. On connection failure, reconnects once.
"""
def start(self):
super().start()
def __init__(self, host, port, fault_func, streams, try_interval=2):
"""
Create the sst client and connect() to it and get the object node
"""
self.host = host
self.port = port
self.timeout = 0.1
self.buffersize = 9000
super().__init__(fault_func, streams, try_interval)
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
def connect(self):
"""
Function used to connect to the client.
"""
self.deque = deque(maxlen=1024)
self.udp = UDP_Receiver(self.host, self.port, self.deque, self.buffersize, self.timeout)
self.sst = SST(self.deque)
return super().connect()
def disconnect(self):
del self.udp
del self.sst
del self.deque
return super().disconnect()
def _setup_annotation(self, annotation):
"""
This class's Implementation of the get_mapping function. returns the read and write functions
"""
parameter = annotation.get('parameter', None)
if parameter is None:
raise Exception("No SST parameter was given in the annotation: %s", annotation)
return parameter
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
the OPC ua read/write functions require the dimensionality and the type to be known
"""
return
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
# process the annotation
SST_param = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
self.setup_value_conversion(attribute)
def read_function():
return self.sst.parameters[SST_param]
def write_function(value):
"""
Not used here
"""
pass
return read_function, write_function
class UDP_Receiver(Thread):
"""
This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
"""
def __init__(self, host, port, deque, buffersize=9000, timeout=0.1):
self.deque = deque
self.host = host
self.port = port
self.buffersize = buffersize
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.settimeout(timeout)
self.stream_on = True
super().__init__()
self.start()
def run(self):
# all variables are manually defined and are updated each time
while self.stream_on:
try:
packet = Array('B', self.buffersize)
self.sock.recvmsg_into(packet[0:self.buffersize])
self.deque.append(packet)
except socket.timeout:
pass
def __del__(self):
self.stream_on = False
self.join()
class SST(Thread):
def __init__(self, deque):
self.deque = deque
self.last_packet = None
self.parameters = {
"packet_count": 0,
"timestamp": 0
}
super().__init__()
self.start()
def run(self):
while True:
packet = self.deque.pop()
if packet is None:
break
self.decode(packet)
def __del__(self):
self.deque.appendleft(None)
self.join()
def decode(self, packet):
self.parameters["packet_count"] += 1
self.parameters["timestamp"] = time.time()