# -*- coding: utf-8 -*- # # This file is part of the PCC project # # # # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. """ PCC Device Server for LOFAR2.0 """ # PyTango imports from tango.server import run from tango.server import device_property # Additional import from clients.SNMP_client import SNMP_client from src.attribute_wrapper import * from src.hardware_device import * __all__ = ["example_device", "main"] class example_device(hardware_device): """ **Properties:** - Device Property SNMP_community - Type:'DevString' SNMP_host - Type:'DevULong' SNMP_timeout - Type:'DevDouble' """ # ----------------- # Device Properties # ----------------- # SNMP_community = device_property( # dtype='DevString', # mandatory=True # ) # # SNMP_host = device_property( # dtype='DevString', # mandatory=True # ) # # SNMP_timeout = device_property( # dtype='DevDouble', # mandatory=True # ) SNMP_community = b"public" SNMP_host = "127.0.0.1" SNMP_timeout = 5.0 # ---------- # Attributes # ---------- # simple scalar - description attr1 = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.1.0"}, datatype=numpy.str_, access=AttrWriteType.READ_WRITE) # simple scalar uptime attr2 = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.3.0"}, datatype=numpy.int64, access=AttrWriteType.READ_WRITE) # simple scalar with name attr3 = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.5.0"}, datatype=numpy.str_, access=AttrWriteType.READ_WRITE) #spectrum with all elements attr4 = attribute_wrapper(comms_annotation={"oids": ["1.3.6.1.2.1.2.2.1.1.1", "1.3.6.1.2.1.2.2.1.1.2", "1.3.6.1.2.1.2.2.1.1.3"]}, dims=(3,), datatype=numpy.int64) #inferred spectrum attr5 = attribute_wrapper(comms_annotation={"oids": ".1.3.6.1.2.1.2.2.1.1"}, dims=(3,), datatype=numpy.int64) def always_executed_hook(self): """Method always executed before any TANGO command is executed.""" pass def delete_device(self): """Hook to delete resources allocated in init_device. This method allows for any memory or other resources allocated in the init_device method to be released. This method is called by the device destructor and by the device Init command (a Tango built-in). """ self.debug_stream("Shutting down...") self.Off() self.debug_stream("Shut down. Good bye.") # -------- # overloaded functions # -------- def initialise(self): """ user code here. is called when the state is set to STANDBY """ # set up the SNMP ua client self.snmp_manager = SNMP_client(self.SNMP_community, self.SNMP_host, self.SNMP_timeout, self.Fault, self) # map the attributes to the OPC ua comm client for i in self.attr_list(): i.set_comm_client(self.snmp_manager) self.snmp_manager.start() # -------- # Commands # -------- # ---------- # Run server # ---------- def main(args=None, **kwargs): """Main function of the PCC module.""" return run((example_device,), args=args, **kwargs) if __name__ == '__main__': main()