Skip to content
Snippets Groups Projects
Commit 248c4ae4 authored by Taya Snijder's avatar Taya Snijder
Browse files

restored devices/test_device.py

parent c1851723
No related branches found
No related tags found
1 merge request!26Resolve #2021 "04 16 branched from master snmp device"
...@@ -13,83 +13,63 @@ ...@@ -13,83 +13,63 @@
# PyTango imports # PyTango imports
from tango.server import run from tango.server import run
from tango.server import device_property from tango.server import device_property
from tango import DevState
# Additional import # Additional import
<<<<<<< HEAD
from clients.SNMP_client import SNMP_client
from src.attribute_wrapper import *
from src.hardware_device import *
=======
from clients.test_client import test_client from clients.test_client import test_client
from util.attribute_wrapper import * from util.attribute_wrapper import *
from util.hardware_device import * from util.hardware_device import *
>>>>>>> master
__all__ = ["example_device", "main"] __all__ = ["test_device", "main"]
class example_device(hardware_device):
class test_device(hardware_device):
# ----------------- # -----------------
# Device Properties # Device Properties
# ----------------- # -----------------
SNMP_community = b"public" OPC_Server_Name = device_property(
SNMP_host = "127.0.0.1" dtype='DevString',
SNMP_timeout = 5.0 )
OPC_Server_Port = device_property(
dtype='DevULong',
)
OPC_Time_Out = device_property(
dtype='DevDouble',
)
# ---------- # ----------
# Attributes # Attributes
# ---------- # ----------
bool_scalar_R = attribute_wrapper(comms_annotation="numpy.bool_ type read scalar", datatype=numpy.bool_)
bool_scalar_RW = attribute_wrapper(comms_annotation="numpy.bool_ type read/write scalar", datatype=numpy.bool_, access=AttrWriteType.READ_WRITE)
# simple scalar int32_spectrum_R = attribute_wrapper(comms_annotation="numpy.int32 type read spectrum (len = 8)", datatype=numpy.int32, dims=(8,))
attr1 = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.6.0"}, datatype=numpy.bool_, access=AttrWriteType.READ_WRITE) int32_spectrum_RW = attribute_wrapper(comms_annotation="numpy.int32 type read spectrum (len = 8)", datatype=numpy.int32, dims=(8,),
# simple scalar with host access=AttrWriteType.READ_WRITE)
attr2 = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.5.0"}, datatype=numpy.bool_, access=AttrWriteType.READ_WRITE)
#spectrum with all elements
attr3 = attribute_wrapper(comms_annotation={"oids": ["1.3.6.1.2.1.1.5.1", "1.3.6.1.2.1.1.5.2", "1.3.6.1.2.1.1.5.3"]}, dims=(3,), datatype=numpy.bool_)
#inferred spectrum
attr4 = attribute_wrapper(comms_annotation={"oids": ["1.3.6.1.2.1.1.5.0"]}, dims=(3,), datatype=numpy.bool_)
def always_executed_hook(self): double_image_R = attribute_wrapper(comms_annotation="numpy.double type read image (dims = 2x8)", datatype=numpy.double, dims=(2, 8))
"""Method always executed before any TANGO command is executed.""" double_image_RW = attribute_wrapper(comms_annotation="numpy.double type read/write image (dims = 8x2)", datatype=numpy.double, dims=(8, 2),
pass access=AttrWriteType.READ_WRITE)
def delete_device(self): int32_scalar_R = attribute_wrapper(comms_annotation="numpy.int32 type read scalar", datatype=numpy.int32)
"""Hook to delete resources allocated in init_device. uint16_spectrum_RW = attribute_wrapper(comms_annotation="numpy.uint16 type read/write spectrum (len = 8)", datatype=numpy.uint16, dims=(8,),
access=AttrWriteType.READ_WRITE)
This method allows for any memory or other resources allocated in the float32_image_R = attribute_wrapper(comms_annotation="numpy.float32 type read image (dims = 8x2)", datatype=numpy.float32, dims=(8, 2))
init_device method to be released. This method is called by the device uint8_image_RW = attribute_wrapper(comms_annotation="numpy.uint8 type read/write image (dims = 2x8)", datatype=numpy.uint8, dims=(2, 8),
destructor and by the device Init command (a Tango built-in). access=AttrWriteType.READ_WRITE)
"""
self.debug_stream("Shutting down...")
self.Off()
self.debug_stream("Shut down. Good bye.")
# -------- # --------
# overloaded functions # overloaded functions
# -------- # --------
def initialise(self): def configure_for_initialise(self):
""" user code here. is called when the state is set to STANDBY """ """ user code here. is called when the sate is set to INIT """
"""Initialises the attributes and properties of the PCC."""
# set up the SNMP ua client
self.snmp_manager = SNMP_client(self.SNMP_community, self.SNMP_host, self.SNMP_timeout, self.Fault, self)
<<<<<<< HEAD
# map the attributes to the OPC ua comm client
for i in self.attr_list():
i.set_comm_client(self.snmp_manager)
self.snmp_manager.start() self.set_state(DevState.INIT)
# --------
# Commands
# --------
=======
# set up the test client # set up the test client
self.test_client = test_client(self.Fault, self) self.test_client = test_client(self.Fault, self)
...@@ -98,15 +78,14 @@ class example_device(hardware_device): ...@@ -98,15 +78,14 @@ class example_device(hardware_device):
i.set_comm_client(self.test_client) i.set_comm_client(self.test_client)
self.test_client.start() self.test_client.start()
>>>>>>> master
# ---------- # ----------
# Run server # Run server
# ---------- # ----------
def main(args=None, **kwargs): def main(args=None, **kwargs):
"""Main function of the PCC module.""" """Main function of the example module."""
return run((example_device,), args=args, **kwargs) return run((test_device,), args=args, **kwargs)
if __name__ == '__main__': if __name__ == '__main__':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment