Skip to content
Snippets Groups Projects
Select Git revision
  • 5098f38577999518cc11f292c48527f83331873d
  • master default protected
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • fix-build-dind
  • L2SS-2153--Improve-Error-Handling
  • L2SS-2153-Add-Grpc-Gateway-support
  • L2SS-1970-apsct-lol
  • v0.55.5 protected
  • v0.55.4 protected
  • 0.55.2.dev0
  • 0.55.1.dev0
  • 0.55.0.dev0
  • v0.54.0 protected
  • 0.53.2.dev0
  • 0.53.1.dev0
  • v0.52.3-r2 protected
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
  • v0.52.1-rc1 protected
  • v0.51.9-6 protected
41 results

snmp.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    snmp.py 3.73 KiB
    # -*- coding: utf-8 -*-
    #
    # This file is part of the PCC project
    #
    #
    #
    # Distributed under the terms of the APACHE license.
    # See LICENSE.txt for more info.
    
    """ SNMP Device for LOFAR2.0
    
    """
    
    # TODO(Corne): Remove sys.path.append hack once packaging is in place!
    import os, sys
    currentdir = os.path.dirname(os.path.realpath(__file__))
    parentdir = os.path.dirname(currentdir)
    parentdir = os.path.dirname(parentdir)
    sys.path.append(parentdir)
    
    # PyTango imports
    from tango.server import run
    from tango.server import device_property
    from tango import AttrWriteType
    
    # Additional import
    from examples.snmp.snmp_client import SNMP_client
    from clients.attribute_wrapper import attribute_wrapper
    from devices.hardware_device import hardware_device
    
    import numpy
    
    __all__ = ["SNMP", "main"]
    
    
    class SNMP(hardware_device):
        """
    
        **Properties:**
    
        - Device Property
            SNMP_community
            - Type:'DevString'
            SNMP_host
            - Type:'DevULong'
            SNMP_timeout
            - Type:'DevDouble'
            """
    
        # -----------------
        # Device Properties
        # -----------------
    
        SNMP_community = device_property(
            dtype='DevString',
            mandatory=True
        )
    
        SNMP_host = device_property(
            dtype='DevString',
            mandatory=True
        )
    
        SNMP_timeout = device_property(
            dtype='DevDouble',
            mandatory=True
        )
    
        # ----------
        # Attributes
        # ----------
    
        sys_description_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.1.0"}, datatype=numpy.str_)
        sys_objectID_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.2.0", "type": "OID"}, datatype=numpy.str_)
        sys_uptime_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.3.0", "type": "TimeTicks"}, datatype=numpy.int64)
        sys_name_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.5.0"}, datatype=numpy.str_)
        ip_route_mask_127_0_0_1_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.4.21.1.11.127.0.0.1", "type": "IpAddress"}, datatype=numpy.str_)
        TCP_active_open_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.6.5.0", "type": "Counter32"}, datatype=numpy.int64)
    
        sys_contact_RW = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str_, access=AttrWriteType.READ_WRITE)
        sys_contact_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str_)
    
        TCP_Curr_estab_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.6.9.0", "type": "Gauge"}, datatype=numpy.int64)
    
        # inferred spectrum
        if_index_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.2.2.1.1"}, dims=(10,), datatype=numpy.int64)
    
    
        # --------
        # overloaded functions
        # --------
        def configure_for_initialise(self):
            """ user code here. is called when the state is set to STANDBY """
    
            # set up the SNMP ua client
            self.snmp_manager = SNMP_client(self.SNMP_community, self.SNMP_host, self.SNMP_timeout, self.Fault, self)
    
            # map an access helper class
            for i in self.attr_list():
                try:
                    i.set_comm_client(self.snmp_manager)
                except Exception as e:
                    # use the pass function instead of setting read/write fails
                    i.set_pass_func()
                    self.warn_stream("error while setting the SNMP attribute {} read/write function. {}".format(i, e))
    
            self.snmp_manager.start()
    
    
    # --------
    # Commands
    # --------
    
    
    # ----------
    # Run server
    # ----------
    def main(args=None, **kwargs):
        """Main function of the PCC module."""
    
        from common.lofar_logging import configure_logger
        import logging
        configure_logger(logging.getLogger())
    
        return run((SNMP,), args=args, **kwargs)
    
    
    if __name__ == '__main__':
        main()