# -*- coding: utf-8 -*- # # This file is part of the PCC project # # # # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. """ SNMP Device for LOFAR2.0 """ # PyTango imports from tango.server import run from tango.server import device_property from tango import AttrWriteType # Additional import from clients.SNMP_client import SNMP_client from util.attribute_wrapper import attribute_wrapper from util.hardware_device import hardware_device import numpy __all__ = ["SNMP", "main"] class SNMP(hardware_device): """ **Properties:** - Device Property SNMP_community - Type:'DevString' SNMP_host - Type:'DevULong' SNMP_timeout - Type:'DevDouble' """ # ----------------- # Device Properties # ----------------- SNMP_community = device_property( dtype='DevString', mandatory=True ) SNMP_host = device_property( dtype='DevString', mandatory=True ) SNMP_timeout = device_property( dtype='DevDouble', mandatory=True ) # ---------- # Attributes # ---------- sys_description_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.1.0"}, datatype=numpy.str_) sys_objectID_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.2.0", "type": "OID"}, datatype=numpy.str_) sys_uptime_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.3.0", "type": "TimeTicks"}, datatype=numpy.int64) sys_name_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.5.0"}, datatype=numpy.str_) ip_route_mask_127_0_0_1_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.4.21.1.11.127.0.0.1", "type": "IpAddress"}, datatype=numpy.str_) TCP_active_open_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.6.5.0", "type": "Counter32"}, datatype=numpy.int64) sys_contact_RW = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str_, access=AttrWriteType.READ_WRITE) sys_contact_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str_) TCP_Curr_estab_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.6.9.0", "type": "Gauge"}, datatype=numpy.int64) # inferred spectrum if_index_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.2.2.1.1"}, dims=(10,), datatype=numpy.int64) # -------- # overloaded functions # -------- def configure_for_initialise(self): """ user code here. is called when the state is set to STANDBY """ # set up the SNMP ua client self.snmp_manager = SNMP_client(self.SNMP_community, self.SNMP_host, self.SNMP_timeout, self.Fault, self) # map an access helper class for i in self.attr_list(): try: i.set_comm_client(self.snmp_manager) except Exception as e: # use the pass function instead of setting read/write fails i.set_pass_func() self.warn_stream("error while setting the SNMP attribute {} read/write function. {}".format(i, e)) self.snmp_manager.start() # -------- # Commands # -------- # ---------- # Run server # ---------- def main(args=None, **kwargs): """Main function of the PCC module.""" from util.lofar_logging import configure_logger import logging configure_logger(logging.getLogger()) return run((SNMP,), args=args, **kwargs) if __name__ == '__main__': main()