Skip to content
Snippets Groups Projects
Select Git revision
  • master
1 result

test_device.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    SNMP.py 2.96 KiB
    # -*- coding: utf-8 -*-
    #
    # This file is part of the PCC project
    #
    #
    #
    # Distributed under the terms of the APACHE license.
    # See LICENSE.txt for more info.
    
    """ PCC Device Server for LOFAR2.0
    
    """
    
    # PyTango imports
    from tango.server import run
    from tango.server import device_property
    # Additional import
    
    from clients.SNMP_client import SNMP_client
    from src.attribute_wrapper import *
    from src.hardware_device import *
    
    
    __all__ = ["PCC", "main"]
    
    class PCC(hardware_device):
    	"""
    
    	**Properties:**
    
    	- Device Property
    		SNMP_community
    			- Type:'DevString'
    		SNMP_host
    			- Type:'DevULong'
    		SNMP_timeout
    			- Type:'DevDouble'
    	"""
    
    	# -----------------
    	# Device Properties
    	# -----------------
    
    	SNMP_community = device_property(
    		dtype='DevString',
    		mandatory=True
    	)
    
    	SNMP_host = device_property(
    		dtype='DevString',
    		mandatory=True
    	)
    
    	SNMP_timeout = device_property(
    		dtype='DevDouble',
    		mandatory=True
    	)
    
    	# ----------
    	# Attributes
    	# ----------
    
    
    	attr0 = attribute_wrapper(comms_annotation={"oids": [""]}, datatype=numpy.bool_, dims=(32,), access=AttrWriteType.READ_WRITE)
    	attr1 = attribute_wrapper(comms_annotation={"oids": ["1.3.6.1.2.1.1.6.0"]}, datatype=numpy.bool_, access=AttrWriteType.READ_WRITE)
    
    	attr2 = attribute_wrapper(comms_annotation={"host": "127.0.0.1", "oids": ["1.3.6.1.2.1.1.5.0"]}, datatype=numpy.bool_, access=AttrWriteType.READ_WRITE)
    
    	attr3 = attribute_wrapper(comms_annotation={"host": "127.0.0.1", "oids": ["1.3.6.1.2.1.1.5.1", "1.3.6.1.2.1.1.5.2", "1.3.6.1.2.1.1.5.3"]}, dims=(3,), datatype=numpy.bool_)
    	attr4 = attribute_wrapper(comms_annotation={"host": "127.0.0.1", "oids": ["1.3.6.1.2.1.1.5.0"]}, dims=(3,), datatype=numpy.bool_)
    	# ["1.3.6.1.2.1.1.5.0"] gets transformed in to an array the size of dims with ".1", ".2" .. added
    	# ["1.3.6.1.2.1.1.5.0.1", "1.3.6.1.2.1.1.5.0.2", "1.3.6.1.2.1.1.5.0.3"]
    
    	def always_executed_hook(self):
    		"""Method always executed before any TANGO command is executed."""
    		pass
    
    	def delete_device(self):
    		"""Hook to delete resources allocated in init_device.
    
    		This method allows for any memory or other resources allocated in the
    		init_device method to be released.  This method is called by the device
    		destructor and by the device Init command (a Tango built-in).
    		"""
    		self.debug_stream("Shutting down...")
    
    		self.Off()
    		self.debug_stream("Shut down.  Good bye.")
    
    	# --------
    	# overloaded functions
    	# --------
    	def initialise(self):
    		""" user code here. is called when the state is set to STANDBY """
    
    		#set up the SNMP ua client
    		self.snmp_manager = SNMP_client(self.SNMP_community, self.SNMP_host, self.SNMP_timeout, self.Standby, self.Fault, self)
    
    		# map the attributes to the OPC ua comm client
    		for i in self.attr_list():
    			i.set_comm_client(self.OPCua_client)
    
    		self.OPCua_client.start()
    
    	# --------
    	# Commands
    	# --------
    
    # ----------
    # Run server
    # ----------
    def main(args=None, **kwargs):
    	"""Main function of the PCC module."""
    	return run((PCC,), args=args, **kwargs)
    
    
    if __name__ == '__main__':
    	main()