Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# -*- coding: utf-8 -*-
#
# This file is part of the PCC project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" PCC Device Server for LOFAR2.0
"""
# PyTango imports
from tango.server import run
from tango.server import device_property
# Additional import
from clients.SNMP_client import SNMP_client
from src.attribute_wrapper import *
from src.hardware_device import *
__all__ = ["example_device", "main"]
class example_device(hardware_device):
"""
**Properties:**
- Device Property
SNMP_community
- Type:'DevString'
SNMP_host
- Type:'DevULong'
SNMP_timeout
- Type:'DevDouble'
"""
# -----------------
# Device Properties
# -----------------
# SNMP_community = device_property(
# dtype='DevString',
# mandatory=True
# )
#
# SNMP_host = device_property(
# dtype='DevString',
# mandatory=True
# )
#
# SNMP_timeout = device_property(
# dtype='DevDouble',
# mandatory=True
# )
SNMP_community = b"public"
SNMP_host = "127.0.0.1"
SNMP_timeout = 5.0
# ----------
# Attributes
# ----------
# simple scalar - description
attr1 = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.1.0"}, datatype=numpy.str_, access=AttrWriteType.READ_WRITE)
# simple scalar uptime
attr2 = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.3.0"}, datatype=numpy.int64, access=AttrWriteType.READ_WRITE)
# simple scalar with name
attr3 = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.5.0"}, datatype=numpy.str_, access=AttrWriteType.READ_WRITE)
#spectrum with all elements
attr4 = attribute_wrapper(comms_annotation={"oids": ["1.3.6.1.2.1.2.2.1.1.1", "1.3.6.1.2.1.2.2.1.1.2", "1.3.6.1.2.1.2.2.1.1.3"]}, dims=(3,), datatype=numpy.int64)
#inferred spectrum
attr5 = attribute_wrapper(comms_annotation={"oids": ".1.3.6.1.2.1.2.2.1.1"}, dims=(3,), datatype=numpy.int64)
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
pass
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command (a Tango built-in).
"""
self.debug_stream("Shutting down...")
self.Off()
self.debug_stream("Shut down. Good bye.")
# --------
# overloaded functions
# --------
def initialise(self):
""" user code here. is called when the state is set to STANDBY """
# set up the SNMP ua client
self.snmp_manager = SNMP_client(self.SNMP_community, self.SNMP_host, self.SNMP_timeout, self.Fault, self)
# map the attributes to the OPC ua comm client
for i in self.attr_list():
i.set_comm_client(self.snmp_manager)
self.snmp_manager.start()
# --------
# Commands
# --------
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the PCC module."""
return run((example_device,), args=args, **kwargs)
if __name__ == '__main__':
main()