Newer
Older
# -*- coding: utf-8 -*-
#
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
"""
# PyTango imports
from tango.server import run
from tango.server import device_property
from tango import AttrWriteType
# Additional import
from clients.sst_client import sst_client
from util.attribute_wrapper import attribute_wrapper
from util.hardware_device import hardware_device
from util.lofar_logging import device_logging_to_python, log_exceptions
import numpy
__all__ = ["SST", "main"]
@device_logging_to_python({"device": "SST"})
class SST(hardware_device):
# -----------------
# Device Properties
# -----------------
SST_Port = device_property(
dtype='DevULong',
mandatory=True
)
# ----------
# Attributes
# ----------
packet_count_R = attribute_wrapper(comms_annotation={"parameter": "packet_count"}, datatype=numpy.int64)
timestamp_R = attribute_wrapper(comms_annotation={"parameter": "timestamp"}, datatype=numpy.int64)
# --------
# overloaded functions
def configure_for_off(self):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
try:
self.set_comm_client.stop()
except Exception as e:
self.warn_stream("Exception while stopping sst_client in configure_for_off function: {}. Exception ignored".format(e))
@log_exceptions()
def configure_for_initialise(self):
""" user code here. is called when the sate is set to INIT """
"""Initialises the attributes and properties of the statistics device."""
self.sst_client = sst_client("0.0.0.0", self.SST_Port, self.Fault, self)
# map an access helper class
for i in self.attr_list():
try:
i.set_comm_client(self.sst_client)
except Exception as e:
# use the pass function instead of setting read/write fails
i.set_pass_func()
self.warn_stream("error while setting the sst attribute {} read/write function. {}".format(i, e))
pass
self.sst_client.start()
# --------
# Commands
# --------
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
return run((SST,), args=args, **kwargs)
if __name__ == '__main__':
main()