Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# -*- coding: utf-8 -*-
#
# This file is part of the StatsCrosslet project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" OPC-UA client for LOFAR stations crosslet stats
"""
# PyTango imports
import tango
from tango import DebugIt
from tango.server import run
from tango.server import Device
from tango.server import attribute, command, pipe
from tango.server import device_property
from tango import AttrQuality, DispLevel, DevState
from tango import AttrWriteType, PipeWriteType
# Additional import
# PROTECTED REGION ID(StatsCrosslet.additionnal_import) ENABLED START #
import opcua
import numpy
import traceback
import threading
import time
# PROTECTED REGION END # // StatsCrosslet.additionnal_import
__all__ = ["StatsCrosslet", "main"]
class StatsCrosslet(Device):
"""
**Properties:**
- Device Property
OPC_Server_Name
- Type:'DevString'
OPC_Server_Port
- Type:'DevULong'
OPC_time_out
- Type:'DevULong'
Default_pause_time
- Type:'DevDouble'
Default_subband
- Type:'DevULong'
Default_integration_time
- Type:'DevDouble'
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""
# PROTECTED REGION ID(StatsCrosslet.class_variable) ENABLED START #
client = 0
ns = 0
obj = 0
record_cross = 0
data = []
data_lock = threading.Lock()
stop_data_read_loop = threading.Event()
data_acquisition_is_active = threading.Event()
data_read_loop = threading.Thread()
def read_data(self):
# This is the the thread that continuously reads the crosslet
# statistics from the station and feeds it into the Tango
# property system.
while self.stop_data_read_loop.is_set() is False:
time.sleep(self._pause_time)
if self.data_acquisition_is_active.is_set() is True:
data = []
try:
timestamp, visibilities, rcu_modes = self.obj.call_method(self.record_cross, self._subband, self._integration_time)
self.data_lock.acquire()
self._time_stamp = timestamp
self._raw_visibilities = visibilities
self._visibilities = numpy.array(visibilities)[0] + 1j * numpy.array(visibilities[1])
self._rcu_modes = rcu_modes
self.data_lock.release()
except Exception as e:
print("Cannot call the method %s in the OPC-UA server %s. Trace: %s" % (self.record_cross, self.OPC_Server_Name, traceback.format_exc
()))
# PROTECTED REGION END # // StatsCrosslet.class_variable
# -----------------
# Device Properties
# -----------------
OPC_Server_Name = device_property(
dtype='DevString',
default_value="okeanos"
)
OPC_Server_Port = device_property(
dtype='DevULong',
default_value=55556
)
OPC_time_out = device_property(
dtype='DevULong',
default_value=1000
)
Default_pause_time = device_property(
dtype='DevDouble',
default_value=60.0
)
Default_subband = device_property(
dtype='DevULong',
default_value=150
)
Default_integration_time = device_property(
dtype='DevDouble',
default_value=1.0
)
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# ----------
# Attributes
# ----------
subband = attribute(
dtype='DevUShort',
access=AttrWriteType.READ_WRITE,
)
integration_time = attribute(
dtype='DevDouble',
)
time_stamp = attribute(
dtype='DevDouble',
)
pause_time = attribute(
dtype='DevDouble',
access=AttrWriteType.READ_WRITE,
)
visibilities = attribute(
dtype=('DevDouble',),
max_dim_x=96,
)
rcu_modes = attribute(
dtype=('DevString',),
max_dim_x=96,
)
raw_visibilities = attribute(
dtype=(('DevDouble',),),
max_dim_x=96, max_dim_y=96,
)
# -----
# Pipes
# -----
pipe_visibilities = pipe(
)
pipe_raw_visibilities = pipe(
)
# ---------------
# General methods
# ---------------
def init_device(self):
"""Initialises the attributes and properties of the StatsCrosslet."""
Device.init_device(self)
# PROTECTED REGION ID(StatsCrosslet.init_device) ENABLED START #
self._visibilities = (0.0,)
try:
self.client = opcua.Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_time_out)
self.client.connect()
ns = self.client.get_namespace_index("http://lofar.eu")
self.obj = self.client.get_root_node().get_child(["0:Objects", "{}:StationMetrics".format(ns), "{}:RCU".format(ns)])
self.record_cross = "{}:record_cross".format(ns)
# Set default values in the read/write attributes
self._pause_time = self.Default_pause_time
self._integration_time = self.Default_integration_time
self._subband = self.Default_subband
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
self.data_read_loop = threading.Thread(target = self.read_data)
self.data_acquisition_is_active.set()
self.stop_data_read_loop.clear()
self.data_read_loop.start()
except Exception as e:
print("Cannot connect to the OPC-UA server %s. Trace: %s" % (self.OPC_Server_Name, traceback.format_exc()))
# PROTECTED REGION END # // StatsCrosslet.init_device
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
# PROTECTED REGION ID(StatsCrosslet.always_executed_hook) ENABLED START #
# PROTECTED REGION END # // StatsCrosslet.always_executed_hook
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command.
"""
# PROTECTED REGION ID(StatsCrosslet.delete_device) ENABLED START #
self.data_acquisition_is_active.clear()
self.stop_data_read_loop.set()
self.data_read_loop.join()
if self.client is not Null:
self.client.close_session()
self.client_secure_channel()
# PROTECTED REGION END # // StatsCrosslet.delete_device
# ------------------
# Attributes methods
# ------------------
def read_subband(self):
# PROTECTED REGION ID(StatsCrosslet.subband_read) ENABLED START #
"""Return the subband attribute."""
return self._subband
# PROTECTED REGION END # // StatsCrosslet.subband_read
def write_subband(self, value):
# PROTECTED REGION ID(StatsCrosslet.subband_write) ENABLED START #
"""Set the subband attribute."""
self._subband = value
# PROTECTED REGION END # // StatsCrosslet.subband_write
def read_integration_time(self):
# PROTECTED REGION ID(StatsCrosslet.integration_time_read) ENABLED START #
"""Return the integration_time attribute."""
return self._integration_time
# PROTECTED REGION END # // StatsCrosslet.integration_time_read
def read_time_stamp(self):
# PROTECTED REGION ID(StatsCrosslet.time_stamp_read) ENABLED START #
"""Return the time_stamp attribute."""
self.data_lock.acquire()
time_stamp = self._time_stamp
self.data_lock.release()
return time_stamp
# PROTECTED REGION END # // StatsCrosslet.time_stamp_read
def read_pause_time(self):
# PROTECTED REGION ID(StatsCrosslet.pause_time_read) ENABLED START #
"""Return the pause_time attribute."""
return self._pause_time
# PROTECTED REGION END # // StatsCrosslet.pause_time_read
def write_pause_time(self, value):
# PROTECTED REGION ID(StatsCrosslet.pause_time_write) ENABLED START #
"""Set the pause_time attribute."""
self._pause_time = value
# PROTECTED REGION END # // StatsCrosslet.pause_time_write
def read_visibilities(self):
# PROTECTED REGION ID(StatsCrosslet.visibilities_read) ENABLED START #
"""Return the visibilities attribute."""
self.data_lock.acquire()
visibilities = self._visibilities
self.data_lock.release()
return visibilities
# PROTECTED REGION END # // StatsCrosslet.visibilities_read
def read_rcu_modes(self):
# PROTECTED REGION ID(StatsCrosslet.rcu_modes_read) ENABLED START #
"""Return the rcu_modes attribute."""
self.data_lock.acquire()
rcu_modes = self._rcu_modes
self.data_lock.release()
return rcu_modes
# PROTECTED REGION END # // StatsCrosslet.rcu_modes_read
def read_raw_visibilities(self):
# PROTECTED REGION ID(StatsCrosslet.raw_visibilities_read) ENABLED START #
"""Return the raw_visibilities attribute."""
self.data_lock.acquire()
raw_visibilities = self._raw_visibilities
self.data_lock.release()
return raw_visibilities
# PROTECTED REGION END # // StatsCrosslet.raw_visibilities_read
# -------------
# Pipes methods
# -------------
def read_pipe_visibilities(self):
# PROTECTED REGION ID(StatsCrosslet.pipe_visibilities_read) ENABLED START #
return dict(x=0, y=0)
# PROTECTED REGION END # // StatsCrosslet.pipe_visibilities_read
def read_pipe_raw_visibilities(self):
# PROTECTED REGION ID(StatsCrosslet.pipe_raw_visibilities_read) ENABLED START #
return dict(x=0, y=0)
# PROTECTED REGION END # // StatsCrosslet.pipe_raw_visibilities_read
# --------
# Commands
# --------
@command(
)
@DebugIt()
def start_acquisition(self):
# PROTECTED REGION ID(StatsCrosslet.start_acquisition) ENABLED START #
"""
Start the data acquisition of the station`s crosslet stats.
:param argin: 'DevULong'
:return:None
"""
self.data_acquisition_is_active.set()
# PROTECTED REGION END # // StatsCrosslet.start_acquisition
@command(
)
@DebugIt()
def stop_acquisition(self):
# PROTECTED REGION ID(StatsCrosslet.stop_acquisition) ENABLED START #
"""
Stop the data acquisition.
:return:None
"""
self.data_acquisition_is_active.clear()
# PROTECTED REGION END # // StatsCrosslet.stop_acquisition
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the StatsCrosslet module."""
# PROTECTED REGION ID(StatsCrosslet.main) ENABLED START #
return run((StatsCrosslet,), args=args, **kwargs)
# PROTECTED REGION END # // StatsCrosslet.main
if __name__ == '__main__':
main()